220 lines
7.8 KiB
Python
220 lines
7.8 KiB
Python
import shutil
|
||
import os
|
||
import time
|
||
from loguru import logger
|
||
import zipfile
|
||
|
||
|
||
def export_templates_folder(output_folder, stop_event, progress_callback=None):
|
||
"""
|
||
将指定文件夹压缩为 zip 包
|
||
:param output_folder: 压缩包存放的文件夹路径 (默认 'backup')
|
||
:param stop_event: 停止事件
|
||
:param progress_callback : 进度条回调
|
||
"""
|
||
source_folder = "data"
|
||
output_folder = output_folder if output_folder else "backup"
|
||
try:
|
||
# 1. 检查源文件夹是否存在
|
||
if not os.path.exists(source_folder):
|
||
logger.error(f"备份失败: 找不到源文件夹 '{source_folder}'")
|
||
return
|
||
|
||
# 2. 确保输出目录存在
|
||
if not os.path.exists(output_folder):
|
||
os.makedirs(output_folder)
|
||
logger.info(f"创建备份目录: {output_folder}")
|
||
|
||
# 3. 生成带时间戳的文件名 (例如: data_backup_20251211_103000)
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
# 注意: make_archive 不需要写后缀 .zip,它会自动加
|
||
base_name = os.path.join(output_folder, f"data_备份_{timestamp}")
|
||
|
||
logger.info(f"正在压缩 '{source_folder}' ...")
|
||
|
||
# 4. 执行压缩
|
||
# format="zip": 压缩格式
|
||
# root_dir: 要压缩的根目录的上级目录 (通常填 None)
|
||
# base_dir: 要压缩的具体目录
|
||
zip_path = shutil.make_archive(
|
||
base_name=base_name,
|
||
format="zip",
|
||
root_dir=os.path.dirname(
|
||
os.path.abspath(source_folder)
|
||
), # 这里为了安全,定位到父级
|
||
base_dir=os.path.basename(
|
||
os.path.abspath(source_folder)
|
||
), # 只压缩 data 文件夹本身
|
||
)
|
||
|
||
logger.success(f"✅ 备份成功! 文件已保存至:\n{zip_path}")
|
||
return zip_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ 备份出错: {str(e)}")
|
||
import traceback
|
||
|
||
logger.error(traceback.format_exc())
|
||
|
||
|
||
def export_data(save_dir, root_dir=".", progress_callback=None):
|
||
"""
|
||
导出 data 和 output 两个文件夹到同一个 zip 包中
|
||
:param save_dir: 用户在 GUI 弹窗中选择的保存目录 (例如: D:/Backup)
|
||
:param root_dir: 项目根目录 (用于找到 data 和 output)
|
||
:param progress_callback: 进度条回调函数,接收一个 float (0.0~1.0)
|
||
"""
|
||
|
||
# 1. 定义要打包的目标文件夹
|
||
targets = ["data", "output"]
|
||
|
||
# 2. 检查保存目录
|
||
if not os.path.exists(save_dir):
|
||
logger.error(f"保存目录不存在: {save_dir}")
|
||
return None
|
||
|
||
# --- 【新增步骤 A】预先计算文件总数 ---
|
||
total_files = 0
|
||
for target in targets:
|
||
target_abs_path = os.path.join(root_dir, target)
|
||
if os.path.exists(target_abs_path):
|
||
for _, _, files in os.walk(target_abs_path):
|
||
total_files += len(files)
|
||
|
||
logger.info(f"待压缩文件总数: {total_files}")
|
||
# ------------------------------------
|
||
|
||
# 3. 生成压缩包路径
|
||
timestamp = time.strftime("%Y%m%d_%H%M%S")
|
||
zip_filename = f"完整备份_{timestamp}.zip"
|
||
zip_path = os.path.join(save_dir, zip_filename)
|
||
|
||
logger.info(f"开始备份,目标文件: {zip_path}")
|
||
|
||
try:
|
||
# 4. 创建压缩包
|
||
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
|
||
|
||
processed_count = 0 # 当前处理的文件数
|
||
has_files = False # 标记是否真的压缩了文件
|
||
|
||
for target in targets:
|
||
target_abs_path = os.path.join(root_dir, target)
|
||
|
||
if not os.path.exists(target_abs_path):
|
||
logger.warning(f"⚠️ 跳过: 找不到文件夹 '{target}'")
|
||
continue
|
||
|
||
logger.info(f"正在压缩: {target} ...")
|
||
|
||
# 5. 遍历文件夹写入 ZIP
|
||
for root, dirs, files in os.walk(target_abs_path):
|
||
for file in files:
|
||
# 获取文件的绝对路径
|
||
file_abs_path = os.path.join(root, file)
|
||
|
||
# 计算相对路径
|
||
arcname = os.path.relpath(file_abs_path, root_dir)
|
||
|
||
# 写入压缩包
|
||
zf.write(file_abs_path, arcname)
|
||
has_files = True
|
||
# 更新进度条
|
||
if progress_callback:
|
||
progress_callback(processed_count + 1, total_files, "导出数据中...")
|
||
|
||
if has_files:
|
||
# 确保进度条最后能走到 100%
|
||
if progress_callback:
|
||
progress_callback(total_files, total_files, "导出数据成功")
|
||
|
||
logger.success(f"✅ 备份成功! 文件已保存至:\n{zip_path}")
|
||
return zip_path
|
||
else:
|
||
logger.error("❌ 备份失败: data 和 output 文件夹均为空或不存在。")
|
||
if os.path.exists(zip_path):
|
||
os.remove(zip_path)
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"导出过程出错: {str(e)}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
return None
|
||
|
||
|
||
def initialize_project(root_dir=".", progress_callback=None):
|
||
"""
|
||
初始化项目:清空 data,重建目录,复制模板
|
||
:param root_dir: 项目根目录
|
||
:param progress_callback : 进度条回调
|
||
"""
|
||
# 定义路径
|
||
data_dir = os.path.join(root_dir, "data")
|
||
images_dir = os.path.join(data_dir, "images")
|
||
output_dir = os.path.join(root_dir, "output")
|
||
|
||
# 模板源文件路径
|
||
template_dir = os.path.join(root_dir, "templates")
|
||
src_excel = os.path.join(template_dir, "names.xlsx")
|
||
dst_excel = os.path.join(data_dir, "names.xlsx")
|
||
|
||
logger.info("开始初始化/重置项目...")
|
||
|
||
# --- 1. 清空 data 文件夹 ---
|
||
if os.path.exists(data_dir):
|
||
try:
|
||
# 暴力删除整个 data 文件夹及其内容
|
||
shutil.rmtree(data_dir)
|
||
logger.info(f"已清理旧数据: {data_dir}")
|
||
except PermissionError:
|
||
logger.error(
|
||
f"❌ 删除失败!请检查 '{data_dir}' 下的文件(如 Excel)是否正在被打开。"
|
||
)
|
||
return
|
||
except Exception as e:
|
||
logger.error(f"❌ 删除出错: {e}")
|
||
return
|
||
# --- 2. 清空 output_dir 文件夹 ---
|
||
if os.path.exists(output_dir):
|
||
try:
|
||
# 暴力删除整个 output 文件夹及其内容
|
||
shutil.rmtree(output_dir)
|
||
logger.info(f"已清理旧数据: {output_dir}")
|
||
except PermissionError:
|
||
logger.error(f"❌ 删除失败!请检查 '{output_dir}' 下的文件是否正在被打开。")
|
||
return
|
||
except Exception as e:
|
||
logger.error(f"❌ 删除出错: {e}")
|
||
return
|
||
|
||
# --- 2. 新建目录结构 ---
|
||
try:
|
||
# makedirs 会自动创建父级目录 (data 和 data/images 都会被创建)
|
||
os.makedirs(images_dir, exist_ok=True)
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
logger.info(f"✅ 已创建目录: {images_dir}")
|
||
except Exception as e:
|
||
logger.error(f"创建目录失败: {e}")
|
||
return
|
||
|
||
# --- 3. 复制模板文件 ---
|
||
if os.path.exists(src_excel):
|
||
try:
|
||
shutil.copy(src_excel, dst_excel)
|
||
logger.success(f"✅ 成功从模板恢复: {dst_excel}")
|
||
logger.success("🎉 初始化完成!一切已恢复出厂设置。")
|
||
except Exception as e:
|
||
logger.error(f"复制模板文件失败: {e}")
|
||
else:
|
||
logger.warning(
|
||
f"⚠️ 警告: 模板文件不存在 ({src_excel}),data 文件夹内将没有 Excel 文件。"
|
||
)
|
||
|
||
|
||
def check_file_exists(file_path):
|
||
"""
|
||
判断文件是否存在
|
||
"""
|
||
return file_path and isinstance(file_path, str) and os.path.exists(file_path)
|