import os import threading import time import traceback import comtypes.client import pandas as pd import pythoncom from loguru import logger from pptx import Presentation from config.config import load_config from utils.agent_utils import generate_comment from utils.file_utils import check_file_exists, get_output_pptx_files from utils.growt_utils import ( replace_one_page, replace_two_page, replace_three_page, replace_four_page, replace_five_page, ) from utils.image_utils import find_image_path from utils.zodiac_utils import calculate_zodiac # ========================================== # 1. 生成模板(根据names.xlsx文件生成名字图片文件夹) # ========================================== def generate_template(stop_event: threading.Event = None, progress_callback=None): """ " 根据学生姓名生成相对应的以学生姓名的存放照片的文件夹 :params stop_event 任务是否停止事件(监听UI的事件监听) :params progress_callback 进度回调函数 """ # 1. 加载配置文件 try: config = load_config() except Exception as e: logger.error(f"配置文件获取失败: {str(e)}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) try: # 1. 读取数据 df = pd.read_excel(config["excel_file"], sheet_name="Sheet1") # 2. 获取姓名数据 datas = df["姓名"].values.tolist() total_count = len(datas) logger.info(f"开始生成学生模版文件,共 {total_count} 位学生...") # 3. 循环处理 for i, name in enumerate(datas): # 判断是否有停止事件 if stop_event and stop_event.is_set(): logger.warning("任务正在停止中,正在中断中.....") return # 停止任务 # 添加进度条 if progress_callback: progress_callback(i + 1, total_count, "生成学生图片文件夹") logger.info(f"[{i + 1}/{len(datas)}] 正在生成: {name}") # 确保 name 是字符串且去除了空格 (增加健壮性) name = str(name).strip() # 生成学生图片的文件夹 student_folder = os.path.join(config["image_folder"], name) if os.path.exists(student_folder): logger.info(f"学生图片文件夹已存在 {student_folder}") else: logger.info(f"正在生成学生图片文件夹 {student_folder}") os.makedirs(student_folder, exist_ok=True) # 更新进度条为100% if progress_callback: progress_callback(total_count, total_count, "生成学生图片文件夹") logger.success("✅ 所有学生模版文件已生成完毕") except Exception as e: logger.error(f"程序运行出错: {str(e)}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) # ========================================== # 2. 生成评语(根据names.xlsx文件生成评价) # ========================================== def generate_comment_all(stop_event: threading.Event = None, progress_callback=None): """ 根据学生姓名生成评价 :params stop_event 任务是否停止事件(监听UI的事件监听) :params progress_callback 进度回调函数 """ # 1. 加载配置文件 try: config = load_config() except Exception as e: logger.error(f"配置文件获取失败: {str(e)}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) try: # 1. 读取数据 excel_path = config["excel_file"] df = pd.read_excel(excel_path, sheet_name="Sheet1") # 检查是否存在"评价"列,不存在则新建(防止报错) if "评价" not in df.columns: df["评价"] = "" # 获取学生数据行数 total_count = len(df) logger.info(f"开始生成学生评语,共 {len(df)} 位学生...") # 强制将“评价”列转换为 object 类型 df["评价"] = df["评价"].astype("object") # --- 遍历 DataFrame 的索引 (index) --- for i in df.index: if stop_event and stop_event.is_set(): logger.warning("任务正在停止中,正在中断中.....") return # 停止任务 # 获取学生姓名 name = df.at[i, "姓名"] if pd.isna(name): continue else: name = str(name).strip() # 获取性别 sex = pd.isna(df.at[i, "性别"]) if "男" else str(df.at[i, "性别"]).strip() # 获取当前行的特征(如果Excel里有“特征”这一列就读,没有就用默认值) # 假设Excel里有一列叫 "表现特征",如果没有则用默认的 "有礼貌..." traits = ( df.at[i, "表现特征"] if "表现特征" in df.columns and not pd.isna(df.at[i, "表现特征"]) else "有礼貌、守纪律" ) # 优化:如果“评价”列已经有内容了,跳过不生成(节省API费用) current_comment = df.at[i, "评价"] if not pd.isna(current_comment) and str(current_comment).strip() != "": logger.info(f"[{i + 1}/{total_count}] {name} 已有评语,跳过。") continue # 添加进度条 if progress_callback: progress_callback( i + 1, total_count, f"[{i + 1}/{total_count}] 正在生成评价: {name}" ) logger.info(f"[{i + 1}/{total_count}] 正在生成评价: {name}") try: # 调用AI大模型生成内容 generated_text = generate_comment( name, config["age_group"], traits, sex ) df.at[i, "评价"] = generated_text if str(generated_text).strip() else "" logger.success(f"学生:{name},评语生成完毕") # 可选:每生成 5 个就保存一次 if (i + 1) % 5 == 0: df.to_excel(excel_path, index=False) logger.success("✅ 阶段性保存成功") # 避免触发API速率限制 time.sleep(1) except Exception as e: logger.error(f"学生:{name},生成评语出错: {str(e)}") # --- 循环结束后最终保存文件 --- # index=False 表示不把 pandas 的索引 (0,1,2...) 写到 Excel 第一列 df.to_excel(excel_path, index=False) logger.success(f"所有评语已生成并写入文件:{excel_path}") if progress_callback: progress_callback(total_count, total_count, "生成学生评语") except PermissionError: logger.error(f"保存失败!请先关闭 Excel 文件:{config['excel_file']}") except Exception as e: logger.error(f"程序运行出错: {str(e)}") logger.error(traceback.format_exc()) # ========================================== # 3. 生成成长报告(根据names.xlsx文件生成) # ========================================== def generate_report(stop_event: threading.Event = None, progress_callback=None): """ 根据学生姓名生成成长报告 :params stop_event 任务是否停止事件(监听UI的事件监听) :params progress_callback 进度回调函数 """ # 1. 加载配置文件 try: config = load_config() except Exception as e: logger.error(f"配置文件获取失败: {str(e)}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) # 1. 检查模版文件是否存在 if not os.path.exists(config["source_file"]): logger.info(f"错误: 找不到模版文件 {config["source_file"]}") # 2. 检查数据文件是否存在 if not os.path.exists(config["excel_file"]): logger.info(f"错误: 找不到数据文件 {config['excel_file']}") try: # 1. 读取数据 df = pd.read_excel(config["excel_file"], sheet_name="Sheet1") # 2. 确保列名对应 columns = [ "姓名", "英文名", "性别", "生日", "属相", "我的好朋友", "我的爱好", "喜欢的游戏", "喜欢吃的食物", "评价", ] # 获取数据列表 datas = df[columns].values.tolist() total_count = len(datas) # 获取配置文件的教师签名 teacher_names_str = " ".join(config["teachers"]) logger.info(f"开始处理,共 {total_count} 位学生...") # 3. 循环处理 for i, row_data in enumerate(datas): if stop_event and stop_event.is_set(): logger.warning("任务正在停止中,正在中断中.....") return # 解包数据 ( name, english_name, sex, birthday, zodiac, friend, hobby, game, food, comments, ) = row_data # 更新进度条 if progress_callback: progress_callback( i + 1, total_count, f"[{i + 1}/{len(datas)}] 正在生成: 【{name}】 成长报告", ) logger.info(f"[{i + 1}/{len(datas)}] 正在生成: {name}") # 每次循环重新加载模版 prs = Presentation(config["source_file"]) # --- 页面 1 --- replace_one_page(prs, name, config["class_name"]) # --- 页面 2 --- replace_two_page(prs, comments, teacher_names_str) # --- 页面 3 --- student_image_folder = os.path.join(config["image_folder"], name) logger.info(f"学生:{name},图片文件夹: {student_image_folder}") # 判断学生图片文件夹是否存在 if not os.path.exists(student_image_folder): logger.warning( f"⚠️ 警告: 学生:{name},学生图片文件夹不存在 {student_image_folder}" ) continue # 检查姓名是否为空 if not name: logger.error(f"⚠️ 警告: 学生:{name},姓名为空,跳过") break # 构造学生信息字典 student_info_dict = { "name": name, "english_name": english_name if pd.notna(english_name) else " ", "sex": sex if pd.notna(sex) else "男", "birthday": ( birthday.strftime("%Y-%m-%d") if pd.notna(birthday) else " " ), "zodiac": ( str(zodiac).strip() if str(zodiac).strip() or not str(zodiac).strip().lower() else " " ), "friend": ( str(friend).strip() if str(friend).strip() or not str(friend).strip().lower() else " " ), "hobby": ( str(hobby).strip() if str(hobby).strip() or not str(hobby).strip().lower() else " " ), "game": ( str(game).strip() if str(game).strip() or not str(game).strip().lower() else " " ), "food": ( str(food).strip() if str(food).strip() or not str(food).strip().lower() else " " ), } # 获取学生个人照片路径 me_image_path = find_image_path(student_image_folder, "me") # 检查学生图片是否存在,若不存在则跳过 if check_file_exists(me_image_path): replace_three_page(prs, student_info_dict, me_image_path) else: replace_three_page(prs, student_info_dict) logger.warning(f"⚠️ 警告: 学生图片文件不存在 {me_image_path}") # --- 页面 4 --- class_image_path = find_image_path( config["image_folder"], config["class_name"] ) print(config["image_folder"], config["class_name"]) # 添加检查班级图片是否存在,若不存在则跳过 if check_file_exists(class_image_path): replace_four_page(prs, class_image_path) else: logger.warning(f"⚠️ 警告: 班级图片文件不存在 {class_image_path}") # --- 页面 5 --- if os.path.exists(student_image_folder): img1_path = find_image_path(student_image_folder, "1") img2_path = find_image_path(student_image_folder, "2") # 逻辑优化: # 情况A: 两张都找到了 -> 正常插入 if img1_path and img2_path: replace_five_page(prs, img1_path, img2_path) # 情况B: 只找到了 1 -> 两张图都用 1 (避免报错) elif img1_path and not img2_path: replace_five_page(prs, img1_path, img1_path) # 情况C: 一张都没找到 else: logger.warning( f"⚠️ 警告: {name} 缺少作品照片 (1.jpg/png 或 2.jpg/png)[/]" ) else: logger.warning(f"错误: 学生图片文件夹不存在 {student_image_folder}") # --- 保存文件 --- file_ext = os.path.splitext(config["source_file"])[1] safe_name = str(name).strip() new_filename = f"{config['class_name']} {safe_name} 幼儿成长报告{file_ext}" output_path = os.path.join(config["output_folder"], new_filename) try: prs.save(output_path) logger.success(f"学生:{name},保存成功: {new_filename}") except PermissionError: logger.error( f"保存失败: 文件 {new_filename} 可能已被打开,请关闭后重试。" ) if progress_callback: progress_callback(total_count, total_count, "生成报告") logger.success("所有报告生成完毕!") except Exception as e: logger.error(f"程序运行出错: {str(e)}") logger.error(traceback.format_exc()) # ========================================== # 4. 转换格式(根据names.xlsx文件生成PPT转PDF) # ========================================== def generate_convert_pdf(stop_event: threading.Event = None, progress_callback=None): """ 批量转换文件夹下的所有 PPT :params folder_path 需要转换的PPT文件夹 :params stop_event 任务是否停止事件(监听UI的事件监听) :params progress_callback 进度回调函数 """ # 1. 加载配置文件 try: config = load_config() except Exception as e: logger.error(f"配置文件获取失败: {str(e)}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) # 子线程初始化 COM 组件 pythoncom.CoInitialize() try: folder_path = os.path.abspath(config.get("output_folder")) if not os.path.exists(folder_path): logger.error(f"文件夹不存在: {folder_path}") return # 获取所有 ppt/pptx 文件 files = [ f for f in os.listdir(folder_path) if f.lower().endswith((".ppt", ".pptx")) ] if not files: logger.warning("没有找到 PPT 文件") return total_count = len(files) logger.info(f"发现 {total_count} 个文件,准备开始转换...") powerpoint = None try: # 1. 启动应用 (只启动一次) powerpoint = comtypes.client.CreateObject("PowerPoint.Application") # 设置是否显示转化页面,但如果遇到转换卡死,可以尝试去掉下面这行的注释,让它显示出来 # powerpoint.Visible = 1 for filename in files: if stop_event and stop_event.is_set(): logger.warning("任务正在停止中,正在中断中.....") return # 添加进度条 if progress_callback: progress_callback(files.index(filename), total_count, "转换PDF") ppt_path = os.path.join(folder_path, filename) pdf_path = os.path.splitext(ppt_path)[0] + ".pdf" # 如果 PDF 已存在,可以选择跳过 if os.path.exists(pdf_path): logger.info(f"[跳过] 已存在: {filename}") continue logger.info( f"[{files.index(filename)}/{total_count}]正在转换: {filename} ..." ) try: # 打开 -> 另存为 -> 关闭 deck = powerpoint.Presentations.Open(ppt_path) deck.SaveAs(pdf_path, 32) # 32 代表 PDF 格式 deck.Close() except Exception as e: logger.error(f"文件 {filename} 转换出错: {e}") # 添加进度条 if progress_callback: progress_callback(total_count, total_count, "转换PDF") except Exception as e: logger.error(f"PowerPoint 进程启动出错: {e}") finally: # 2. 退出应用 if powerpoint: try: powerpoint.Quit() except: pass logger.success("PowerPoint 已关闭,批量转换完成。") except Exception as e: logger.error(f"未知错误: {e}") finally: # 【核心修复 2】释放资源 pythoncom.CoUninitialize() # ========================================== # 5. 生成属相(根据names.xlsx文件生成属相) # ========================================== def generate_zodiac(stop_event: threading.Event = None, progress_callback=None): """ 生成学生属相,如果“生日”列为空,则跳过该学生。 :params stop_event 任务是否停止事件(监听UI的事件监听) :params progress_callback 进度回调函数 """ # 1. 加载配置文件 try: config = load_config() except Exception as e: logger.error(f"配置文件获取失败: {str(e)}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) try: # 1. 读取数据 excel_path = config["excel_file"] df = pd.read_excel(excel_path, sheet_name="Sheet1") # 2. 检查必要的列 date_column = "生日" target_column = "属相" if date_column not in df.columns: logger.error(f"Excel中找不到列名:【{date_column}】,请检查表头。") return if target_column not in df.columns: df[target_column] = "" total_count = len(df) logger.info(f"开始生成学生属相,共 {total_count} 位学生...") # 3. 预处理:将“生日”列转换为 datetime 格式 df["temp_date"] = pd.to_datetime(df[date_column], errors="coerce") # 4. 遍历 DataFrame 并计算/更新数据 for i, row in df.iterrows(): # 关键点 1: 检查停止信号 if stop_event and stop_event.is_set(): logger.warning("任务已接收到停止信号,正在中断...") return # 添加进度条 if progress_callback: progress_callback(i + 1, total_count, "生成属相") name = row.get("姓名", f"学生_{i + 1}") date = row["temp_date"] logger.info(f"[{i + 1}/{total_count}] 正在处理学生:{name}...") # === 关键点 2: 检查生日是否为空 === if pd.isna(date): # 记录警告日志并跳过当前循环迭代 logger.warning(f"跳过:学生【{name}】的生日数据为空或格式错误。") # 可以选择将属相字段清空或设置为特定值,此处设置为“待补充” df.loc[i, target_column] = "待补充" continue # 跳到下一个学生 # ================================= # 5. 计算并赋值 zodiac = calculate_zodiac(date) df.loc[i, target_column] = zodiac logger.info(f" -> 属相计算成功:{name} ,属相: {zodiac}") # 6. 清理和保存结果 df = df.drop(columns=["temp_date"]) save_path = excel_path try: df.to_excel(save_path, sheet_name="Sheet1", index=False) logger.success(f"所有属相已更新并写入文件:{save_path}") except PermissionError: logger.error(f"保存失败!请先关闭 Excel 文件:{save_path}") # 添加进度条 if progress_callback: progress_callback(total_count, total_count, "生成属相") except FileNotFoundError: logger.error(f"找不到文件 {config.get('excel_file')}") logger.error(traceback.format_exc()) except Exception as e: logger.error(f"程序运行出错: {str(e)}") logger.error(traceback.format_exc()) # ========================================== # 6. 一键生成园长签名(根据输出文件夹生成签名) # ========================================== def generate_signature(progress_callback=None) -> str: """ 生成园长签名(不依赖占位符,直接在指定位置添加) """ # 1. 加载配置文件 try: config = load_config() except Exception as e: logger.error(f"配置文件获取失败: {str(e)}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) try: # 获取所有的PPT (此时返回的是文件名或路径的列表) pptx_files = get_output_pptx_files(config["output_folder"]) if not pptx_files: logger.warning("没有找到 PPT 文件") return "未找到文件" logger.info(f"开始生成签名,共 {len(pptx_files)} 个 PPT 文件...") img_path = config.get("signature_image") # 签名图片路径 if not img_path or not os.path.exists(img_path): logger.error(f"签名图片不存在: {img_path}") logger.warning(f"⚠️ 警告: 缺少签名照片('signature')") return logger.info(f"签名图片存在: {img_path}") # 从配置文件获取签名位置信息,如果没有则使用默认值 signature_left = config.get("signature_left", 2987040) # 左位置 signature_top = config.get("signature_top", 8273415) # 上位置 signature_width = config.get("signature_width", 1800000) # 宽度 signature_height = config.get("signature_height", 720000) # 高度 # 导入必要的模块 from utils.image_utils import get_corrected_image_stream for i, filename in enumerate(pptx_files): # 获取完整绝对路径 pptx_path = os.path.join(config["output_folder"], filename) # 打开 PPT 对象 prs = Presentation(pptx_path) # 获取第二张幻灯片 (索引为1) slide = prs.slides[1] # 获取修正后的图片流 img_stream = get_corrected_image_stream(img_path) # 直接在指定位置添加签名图片 slide.shapes.add_picture( img_stream, signature_left, signature_top, signature_width, signature_height, ) logger.info(f"在幻灯片 1 上添加签名图片") # 保存修改后的 PPT prs.save(pptx_path) # 更新进度条 (如果有 callback) if progress_callback: progress_callback( i + 1, len(pptx_files), f"[{i + 1}/{len(pptx_files)}] 生成签名完成: {filename}", ) logger.success(f"[{i + 1}/{len(pptx_files)}] 生成签名完成: {filename}") if progress_callback: progress_callback(len(pptx_files), len(pptx_files), "签名生成完成") except Exception as e: logger.error(f"generate_signature 发生未知错误: {e}") # 打印详细报错位置,方便调试 logger.error(traceback.format_exc()) return str(e)