TraceStudio-dev/server/app/core/serialization.py

83 lines
2.7 KiB
Python
Raw Permalink Normal View History

2026-01-10 19:08:49 +08:00
"""集中序列化工具(用于将执行结果中的复杂对象转换为 JSON 友好的预览格式)。
设计原则
- DataFrame/大对象的序列化集中到一个位置便于维护与优化
- 在同一次进程内尽量避免对相同对象重复序列化基于 id 缓存以减少 CPU/内存开销
"""
from typing import Any, Dict
# 简单的按对象 id 缓存,避免在同一次执行中重复序列化同一对象。
# 这是一个进程级缓存,生命周期与进程相同;适用于短期请求场景。
_OBJ_SERIALIZE_CACHE: Dict[int, Any] = {}
def smart_serialize(value: Any, preview_limit: int = 20) -> Any:
"""Serialize outputs into JSON-friendly, preview-first representation.
- Polars DataFrame -> {"__type":"DataFrame", "columns":..., "preview": [...], "rows": n}
- list/tuple/dict -> recursively serialize
- primitives -> pass through
Caches serialization results per-object-id to avoid duplicate work within
the same process/request.
"""
cache_key = id(value)
# Fast path: return cached serialization when available and object identity matches
cached = _OBJ_SERIALIZE_CACHE.get(cache_key)
if cached is not None and cached[0] is value:
return cached[1]
# Local import to avoid hard dependency at module import time
try:
import polars as pl
except Exception:
pl = None
def _ser(v: Any) -> Any:
# DataFrame
if pl is not None and isinstance(v, pl.DataFrame):
try:
cols = list(v.columns)
preview = v.head(preview_limit).to_dicts()
return {
"__type": "DataFrame",
"columns": cols,
"preview": preview,
"rows": v.height,
}
except Exception:
return {"__type": "DataFrame", "columns": [], "preview": [], "rows": 0}
# list/tuple
if isinstance(v, (list, tuple)):
return [_ser(x) for x in v]
# dict
if isinstance(v, dict):
return {k: _ser(val) for k, val in v.items()}
# primitives
try:
if isinstance(v, (str, int, float, bool)) or v is None:
return v
import numpy as _np
if isinstance(v, (_np.generic,)):
return v.item()
except Exception:
pass
# fallback to string
try:
return str(v)
except Exception:
return None
result = _ser(value)
try:
_OBJ_SERIALIZE_CACHE[cache_key] = (value, result)
except Exception:
# ignore cache failures
pass
return result