""" 用户目录管理模块 管理用户工作空间和文件系统结构 """ from pathlib import Path from typing import List, Optional, Dict import os import json import yaml # 全局配置 _config = None CLOUD_ROOT = None USERS_DB_FILE = None def load_config(): """加载系统配置""" global _config, CLOUD_ROOT, USERS_DB_FILE if _config is None: config_path = Path(__file__).parent.parent.parent / "system_config.yaml" with open(config_path, 'r', encoding='utf-8') as f: _config = yaml.safe_load(f) # 优先尝试从环境变量获取 (Local Dev / Docker Compose Environment) env_cloud_root = os.getenv("CLOUD_ROOT") if env_cloud_root: # 这里的路径就是 D:\XGame\TraceStudio\cloud CLOUD_ROOT = Path(env_cloud_root).resolve() print(f"[Config] 使用环境变量 CLOUD_ROOT: {CLOUD_ROOT}") else: # 回退方案:使用 yaml 配置的相对路径 (旧逻辑) project_root = Path(__file__).parent.parent.parent.parent cloud_root_rel = _config.get('storage', {}).get('cloud_root', './cloud') CLOUD_ROOT = (project_root / cloud_root_rel).resolve() print(f"[Config] 使用相对路径 CLOUD_ROOT: {CLOUD_ROOT}") # ========================================================= # 用户数据库路径 # ========================================================= # 同样建议支持环境变量,方便灵活切换位置 env_users_db = os.getenv("USERS_DB_FILE") if env_users_db: USERS_DB_FILE = Path(env_users_db).resolve() else: # 默认放在 server 目录下 USERS_DB_FILE = Path(__file__).parent.parent.parent / "users.json" print(f"✅ 系统配置加载完成,CLOUD_ROOT: {CLOUD_ROOT}") return _config # 初始化配置 load_config() def load_users_db() -> Dict: """加载用户数据库""" if not USERS_DB_FILE.exists(): # 创建默认数据库 default_db = { "users": [ { "username": "guest", "display_name": "访客", "created_at": "2026-01-07T00:00:00Z", "active": True } ] } save_users_db(default_db) return default_db try: with open(USERS_DB_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"⚠️ 加载用户数据库失败: {e}") return {"users": []} def save_users_db(db: Dict) -> bool: """保存用户数据库""" try: USERS_DB_FILE.parent.mkdir(parents=True, exist_ok=True) with open(USERS_DB_FILE, 'w', encoding='utf-8') as f: json.dump(db, f, indent=2, ensure_ascii=False) return True except Exception as e: print(f"⚠️ 保存用户数据库失败: {e}") return False def get_active_users() -> List[str]: """获取所有活跃用户名列表""" db = load_users_db() return [ user["username"] for user in db.get("users", []) if user.get("active", True) ] def user_exists(username: str) -> bool: """检查用户是否存在""" return username in get_active_users() def init_base_structure(): """初始化基础文件系统结构""" # 创建云存储根目录 CLOUD_ROOT.mkdir(exist_ok=True) # 创建基础目录 base_dirs = [ CLOUD_ROOT / "users", CLOUD_ROOT / "shared", CLOUD_ROOT / "temp", ] for dir_path in base_dirs: dir_path.mkdir(exist_ok=True) # 从数据库创建用户目录 active_users = get_active_users() for user in active_users: create_user_workspace(user) print(f"✅ 文件系统初始化完成: {CLOUD_ROOT}") print(f"✅ 已激活用户: {', '.join(active_users)}") def create_user_workspace(username: str) -> Path: """ 创建用户工作空间 Args: username: 用户名 Returns: 用户根目录路径 """ user_root = CLOUD_ROOT / "users" / username user_root.mkdir(parents=True, exist_ok=True) # 创建用户子目录 subdirs = ["data", "workflows", "results", "cache"] for subdir in subdirs: (user_root / subdir).mkdir(exist_ok=True) return user_root def get_user_root(username: str) -> Path: """获取用户根目录""" user_root = CLOUD_ROOT / "users" / username if not user_root.exists(): return create_user_workspace(username) return user_root def get_user_path(username: str, relative_path: str = "") -> Path: """ 获取用户文件的绝对路径 Args: username: 用户名 relative_path: 相对路径(相对于用户根目录) Returns: 绝对路径 """ user_root = get_user_root(username) if relative_path: return user_root / relative_path return user_root def list_users() -> List[str]: """列出所有活跃用户(从数据库)""" return get_active_users() class UserContext: """用户上下文管理""" def __init__(self, username: str): self.username = username self.root = get_user_root(username) def get_path(self, relative_path: str = "") -> Path: """获取相对于用户根目录的路径""" if relative_path: return self.root / relative_path return self.root def ensure_dir(self, relative_path: str): """确保目录存在""" path = self.get_path(relative_path) path.mkdir(parents=True, exist_ok=True) return path def list_dir(self, relative_path: str = ""): """列出目录内容""" path = self.get_path(relative_path) if not path.exists(): return [] return list(path.iterdir())