编辑代码

from scapy.all import *
from scapy.contrib.modbus import ModbusADURequest, ModbusADUResponse
from scapy.layers.inet import TCP
from collections import defaultdict

# 状态定义(细化到功能码类别)
class State:
    IDLE = "Idle"
    AWAITING_READ = "Awaiting_Read_Response"    # 等待读操作响应
    AWAITING_WRITE = "Awaiting_Write_Response"  # 等待写操作响应
    COMPLETED = "Completed"
    ERROR = "Error"

# 功能码分类
READ_FUNCS = {0x01, 0x02, 0x03, 0x04}    # 读操作功能码
WRITE_FUNCS = {0x05, 0x06, 0x10}         # 写操作功能码

class ModbusSession:
    def __init__(self, trans_id, start_time, func_code):
        self.trans_id = trans_id
        self.start_time = start_time
        self.func_code = func_code
        # 根据功能码类型设置初始状态和超时
        if func_code in READ_FUNCS:
            self.state = State.AWAITING_READ
            self.timeout = 2.0  # 读操作超时
        elif func_code in WRITE_FUNCS:
            self.state = State.AWAITING_WRITE
            self.timeout = 3.0  # 写操作超时
        else:
            self.state = State.AWAITING_READ  # 默认处理
            self.timeout = 2.0

class ModbusFSM:
    def __init__(self):
        self.sessions = defaultdict(lambda: None)
        self.current_state = State.IDLE

    def process_packet(self, pkt):
        if TCP in pkt and (pkt[TCP].dport == 502 or pkt[TCP].sport == 502):
            if ModbusADURequest in pkt:
                self._handle_request(pkt)
            elif ModbusADUResponse in pkt:
                self._handle_response(pkt)
            self._cleanup_timeout_sessions(pkt.time)

    def _handle_request(self, pkt):
        trans_id = pkt.transId
        func_code = pkt.funcCode
        print(f"[Request] 时间: {pkt.time:.6f}s | TransID: {trans_id} | 功能码: 0x{func_code:02x}")
        self.sessions[trans_id] = ModbusSession(trans_id, pkt.time, func_code)
        # 更新全局状态(示例简化处理,实际应根据业务需求调整)
        self.current_state = State.AWAITING_READ if func_code in READ_FUNCS else State.AWAITING_WRITE

    def _handle_response(self, pkt):
        trans_id = pkt.transId
        session = self.sessions.get(trans_id)
        if not session:
            print(f"[Warning] 未知事务ID: {trans_id}")
            return
        # 获取请求时的功能码
        original_func = session.func_code
        if session.state in [State.AWAITING_READ, State.AWAITING_WRITE]:
            if pkt.funcCode & 0x80:
                # 异常响应需匹配原功能码
                if (pkt.funcCode & 0x7F) != original_func:
                    print(f"[Error] 事务ID: {trans_id} | 异常功能码不匹配")
                else:
                    print(f"[Error] 事务ID: {trans_id} | 功能码异常: 0x{pkt.funcCode:02x}")
                session.state = State.ERROR
            else:
                if pkt.funcCode != original_func:
                    print(f"[Error] 事务ID: {trans_id} | 功能码不匹配")
                    session.state = State.ERROR
                else:
                    # 根据操作类型打印状态转移
                    if session.state == State.AWAITING_READ:
                        print(f"[Response] 事务ID: {trans_id} | 读操作完成")
                    else:
                        print(f"[Response] 事务ID: {trans_id} | 写操作完成")
                    session.state = State.COMPLETED
            del self.sessions[trans_id]

    def _cleanup_timeout_sessions(self, current_time):
        timeout_trans = []
        for trans_id, session in self.sessions.items():
            if session and (current_time - session.start_time > session.timeout):
                state_type = "读" if session.func_code in READ_FUNCS else "写"
                print(f"[Timeout] 事务ID: {trans_id} | {state_type}操作超时")
                timeout_trans.append(trans_id)
        for trans_id in timeout_trans:
            del self.sessions[trans_id]

if __name__ == "__main__":
    #packets = rdpcap("F:/研究生/数据集/Modbus_dataset-master/Modbus_dataset-master/characterization_modbus_6RTU_with_operate.pcap")
    packets = rdpcap("F:/研究生/数据集/Modbus_dataset-master/Modbus_dataset-master/CnC_uploading_exe_modbus_6RTU_with_operate.pcap")
    fsm = ModbusFSM()
    sorted_packets = sorted(packets, key=lambda x: x.time)
    for pkt in sorted_packets:
        fsm.process_packet(pkt)