from scapy.all import *
from scapy.contrib.modbus import ModbusADURequest, ModbusADUResponse
from scapy.layers.inet import TCP
from collections import defaultdict
class State:
IDLE = "Idle"
AWAITING_READ = "Awaiting_Read_Response"
AWAITING_WRITE = "Awaiting_Write_Response"
COMPLETED = "Completed"
ERROR = "Error"
READ_FUNCS = {0x01, 0x02, 0x03, 0x04}
WRITE_FUNCS = {0x05, 0x06, 0x10}
class ModbusSession:
def __init__(self, trans_id, start_time, func_code):
self.trans_id = trans_id
self.start_time = start_time
self.func_code = func_code
if func_code in READ_FUNCS:
self.state = State.AWAITING_READ
self.timeout = 2.0
elif func_code in WRITE_FUNCS:
self.state = State.AWAITING_WRITE
self.timeout = 3.0
else:
self.state = State.AWAITING_READ
self.timeout = 2.0
class ModbusFSM:
def __init__(self):
self.sessions = defaultdict(lambda: None)
self.current_state = State.IDLE
def process_packet(self, pkt):
if TCP in pkt and (pkt[TCP].dport == 502 or pkt[TCP].sport == 502):
if ModbusADURequest in pkt:
self._handle_request(pkt)
elif ModbusADUResponse in pkt:
self._handle_response(pkt)
self._cleanup_timeout_sessions(pkt.time)
def _handle_request(self, pkt):
trans_id = pkt.transId
func_code = pkt.funcCode
print(f"[Request] 时间: {pkt.time:.6f}s | TransID: {trans_id} | 功能码: 0x{func_code:02x}")
self.sessions[trans_id] = ModbusSession(trans_id, pkt.time, func_code)
self.current_state = State.AWAITING_READ if func_code in READ_FUNCS else State.AWAITING_WRITE
def _handle_response(self, pkt):
trans_id = pkt.transId
session = self.sessions.get(trans_id)
if not session:
print(f"[Warning] 未知事务ID: {trans_id}")
return
original_func = session.func_code
if session.state in [State.AWAITING_READ, State.AWAITING_WRITE]:
if pkt.funcCode & 0x80:
if (pkt.funcCode & 0x7F) != original_func:
print(f"[Error] 事务ID: {trans_id} | 异常功能码不匹配")
else:
print(f"[Error] 事务ID: {trans_id} | 功能码异常: 0x{pkt.funcCode:02x}")
session.state = State.ERROR
else:
if pkt.funcCode != original_func:
print(f"[Error] 事务ID: {trans_id} | 功能码不匹配")
session.state = State.ERROR
else:
if session.state == State.AWAITING_READ:
print(f"[Response] 事务ID: {trans_id} | 读操作完成")
else:
print(f"[Response] 事务ID: {trans_id} | 写操作完成")
session.state = State.COMPLETED
del self.sessions[trans_id]
def _cleanup_timeout_sessions(self, current_time):
timeout_trans = []
for trans_id, session in self.sessions.items():
if session and (current_time - session.start_time > session.timeout):
state_type = "读" if session.func_code in READ_FUNCS else "写"
print(f"[Timeout] 事务ID: {trans_id} | {state_type}操作超时")
timeout_trans.append(trans_id)
for trans_id in timeout_trans:
del self.sessions[trans_id]
if __name__ == "__main__":
packets = rdpcap("F:/研究生/数据集/Modbus_dataset-master/Modbus_dataset-master/CnC_uploading_exe_modbus_6RTU_with_operate.pcap")
fsm = ModbusFSM()
sorted_packets = sorted(packets, key=lambda x: x.time)
for pkt in sorted_packets:
fsm.process_packet(pkt)