from scapy.all import *
from scapy.contrib.modbus import ModbusADUResponse
from scapy.layers.inet import TCP, IP
from collections import defaultdict
class ModbusFSM:
def __init__(self):
self.current_state = None
self.cycle_count = 0
self.state_start = 0.0
self.state_end = 0.0
self.state_stats = defaultdict(lambda: {'entries': 0, 'total_duration': 0.0})
def process_packet(self, pkt):
"""处理所有Modbus响应(包含异常)"""
if IP in pkt and TCP in pkt and pkt[TCP].sport == 502 and ModbusADUResponse in pkt:
self._process_modbus_response(pkt)
def _process_modbus_response(self, pkt):
src_ip = pkt[IP].src
unit_id = pkt.unitId
raw_func = pkt.funcCode
new_state = (src_ip, unit_id, raw_func)
if new_state != self.current_state:
self._record_previous_state()
self._init_new_state(new_state, pkt.time)
else:
self._update_current_state(pkt.time)
def _record_previous_state(self):
if self.current_state:
duration = self.state_end - self.state_start
self.state_stats[self.current_state]['entries'] += 1
self.state_stats[self.current_state]['total_duration'] += duration
src_ip, unit_id, func = self.current_state
status = "异常" if func & 0x80 else "正常"
print(
f"[状态结束] 从站 {src_ip}:{unit_id} | 功能码 0x{func:02x}({status}) | "
f"循环 {self.cycle_count} 次 | 持续时间 {duration:.3f}s"
)
def _init_new_state(self, new_state, timestamp):
self.current_state = new_state
self.cycle_count = 1
self.state_start = timestamp
self.state_end = timestamp
src_ip, unit_id, func = new_state
status = "异常" if func & 0x80 else "正常"
print(
f"[新状态开始] 从站 {src_ip}:{unit_id} | 功能码 0x{func:02x}({status}) | "
f"时间 {timestamp:.3f}s"
)
def _update_current_state(self, timestamp):
self.cycle_count += 1
self.state_end = timestamp
def finalize(self):
self._record_previous_state()
def print_statistics(self):
print("\n=== 状态统计汇总 ===")
print("IP地址 | Unit ID | 功能码 | 状态 | 进入次数 | 总持续时间(s)")
print("-" * 70)
for (src_ip, unit_id, func), stats in sorted(self.state_stats.items()):
status = "异常" if func & 0x80 else "正常"
base_func = func & 0x7F
print(
f"{src_ip:15} | {unit_id:6} | 0x{base_func:02x}/0x{func:02x} | {status:5} | "
f"{stats['entries']:8} | {stats['total_duration']:15.3f}"
)
if __name__ == "__main__":
packets = rdpcap(
"F:/研究生/数据集/Modbus_dataset-master/Modbus_dataset-master/characterization_modbus_6RTU_with_operate.pcap")
fsm = ModbusFSM()
sorted_pkts = sorted(packets, key=lambda x: x.time)
for pkt in sorted_pkts:
fsm.process_packet(pkt)
fsm.finalize()
fsm.print_statistics()