fix:基本实现相关功能
This commit is contained in:
126
src/main/manager/readingReflectionsTaskManager.ts
Normal file
126
src/main/manager/readingReflectionsTaskManager.ts
Normal file
@@ -0,0 +1,126 @@
|
||||
import { EventEmitter } from 'events'
|
||||
import pLimit from 'p-limit' // 建议使用 v2.2.0 以兼容 CJS
|
||||
import { readingReflectionGraph } from '@main/services/ai/graph/readingReflectionGraph'
|
||||
import { AppDataSource } from '@main/db/data-source'
|
||||
import { ReadingReflectionTaskBatch } from '@main/db/entities/ReadingReflectionTaskBatch'
|
||||
import { ReadingReflectionTaskItem } from '@main/db/entities/ReadingReflectionTaskItem'
|
||||
|
||||
export const readingReflectionTaskEvent = new EventEmitter()
|
||||
|
||||
class TaskManager {
|
||||
private limit = pLimit(2)
|
||||
private batchRepo = AppDataSource.getRepository(ReadingReflectionTaskBatch)
|
||||
private itemRepo = AppDataSource.getRepository(ReadingReflectionTaskItem)
|
||||
|
||||
/**
|
||||
* 更新主任务汇总进度
|
||||
*/
|
||||
private async updateBatchStatus(batchId: string) {
|
||||
const items = await this.itemRepo.find({ where: { batch: { id: batchId } } })
|
||||
if (items.length === 0) return
|
||||
|
||||
const avgProgress = Math.round(items.reduce((acc, i) => acc + i.progress, 0) / items.length)
|
||||
let status = 'PROCESSING'
|
||||
if (avgProgress === 100) status = 'COMPLETED'
|
||||
if (items.every((i) => i.status === 'FAILED')) status = 'FAILED'
|
||||
|
||||
await this.batchRepo.update(batchId, { progress: avgProgress, status })
|
||||
|
||||
// 发送给左侧列表订阅者
|
||||
readingReflectionTaskEvent.emit('batchProgressUpdate', {
|
||||
batchId,
|
||||
progress: avgProgress,
|
||||
status
|
||||
})
|
||||
}
|
||||
|
||||
async startBatchTask(taskId: string, task: any) {
|
||||
const total = task.quantity || 1
|
||||
|
||||
// 1. 初始化主任务
|
||||
const batch = this.batchRepo.create({ id: taskId, bookName: task.bookName, totalCount: total })
|
||||
await this.batchRepo.save(batch)
|
||||
// 发送给左侧列表订阅者
|
||||
readingReflectionTaskEvent.emit('batchProgressUpdate', {
|
||||
batchId: taskId,
|
||||
progress: 0,
|
||||
status: 'PROCESSING'
|
||||
})
|
||||
|
||||
const promises = Array.from({ length: total }).map((_, index) => {
|
||||
const subTaskId = total === 1 ? taskId : `${taskId}-${index}`
|
||||
|
||||
return this.limit(async () => {
|
||||
try {
|
||||
const item = this.itemRepo.create({ id: subTaskId, batch: batch, status: 'PENDING' })
|
||||
await this.itemRepo.save(item)
|
||||
|
||||
const stream = await readingReflectionGraph.stream(
|
||||
{ ...task },
|
||||
{ configurable: { thread_id: subTaskId } }
|
||||
)
|
||||
|
||||
let finalResult: any = {}
|
||||
|
||||
for await (const chunk of stream) {
|
||||
// 处理生成正文节点
|
||||
if (chunk.generateReadingReflectionContent) {
|
||||
const contentData = chunk.generateReadingReflectionContent
|
||||
await this.itemRepo.update(subTaskId, {
|
||||
status: 'WRITING',
|
||||
progress: 50,
|
||||
content: contentData.content,
|
||||
title: contentData.title
|
||||
})
|
||||
finalResult = { ...finalResult, ...contentData }
|
||||
await this.updateBatchStatus(taskId)
|
||||
this.emitProgress(taskId, index, total, 60, '正文已生成...')
|
||||
}
|
||||
|
||||
// 处理生成摘要节点
|
||||
if (chunk.generateReadingReflectionSummary) {
|
||||
const summaryData = chunk.generateReadingReflectionSummary
|
||||
finalResult = { ...finalResult, ...summaryData }
|
||||
await this.itemRepo.update(subTaskId, {
|
||||
status: 'COMPLETED',
|
||||
progress: 100,
|
||||
summary: summaryData.summary,
|
||||
title: finalResult.title,
|
||||
keywords: summaryData.keywords
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
await this.updateBatchStatus(taskId)
|
||||
this.emitProgress(taskId, index, total, 100, '生成成功', finalResult)
|
||||
} catch (error) {
|
||||
await this.itemRepo.update(subTaskId, { status: 'FAILED', progress: 0 })
|
||||
await this.updateBatchStatus(taskId)
|
||||
this.emitProgress(taskId, index, total, 0, '生成失败')
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
await Promise.all(promises)
|
||||
}
|
||||
|
||||
private emitProgress(
|
||||
taskId: string,
|
||||
index: number,
|
||||
total: number,
|
||||
progress: number,
|
||||
status: string,
|
||||
result?: any
|
||||
) {
|
||||
const displayId = total === 1 ? taskId : `${taskId}-${index}`
|
||||
readingReflectionTaskEvent.emit('readingReflectionTaskProgress', {
|
||||
taskId: displayId,
|
||||
progress,
|
||||
status: status, // 传枚举 Key
|
||||
statusText: `[任务${index + 1}/${total}] ${status}`, // 传描述文字
|
||||
result
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export const readingReflectionsTaskManager = new TaskManager()
|
||||
Reference in New Issue
Block a user