"""准备分发目录或归档大文件。 用法: python scripts/prepare_dist.py --mode copy # 将必需文件复制到 dist/ python scripts/prepare_dist.py --mode archive # 将大文件归档到 archive/ 脚本不会删除源文件,会生成操作日志和回滚脚本。 """ import argparse import os import shutil import json from pathlib import Path ROOT = Path(__file__).resolve().parents[1] DIST = ROOT / "dist" ARCHIVE = ROOT / "archive" LOG = ROOT / "scripts" / "prepare_dist.log" DEFAULT_INCLUDES = [ "server", "web", "cloud/custom_nodes", "README.md", "server/system_config.yaml", ] LARGE_EXT = {"*.csv", "*.utrace", "*.zip"} # Patterns to ignore when copying to dist IGNORE_PATTERNS = [ "__pycache__", "node_modules", "*.pyc", "*.pyo", ] def write_log(entry: dict): with open(LOG, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") def gather_large_files(root: Path): results = [] for p in root.rglob("*"): if p.is_file(): if any(p.match(pattern) for pattern in LARGE_EXT): results.append(p) return results def copy_includes(includes, dest: Path): dest.mkdir(parents=True, exist_ok=True) actions = [] for inc in includes: src = ROOT / inc if not src.exists(): continue target = dest / src.name if src.is_dir(): for root_dir, dirs, files in os.walk(src): # skip ignored directories rel = os.path.relpath(root_dir, src) parts = rel.split(os.sep) if rel != '.' else [] if any(pat in parts for pat in IGNORE_PATTERNS if not pat.startswith('*')): continue # copy files except ignored patterns for f in files: skip = False for pat in IGNORE_PATTERNS: if pat.startswith('*') and Path(f).match(pat): skip = True break if skip: continue src_file = Path(root_dir) / f relpath = src_file.relative_to(src) dest_file = target / relpath dest_file.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src_file, dest_file) actions.append((str(src), str(target), "copytree_filtered")) else: # single file copy shutil.copy2(src, target) actions.append((str(src), str(target), "copyfile")) write_log({"action": "copy_includes", "items": actions}) return actions def archive_files(files, dest: Path): dest.mkdir(parents=True, exist_ok=True) actions = [] for f in files: # skip ignored if any(part == '__pycache__' for part in f.parts): continue rel = f.relative_to(ROOT) target = dest / rel target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(f, target) actions.append((str(f), str(target))) write_log({"action": "archive_files", "items": actions}) return actions def build_rollback(actions, rollback_path: Path): rollback_path.parent.mkdir(parents=True, exist_ok=True) with open(rollback_path, "w", encoding="utf-8") as f: f.write("# 回滚脚本(手动检查并运行)\n") for src, dst, kind in actions: f.write(f"# 删除目标: {dst}\n") write_log({"action": "write_rollback", "path": str(rollback_path)}) def main(): parser = argparse.ArgumentParser() parser.add_argument("--mode", choices=["copy", "archive"], default="copy") parser.add_argument("--includes", nargs="*", default=DEFAULT_INCLUDES) args = parser.parse_args() if args.mode == "copy": actions = copy_includes(args.includes, DIST) build_rollback(actions, ROOT / "scripts" / "rollback_copy.sh") print("已将包含项复制到 dist/,查看 scripts/prepare_dist.log 获取详情。") else: files = gather_large_files(ROOT / "cloud") actions = archive_files(files, ARCHIVE) build_rollback(actions, ROOT / "scripts" / "rollback_archive.sh") print("已将大文件复制到 archive/,查看 scripts/prepare_dist.log 获取详情。") if __name__ == "__main__": main()