import numpy as np
from scipy.signal import medfilt, butter, filtfilt, savgol_filter
import matplotlib.pyplot as plt
def validate_input(time, flow):
"""验证输入数据的有效性"""
if len(time) != len(flow):
raise ValueError("时间序列和流量序列长度必须一致")
if len(time) < 5:
raise ValueError("数据量过少,至少需要5个数据点")
time_diff = np.diff(time)
if not np.allclose(time_diff, time_diff[0], rtol=1e-3):
raise ValueError("时间序列必须是等间隔采样")
try:
time_array = np.array(time, dtype=float)
flow_array = np.array(flow, dtype=float)
except ValueError:
raise ValueError("输入包含非数值类型")
def main():
try:
time = [
0.125, 0.25, 0.375, 0.5, 0.625, 0.75, 0.875, 1.0,
1.125, 1.25, 1.375, 1.5, 1.625, 1.75, 1.875, 2.0,
2.125, 2.25, 2.375, 2.5, 2.625, 2.75, 2.875, 3.0
]
flow = [
11.313, 11.484, 11.477, 11.47, 11.464, 11.458, 11.454, 11.45,
11.447, 11.445, 11.444, 11.445, 11.446, 11.448, 11.449, 11.45,
11.452, 11.453, 11.455, 11.456, 11.457, 11.456, 11.455, 11.455
]
validate_input(time, flow)
time_array = np.array(time, dtype=float)
flow_array = np.array(flow, dtype=float)
sampling_interval = time[1] - time[0]
sampling_freq = 1 / sampling_interval
flow_clean = medfilt(flow_array, kernel_size=5)
def butter_lowpass(data, cutoff, fs, order=4):
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype='low')
return filtfilt(b, a, data)
filtered_lowpass = butter_lowpass(flow_clean, cutoff=0.5, fs=sampling_freq)
window_length = min(25, len(flow_array)//2*2-1)
filtered_final = savgol_filter(filtered_lowpass,
window_length=window_length,
polyorder=3)
plt.figure(figsize=(12, 6))
plt.plot(time_array, flow_array,
label='原始数据',
alpha=0.4,
linewidth=1)
plt.plot(time_array, filtered_final,
label=f'滤波后 (截止频率0.5Hz + SG{window_length}窗口)',
linewidth=2,
color='red')
plt.xlabel('时间 (秒)', fontsize=12)
plt.ylabel('流量 (L/min)', fontsize=12)
plt.title('GP22流量数据滤波效果', fontsize=14)
plt.grid(True, linestyle='--', alpha=0.6)
plt.legend()
plt.tight_layout()
plt.savefig('滤波结果.png', dpi=300)
plt.show()
except Exception as e:
print("\n错误详情:")
print(f"❌ {str(e)}")
print("\n故障排除指南:")
print("1. 检查时间/流量数据是否一一对应")
print("2. 确认所有数据都是数字(无中文符号)")
print("3. 时间序列必须等间隔且升序排列")
print("4. 数据总量应至少是窗口长度(默认25)的两倍")
print("5. 推荐先使用示例数据测试,确认无误后再替换真实数据")
if __name__ == "__main__":
main()