1.drops之前的文档SQLMAP进阶使用介绍过SQLMAP的高级使用方法,网上也有几篇介绍过SQLMAP源码的文章 曾是土木人 ,都写的非常好,建议大家都看一下。
2.在菜单栏Run->Edit Configurations。点击左侧的“+”,选择Python,Script中选择sqlmap.py的路径,Script parameters中填入注入时的命令,如下图。 3.打开sqlmap.py,开始函数是main函数,在main函数处下断点。
4.右键Debug 'sqlmap',然后程序就自动跳到我们下断点的main()函数处,后面可以继续添加断点进行调试。如下图,左边红色的代表跳转到下一个断点处,上面红色的表示跳到下一句代码处
#!python paths.SQLMAP_ROOT_PATH = modulePath() setPaths()
#!python paths.SQLMAP_EXTRAS_PATH = os.path.join(paths.SQLMAP_ROOT_PATH, "extra") paths.SQLMAP_PROCS_PATH = os.path.join(paths.SQLMAP_ROOT_PATH, "procs") paths.SQLMAP_SHELL_PATH = os.path.join(paths.SQLMAP_ROOT_PATH, "shell") paths.SQLMAP_TAMPER_PATH = os.path.join(paths.SQLMAP_ROOT_PATH, "tamper") paths.SQLMAP_WAF_PATH = os.path.join(paths.SQLMAP_ROOT_PATH, "waf")
接下来的78行函数initOptions(cmdLineOptions),包含了三个函数,作用如流程图所示,设置conf,KB,参数. conf会保存用户输入的一些参数,比如url,端口kb会保存注入时的一些参数,其中有两个是比较特殊的kb.chars.start和kb.chars.stop,这两个是随机字符串,后面会有介绍。
#!python _setConfAttributes() _setKnowledgeBaseAttributes() _mergeOptions(inputOptions, overrideOptions)
#!python if conf.direct: initTargetEnv() setupTargetEnv() action() return True
#!python python sqlmap.py -d "mysql://admin:[email protected]
:3306/testdb" -f --banner --dbs --user #!python if conf.url and not any((conf.forms, conf.crawlDepth)): kb.targets.add((conf.url, conf.method, conf.data, conf.cookie, None))
#!python for targetUrl, targetMethod, targetData, targetCookie, targetHeaders in kb.targets:
#!python def setupTargetEnv(): _createTargetDirs() _setRequestParams() _setHashDB() _resumeHashDBValues() _setResultsFile() _setAuthCred()
#!python checkWaf() if conf.identifyWaf: identifyWaf()
377行checkWaf()是检测是否有WAF,检测方法是NMAP的http-waf-detect.nse,比如页面为index.php?id=1,那现在添加一个随机变量index.php?id=1&aaa=2,设置paoyload类似为 AND 1=1 UNION ALL SELECT 1,2,3,table_name FROM information_schema.tables WHERE 2>1-- ../../../etc/passwd
#!python __product__ = "360 Web Application Firewall (360)" def detect(get_page): retval = False for vector in WAF_ATTACK_VECTORS: page, headers, code = get_page(get=vector) retval = re.search(r"wangzhan/.360/.cn", headers.get("X-Powered-By-360wzb", ""), re.I) is not None if retval: break return retval if (len(kb.injections) == 0 or (len(kb.injections) == 1 and kb.injections[0].place is None)) / and (kb.injection.place is None or kb.injection.parameter is None):
#!python for place in parameters: # Test User-Agent and Referer headers only if # --level >= 3 skip = (place == PLACE.USER_AGENT and conf.level < skip |= (place == PLACE.REFERER and conf.level < 3) # Test Host header only if # --level >= 5 skip |= (place == PLACE.HOST and conf.level < 5) # Test Cookie header only if --level >= 2 skip |= (place == PLACE.COOKIE and conf.level < 2)
#!python check = checkDynParam(place, parameter, value)
#!python check = heuristicCheckSqlInjection(place, parameter)
#!python if conf.prefix or conf.suffix: if conf.prefix: prefix = conf.prefix if conf.suffix: suffix = conf.suffix randStr = "" while '/'' not in randStr: randStr = randomStr(length=10, alphabet=HEURISTIC_CHECK_ALPHABET) kb.heuristicMode = True payload = "%s%s%s" % (prefix, randStr, suffix) payload = agent.payload(place, parameter, newValue=payload) page, _ = Request.queryPage(payload, place, content=True, raise404=False) kb.heuristicMode = False parseFilePaths(page) result = wasLastResponseDBMSError()
首先conf.prefix和conf.suffix代表用户指定的前缀和后缀;在 while '/'' not in randStr
中,随机选择'"', '/'', ')', '(', ',', '.'中的字符,选10个,并且单引号要在。接下来生成一个payload,类似u'name= PAYLOAD_DELIMITER/__1)."."."/'."__PAYLOAD_DELIMITER '。其中 PAYLOAD_DELIMITER/__1和__PAYLOAD_DELIMITER 是随机字符串。请求网页后,调用parseFilePaths进行解析,查看是否爆出绝对路径,而wasLastResponseDBMSError是判断response中是否包含了数据库的报错信息。
#!python value = "%s%s%s" % (randomStr(), DUMMY_XSS_CHECK_APPENDIX, randomStr()) payload = "%s%s%s" % (prefix, "'%s" % value, suffix) payload = agent.payload(place, parameter, newValue=payload) page, _ = Request.queryPage(payload, place, content=True, raise404=False) paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place if value in (page or ""): infoMsg = "heuristic (XSS) test shows that %s parameter " % paramType infoMsg += "'%s' might be vulnerable to XSS attacks" % parameter logger.info(infoMsg) kb.heuristicMode = False
上面的代码是从888行开始,DUMMY_XSS_CHECK_APPENDIX = "<'/">",如果输入的字符串在页面中返回了,会提示可能存在XSS漏洞。
#!python if testSqlInj: ...... injection = checkSqlInjection(place, parameter, value)
#!python paramType = conf.method if conf.method not in (None, HTTPMETHOD.GET, HTTPMETHOD.POST) else place tests = getSortedInjectionTests()
#!python if conf.dbms is None: if not injection.dbms and PAYLOAD.TECHNIQUE.BOOLEAN in injection.data: if not Backend.getIdentifiedDbms() and kb.heuristicDbms is False: kb.heuristicDbms = heuristicCheckDbms(injection) if kb.reduceTests is None and not conf.testFilter and (intersect(Backend.getErrorParsedDBMSes(), / SUPPORTED_DBMS, True) or kb.heuristicDbms or injection.dbms): msg = "it looks like the back-end DBMS is '%s'. " % (Format.getErrorParsedDBMSes() or kb.heuristicDbms or injection.dbms) msg += "Do you want to skip test payloads specific for other DBMSes? [Y/n]" kb.reduceTests = (Backend.getErrorParsedDBMSes() or [kb.heuristicDbms]) if readInput(msg, default='Y').upper() == 'Y' else [] if kb.extendTests is None and not conf.testFilter and (conf.level < 5 or conf.risk < 3) / and (intersect(Backend.getErrorParsedDBMSes(), SUPPORTED_DBMS, True) or / kb.heuristicDbms or injection.dbms): msg = "for the remaining tests, do you want to include all tests " msg += "for '%s' extending provided " % (Format.getErrorParsedDBMSes() or kb.heuristicDbms or injection.dbms) msg += "level (%d)" % conf.level if conf.level < 5 else "" msg += " and " if conf.level < 5 and conf.risk < 3 else "" msg += "risk (%d)" % conf.risk if conf.risk < 3 else "" msg += " values? [Y/n]" if conf.level < 5 and conf.risk < 3 else " value? [Y/n]" kb.extendTests = (Backend.getErrorParsedDBMSes() or [kb.heuristicDbms]) if readInput(msg, default='Y').upper() == 'Y' else []
140行if stype == PAYLOAD.TECHNIQUE.UNION:会判断是不是union注入,这个stype就是payload文件夹下面xml文件中的stype,如果是union,就会进入,然后配置列的数量等,今天先介绍流程,union注入以后会介绍。
#!python if conf.tech and isinstance(conf.tech, list) and stype not in conf.tech: debugMsg = "skipping test '%s' because the user " % title debugMsg += "specified to test only for " debugMsg += "%s techniques" % " & ".join(map(lambda x: PAYLOAD.SQLINJECTION[x], conf.tech)) logger.debug(debugMsg) continue
B: Boolean-based blind SQL injection(布尔型注入) E: Error-based SQL injection(报错型注入) U: UNION query SQL injection(可联合查询注入) S: Stacked queries SQL injection(可多语句查询注入) T: Time-based blind SQL injection(基于时间延迟注入) Q: inline_query(内联查询)
#!python fstPayload = agent.cleanupPayload(test.request.payload, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) else None)
test.request.payload为'AND [RANDNUM]=[RANDNUM]'(相应payload.xml中的request值)。根据此代码,生成一个随机字符串,如fstPayload=u'AND 2876=2876'。302行:
#!python for boundary in boundaries: injectable = False if boundary.level > conf.level and not (kb.extendTests and intersect(payloadDbms, kb.extendTests, True)): continue
#!python clauseMatch = False for clauseTest in test.clause: if clauseTest in boundary.clause: clauseMatch = True break if test.clause != [0] and boundary.clause != [0] and not clauseMatch: continue whereMatch = False for where in test.where: if where in boundary.where: whereMatch = True break if not whereMatch: continue
首先,循环遍历test.clause(payload中的clause值),如果clauseTest在boundary的clause中,则设置clauseMatch = True,代表此条boundary可以使用。 接下来循环匹配where(payload中的where值),如果存在这样的where,设置whereMatch = True。如果clause和where中的一个没有匹配成功,都会结束循环,进入下一个payload的测试。
#!python prefix = boundary.prefix if boundary.prefix else "" suffix = boundary.suffix if boundary.suffix else "" ptype = boundary.ptype prefix = conf.prefix if conf.prefix is not None else prefix suffix = conf.suffix if conf.suffix is not None else suffix comment = None if conf.suffix is not None else comment
#!python for where in test.where: if where == PAYLOAD.WHERE.ORIGINAL or conf.prefix: ...... elif where == PAYLOAD.WHERE.NEGATIVE: ...... elif where == PAYLOAD.WHERE.REPLACE: ......
1:表示将我们的payload直接添加在值得后面[此处指的应该是检测的参数的值] 如我们写的参数是id=1,设置
2:表示将检测的参数的值更换为一个整数,然后将payload添加在这个整数的后面。 如我们写的参数是id=1,设置
#!python boundPayload = agent.prefixQuery(fstPayload, prefix, where, clause) boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where) reqPayload = agent.payload(place, parameter, newValue=boundPayload, where=where)
#!python for method, check in test.response.items(): check = agent.cleanupPayload(check, origValue=value if place not in (PLACE. URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) else None) if method == PAYLOAD.METHOD.COMPARISON: def genCmpPayload(): sndPayload = agent.cleanupPayload(test.response.comparison, origValue=value if place not in (PLACE.URI, PLACE.CUSTOM_POST, PLACE.CUSTOM_HEADER) else None) boundPayload = agent.prefixQuery(sndPayload, prefix, where, clause) boundPayload = agent.suffixQuery(boundPayload, comment, suffix, where) cmpPayload = agent.payload(place, parameter, newValue=boundPayload, where=where) return cmpPayload kb.matchRatio = None kb.negativeLogic = (where == PAYLOAD.WHERE.NEGATIVE) Request.queryPage(genCmpPayload(), place, raise404=False) falsePage = threadData.lastComparisonPage or "" trueResult = Request.queryPage(reqPayload, place, raise404=False) truePage = threadData.lastComparisonPage or "" if trueResult: falseResult = Request.queryPage(genCmpPayload(), place, raise404=False) if not falseResult: infoMsg = "%s parameter '%s' seems to be '%s' injectable " % ( paramType, parameter, title) logger.info(infoMsg) injectable = True if not injectable and not any((conf.string, conf.notString, conf. regexp)) and kb.pageStable: trueSet = set(extractTextTagContent(truePage)) falseSet = set(extractTextTagContent(falsePage)) candidates = filter(None, (_.strip() if _.strip() in (kb. pageTemplate or "") and _.strip() not in falsePage and _.strip() not in threadData.lastComparisonHeaders else None for _ in ( trueSet - falseSet))) if candidates: conf.string = candidates[0] infoMsg = "%s parameter '%s' seems to be '%s' injectable (with --string=/"%s/")" % (paramType, parameter, title, repr(conf. string).lstrip('u').strip("'")) logger.info(infoMsg) injectable = True elif method == PAYLOAD.METHOD.GREP: try: page, headers = Request.queryPage(reqPayload, place, content=True, raise404=False) output = extractRegexResult(check, page, re.DOTALL | re. IGNORECASE) / or extractRegexResult(check, listToStrValue( / [headers[key] for key in headers.keys() if key.lower() != URI_HTTP_HEADER.lower()] / if headers else None), re.DOTALL | re.IGNORECASE) / or extractRegexResult(check, threadData.lastRedirectMsg[1] / if threadData.lastRedirectMsg and threadData. lastRedirectMsg[0] == / threadData.lastRequestUID else None, re.DOTALL | re. IGNORECASE) if output: result = output == "1" if result: infoMsg = "%s parameter '%s' is '%s' injectable " % ( paramType, parameter, title) logger.info(infoMsg) injectable = True except SqlmapConnectionException, msg: debugMsg = "problem occurred most likely because the " debugMsg += "server hasn't recovered as expected from the " debugMsg += "error-based payload used ('%s')" % msg logger.debug(debugMsg) elif method == PAYLOAD.METHOD.TIME: trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False) if trueResult: # Confirm test's results trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True, raise404=False) if trueResult: infoMsg = "%s parameter '%s' seems to be '%s' injectable " % ( paramType, parameter, title) logger.info(infoMsg) injectable = True elif method == PAYLOAD.METHOD.UNION: configUnion(test.request.char, test.request.columns) if not Backend.getIdentifiedDbms(): if kb.heuristicDbms is None: warnMsg = "using unescaped version of the test " warnMsg += "because of zero knowledge of the " warnMsg += "back-end DBMS. You can try to " warnMsg += "explicitly set it using option '--dbms'" singleTimeWarnMessage(warnMsg) else: Backend.forceDbms(kb.heuristicDbms) if unionExtended: infoMsg = "automatically extending ranges for UNION " infoMsg += "query injection technique tests as " infoMsg += "there is at least one other (potential) " infoMsg += "technique found" singleTimeLogMessage(infoMsg) reqPayload, vector = unionTest(comment, place, parameter, value, prefix, suffix) if isinstance(reqPayload, basestring): infoMsg = "%s parameter '%s' is '%s' injectable" % (paramType, parameter, title) logger.info(infoMsg) injectable = True # Overwrite 'where' because it can be set # by unionTest() directly where = vector[6] kb.previousMethod = method
1.method为PAYLOAD.METHOD.COMPARISON:bool类型盲注 2.method为PAYLOAD.METHOD.GREP:基于错误的sql注入 3.mehtod为PAYLOAD.METHOD.TIME:基于时间的盲注 4.method为PAYLOAD.METHOD.UNION:union联合查询
4.where字段有三个值1:表示将我们的payload直接添加在值得后面[此处指的应该是检测的参数的值] 如我们写的参数是id=1,设置
5.最终的payload = url参数 + boundary.prefix+test.payload+boundary.suffix
#!python if conf.getDbs: conf.dumper.dbs(conf.dbmsHandler.getDbs()) if conf.getTables: conf.dumper.dbTables(conf.dbmsHandler.getTables()) if conf.commonTables: conf.dumper.dbTables(tableExists(paths.COMMON_TABLES))
#!python _saveToResultsFile() _saveToHashDB() _showInjections() _selectInjection()
#!python query = queries[Backend.getIdentifiedDbms()].current_db.query
#!python if not value and not abortedFlag: output = _oneShotUnionUse(expression, unpack) value = parseUnionPage(output)
#!python retVal = hashDBRetrieve("%s%s" % (conf.hexConvert, expression), checkConf=True)
#!python def hashKey(key): key = key.encode(UNICODE_ENCODING) if isinstance(key, unicode) else repr(key) retVal = int(hashlib.md5(key).hexdigest()[:12], 16) #注释:hash的算法,对应数据库中的id。md5后,转换为10进制,就是session中的id return retVal def retrieve(self, key, unserialize=False): retVal = None if key and (self._write_cache or os.path.isfile(self.filepath)): hash_ = HashDB.hashKey(key) retVal = self._write_cache.get(hash_) if not retVal: while True: try: for row in self.cursor.execute("SELECT value FROM storage WHERE id=?", (hash_,)): retVal = row[0] except sqlite3.OperationalError, ex: if not "locked" in ex.message: raise except sqlite3.DatabaseError, ex: errMsg = "error occurred while accessing session file '%s' ('%s'). " % (self.filepath, ex) errMsg += "If the problem persists please rerun with `--flush-session`" raise SqlmapDataException, errMsg else: break return retVal if not unserialize else unserializeObject(retVal)
#!python def hashDBRetrieve(key, unserialize=False, checkConf=False): _ = "%s%s%s" % (conf.url or "%s%s" % (conf.hostname, conf.port), key, HASHDB_MILESTONE_VALUE) retVal = conf.hashDB.retrieve(_, unserialize) if kb.resumeValues and not (checkConf and any((conf.flushSession, conf.freshQueries))) else None if not kb.inferenceMode and not kb.fileReadMode and any(_ in (retVal or "") for _ in (PARTIAL_VALUE_MARKER, PARTIAL_HEX_VALUE_MARKER)): retVal = None return retVal
此函数用于生成hash的key,生成方法为url+'None'+命令+HASHDB_MILESTONE_VALUE,比如u''。此key经过int(hashlib.md5(key).hexdigest()[:12], 16),就是对应session中的id
#!python vector = kb.injection.data[PAYLOAD.TECHNIQUE.UNION].vector kb.unionDuplicates = vector[7] kb.forcePartialUnion = vector[8] query = agent.forgeUnionQuery(injExpression, vector[0], vector[1], vector[2], vector[3], vector[4], vector[5], vector[6], None, limited) where = PAYLOAD.WHERE.NEGATIVE if conf.limitStart or conf.limitStop else vector[6] payload = agent.payload(newValue=query, where=where)
最终的值通过解析session中的记录value = parseUnionPage(output),找到kb.chars.start和kb.chars.stop中间的值,就是结果。