pytrace/server_old/file_manager.py
Boshuang Zhao fb8458011a upload
2026-01-15 21:58:30 +08:00

332 lines
10 KiB
Python

"""
文件管理模块
提供多用户文件系统管理功能
"""
import os
import shutil
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
from fastapi import APIRouter, HTTPException, UploadFile, File, Query
from pydantic import BaseModel
# 使用 cloud 目录作为根目录
BASE_DIR = Path(__file__).parent.parent / "cloud"
router = APIRouter(prefix="/api/files", tags=["files"])
# ========== Pydantic 模型 ==========
class FileInfo(BaseModel):
"""文件/文件夹信息"""
name: str
path: str
type: str # 'file' | 'folder'
size: Optional[int] = None
modified: Optional[str] = None
extension: Optional[str] = None
class FileAction(BaseModel):
"""文件操作请求"""
action: str # 'move' | 'delete' | 'mkdir' | 'rename'
path: str
target: Optional[str] = None # move/rename 的目标路径
new_name: Optional[str] = None # 新名称
# ========== 工具函数 ==========
def init_base_structure():
"""初始化基础目录结构"""
BASE_DIR.mkdir(exist_ok=True)
# 创建共享目录
(BASE_DIR / "shared").mkdir(exist_ok=True)
(BASE_DIR / "shared" / "examples").mkdir(exist_ok=True)
# 创建临时目录
(BASE_DIR / "temp").mkdir(exist_ok=True)
# 创建用户根目录
(BASE_DIR / "users").mkdir(exist_ok=True)
print(f"✅ 文件系统初始化完成:{BASE_DIR}")
def ensure_user_dir(username: str) -> Path:
"""确保用户目录存在"""
user_dir = BASE_DIR / "users" / username
user_dir.mkdir(parents=True, exist_ok=True)
# 创建用户子目录
(user_dir / "workflows").mkdir(exist_ok=True)
(user_dir / "data").mkdir(exist_ok=True)
(user_dir / "results").mkdir(exist_ok=True)
return user_dir
def sanitize_path(path_str: str) -> Path:
"""
清理路径,防止目录遍历攻击
返回相对于 BASE_DIR 的安全路径
"""
# 移除开头的斜杠,转换为相对路径
path_str = path_str.lstrip("/\\")
# 解析为绝对路径
full_path = (BASE_DIR / path_str).resolve()
# 确保路径在 BASE_DIR 内
try:
full_path.relative_to(BASE_DIR.resolve())
except ValueError:
raise HTTPException(status_code=403, detail="无效的路径")
return full_path
def get_file_info(path: Path) -> FileInfo:
"""获取文件/文件夹信息"""
stat = path.stat()
return FileInfo(
name=path.name,
path=str(path.relative_to(BASE_DIR)).replace("\\", "/"),
type="folder" if path.is_dir() else "file",
size=stat.st_size if path.is_file() else None,
modified=datetime.fromtimestamp(stat.st_mtime).isoformat(),
extension=path.suffix if path.is_file() else None
)
# ========== API 端点 ==========
@router.get("/list")
async def list_files(
path: str = Query("", description="目录路径,空字符串表示根目录"),
username: Optional[str] = Query(None, description="当前用户名")
) -> Dict[str, Any]:
"""
列出指定目录下的文件和文件夹
"""
try:
# 如果提供了用户名,确保用户目录存在
if username:
ensure_user_dir(username)
# 获取目标路径
if not path or path == "/":
target_path = BASE_DIR
else:
target_path = sanitize_path(path)
if not target_path.exists():
raise HTTPException(status_code=404, detail="路径不存在")
if not target_path.is_dir():
raise HTTPException(status_code=400, detail="路径不是目录")
# 获取所有文件和文件夹
items: List[FileInfo] = []
for item in sorted(target_path.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())):
if item.name.startswith("__"):
continue # 跳过隐藏文件
try:
items.append(get_file_info(item))
except Exception as e:
print(f"警告:无法读取 {item}: {e}")
return {
"path": str(target_path.relative_to(BASE_DIR)).replace("\\", "/") or "/",
"items": [item.dict() for item in items]
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取目录失败: {str(e)}")
@router.post("/upload")
async def upload_file(
file: UploadFile = File(...),
path: str = Query("", description="上传目录路径"),
username: Optional[str] = Query(None, description="当前用户名")
) -> Dict[str, Any]:
"""
上传文件到指定目录
"""
try:
# 确保用户目录存在
if username:
ensure_user_dir(username)
# 获取目标目录
if not path or path == "/":
target_dir = BASE_DIR
else:
target_dir = sanitize_path(path)
if not target_dir.exists():
target_dir.mkdir(parents=True)
if not target_dir.is_dir():
raise HTTPException(status_code=400, detail="目标路径不是目录")
# 保存文件
file_path = target_dir / file.filename
# 如果文件已存在,添加后缀
counter = 1
original_stem = file_path.stem
while file_path.exists():
file_path = target_dir / f"{original_stem}_{counter}{file_path.suffix}"
counter += 1
# 写入文件
with file_path.open("wb") as f:
content = await file.read()
f.write(content)
return {
"success": True,
"file": get_file_info(file_path).dict(),
"message": f"文件上传成功: {file.filename}"
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"上传失败: {str(e)}")
@router.post("/action")
async def file_action(action: FileAction) -> Dict[str, Any]:
"""
执行文件操作:移动、删除、创建目录、重命名
"""
try:
source_path = sanitize_path(action.path)
if action.action == "delete":
if not source_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
if source_path.is_dir():
shutil.rmtree(source_path)
else:
source_path.unlink()
return {"success": True, "message": f"已删除: {source_path.name}"}
elif action.action == "mkdir":
new_dir = source_path / (action.new_name or "新文件夹")
counter = 1
original_name = new_dir.name
while new_dir.exists():
new_dir = source_path / f"{original_name}_{counter}"
counter += 1
new_dir.mkdir(parents=True)
return {
"success": True,
"folder": get_file_info(new_dir).dict(),
"message": f"已创建文件夹: {new_dir.name}"
}
elif action.action == "rename":
if not action.new_name:
raise HTTPException(status_code=400, detail="缺少新名称")
if not source_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
new_path = source_path.parent / action.new_name
if new_path.exists():
raise HTTPException(status_code=400, detail="目标名称已存在")
source_path.rename(new_path)
return {
"success": True,
"file": get_file_info(new_path).dict(),
"message": f"已重命名为: {action.new_name}"
}
elif action.action == "move":
if not action.target:
raise HTTPException(status_code=400, detail="缺少目标路径")
if not source_path.exists():
raise HTTPException(status_code=404, detail="源文件不存在")
target_path = sanitize_path(action.target)
if not target_path.is_dir():
raise HTTPException(status_code=400, detail="目标路径不是目录")
new_path = target_path / source_path.name
shutil.move(str(source_path), str(new_path))
return {
"success": True,
"file": get_file_info(new_path).dict(),
"message": f"已移动到: {action.target}"
}
else:
raise HTTPException(status_code=400, detail=f"未知操作: {action.action}")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"操作失败: {str(e)}")
@router.get("/download")
async def download_file(path: str = Query(..., description="文件路径")):
"""
下载文件
"""
from fastapi.responses import FileResponse
try:
file_path = sanitize_path(path)
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
if not file_path.is_file():
raise HTTPException(status_code=400, detail="路径不是文件")
return FileResponse(
path=str(file_path),
filename=file_path.name,
media_type="application/octet-stream"
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"下载失败: {str(e)}")
@router.get("/info")
async def get_file_info_api(path: str = Query(..., description="文件路径")) -> FileInfo:
"""
获取文件详细信息
"""
try:
file_path = sanitize_path(path)
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
return get_file_info(file_path)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取信息失败: {str(e)}")