原创作者: 大师
接到某单位通知让查找中国具有SYNful Knock后门的CISCO路由器,按照曼迪安特分析的报告称中国已经发现3台具有SYNful Knock后门的路由器,如何快速从全国3亿IP地址中快速查找出3个IP地址难度还是十分的大啊,而我经过查找发现中国已经有4个IP被植入了后门,现将检测过程分享给大家。
一、获取IP地址
为保证中国IP的全面性,从apnic重新获取亚洲区域所分配到的IP,过滤出CN的IP,结果如下。apnic文件中每行为一个IP地址段,以"|"作为分隔,第四个字段为IP起始地址,第五个字段为IP地址数量。
二、IP地址格式调整
将IP地址格式调整成zmap的CIDR格式,如下:
三、使用zmap检测80端口开放ip
命令:zmap -w china_ip_cidr.txt -p 80 -o 80.txt
检测出5184575个开放80端口的IP地址。
四、POC制作思路
互联网搜索发现还没有此后门的POC (现在CISCO已经发布自己的POC,后期我的POC也参考CISCO的POC做了适当调整) ,没办法自给自足仔细研读了曼迪安特的报告,经过多次改版最终POC思路如下:
(一)伪造SYN报文,使seq和ack_seq之间的差为0xC123D大批量发送给目标主机。
(二)单独监听收到的网络报文,将seq和ack_seq之间的差为0xC123E、urgent flag未设置、urgent pointer=0×0001、hard-coded TCP options=“02 04 05 b4 01 01 04 02 01 03 03 05”的网络报文过滤出来保存。
五、POC分析如下:
(一)SYN报文部分
def create_tcp_syn_header(source_port,source_ip, dest_ip, dest_port): # tcp 头部选项 source = source_port # 随机化一个源端口 seq = 791104 ack_seq = 3 doff = 5 # tcp flags fin = 0 syn = 1 rst = 0 psh = 0 ack = 0 urg = 0 window = socket.htons (8192) # 最大窗口大小 check = 0 urg_ptr = 0 offset_res = (doff << 4) + 0 tcp_flags = fin + (syn<<1) + (rst<<2) + (psh<<3) + (ack<<4) + (urg<<5) tcp_header = pack('!HHLLBBHHH', source, dest_port, seq, ack_seq, offset_res, tcp_flags, window, check, urg_ptr) # 伪头部选项 source_address = socket.inet_aton( source_ip ) dest_address = socket.inet_aton( dest_ip ) placeholder = 0 protocol = socket.IPPROTO_TCP tcp_length = len(tcp_header) psh = pack('!4s4sBBH', source_address, dest_address, placeholder, protocol, tcp_length) psh = psh + tcp_header tcp_checksum = checksum(psh) # 重新打包TCP头部,并填充正确地校验和 tcp_header = pack('!HHLLBBHHH', source, dest_port, seq, ack_seq, offset_res, tcp_flags, window, tcp_checksum, urg_ptr) return tcp_header
修改SYN seq和ack_seq差值,使其差值为0xC123D,并直接发送SYN,不等待接收SYN ACK报文,由网络监听部分进行SYN ACK报文的拦截,由于不接收SYN ACK报文,发送速度提升了N倍。考虑到发送完关闭过快在每个报文发送完毕后增加了0.1秒的等待。
(二)网络监听部分
此部分借鉴了CISCO发布的检测脚本,但是CISCO检测脚本检测性能较差,我将检测POC分成了2部分,将SYN报文发送部分POC放入检测框架批量执行,网络监听部分单独执行分析SYN ACK的报文特征符合性,代码如下:
class KnockScanner(object): """Scans for routers loaded with SYNKnock malware """ ACK_OFFSET = 0xC123D TCP_OPTIONS_STRING = "/x02/x04/x05/xb4/x01/x01/x04/x02/x01/x03/x03/x05" DEST_PORT = 80 LISTEN_FILTER = "icmp or (tcp and src port %s)" % DEST_PORT def __init__(self): self.iface=scapy.all.conf.iface @classmethod def ioc_offset_correct(cls, tcp_pkt): """ACK/SYN sequence number set to static offset """ original_seq = tcp_pkt.ack - 1 # seq number was bumped by the ack # not clear if offset is always directional, so just use abs value offset = abs(tcp_pkt.seq - original_seq) if offset == cls.ACK_OFFSET: return (True, "ACK/SYN seq numbers set to static offset") return (False, "ACK/SYN seq number offset not right (0x%X, " "looking for 0x%X)" % (offset, cls.ACK_OFFSET)) @classmethod def ioc_urg_pointer(cls, tcp_pkt): """Urgent flag turned off but urgent pointer turned on """ # This isn't a reliable check, have seen replies with no urg pointer # set at all. But if pointer *is* set and no flags, it's highly # suspicious if (tcp_pkt.flags & TCPFLAG_URG): if tcp_pkt.urgptr: return (False, "URG ptr=%r with flag set (normal)" % tcp_pkt.urgptr) else: return (True, "URG Flag set but no URG ptr. " "Not a known indicator, but not normal") else: if tcp_pkt.urgptr: return (True, "URG flag off but URG ptr=%r" % tcp_pkt.urgptr) else: # No flag, no pointer, not interesting return (False, None) @classmethod def ioc_tcp_options(cls, tcp_pkt): """Static TCP options set """ if str(tcp_pkt)[20:].startswith(cls.TCP_OPTIONS_STRING): return (True, "Static set of TCP options found") return (False, "Static set of TCP options not found") def tcp_is_knock_reply(self, tcp_pkt): """Looks at a TCP packet and determines if it is a SYNKnock reply """ if tcp_pkt.flags & TCPFLAG_RST: return (False, "TCP Reset received") details = [] offset, ioc_details = self.ioc_offset_correct(tcp_pkt) if ioc_details: details.append(ioc_details) urg, ioc_details = self.ioc_urg_pointer(tcp_pkt) if ioc_details: details.append(ioc_details) options, ioc_details = self.ioc_tcp_options(tcp_pkt) if ioc_details: details.append(ioc_details) if any([offset, urg, options]): return (True, "SYNKnock indicators found: %s" % "; ".join(details)) return (False, "No indicators found: %s" % "; ".join(details)) def listener(self, output_pipe): """Sniffer looking for replies from scanned hosts/networks """ def reply_handler(pkt): """Per packet sniffer callback """ ip_src = pkt[IP].src if IP in pkt else None if TCP in pkt: tcp_pkt = pkt[TCP] is_knock, mesg = self.tcp_is_knock_reply(tcp_pkt) output_pipe.send((is_knock, mesg, ip_src)) else: output_pipe.send( (False, "Unexpected packet: %s" % pkt.summary(), ip_src) ) def packet_filter(pkt): """Screens packets for TCP SYN|ACK packets from target port """ # ideally, the bpf listen filter should do most of this, but that's # been highly unreliable through scapy if TCP not in pkt: return False tcp_pkt = pkt[TCP] return (tcp_pkt.sport == self.DEST_PORT and tcp_pkt.flags & TCPFLAG_SYN and tcp_pkt.flags & TCPFLAG_ACK) sniff(store=False, prn=reply_handler, filter=self.LISTEN_FILTER, lfilter=packet_filter, iface=self.iface, promisc=False)
五、批量执行
(一)将待检测IP入库,祭出我编写的神器pwscan大规模检测框架,设定进程数1000,启动检测框架如下:
框架启动了1000个扫描引擎。
奔跑吧小驴子,很快检测完成,如下:
六、检测结果
对全网检测5遍后结果如下 ( 不好意思IP要隐藏:) ) :
七、漏洞验证
单独进行漏洞验证如下:
(一)脚本验证
弱口令成功登陆,看到了曼迪安特说的"#"号
执行show platform查看文件被修改情况,找到曼迪安特说的RW标致