import { EventEmitter } from 'events' import pLimit from 'p-limit' // 建议使用 v2.2.0 以兼容 CJS import { readingReflectionGraph } from '@main/services/ai/graph/readingReflectionGraph' import { AppDataSource } from '@main/db/data-source' import { ReadingReflectionTaskBatch } from '@main/db/entities/ReadingReflectionTaskBatch' import { ReadingReflectionTaskItem } from '@main/db/entities/ReadingReflectionTaskItem' import Store from 'electron-store' import { CONFIG_STORE_KEY } from '@rpc/constants/store_key' import { Notification } from 'electron' export const readingReflectionTaskEvent = new EventEmitter() // 兼容性处理获取 Store 构造函数 const StoreClass = (Store as any).default || Store const store = new StoreClass({ encryptionKey: CONFIG_STORE_KEY }) class TaskManager { private limit = pLimit(2) private batchRepo = AppDataSource.getRepository(ReadingReflectionTaskBatch) private itemRepo = AppDataSource.getRepository(ReadingReflectionTaskItem) /** * 更新主任务汇总进度 */ private async updateBatchStatus(batchId: string) { const items = await this.itemRepo.find({ where: { batch: { id: batchId } } }) if (items.length === 0) return const avgProgress = Math.round(items.reduce((acc, i) => acc + i.progress, 0) / items.length) let status = 'PROCESSING' if (avgProgress === 100) status = 'COMPLETED' if (items.every((i) => i.status === 'FAILED')) status = 'FAILED' await this.batchRepo.update(batchId, { progress: avgProgress, status }) // 发送给左侧列表订阅者 readingReflectionTaskEvent.emit('batchProgressUpdate', { batchId, progress: avgProgress, status }) } async startBatchTask(taskId: string, task: any) { const total = task.quantity || 1 // 1. 初始化主任务 const batch = this.batchRepo.create({ id: taskId, bookName: task.bookName, totalCount: total }) await this.batchRepo.save(batch) // 发送给左侧列表订阅者 readingReflectionTaskEvent.emit('batchProgressUpdate', { batchId: taskId, progress: 0, status: 'PROCESSING' }) const promises = Array.from({ length: total }).map((_, index) => { const subTaskId = total === 1 ? taskId : `${taskId}-${index}` return this.limit(async () => { try { const item = this.itemRepo.create({ id: subTaskId, batch: batch, status: 'PENDING' }) await this.itemRepo.save(item) const stream = await readingReflectionGraph.stream( { ...task }, { configurable: { thread_id: subTaskId } } ) let finalResult: any = {} for await (const chunk of stream) { // 处理生成正文节点 if (chunk.generateReadingReflectionContent) { const contentData = chunk.generateReadingReflectionContent await this.itemRepo.update(subTaskId, { status: 'WRITING', progress: 50, content: contentData.content, title: contentData.title }) finalResult = { ...finalResult, ...contentData } await this.updateBatchStatus(taskId) this.emitProgress(taskId, index, total, 60, '正文已生成...') } // 处理生成摘要节点 if (chunk.generateReadingReflectionSummary) { const summaryData = chunk.generateReadingReflectionSummary finalResult = { ...finalResult, ...summaryData } await this.itemRepo.update(subTaskId, { status: 'COMPLETED', progress: 100, summary: summaryData.summary, title: finalResult.title, keywords: summaryData.keywords }) } } await this.updateBatchStatus(taskId) this.emitProgress(taskId, index, total, 100, '生成成功', finalResult) } catch (error) { await this.itemRepo.update(subTaskId, { status: 'FAILED', progress: 0 }) await this.updateBatchStatus(taskId) this.emitProgress(taskId, index, total, 0, '生成失败') } }) }) await Promise.all(promises) } private emitProgress( taskId: string, index: number, total: number, progress: number, status: string, result?: any ) { const displayId = total === 1 ? taskId : `${taskId}-${index}` //发送 tRPC 实时事件(驱动前端 UI 进度条) readingReflectionTaskEvent.emit('readingReflectionTaskProgress', { taskId: displayId, progress, status: status, // 传枚举 Key statusText: `[任务${index + 1}/${total}] ${status}`, // 传描述文字 result }) // 2. 添加任务状态通知判断 this.handleNotification(status, progress, total, index) } /** * 内部私有方法:处理通知逻辑 */ private handleNotification(status: string, progress: number, total: number, index: number) { // 从 electron-store 获取用户偏好 const config = store.get('notification') || { masterSwitch: true, taskCompleted: true, taskFailed: true } // 如果总开关关闭,直接拦截 if (!config.masterSwitch) return // 场景 A: 任务全部完成 (100%) if (progress === 100 && config.taskCompleted) { // 只有当所有子任务都完成,或者当前是单任务时才弹出 // 如果是批量任务,你可以选择在最后一个子任务完成时通知 if (index + 1 === total) { new Notification({ title: '🎉 读书心得已生成', body: total > 1 ? `共 ${total} 篇心得已全部处理完成。` : '您的书籍心得已准备就绪。', silent: config.silentMode }).show() } } // 场景 B: 任务失败 (假设你传入的 status 是 'FAILED') if (status === 'FAILED' && config.taskFailed) { new Notification({ title: '❌ 任务生成失败', body: `第 ${index + 1} 项任务执行异常,请检查网络或 API 余额。`, silent: config.silentMode }).show() } } } export const readingReflectionsTaskManager = new TaskManager()