TraceStudio-dist/server/app/api/endpoints_upload.py

237 lines
7.7 KiB
Python
Raw Permalink Normal View History

2026-01-13 16:41:31 +08:00
"""
文件上传和管理相关 API
复用 server/file_manager.py 的逻辑
"""
from fastapi import APIRouter, UploadFile, File, HTTPException, Query
from fastapi.responses import FileResponse
from pathlib import Path
from typing import Optional
import shutil
from ..core.user_manager import get_user_path, CLOUD_ROOT
from ..core.security import is_safe_path, validate_filename, sanitize_path
router = APIRouter()
@router.get("/files/list")
async def list_files(
path: str = Query("", description="相对路径")
):
"""列出目录内容(展示根目录,所有用户共享)"""
# 清理路径
path = sanitize_path(path)
# 构建目标路径(从根目录开始)
if path:
target_path = CLOUD_ROOT / path
else:
target_path = CLOUD_ROOT
# 安全检查
if not is_safe_path(CLOUD_ROOT, target_path):
raise HTTPException(status_code=403, detail="Access denied")
if not target_path.exists():
target_path.mkdir(parents=True, exist_ok=True)
# 列出目录内容
items = []
try:
for item in target_path.iterdir():
# 统一使用正斜杠格式
if item.name.startswith("__"):
continue # 跳过隐藏文件
relative_path = str(item.relative_to(CLOUD_ROOT)).replace('\\', '/')
items.append({
"name": item.name,
"path": relative_path,
"type": "folder" if item.is_dir() else "file",
"size": item.stat().st_size if item.is_file() else None,
"extension": item.suffix if item.is_file() else None
})
except PermissionError:
raise HTTPException(status_code=403, detail="Permission denied")
# 统一使用正斜杠格式
current_path = str(target_path.relative_to(CLOUD_ROOT)).replace('\\', '/') if path else ""
return {
"path": current_path,
"items": sorted(items, key=lambda x: (x["type"] == "file", x["name"]))
}
@router.post("/files/upload")
async def upload_file(
file: UploadFile = File(...),
path: str = Query("", description="目标目录"),
username: Optional[str] = Query(None, description="用户名(可选,用于保存到用户目录)")
):
"""上传文件默认到根目录如指定username则到用户目录"""
# 验证文件名
is_valid, error_msg = validate_filename(file.filename or "")
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
# 清理路径
path = sanitize_path(path)
# 构建目标路径根据是否有username决定
if username:
# 用户目录模式(用于工作流等)
target_dir = get_user_path(username, path)
base_path = get_user_path(username)
else:
# 根目录模式(用于文件管理器)
target_dir = CLOUD_ROOT / path if path else CLOUD_ROOT
base_path = CLOUD_ROOT
target_file = target_dir / (file.filename or "unnamed")
# 安全检查
if not is_safe_path(base_path, target_file):
raise HTTPException(status_code=403, detail="Access denied")
# 确保目录存在
target_dir.mkdir(parents=True, exist_ok=True)
# 保存文件
with open(target_file, "wb") as f:
content = await file.read()
f.write(content)
# 统一使用正斜杠格式
file_path = str(target_file.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {
"success": True,
"file": {
"name": target_file.name,
"path": file_path,
"size": target_file.stat().st_size
}
}
@router.get("/files/download")
async def download_file(path: str = Query(..., description="文件路径")):
"""下载文件"""
# 清理路径
path = sanitize_path(path)
# 构建绝对路径
file_path = CLOUD_ROOT / path
# 安全检查
if not is_safe_path(CLOUD_ROOT, file_path):
raise HTTPException(status_code=403, detail="Access denied")
if not file_path.exists() or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(
path=str(file_path),
filename=file_path.name,
media_type="application/octet-stream"
)
@router.get("/files/info")
async def get_file_info(path: str = Query(..., description="文件路径")):
"""获取文件信息(用于前端检查文件是否存在)"""
# 清理路径
path = sanitize_path(path)
# 构建绝对路径
file_path = CLOUD_ROOT / path
# 安全检查
if not is_safe_path(CLOUD_ROOT, file_path):
raise HTTPException(status_code=403, detail="Access denied")
if not file_path.exists():
return {
"exists": False,
"path": path.replace('\\', '/')
}
stat = file_path.stat()
# 统一使用正斜杠格式
file_path_str = str(file_path.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {
"exists": True,
"name": file_path.name,
"path": file_path_str,
"type": "folder" if file_path.is_dir() else "file",
"size": stat.st_size if file_path.is_file() else None,
"modified": stat.st_mtime,
"extension": file_path.suffix if file_path.is_file() else None
}
@router.post("/files/action")
async def file_action(payload: dict):
"""文件操作(删除、移动、创建目录、重命名)"""
action = payload.get("action")
path = sanitize_path(payload.get("path", ""))
if not path:
raise HTTPException(status_code=400, detail="Missing path")
file_path = CLOUD_ROOT / path
# 安全检查
if not is_safe_path(CLOUD_ROOT, file_path):
raise HTTPException(status_code=403, detail="Access denied")
if action == "delete":
if file_path.exists():
if file_path.is_dir():
shutil.rmtree(file_path)
else:
file_path.unlink()
return {"success": True, "message": "Deleted"}
elif action == "mkdir":
new_name = payload.get("new_name")
if not new_name:
raise HTTPException(status_code=400, detail="Missing new_name")
is_valid, error_msg = validate_filename(new_name)
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
new_dir = file_path / new_name
if not is_safe_path(CLOUD_ROOT, new_dir):
raise HTTPException(status_code=403, detail="Access denied")
new_dir.mkdir(parents=True, exist_ok=True)
# 统一使用正斜杠格式
new_dir_path = str(new_dir.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {"success": True, "path": new_dir_path}
elif action == "rename":
new_name = payload.get("new_name")
if not new_name:
raise HTTPException(status_code=400, detail="Missing new_name")
is_valid, error_msg = validate_filename(new_name)
if not is_valid:
raise HTTPException(status_code=400, detail=error_msg)
new_path = file_path.parent / new_name
if not is_safe_path(CLOUD_ROOT, new_path):
raise HTTPException(status_code=403, detail="Access denied")
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
file_path.rename(new_path)
# 统一使用正斜杠格式
new_path_str = str(new_path.relative_to(CLOUD_ROOT)).replace('\\', '/')
return {"success": True, "path": new_path_str}
else:
raise HTTPException(status_code=400, detail=f"Unknown action: {action}")