编辑代码

from scapy.all import *
from scapy.contrib.modbus import ModbusADURequest, ModbusADUResponse
from scapy.layers.inet import TCP
from collections import defaultdict

# 状态定义
class State:
    IDLE = "Idle"    # 空闲状态(无活跃事务)
    AWAITING_RESPONSE = "Awaiting_Response"  # 已发送请求,等待响应
    COMPLETED = "Completed"    # 事务正常完成
    ERROR = "Error"            # 事务异常终止

class ModbusSession:
    def __init__(self, trans_id, start_time):
        self.trans_id = trans_id     # 事务ID(Modbus核心标识)
        self.start_time = start_time   # 时间戳(浮点数,单位秒)
        self.state = State.AWAITING_RESPONSE    # 初始状态 已发送请求,等待响应
        self.timeout = 2.0         # 超时阈值(工业网络典型值)

class ModbusFSM:
    def __init__(self):
        self.sessions = defaultdict(lambda: None) #字典结构,键为事务ID,值为会话对象
        self.current_state = State.IDLE #监控协议栈整体运行状态

    def process_packet(self, pkt):  #数据包处理
        # 步骤1:过滤Modbus/TCP流量(源或目标端口为502)
        if TCP in pkt and (pkt[TCP].dport == 502 or pkt[TCP].sport == 502):
            # 步骤2:判断数据包类型
            if ModbusADURequest in pkt:   # 请求包处理
                self._handle_request(pkt)
            elif ModbusADUResponse in pkt:  # 响应包处理
                self._handle_response(pkt)
                # 步骤3:每次处理完数据包后清理超时会话
            self._cleanup_timeout_sessions(pkt.time)

    def _handle_request(self, pkt):  #处理请求
        trans_id = pkt.transId # 提取事务ID(Scapy自动解析)
        print(f"[Request] 时间: {pkt.time:.6f}s | TransID: {trans_id} | 功能码: 0x{pkt.funcCode:02x}")#.6f 表示将浮点数格式化为6位小数精度。  02x 表示将整数格式化为2位十六进制数,不足两位时左侧补零。
        self.sessions[trans_id] = ModbusSession(trans_id, pkt.time) #sessions 是一个键值对容器,键(key)是[trans_id],中括号是键,值(value)是ModbusSession对象
        self.current_state = State.AWAITING_RESPONSE   # 已发送请求,等待响应

    def _handle_response(self, pkt): #处理响应
        trans_id = pkt.transId
        session = self.sessions.get(trans_id) #get是取字典中键[trans_id]对应的值
        #value1 = sessions[100]  自动添加键 100,值为 None,value = sessions.get(100) 获取键[100]的值
        if not session:
            print(f"[Warning] 未知事务ID: {trans_id}")
            return
        if session.state == State.AWAITING_RESPONSE:
            if pkt.funcCode & 0x80:
                print(f"[Error] 事务ID: {trans_id} | 异常功能码: 0x{pkt.funcCode:02x}")
                session.state = State.ERROR
            else:
                print(f"[Response] 事务ID: {trans_id} | 正常响应")
                session.state = State.COMPLETED
            del self.sessions[trans_id]

    def _cleanup_timeout_sessions(self, current_time):
        timeout_trans = []
        for trans_id, session in self.sessions.items():#sessions.items() 是字典(dict)对象的键值对遍历方法,用于获取字典中所有键值对的动态视图。
            if session and (current_time - session.start_time > session.timeout): #如果session存在并且xxx
                print(f"[Timeout] 事务ID: {trans_id} | 等待超时")
                timeout_trans.append(trans_id) #将trans_id添加到timeout_trans列表中
        for trans_id in timeout_trans:
            del self.sessions[trans_id] #删除字典中的某个键值对

if __name__ == "__main__":
   # packets = rdpcap("F:/研究生/数据集/Modbus_dataset-master/Modbus_dataset-master/characterization_modbus_6RTU_with_operate.pcap")
   packets = rdpcap("F:/研究生/数据集/Modbus_dataset-master/Modbus_dataset-master/CnC_uploading_exe_modbus_6RTU_with_operate.pcap")
   #函数返回一个包含所有数据包的列表,每个数据包都是一个Scapy对象。类型:PacketList(Scapy 自定义类,继承自 Python 的 list)。
   fsm = ModbusFSM()
   sorted_packets = sorted(packets, key=lambda x: x.time)
   for pkt in sorted_packets:
       fsm.process_packet(pkt)