const cron = require('node-cron');
const logger = require('../utils/logger');
const WeChatService = require('./WeChatService');
const WebhookService = require('./WebhookService');
const { getDatabase } = require('../config/database');
class SchedulerService {
constructor() {
this.jobs = new Map();
this.isRunning = false;
}
init() {
logger.info('初始化任务调度服务');
this.loadActiveConfigs();
}
async loadActiveConfigs() {
try {
const configs = await this.getActiveConfigs();
for (const config of configs) {
await this.startJob(config);
}
if (configs.length > 0) {
this.isRunning = true;
logger.info(`已启动 ${configs.length} 个定时任务`);
}
} catch (error) {
logger.error('加载配置失败:', error);
}
}
getActiveConfigs() {
const db = getDatabase();
return new Promise((resolve, reject) => {
const sql = 'SELECT * FROM configs WHERE is_active = 1';
db.all(sql, (err, rows) => {
if (err) {
reject(err);
} else {
resolve(rows);
}
});
});
}
async startJob(config) {
const jobId = `config_${config.id}`;
if (this.jobs.has(jobId)) {
this.stopJob(jobId);
}
const cronExpression = `*/${config.interval_minutes} * * * *`;
const job = cron.schedule(cronExpression, async () => {
await this.executeTokenRefresh(config);
}, {
scheduled: false
});
this.jobs.set(jobId, {
job: job,
config: config
});
job.start();
logger.info(`任务已启动 - AppID: ${config.appid}, 间隔: ${config.interval_minutes}分钟`);
await this.executeTokenRefresh(config);
}
stopJob(jobId) {
if (this.jobs.has(jobId)) {
const jobInfo = this.jobs.get(jobId);
jobInfo.job.stop();
this.jobs.delete(jobId);
logger.info(`任务已停止 - JobID: ${jobId}`);
}
}
stop() {
for (const [jobId, jobInfo] of this.jobs) {
jobInfo.job.stop();
}
this.jobs.clear();
this.isRunning = false;
logger.info('所有定时任务已停止');
}
async executeTokenRefresh(config) {
try {
logger.info(`执行定时任务 - AppID: ${config.appid}`);
await this.logOperation('token_refresh', `开始获取AccessToken - AppID: ${config.appid}`, 'running');
const tokenData = await WeChatService.getAccessToken(config.appid, config.appsecret);
if (config.webhook_url) {
await WebhookService.push(config.webhook_url, {
appid: config.appid,
access_token: tokenData.access_token,
expires_in: tokenData.expires_in,
expire_time: tokenData.expire_time
});
}
await this.logOperation('token_refresh', `AccessToken获取成功 - AppID: ${config.appid}`, 'success', {
token_length: tokenData.access_token.length,
expires_in: tokenData.expires_in
});
} catch (error) {
logger.error(`定时任务执行失败 - AppID: ${config.appid}:`, error);
await this.logOperation('token_refresh', `AccessToken获取失败 - AppID: ${config.appid}: ${error.message}`, 'error', {
error: error.message
});
}
}
async manualRefresh(configId) {
const config = await this.getConfigById(configId);
if (!config) {
throw new Error('配置不存在');
}
await this.executeTokenRefresh(config);
return { success: true, message: '手动刷新完成' };
}
getConfigById(configId) {
const db = getDatabase();
return new Promise((resolve, reject) => {
const sql = 'SELECT * FROM configs WHERE id = ?';
db.get(sql, [configId], (err, row) => {
if (err) {
reject(err);
} else {
resolve(row);
}
});
});
}
async reloadConfig(configId) {
const config = await this.getConfigById(configId);
if (!config) {
throw new Error('配置不存在');
}
const jobId = `config_${configId}`;
if (config.is_active) {
await this.startJob(config);
} else {
this.stopJob(jobId);
}
return { success: true, message: '配置已重新加载' };
}
async logOperation(type, message, status, details = null) {
const db = getDatabase();
return new Promise((resolve, reject) => {
const sql = `
INSERT INTO operation_logs (type, message, status, details)
VALUES (?, ?, ?, ?)
`;
db.run(sql, [
type,
message,
status,
details ? JSON.stringify(details) : null
], function(err) {
if (err) {
logger.error('记录操作日志失败:', err);
reject(err);
} else {
resolve(this.lastID);
}
});
});
}
getStatus() {
return {
isRunning: this.isRunning,
activeJobs: this.jobs.size,
jobs: Array.from(this.jobs.entries()).map(([jobId, jobInfo]) => ({
jobId,
appid: jobInfo.config.appid,
interval: jobInfo.config.interval_minutes,
webhookUrl: jobInfo.config.webhook_url
}))
};
}
}
module.exports = new SchedulerService();