编辑代码

from scapy.all import *
from scapy.contrib.modbus import ModbusADUResponse
from scapy.layers.inet import TCP, IP
from collections import defaultdict


class ModbusFSM:
    def __init__(self):
        self.current_state = None  # (src_ip, unit_id, raw_func_code)# 三元组:(源IP, Unit ID, 原始功能码)
        self.cycle_count = 0       # 当前状态循环次数
        self.state_start = 0.0     # 状态开始时间戳
        self.state_end = 0.0       # 状态结束时间戳
        self.state_stats = defaultdict(lambda: {'entries': 0, 'total_duration': 0.0})# 统计存储
#键 (Key):三元组 (src_ip, unit_id, func),值 (Value):统计字典:entries:该状态被进入的次数;total_duration:该状态的总持续时间(秒)
    def process_packet(self, pkt):
        """处理所有Modbus响应(包含异常)"""
        if IP in pkt and TCP in pkt and pkt[TCP].sport == 502 and ModbusADUResponse in pkt:
            self._process_modbus_response(pkt)

    def _process_modbus_response(self, pkt):
        src_ip = pkt[IP].src # 提取源IP
        unit_id = pkt.unitId # 提取Unit ID
        raw_func = pkt.funcCode  # 保留原始功能码(含异常标志)

        new_state = (src_ip, unit_id, raw_func)

        # 状态变更处理
        if new_state != self.current_state:
            self._record_previous_state()
            self._init_new_state(new_state, pkt.time)
        else:
            self._update_current_state(pkt.time)

    def _record_previous_state(self):
        if self.current_state:
            duration = self.state_end - self.state_start
            self.state_stats[self.current_state]['entries'] += 1
            self.state_stats[self.current_state]['total_duration'] += duration

            src_ip, unit_id, func = self.current_state
            status = "异常" if func & 0x80 else "正常"
            print(
                f"[状态结束] 从站 {src_ip}:{unit_id} | 功能码 0x{func:02x}({status}) | "
                f"循环 {self.cycle_count} 次 | 持续时间 {duration:.3f}s"
            )

    def _init_new_state(self, new_state, timestamp):
        self.current_state = new_state
        self.cycle_count = 1
        self.state_start = timestamp
        self.state_end = timestamp
        src_ip, unit_id, func = new_state
        status = "异常" if func & 0x80 else "正常"
        print(
            f"[新状态开始] 从站 {src_ip}:{unit_id} | 功能码 0x{func:02x}({status}) | "
            f"时间 {timestamp:.3f}s"
        )

    def _update_current_state(self, timestamp):
        self.cycle_count += 1
        self.state_end = timestamp

    def finalize(self):
        self._record_previous_state()

    def print_statistics(self):
        print("\n=== 状态统计汇总 ===")
        print("IP地址        | Unit ID | 功能码    | 状态   | 进入次数 | 总持续时间(s)")
        print("-" * 70)
        for (src_ip, unit_id, func), stats in sorted(self.state_stats.items()):
            status = "异常" if func & 0x80 else "正常"
            base_func = func & 0x7F  # 显示基础功能码
            print(
                f"{src_ip:15} | {unit_id:6} | 0x{base_func:02x}/0x{func:02x} | {status:5} | "
                f"{stats['entries']:8} | {stats['total_duration']:15.3f}"
            )


if __name__ == "__main__":
    # 示例数据路径需替换为实际路径
    packets = rdpcap(
        "F:/研究生/数据集/Modbus_dataset-master/Modbus_dataset-master/characterization_modbus_6RTU_with_operate.pcap")

    fsm = ModbusFSM()
    sorted_pkts = sorted(packets, key=lambda x: x.time)

    for pkt in sorted_pkts:
        fsm.process_packet(pkt)

    fsm.finalize()
    fsm.print_statistics()