2023新年第一天,python+scapy监听 分析封包

60次阅读
没有评论

两种方式获得数据,1.实时抓包,2.读取 pcap文件

#scapy抓包使用 sniff() 函数
def sniff(count=0, store=1, offline=None, prn=None,filter=None, L2socket=None, timeout=None, opened_socket=None, stop_filter=None, iface=None,*args,**kargs)

count:抓包的数量,0表示无限制;
store:保存抓取的数据包或者丢弃,1保存,0丢弃
offline:从 pcap 文件读取数据包,而不进行嗅探,默认为None
prn:为每一个数据包定义一个函数,如果返回了什么,则显示。例如:prn = lambda x: x.summary();    ( packct.summar()函数返回的是对包的统计性信息 )
filter:过滤规则,使用wireshark里面的过滤语法(详见Scapy Sniffer的filter语法)
L2socket:使用给定的 L2socket
timeout:在给定的时间后停止嗅探,默认为 None
opened_socket:对指定的对象使用 .recv() 进行读取;
stop_filter:定义一个函数,决定在抓到指定数据包后停止抓包,如:stop_filter = lambda x: x.haslayer(TCP);
iface:指定抓包的接口(在命令行下执行‘ipconfig /add’ 获得的网卡描述,代码如下)

import scapy
from scapy.all import *
#显示网卡信息
show_interfaces()
# !/usr/bin/env python

import os
from scapy.all import *
from scapy.utils import PcapReader
import dpkt
import socket
import numpy as np

pkts = []
count = 0
pcapnum = 0

def write_cap(x):
    global pkts
    global count
    global pcapnum
    pkts.append(x)
    count += 1
    if count == 10:
        pcapnum += 1
        pname = "pcap%d.pcap" % pcapnum
        wrpcap(pname, pkts)
        pkts = []
        count = 0

# 测试抓包解析
def test_dump_file():
    print ("Testing the dump file...")
    dump_file = "./pcap1.pcap"
    if os.path.exists(dump_file):
        print ("dump fie %s found." %dump_file)
        pkts = sniff(offline=dump_file)
        count = 0
        while (count <=2):
            print ("----Dumping pkt:%s----" %count)
            print (hexdump(pkts[count]))
            count += 1
        
    else:
        print ("dump fie %s not found." %dump_file)

if __name__ == '__main__':
#监听加显示截取的封包
#     print ("Started packet capturing and dumping... Press CTRL+C to exit")
#     sniff(filter="host 172.16.19.3 and tcp",iface="Realtek PCIe GbE Family Controller",prn=write_cap)
#     test_dump_file()
#监听加分析指定的封包
with open("pcap3.pcap", 'rb') as fr:
    pcap = dpkt.pcap.Reader(fr)
    for timestamp, buffer in pcap:
        ethernet = dpkt.ethernet.Ethernet(buffer)
        # 我们仅需要TCP的包
        if not isinstance(ethernet.data, dpkt.ip.IP):
            continue
        ip = ethernet.data
        if not isinstance(ip.data, dpkt.tcp.TCP):
            continue
        tcp = ip.data
        # 过滤掉内容为空的包
        if len(tcp.data) == 0:
            continue
        # 发送方的IP
        src = socket.inet_ntoa(ip.src)
        if src=='172.0.0.30':
            # 接收方的IP
            dst = socket.inet_ntoa(ip.dst)
            if dst=='172.0.0.3':
                print('发送方:'+src,'接收方:'+dst)
                # 报文内容(byte数组)
                byteArray = tcp.data
                print(byteArray)


实时监听解包

from scapy.all import *
from scapy.utils import PcapReader
import numpy as np
import re

def processStr(data):
    pattern = re.compile('[0-9]{9,}', re.S)#正则提取我需要的编码
    res = re.findall(pattern, str(data))
    return res


# sniff(filter="host 172.0.0.3 and tcp",iface="Realtek PCIe GbE Family Controller",count=2,prn=lambda x:x[IP].src+'---->'+x[IP].dst)
pcap = sniff(filter="dst host 172.0.0.3 and tcp",count=10)
#dst host 172.0.0.3 指定

for packet in pcap:
    if packet['IP'].payload:
        src=packet['IP'].src
        dst=packet['IP'].dst
        dtlen=packet['IP'].len
        if dst=='172.0.0.3':
            if dtlen>1000:
                if 'Raw' in packet:
                    load = packet['Raw'].load
                items = processStr(load)
                for i in items:
                    print(i)
正文完