import pandas as pd
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from datetime import datetime, timedelta
class CountIfsApp:
def __init__(self, root):
self.root = root
self.root.title("多条件统计工具")
self.file_frame = ttk.LabelFrame(root, text="选择Excel文件")
self.file_frame.pack(padx=10, pady=5, fill="x")
self.file_path = tk.StringVar()
ttk.Entry(self.file_frame, textvariable=self.file_path, state="readonly").pack(side="left", fill="x", expand=True, padx=5)
ttk.Button(self.file_frame, text="浏览...", command=self.select_file).pack(side="right", padx=5)
self.param_frame = ttk.LabelFrame(root, text="统计条件设置")
self.param_frame.pack(padx=10, pady=5, fill="both", expand=True)
ttk.Label(self.param_frame, text="发热聚集性监测(多个值用逗号分隔):").grid(row=0, column=0, sticky="w", padx=5, pady=2)
self.v_text = ttk.Entry(self.param_frame)
self.v_text.grid(row=0, column=1, sticky="we", padx=5, pady=2)
self.v_text.insert(0, "≥38.6")
ttk.Label(self.param_frame, text="监测周期:").grid(row=1, column=0, sticky="w", padx=5, pady=2)
self.v_days = ttk.Entry(self.param_frame)
self.v_days.grid(row=1, column=1, sticky="we", padx=5, pady=2)
self.v_days.insert(0, "30")
ttk.Label(self.param_frame, text="红肿聚集性监测(多个值用逗号分隔):").grid(row=2, column=0, sticky="w", padx=5, pady=2)
self.x_text = ttk.Entry(self.param_frame)
self.x_text.grid(row=2, column=1, sticky="we", padx=5, pady=2)
self.x_text.insert(0, "2.6 - 5.0")
ttk.Label(self.param_frame, text="监测周期:").grid(row=3, column=0, sticky="w", padx=5, pady=2)
self.x_days = ttk.Entry(self.param_frame)
self.x_days.grid(row=3, column=1, sticky="we", padx=5, pady=2)
self.x_days.insert(0, "30")
ttk.Label(self.param_frame, text="硬结聚集性监测(多个值用逗号分隔):").grid(row=4, column=0, sticky="w", padx=5, pady=2)
self.z_text = ttk.Entry(self.param_frame)
self.z_text.grid(row=4, column=1, sticky="we", padx=5, pady=2)
self.z_text.insert(0, "条件1")
ttk.Label(self.param_frame, text="监测周期:").grid(row=5, column=0, sticky="w", padx=5, pady=2)
self.z_days = ttk.Entry(self.param_frame)
self.z_days.grid(row=5, column=1, sticky="we", padx=5, pady=2)
self.z_days.insert(0, "30")
self.button_frame = ttk.Frame(root)
self.button_frame.pack(padx=10, pady=5, fill="x")
ttk.Button(self.button_frame, text="开始统计", command=self.calculate).pack(side="right", padx=5)
ttk.Button(self.button_frame, text="退出", command=root.quit).pack(side="right", padx=5)
self.progress = ttk.Progressbar(root, orient="horizontal", mode="determinate")
self.progress.pack(padx=10, pady=5, fill="x")
self.param_frame.columnconfigure(1, weight=1)
def select_file(self):
file_path = filedialog.askopenfilename(
title="选择Excel文件",
filetypes=[("Excel文件", "*.xlsx;*.xls"), ("CSV文件", "*.csv"), ("所有文件", "*.*")]
)
if file_path:
self.file_path.set(file_path)
def parse_date(self, date_str):
"""处理S列日期文本,转换为日期对象"""
try:
parts = str(date_str).split()
if len(parts) > 1:
return datetime.strptime(parts[0], "%Y-%m-%d").date()
else:
return datetime.strptime(parts[0], "%Y-%m-%d").date()
except ValueError:
try:
return datetime.strptime(str(date_str), "%Y/%m/%d").date()
except:
return None
def calculate(self):
file_path = self.file_path.get()
if not file_path:
messagebox.showerror("错误", "请先选择Excel文件")
return
try:
v_criteria = self.v_text.get().replace(" ", "").split(",")
days_range_v = int(self.v_days.get())
x_criteria = self.x_text.get().replace(" ", "").split(",")
days_range_x = int(self.x_days.get())
z_criteria = self.z_text.get().replace(" ", "").split(",")
days_range_z = int(self.z_days.get())
except ValueError:
messagebox.showerror("错误", "天数必须为整数")
return
try:
if file_path.endswith('.csv'):
df = pd.read_csv(file_path)
else:
df = pd.read_excel(file_path)
except Exception as e:
messagebox.showerror("错误", f"读取文件失败: {str(e)}")
return
required_columns = [18, 4, 6, 21, 23, 25]
for col in required_columns:
if col not in df.columns:
messagebox.showerror("错误", f"数据文件中缺少必要的列: {col}")
return
self.progress["value"] = 10
self.root.update_idletasks()
df['计算日期'] = df[18].apply(self.parse_date)
if df['计算日期'].isnull().any():
messagebox.showwarning("警告", "部分日期格式无法识别,这些行将被跳过")
def create_dict(df, criteria, col_index):
result_dict = {}
for index, row in df.iterrows():
calc_date = row['计算日期']
if pd.isnull(calc_date):
continue
value = row[col_index]
match_found = any(crit in str(value) for crit in criteria)
if match_found:
patient_id = row[6]
doctor_id = row[4]
key = f"{patient_id}|{doctor_id}"
if key not in result_dict:
result_dict[key] = []
result_dict[key].append(calc_date)
return result_dict
self.progress["value"] = 30
self.root.update_idletasks()
dict_v = create_dict(df, v_criteria, 21)
dict_x = create_dict(df, x_criteria, 23)
dict_z = create_dict(df, z_criteria, 25)
self.progress["value"] = 50
self.root.update_idletasks()
df['BD列'] = ""
df['BE列'] = ""
df['BF列'] = ""
total_rows = len(df)
for index, row in df.iterrows():
calc_date = row['计算日期']
if pd.isnull(calc_date):
continue
v_value = row[21]
match_found = any(crit in str(v_value) for crit in v_criteria)
if match_found:
current_date = calc_date
start_date = current_date - timedelta(days=days_range_v)
patient_id = row[6]
doctor_id = row[4]
key = f"{patient_id}|{doctor_id}"
count_v = 0
if key in dict_v:
for visit_date in dict_v[key]:
if start_date < visit_date <= current_date:
count_v += 1
if count_v > 0:
df.at[index, 'BD列'] = count_v
x_value = row[23]
match_found = any(crit in str(x_value) for crit in x_criteria)
if match_found:
current_date = calc_date
start_date = current_date - timedelta(days=days_range_x)
patient_id = row[6]
doctor_id = row[4]
key = f"{patient_id}|{doctor_id}"
count_x = 0
if key in dict_x:
for visit_date in dict_x[key]:
if start_date < visit_date <= current_date:
count_x += 1
if count_x > 0:
df.at[index, 'BE列'] = count_x
z_value = row[25]
match_found = any(crit in str(z_value) for crit in z_criteria)
if match_found:
current_date = calc_date
start_date = current_date - timedelta(days=days_range_z)
patient_id = row[6]
doctor_id = row[4]
key = f"{patient_id}|{doctor_id}"
count_z = 0
if key in dict_z:
for visit_date in dict_z[key]:
if start_date < visit_date <= current_date:
count_z += 1
if count_z > 0:
df.at[index, 'BF列'] = count_z
self.progress["value"] = 50 + (index / total_rows) * 50
self.root.update_idletasks()
df.drop('计算日期', axis=1, inplace=True)
try:
save_path = filedialog.asksaveasfilename(
title="保存结果文件",
defaultextension=".xlsx",
filetypes=[("Excel文件", "*.xlsx"), ("CSV文件", "*.csv")]
)
if save_path:
if save_path.endswith('.csv'):
df.to_csv(save_path, index=False)
else:
df.to_excel(save_path, index=False)
messagebox.showinfo("完成", f"统计完成,结果已保存到 {save_path}")
except Exception as e:
messagebox.showerror("错误", f"保存文件失败: {str(e)}")
self.progress["value"] = 0
if __name__ == "__main__":
root = tk.Tk()
app = CountIfsApp(root)
root.mainloop()