import time
import os
import logging
import traceback
import configparser
from threading import Thread, Event
import pymysql
CONFIG_FILE = "db_monitor.ini"
DB_HOST = "101.227.76.153"
DB_PORT = 33060
DB_USER = "root"
DB_PASSWORD = "eruo0303"
DB_NAME = "mysql"
SEARCH_DIR = r"D:\StarEpochs"
BACKUP_DIR = r"D:\StarEpochs\backup"
CHECK_INTERVAL = 60
LOG_FILE = "db_monitor.log"
logging.basicConfig(
filename=LOG_FILE,
level=logging.DEBUG,
format="%(asctime)s [%(levelname)-8s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s [%(levelname)-8s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
console.setFormatter(formatter)
logging.getLogger().addHandler(console)
if not os.path.exists(BACKUP_DIR):
os.makedirs(BACKUP_DIR)
def log_step(step, status, details=""):
"""记录详细步骤日志"""
status_text = "SUCCESS" if status else "FAILED"
message = f"STEP: {step.ljust(40)} [{status_text}]"
if details:
message += f" | {details}"
logging.info(message)
def read_config():
"""读取配置文件"""
config = configparser.ConfigParser()
if os.path.exists(CONFIG_FILE):
config.read(CONFIG_FILE)
else:
config['STATUS'] = {
'error_replaced': 'no',
'last_backup': ''
}
with open(CONFIG_FILE, 'w') as configfile:
config.write(configfile)
return config
def update_config(setting, value):
"""更新配置文件"""
config = read_config()
config.set('STATUS', setting, value)
with open(CONFIG_FILE, 'w') as configfile:
config.write(configfile)
log_step(f"更新配置文件 {setting}", True, f"新值: {value}")
def check_db_connection():
"""使用真实MySQL凭据验证数据库连接"""
start_time = time.time()
conn = None
try:
log_step("尝试连接MySQL数据库", True,
f"host={DB_HOST}, port={DB_PORT}, user={DB_USER}, database={DB_NAME}")
conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASSWORD,
database=DB_NAME,
connect_timeout=5
)
with conn.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
if result and result[0] == 1:
log_step("数据库连接验证成功", True,
f"耗时: {(time.time()-start_time)*1000:.2f}ms")
return True
else:
log_step("数据库连接验证失败", False, "查询未返回预期结果")
return False
except pymysql.MySQLError as e:
error_code, error_msg = e.args
log_step("数据库连接失败", False, f"MySQL错误 {error_code}: {error_msg}")
logging.debug(f"错误详情:\n{traceback.format_exc()}")
return False
except Exception as e:
log_step("数据库连接失败", False, f"{type(e).__name__}: {str(e)}")
logging.debug(f"错误详情:\n{traceback.format_exc()}")
return False
finally:
if conn:
try:
conn.close()
log_step("关闭数据库连接", True)
except:
log_step("关闭数据库连接", False, "关闭连接时出错")
def replace_in_files():
"""在错误状态下替换文件内容"""
start_time = time.time()
log_step("开始文件替换操作", True, f"目录: {SEARCH_DIR}")
backup_timestamp = time.strftime("%Y%m%d_%H%M%S")
current_backup = os.path.join(BACKUP_DIR, backup_timestamp)
os.makedirs(current_backup, exist_ok=True)
total_files = 0
processed_files = 0
modified_files = 0
error_files = 0
try:
log_step("扫描目录结构", True)
all_files = []
for root, _, files in os.walk(SEARCH_DIR):
if BACKUP_DIR in root:
continue
for filename in files:
filepath = os.path.join(root, filename)
all_files.append(filepath)
total_files = len(all_files)
log_step(f"找到 {total_files} 个文件", True)
for filepath in all_files:
processed_files += 1
file_start = time.time()
try:
if not os.path.isfile(filepath):
log_step(f"跳过非文件: {filepath}", True)
continue
_, ext = os.path.splitext(filepath)
if ext.lower() in ['.exe', '.dll', '.png', '.jpg', '.jpeg', '.gif', '.zip', '.rar', '.7z']:
log_step(f"跳过二进制文件: {filepath}", True)
continue
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
content = f.read()
original_content = content
file_size = len(content)
log_step(f"读取文件: {filepath}", True,
f"大小: {file_size//1024 if file_size > 1024 else file_size}KB, 耗时: {(time.time()-file_start)*1000:.2f}ms")
replacements = [
("127.0.0.1:3306", "192.168.31.11:3306"),
("localhost:3306", "192.168.31.11:3306"),
("localhost", "192.168.31.11"),
("127.0.0.1", "192.168.31.11"),
("eruo0303", "airio0303")
]
modified = False
replace_counts = {}
for old_str, new_str in replacements:
if old_str in content:
count = content.count(old_str)
content = content.replace(old_str, new_str)
replace_counts[old_str] = count
modified = True
if modified:
relative_path = os.path.relpath(filepath, SEARCH_DIR)
backup_path = os.path.join(current_backup, relative_path)
os.makedirs(os.path.dirname(backup_path), exist_ok=True)
with open(backup_path, "w", encoding="utf-8") as f:
f.write(original_content)
with open(filepath, "w", encoding="utf-8") as f:
f.write(content)
modified_files += 1
replace_details = ", ".join([f"{k}:{v}" for k, v in replace_counts.items()])
log_step(f"修改文件: {filepath}", True,
f"替换: {replace_details}, 备份: {backup_path}, 耗时: {(time.time()-file_start)*1000:.2f}ms")
else:
log_step(f"无需修改: {filepath}", True,
f"耗时: {(time.time()-file_start)*1000:.2f}ms")
except Exception as e:
error_files += 1
error_details = f"{type(e).__name__}: {str(e)}"
log_step(f"处理文件出错: {filepath}", False, error_details)
logging.debug(f"文件错误详情:\n{traceback.format_exc()}")
except Exception as e:
log_step("目录扫描失败", False, f"{type(e).__name__}: {str(e)}")
logging.debug(f"目录扫描错误详情:\n{traceback.format_exc()}")
elapsed = time.time() - start_time
summary = (f"处理完成! 文件总数: {total_files}, 已处理: {processed_files}, "
f"已修改: {modified_files}, 错误: {error_files}, 总耗时: {elapsed:.2f}秒")
log_step("文件替换操作完成", True, summary)
update_config('error_replaced', 'yes')
update_config('last_backup', current_backup)
return modified_files
def restore_files():
"""在数据库恢复时还原文件内容"""
config = read_config()
backup_dir = config.get('STATUS', 'last_backup', fallback='')
if not backup_dir or not os.path.exists(backup_dir):
log_step("恢复文件失败", False, "未找到有效的备份目录")
return 0
start_time = time.time()
log_step("开始文件恢复操作", True, f"备份目录: {backup_dir}")
restored_files = 0
error_files = 0
try:
for root, _, files in os.walk(backup_dir):
for filename in files:
backup_path = os.path.join(root, filename)
file_start = time.time()
try:
relative_path = os.path.relpath(backup_path, backup_dir)
original_path = os.path.join(SEARCH_DIR, relative_path)
if not os.path.exists(original_path):
log_step(f"跳过恢复: {original_path}", True, "原始文件不存在")
continue
with open(backup_path, "r", encoding="utf-8", errors="ignore") as f:
backup_content = f.read()
with open(original_path, "w", encoding="utf-8") as f:
f.write(backup_content)
restored_files += 1
log_step(f"恢复文件: {original_path}", True,
f"耗时: {(time.time()-file_start)*1000:.2f}ms")
except Exception as e:
error_files += 1
error_details = f"{type(e).__name__}: {str(e)}"
log_step(f"恢复文件出错: {original_path}", False, error_details)
logging.debug(f"恢复错误详情:\n{traceback.format_exc()}")
except Exception as e:
log_step("恢复操作失败", False, f"{type(e).__name__}: {str(e)}")
logging.debug(f"恢复错误详情:\n{traceback.format_exc()}")
elapsed = time.time() - start_time
summary = f"恢复完成! 已恢复: {restored_files} 个文件, 错误: {error_files}, 耗时: {elapsed:.2f}秒"
log_step("文件恢复操作完成", True, summary)
update_config('error_replaced', 'no')
return restored_files
def monitor_loop(stop_event):
"""主监控循环"""
logging.info("="*80)
logging.info("数据库监控服务启动")
logging.info(f"监控数据库: {DB_HOST}:{DB_PORT} (用户: {DB_USER})")
logging.info(f"文件目录: {SEARCH_DIR}")
logging.info(f"备份目录: {BACKUP_DIR}")
logging.info(f"检测间隔: {CHECK_INTERVAL}秒")
logging.info("="*80)
config = read_config()
error_replaced = config.get('STATUS', 'error_replaced', fallback='no') == 'yes'
last_state = True
if error_replaced:
logging.warning("警告: 检测到上次未完成的替换状态,服务将以替换状态启动")
check_count = 0
while not stop_event.is_set():
check_count += 1
logging.info(f"\n{'='*40} 检测周期 #{check_count} {'='*40}")
log_step("开始数据库连接验证", True)
current_state = check_db_connection()
log_step(f"当前数据库状态: {'连接正常' if current_state else '连接失败'}", True)
if not current_state and not error_replaced:
logging.warning("!"*60)
logging.warning("! 数据库连接失败,触发文件替换操作")
logging.warning("!"*60)
replace_in_files()
error_replaced = True
elif current_state and error_replaced:
logging.warning("!"*60)
logging.warning("! 数据库连接恢复,触发文件恢复操作")
logging.warning("!"*60)
restore_files()
error_replaced = False
last_state = current_state
log_step(f"等待下一个检测周期 ({CHECK_INTERVAL}秒)", True)
time.sleep(CHECK_INTERVAL)
def main():
try:
import pymysql
except ImportError:
print("错误: 缺少pymysql模块,请先安装: pip install pymysql")
return
stop_event = Event()
monitor_thread = Thread(target=monitor_loop, args=(stop_event,))
monitor_thread.daemon = True
monitor_thread.start()
try:
print(f"数据库监控服务正在运行...")
print(f"日志文件: {os.path.abspath(LOG_FILE)}")
print(f"配置文件: {os.path.abspath(CONFIG_FILE)}")
print(f"监控数据库: {DB_HOST}:{DB_PORT} (用户: {DB_USER})")
print(f"文件目录: {SEARCH_DIR}")
print(f"备份目录: {BACKUP_DIR}")
print("\n按 Enter 键停止服务...")
input()
except KeyboardInterrupt:
pass
finally:
logging.info("接收停止信号,正在关闭服务...")
stop_event.set()
monitor_thread.join(timeout=10)
logging.info("="*80)
logging.info("数据库监控服务已停止")
logging.info("="*80)
print("服务已停止")
if __name__ == "__main__":
main()