编辑代码

import pandas as pd
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from datetime import datetime, timedelta


class CountIfsApp:
    def __init__(self, root):
        self.root = root
        self.root.title("多条件统计工具")

        # 文件选择部分
        self.file_frame = ttk.LabelFrame(root, text="选择Excel文件")
        self.file_frame.pack(padx=10, pady=5, fill="x")

        self.file_path = tk.StringVar()
        ttk.Entry(self.file_frame, textvariable=self.file_path, state="readonly").pack(side="left", fill="x", expand=True, padx=5)
        ttk.Button(self.file_frame, text="浏览...", command=self.select_file).pack(side="right", padx=5)

        # 参数设置部分
        self.param_frame = ttk.LabelFrame(root, text="统计条件设置")
        self.param_frame.pack(padx=10, pady=5, fill="both", expand=True)

        # V列设置
        ttk.Label(self.param_frame, text="发热聚集性监测(多个值用逗号分隔):").grid(row=0, column=0, sticky="w", padx=5, pady=2)
        self.v_text = ttk.Entry(self.param_frame)
        self.v_text.grid(row=0, column=1, sticky="we", padx=5, pady=2)
        self.v_text.insert(0, "≥38.6")

        ttk.Label(self.param_frame, text="监测周期:").grid(row=1, column=0, sticky="w", padx=5, pady=2)
        self.v_days = ttk.Entry(self.param_frame)
        self.v_days.grid(row=1, column=1, sticky="we", padx=5, pady=2)
        self.v_days.insert(0, "30")

        # X列设置
        ttk.Label(self.param_frame, text="红肿聚集性监测(多个值用逗号分隔):").grid(row=2, column=0, sticky="w", padx=5, pady=2)
        self.x_text = ttk.Entry(self.param_frame)
        self.x_text.grid(row=2, column=1, sticky="we", padx=5, pady=2)
        self.x_text.insert(0, "2.6 - 5.0")

        ttk.Label(self.param_frame, text="监测周期:").grid(row=3, column=0, sticky="w", padx=5, pady=2)
        self.x_days = ttk.Entry(self.param_frame)
        self.x_days.grid(row=3, column=1, sticky="we", padx=5, pady=2)
        self.x_days.insert(0, "30")

        # Z列设置
        ttk.Label(self.param_frame, text="硬结聚集性监测(多个值用逗号分隔):").grid(row=4, column=0, sticky="w", padx=5, pady=2)
        self.z_text = ttk.Entry(self.param_frame)
        self.z_text.grid(row=4, column=1, sticky="we", padx=5, pady=2)
        self.z_text.insert(0, "条件1")

        ttk.Label(self.param_frame, text="监测周期:").grid(row=5, column=0, sticky="w", padx=5, pady=2)
        self.z_days = ttk.Entry(self.param_frame)
        self.z_days.grid(row=5, column=1, sticky="we", padx=5, pady=2)
        self.z_days.insert(0, "30")

        # 按钮部分
        self.button_frame = ttk.Frame(root)
        self.button_frame.pack(padx=10, pady=5, fill="x")

        ttk.Button(self.button_frame, text="开始统计", command=self.calculate).pack(side="right", padx=5)
        ttk.Button(self.button_frame, text="退出", command=root.quit).pack(side="right", padx=5)

        # 进度条
        self.progress = ttk.Progressbar(root, orient="horizontal", mode="determinate")
        self.progress.pack(padx=10, pady=5, fill="x")

        # 配置网格权重
        self.param_frame.columnconfigure(1, weight=1)

    def select_file(self):
        file_path = filedialog.askopenfilename(
            title="选择Excel文件",
            filetypes=[("Excel文件", "*.xlsx;*.xls"), ("CSV文件", "*.csv"), ("所有文件", "*.*")]
        )
        if file_path:
            self.file_path.set(file_path)

    def parse_date(self, date_str):
        """处理S列日期文本,转换为日期对象"""
        try:
            # 尝试分割日期和时间部分
            parts = str(date_str).split()
            if len(parts) > 1:
                # 包含时间的日期
                return datetime.strptime(parts[0], "%Y-%m-%d").date()
            else:
                # 只有日期的文本
                return datetime.strptime(parts[0], "%Y-%m-%d").date()
        except ValueError:
            # 如果格式不匹配,尝试其他常见格式
            try:
                return datetime.strptime(str(date_str), "%Y/%m/%d").date()
            except:
                return None

    def calculate(self):
        # 获取文件路径
        file_path = self.file_path.get()
        if not file_path:
            messagebox.showerror("错误", "请先选择Excel文件")
            return

        # 获取参数
        try:
            v_criteria = self.v_text.get().replace(" ", "").split(",")
            days_range_v = int(self.v_days.get())
            x_criteria = self.x_text.get().replace(" ", "").split(",")
            days_range_x = int(self.x_days.get())
            z_criteria = self.z_text.get().replace(" ", "").split(",")
            days_range_z = int(self.z_days.get())
        except ValueError:
            messagebox.showerror("错误", "天数必须为整数")
            return

        # 读取数据
        try:
            if file_path.endswith('.csv'):
                df = pd.read_csv(file_path)
            else:
                df = pd.read_excel(file_path)
        except Exception as e:
            messagebox.showerror("错误", f"读取文件失败: {str(e)}")
            return

        # 检查必要列是否存在
        required_columns = [18, 4, 6, 21, 23, 25]
        for col in required_columns:
            if col not in df.columns:
                messagebox.showerror("错误", f"数据文件中缺少必要的列: {col}")
                return

        # 更新进度条
        self.progress["value"] = 10
        self.root.update_idletasks()

        # 处理S列日期 - 不修改原始列,只创建用于计算的日期列
        df['计算日期'] = df[18].apply(self.parse_date)
        if df['计算日期'].isnull().any():
            messagebox.showwarning("警告", "部分日期格式无法识别,这些行将被跳过")

        # 定义一个函数来创建字典
        def create_dict(df, criteria, col_index):
            result_dict = {}
            for index, row in df.iterrows():
                calc_date = row['计算日期']
                if pd.isnull(calc_date):
                    continue

                value = row[col_index]
                match_found = any(crit in str(value) for crit in criteria)
                if match_found:
                    patient_id = row[6]
                    doctor_id = row[4]
                    key = f"{patient_id}|{doctor_id}"
                    if key not in result_dict:
                        result_dict[key] = []
                    result_dict[key].append(calc_date)
            return result_dict

        # 更新进度条
        self.progress["value"] = 30
        self.root.update_idletasks()

        # 创建 V、X、Z 列的字典
        dict_v = create_dict(df, v_criteria, 21)
        dict_x = create_dict(df, x_criteria, 23)
        dict_z = create_dict(df, z_criteria, 25)

        # 更新进度条
        self.progress["value"] = 50
        self.root.update_idletasks()

        # 初始化 BD、BE、BF 列
        df['BD列'] = ""
        df['BE列'] = ""
        df['BF列'] = ""

        # 遍历每一行进行计算
        total_rows = len(df)
        for index, row in df.iterrows():
            calc_date = row['计算日期']
            if pd.isnull(calc_date):
                continue

            # 计算 V 列的频次
            v_value = row[21]
            match_found = any(crit in str(v_value) for crit in v_criteria)
            if match_found:
                current_date = calc_date
                start_date = current_date - timedelta(days=days_range_v)
                patient_id = row[6]
                doctor_id = row[4]
                key = f"{patient_id}|{doctor_id}"
                count_v = 0
                if key in dict_v:
                    for visit_date in dict_v[key]:
                        if start_date < visit_date <= current_date:
                            count_v += 1
                if count_v > 0:
                    df.at[index, 'BD列'] = count_v

            # 计算 X 列的频次
            x_value = row[23]
            match_found = any(crit in str(x_value) for crit in x_criteria)
            if match_found:
                current_date = calc_date
                start_date = current_date - timedelta(days=days_range_x)
                patient_id = row[6]
                doctor_id = row[4]
                key = f"{patient_id}|{doctor_id}"
                count_x = 0
                if key in dict_x:
                    for visit_date in dict_x[key]:
                        if start_date < visit_date <= current_date:
                            count_x += 1
                if count_x > 0:
                    df.at[index, 'BE列'] = count_x

            # 计算 Z 列的频次
            z_value = row[25]
            match_found = any(crit in str(z_value) for crit in z_criteria)
            if match_found:
                current_date = calc_date
                start_date = current_date - timedelta(days=days_range_z)
                patient_id = row[6]
                doctor_id = row[4]
                key = f"{patient_id}|{doctor_id}"
                count_z = 0
                if key in dict_z:
                    for visit_date in dict_z[key]:
                        if start_date < visit_date <= current_date:
                            count_z += 1
                if count_z > 0:
                    df.at[index, 'BF列'] = count_z

            # 更新进度条
            self.progress["value"] = 50 + (index / total_rows) * 50
            self.root.update_idletasks()

        # 删除临时列
        df.drop('计算日期', axis=1, inplace=True)

        # 保存结果
        try:
            save_path = filedialog.asksaveasfilename(
                title="保存结果文件",
                defaultextension=".xlsx",
                filetypes=[("Excel文件", "*.xlsx"), ("CSV文件", "*.csv")]
            )
            if save_path:
                if save_path.endswith('.csv'):
                    df.to_csv(save_path, index=False)
                else:
                    df.to_excel(save_path, index=False)
                messagebox.showinfo("完成", f"统计完成,结果已保存到 {save_path}")
        except Exception as e:
            messagebox.showerror("错误", f"保存文件失败: {str(e)}")

        # 重置进度条
        self.progress["value"] = 0


if __name__ == "__main__":
    root = tk.Tk()
    app = CountIfsApp(root)
    root.mainloop()