TraceStudio/server/app/nodes/trace_loader_nodes.py
2026-01-13 00:29:18 +08:00

265 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Trace 文件加载节点
用于从 Unreal Insights 导出和加载 Trace 数据
"""
import time
import polars as pl
from server.app.core.user_manager import CLOUD_ROOT
import server.app.utils.io as io
import subprocess
from pathlib import Path
from typing import Any, Dict, Optional
from server.app.core.node_base import (
TraceNode,
input_port,
output_port,
param,
context_var,
NodeType,
CachePolicy
)
from server.app.core.node_registry import register_node
from server.app.services.agent_client import AgentClient
import yaml
def load_system_config():
"""加载系统配置"""
config_path = Path(__file__).parent.parent.parent / "system_config.yaml"
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
return {}
def _agent_client() -> AgentClient:
return AgentClient.from_env(container_cloud_root=CLOUD_ROOT)
@register_node
class TraceLoader_Metadata(TraceNode):
"""
Trace 元数据加载器
功能:
1. 调用 UnrealInsights.exe 打开 .utrace 文件
2. 执行 ExportMetadata 命令导出 CSV
3. 读取 CSV 文件并加载到内存
4. 返回 DataFrame 格式的数据表
使用场景:
- 分析 Trace 文件的元数据信息
- 作为数据处理工作流的起点
"""
CATEGORY = "Loader"
DISPLAY_NAME = "Trace 元数据加载器"
DESCRIPTION = "从 .utrace 文件导出并加载元数据"
ICON = "📥"
NODE_TYPE = NodeType.INPUT # 输入节点(无输入端口)
CACHE_POLICY = CachePolicy.DISK # 使用磁盘缓存(元数据较大)
@output_port("metadata", "DataTable", description="导出的元数据表")
@param("utrace_file", "String", default="", description="Utrace 文件路径", widget="file", required=True)
@param("tokens", "String", description="Token事件列表以 | 进行分割")
def process(self, inputs: Dict[str, Any], context: Optional[Dict] = None) -> Dict[str, Any]:
# 获取参数
tokens = self.get_param("tokens", "")
utrace_file = self.get_param("utrace_file", "")
output_csv = self.get_param("output_csv", "temp/metadata.csv")
print("TraceLoader_Metadata: process called with utrace_file =", utrace_file, ", tokens =", tokens)
if not utrace_file:
raise ValueError("必须指定 utrace_file 参数")
# 加载系统配置
config = load_system_config()
insights_path = config.get("unreal", {}).get("insights_path", "")
# 构建完整路径
# utrace 文件路径(用户目录下)
if not insights_path or not Path(insights_path).exists():
raise FileNotFoundError(
f"UnrealInsights.exe 未找到:{insights_path}\n"
f"请在 system_config.yaml 中配置正确的 insights_path"
)
utrace_full_path = CLOUD_ROOT / utrace_file
if not utrace_full_path.exists():
raise FileNotFoundError(f"Utrace 文件不存在:{utrace_full_path}")
# 构建输出 CSV 完整路径(容器内路径 + 宿主机路径,宿主机路径给 Agent 用)
csv_full_path = CLOUD_ROOT / output_csv
agent = _agent_client()
host_csv_full_path = agent.map_cloud_to_host(output_csv)
# 确保输出目录存在容器内Agent 写宿主路径时也会自动创建
csv_full_path.parent.mkdir(parents=True, exist_ok=True)
io.safe_delete(csv_full_path)
# 通过 Agent 调用宿主机 UnrealInsights
host_utrace_full_path = agent.map_cloud_to_host(utrace_full_path)
cmd_args = [
f'-OpenTraceFile="{host_utrace_full_path}"',
f'-ExecOnAnalysisCompleteCmd="TimingInsights.ExportMetadata {host_csv_full_path} -tokens={tokens}"'
]
print("TraceLoader_Metadata: call agent with", cmd_args)
resp = agent.run("unreal_insights", args=cmd_args, timeout=None)
if not resp.get("success", False):
raise RuntimeError(
f"UnrealInsights 通过 Agent 执行失败 (code={resp.get('return_code')})\n"
f"stdout: {resp.get('stdout','')}\n"
f"stderr: {resp.get('stderr','')}"
)
if not csv_full_path.exists():
raise FileNotFoundError(
f"CSV 文件未生成:{csv_full_path}\n"
f"Agent stdout: {resp.get('stdout','')}\n"
f"Agent stderr: {resp.get('stderr','')}"
)
# 读取 CSV 文件
df = pl.read_csv(csv_full_path)
# 准备输出
outputs = {
"metadata": df,
"file_path": str(csv_full_path)
}
#io.safe_delete(csv_full_path)
return outputs
@register_node
class TraceLoader_Events(TraceNode):
"""
Trace 事件加载器
功能:
导出 Trace 文件中的事件数据Timing Events
"""
CATEGORY = "Loader"
DISPLAY_NAME = "Trace 事件加载器"
DESCRIPTION = "从 .utrace 文件导出并加载事件数据"
ICON = "⏱️"
NODE_TYPE = NodeType.INPUT
CACHE_POLICY = CachePolicy.DISK
@output_port("events", "DataTable", description="导出的事件数据表")
@output_port("file_path", "String", description="导出的 CSV 文件路径")
@param("utrace_file", "String", default="", description="Utrace 文件路径", widget="file", required=True)
@param("output_csv", "String", default="temp/events.csv", description="输出 CSV 路径", required=True)
@param("timeout", "Number", default=120, description="命令执行超时时间(秒)", min=10, max=600, step=10)
@param("auto_cleanup", "Boolean", default=False, description="执行后自动删除 CSV 文件")
@context_var("row_count", "Integer", description="事件数量")
@context_var("column_count", "Integer", description="列数")
@context_var("execution_time", "Number", description="执行时间(秒)")
def process(self, inputs: Dict[str, Any], context: Optional[Dict] = None) -> Dict[str, Any]:
start_time = time.time()
# 获取参数
utrace_file = self.get_param("utrace_file", "")
output_csv = self.get_param("output_csv", "temp/events.csv")
timeout = self.get_param("timeout", 120)
auto_cleanup = self.get_param("auto_cleanup", False)
if not utrace_file:
raise ValueError("必须指定 utrace_file 参数")
# 加载系统配置
config = load_system_config()
insights_path = config.get("unreal", {}).get("insights_path", "")
cloud_root = Path(config.get("storage", {}).get("cloud_root", "./cloud"))
user_id = (context or {}).get("user_id", "guest")
# 构建路径
if not insights_path or not Path(insights_path).exists():
raise FileNotFoundError(f"UnrealInsights.exe 未找到:{insights_path}")
if Path(utrace_file).is_absolute():
utrace_full_path = Path(utrace_file)
else:
utrace_full_path = cloud_root / "users" / user_id / utrace_file
if not utrace_full_path.exists():
raise FileNotFoundError(f"Utrace 文件不存在:{utrace_full_path}")
agent = _agent_client()
if Path(output_csv).is_absolute():
csv_full_path = Path(output_csv)
host_csv_full_path = agent.map_cloud_to_host(output_csv)
else:
csv_full_path = cloud_root / output_csv
host_csv_full_path = agent.map_cloud_to_host(output_csv)
csv_full_path.parent.mkdir(parents=True, exist_ok=True)
# 通过 Agent 执行导出
host_utrace_full_path = agent.map_cloud_to_host(utrace_full_path)
cmd_args = [
f'-OpenTraceFile="{host_utrace_full_path}"',
f'-ExecOnAnalysisCompleteCmd="TimingInsights.ExportTimingEvents {host_csv_full_path}"'
]
try:
resp = agent.run("unreal_insights", args=cmd_args, timeout=timeout)
if not resp.get("success", False):
raise RuntimeError(
f"UnrealInsights 通过 Agent 执行失败 (code={resp.get('return_code')})\n"
f"stdout: {resp.get('stdout','')}\n"
f"stderr: {resp.get('stderr','')}"
)
# 等待文件生成
wait_time = 0
while not csv_full_path.exists() and wait_time < 5:
time.sleep(0.5)
wait_time += 0.5
if not csv_full_path.exists():
raise FileNotFoundError(
f"CSV 文件未生成:{csv_full_path}\n"
f"Agent stdout: {resp.get('stdout','')}\n"
f"Agent stderr: {resp.get('stderr','')}"
)
# 读取 CSV使用 polars保持后端一致
df = pl.read_csv(csv_full_path)
execution_time = time.time() - start_time
outputs = {
"events": df,
"file_path": str(csv_full_path)
}
result_context = {
"row_count": len(df),
"column_count": len(df.columns),
"execution_time": round(execution_time, 2)
}
if auto_cleanup:
try:
csv_full_path.unlink()
result_context["cleaned_up"] = True
except Exception as e:
result_context["cleanup_error"] = str(e)
return outputs
except subprocess.TimeoutExpired:
raise TimeoutError(f"命令执行超时(超过 {timeout} 秒)")
except Exception as e:
if csv_full_path.exists():
try:
csv_full_path.unlink()
except:
pass
raise