TraceStudio-dev/server/app/api/endpoints_upload.py
Boshuang Zhao 44f87d3951 bugfix
2026-01-12 11:13:01 +08:00

237 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文件上传和管理相关 API
复用 server/file_manager.py 的逻辑
"""
from fastapi import APIRouter, UploadFile, File, HTTPException, Query
from fastapi.responses import FileResponse
from pathlib import Path
from typing import Optional
import shutil
from ..core.user_manager import get_user_path, CLOUD_ROOT
from ..core.security import is_safe_path, validate_filename, sanitize_path
router = APIRouter()
@router.get("/files/list")
async def list_files(
path: str = Query("", description="相对路径")
):
"""列出目录内容(展示根目录,所有用户共享)"""
# 清理路径
path = sanitize_path(path)
# 构建目标路径(从根目录开始)
if path:
target_path = CLOUD_ROOT / path
else:
target_path = CLOUD_ROOT
# 安全检查
if not is_safe_path(CLOUD_ROOT, target_path):
raise HTTPException(status_code=403, detail="Access denied")
if not target_path.exists():
target_path.mkdir(parents=True, exist_ok=True)
# 列出目录内容
items = []
try:
for item in target_path.iterdir():
# 统一使用正斜杠格式
if item.name.startswith("__"):
continue # 跳过隐藏文件
relative_path = str(item.relative_to(CLOUD_ROOT)).replace('\\', '/')
items.append({
"name": item.name,
"path": relative_path,
"type": "folder" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else None,
"extension": item.suffix if item.is_file() else None
})
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied")
# 统一使用正斜杠格式
current_path = str(target_path.relative_to(CLOUD_ROOT)).replace('\\', '/') if path else ""
return {
"path": current_path,
"items": sorted(items, key=lambda x: (x["type"] == "file", x["name"]))
}
@router.post("/files/upload")
async def upload_file(
file: UploadFile = File(...),
path: str = Query("", description="目标目录"),
username: Optional[str] = Query(None, description="用户名(可选,用于保存到用户目录)")
):
"""上传文件默认到根目录如指定username则到用户目录"""
# 验证文件名
is_valid, error_msg = validate_filename(file.filename or "")
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
# 清理路径
path = sanitize_path(path)
# 构建目标路径根据是否有username决定
if username:
# 用户目录模式(用于工作流等)
target_dir = get_user_path(username, path)
base_path = get_user_path(username)
else:
# 根目录模式(用于文件管理器)
target_dir = CLOUD_ROOT / path if path else CLOUD_ROOT
base_path = CLOUD_ROOT
target_file = target_dir / (file.filename or "unnamed")
# 安全检查
if not is_safe_path(base_path, target_file):
raise HTTPException(status_code=403, detail="Access denied")
# 确保目录存在
target_dir.mkdir(parents=True, exist_ok=True)
# 保存文件
with open(target_file, "wb") as f:
content = await file.read()
f.write(content)
# 统一使用正斜杠格式
file_path = str(target_file.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {
"success": True,
"file": {
"name": target_file.name,
"path": file_path,
"size": target_file.stat().st_size
}
}
@router.get("/files/download")
async def download_file(path: str = Query(..., description="文件路径")):
"""下载文件"""
# 清理路径
path = sanitize_path(path)
# 构建绝对路径
file_path = CLOUD_ROOT / path
# 安全检查
if not is_safe_path(CLOUD_ROOT, file_path):
raise HTTPException(status_code=403, detail="Access denied")
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=str(file_path),
filename=file_path.name,
media_type="application/octet-stream"
)
@router.get("/files/info")
async def get_file_info(path: str = Query(..., description="文件路径")):
"""获取文件信息(用于前端检查文件是否存在)"""
# 清理路径
path = sanitize_path(path)
# 构建绝对路径
file_path = CLOUD_ROOT / path
# 安全检查
if not is_safe_path(CLOUD_ROOT, file_path):
raise HTTPException(status_code=403, detail="Access denied")
if not file_path.exists():
return {
"exists": False,
"path": path.replace('\\', '/')
}
stat = file_path.stat()
# 统一使用正斜杠格式
file_path_str = str(file_path.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {
"exists": True,
"name": file_path.name,
"path": file_path_str,
"type": "folder" if file_path.is_dir() else "file",
"size": stat.st_size if file_path.is_file() else None,
"modified": stat.st_mtime,
"extension": file_path.suffix if file_path.is_file() else None
}
@router.post("/files/action")
async def file_action(payload: dict):
"""文件操作(删除、移动、创建目录、重命名)"""
action = payload.get("action")
path = sanitize_path(payload.get("path", ""))
if not path:
raise HTTPException(status_code=400, detail="Missing path")
file_path = CLOUD_ROOT / path
# 安全检查
if not is_safe_path(CLOUD_ROOT, file_path):
raise HTTPException(status_code=403, detail="Access denied")
if action == "delete":
if file_path.exists():
if file_path.is_dir():
shutil.rmtree(file_path)
else:
file_path.unlink()
return {"success": True, "message": "Deleted"}
elif action == "mkdir":
new_name = payload.get("new_name")
if not new_name:
raise HTTPException(status_code=400, detail="Missing new_name")
is_valid, error_msg = validate_filename(new_name)
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
new_dir = file_path / new_name
if not is_safe_path(CLOUD_ROOT, new_dir):
raise HTTPException(status_code=403, detail="Access denied")
new_dir.mkdir(parents=True, exist_ok=True)
# 统一使用正斜杠格式
new_dir_path = str(new_dir.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {"success": True, "path": new_dir_path}
elif action == "rename":
new_name = payload.get("new_name")
if not new_name:
raise HTTPException(status_code=400, detail="Missing new_name")
is_valid, error_msg = validate_filename(new_name)
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
new_path = file_path.parent / new_name
if not is_safe_path(CLOUD_ROOT, new_path):
raise HTTPException(status_code=403, detail="Access denied")
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
file_path.rename(new_path)
# 统一使用正斜杠格式
new_path_str = str(new_path.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {"success": True, "path": new_path_str}
else:
raise HTTPException(status_code=400, detail=f"Unknown action: {action}")