197 lines
7.6 KiB
Markdown
197 lines
7.6 KiB
Markdown
|
|
---
|
|||
|
|
title: "Workflow Execution Results Structure - Redundancy & Complexity"
|
|||
|
|
created: 2026-01-10
|
|||
|
|
author: copilot
|
|||
|
|
labels: [bug, api, design, discussion]
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 问题报告:工作流执行结果的数据结构冗余与复杂性
|
|||
|
|
|
|||
|
|
摘要:
|
|||
|
|
- 后端在生成并返回工作流执行报告时保留了多套数据(`node_infos`、`node_results`、并在 API 层做额外合并与序列化),导致返回体中字段重复、语义混淆、序列化逻辑分散,增加前端解析复杂度与潜在性能问题。
|
|||
|
|
|
|||
|
|
## 1) 核心矛盾
|
|||
|
|
- 后端为了兼容历史实现同时保留新的执行上下文,当前返回给前端的执行报告既包含 `node_infos` 又包含 `node_results`(并在 API 层再次合并/序列化),导致字段冗余、语义重复、序列化逻辑被多处重复实现。
|
|||
|
|
|
|||
|
|
## 2) 关键代码片段
|
|||
|
|
- 执行器在 `_get_execution_report` 中返回:
|
|||
|
|
|
|||
|
|
```py
|
|||
|
|
"node_infos": { node_id: info.to_dict() for node_id, info in self.context.node_infos.items() },
|
|||
|
|
"node_results": self.context.node_results
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
- API 层(`endpoints_graph.execute`)对 report 做两种兼容解析并合并后序列化:
|
|||
|
|
|
|||
|
|
```py
|
|||
|
|
if 'node_infos' in report or 'node_results' in report:
|
|||
|
|
node_infos = report.get('node_infos', {}) or {}
|
|||
|
|
node_results = report.get('node_results', {}) or {}
|
|||
|
|
elif isinstance(report, dict):
|
|||
|
|
node_infos = report
|
|||
|
|
node_results = {}
|
|||
|
|
|
|||
|
|
for nid in all_node_ids:
|
|||
|
|
info = node_infos.get(nid)
|
|||
|
|
outputs = node_results.get(nid)
|
|||
|
|
ser_outputs = serialize(outputs) # polars.DataFrame -> preview
|
|||
|
|
results_out[nid] = {
|
|||
|
|
'status': _get_status(info),
|
|||
|
|
'outputs': ser_outputs,
|
|||
|
|
'error': _get_error(info)
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 3) 技术限制与风险
|
|||
|
|
- 冗余存储:`NodeExecutionInfo.result`、`context.node_results[node_id]`、以及 API 层 `results_out` 三处可含相同数据。
|
|||
|
|
- 性能与带宽:DataFrame/大对象可能被多次序列化,产生额外 CPU 与内存消耗。
|
|||
|
|
- 语义不一致:前端需同时解析多套字段,提升错误率。
|
|||
|
|
|
|||
|
|
## 4) 可决策方案
|
|||
|
|
|
|||
|
|
### 方案 A(推荐)—— 单一标准响应(canonical schema)
|
|||
|
|
- 描述:强制执行一个统一的响应结构:顶层 `nodes` 映射,每个 node 包含 `status,duration,cache_hit,error,outputs,meta`。
|
|||
|
|
- 优点:清晰、便于文档化与版本化、降低重复序列化。
|
|||
|
|
- 缺点:需要一次性后端变更与前端适配窗口。
|
|||
|
|
|
|||
|
|
### 方案 B(渐进兼容)—— 分层输出(summary/results)
|
|||
|
|
- 描述:API 层导出 `summary`(轻量)与 `results`(可选详细),通过参数控制是否返回详细 outputs(如 `?full=true`)。
|
|||
|
|
- 优点:低破坏、可渐进迁移。
|
|||
|
|
- 缺点:仍需前端适配 `full` 策略。
|
|||
|
|
|
|||
|
|
### 方案 C(重构)—— 大数据 artifact 存储与引用
|
|||
|
|
- 描述:将完整的大对象(DataFrame)写入临时 artifact 存储,响应仅包含 `preview` 与 `artifact_id`。
|
|||
|
|
- 优点:解决大 payload 问题,支持按需拉取。
|
|||
|
|
- 缺点:实现复杂(存储、TTL、权限)。
|
|||
|
|
|
|||
|
|
## 5) 立刻可执行的短期修复
|
|||
|
|
- 把 DataFrame 序列化抽成单一函数并确保只执行一次。
|
|||
|
|
- 在响应中避免重复包含相同数据(检测并合并重复字段)。
|
|||
|
|
- 添加 `api_schema_version` 字段并在返回 header 中标注,便于兼容管理。
|
|||
|
|
|
|||
|
|
## 6) 建议与决策请求给 Gemini
|
|||
|
|
1. 是否接受 breaking-change(方案 A)并安排前端迁移窗口?
|
|||
|
|
2. 是否采用渐进方案(方案 B)为过渡,并设定过渡期(例如 2 个版本)?
|
|||
|
|
3. 是否在短期内实现 artifact 存储(方案 C 的关键部分)?
|
|||
|
|
|
|||
|
|
## 7) 建议的 canonical schema(示例)
|
|||
|
|
```json
|
|||
|
|
{
|
|||
|
|
"status": "success",
|
|||
|
|
"execution_id": "uuid",
|
|||
|
|
"execution_time": 0.123,
|
|||
|
|
"nodes": {
|
|||
|
|
"node1": {
|
|||
|
|
"status": "success",
|
|||
|
|
"duration": 0.032,
|
|||
|
|
"cache_hit": false,
|
|||
|
|
"error": null,
|
|||
|
|
"outputs": {
|
|||
|
|
"out": {
|
|||
|
|
"__type": "DataFrame",
|
|||
|
|
"columns": ["a","b"],
|
|||
|
|
"preview": [{"a":1,"b":2}],
|
|||
|
|
"rows": 1234
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
"meta": { "class_name": "SomeNode", "node_type": "normal" }
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
```
|
|||
|
|
|
|||
|
|
## 8) 后续动作(会议后)
|
|||
|
|
- 如果选 A:制定迁移计划(API version、前端适配、回退策略)。
|
|||
|
|
- 如果选 B:实现 `summary/results` 与 `?full` 控制,并优化序列化集中点。
|
|||
|
|
- 如果选 C:实现 artifact 存储与权限管理。
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
title: "Refactor: Unified Execution Protocol & Compatibility Cleanup"
|
|||
|
|
created: 2026-01-10
|
|||
|
|
author: gemini
|
|||
|
|
labels: [architecture, breaking-change, cleanup, protocol]
|
|||
|
|
---
|
|||
|
|
|
|||
|
|
# 🧠 《Copilot 开发指导手册》:执行引擎标准化重构
|
|||
|
|
|
|||
|
|
## 1. 功能摘要
|
|||
|
|
废除 `node_infos` 与 `node_results` 的冗余双轨数据流。强制推行以节点为中心的 **Canonical Schema** 作为唯一真相来源,彻底清理开发期遗留的兼容性逻辑(如旧版 Input/Output 节点分支),确保执行引擎职责单一化。
|
|||
|
|
|
|||
|
|
## 2. 核心矛盾
|
|||
|
|
后端为了兼容历史实现,当前返回的报告中字段重复且语义混淆。API 层(`endpoints_graph.execute`)被迫进行繁琐的合并与二次序列化,这增加了前端解析难度并导致大型对象(如 DataFrame)的性能损耗。
|
|||
|
|
|
|||
|
|
## 3. 逻辑契约 (Canonical Schema)
|
|||
|
|
`WorkflowExecutor._get_execution_report` 必须输出如下扁平化结构:
|
|||
|
|
|
|||
|
|
{
|
|||
|
|
"success": "boolean",
|
|||
|
|
"execution_id": "string (uuid)",
|
|||
|
|
"total_duration": "float",
|
|||
|
|
"nodes": {
|
|||
|
|
"{node_id}": {
|
|||
|
|
"status": "success | error | pending",
|
|||
|
|
"metrics": {
|
|||
|
|
"duration": "float",
|
|||
|
|
"cache_hit": "boolean"
|
|||
|
|
},
|
|||
|
|
"outputs": {
|
|||
|
|
"{port_name}": {
|
|||
|
|
"__type": "DataFrame | any",
|
|||
|
|
"preview": "array",
|
|||
|
|
"shape": "tuple"
|
|||
|
|
}
|
|||
|
|
},
|
|||
|
|
"error": "string | null",
|
|||
|
|
"meta": {
|
|||
|
|
"class_name": "string",
|
|||
|
|
"node_type": "string"
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
## 4. 实现提示 (Implementation Hints)
|
|||
|
|
* **彻底移除兼容性方法**:删除已在 `workflow_executor.py` 头部声明弃用的 `_execute_input_node` 和 `_execute_output_node` 内部方法。
|
|||
|
|
* **清理 API 层冗余**:重构 `endpoints_graph.py`,直接透传执行器结果,禁止使用 `if 'node_infos' in report` 等逻辑做降级解析。
|
|||
|
|
* **数据精炼原则**:在执行器内部实现 `_smart_serialize`,针对 Polars DataFrame 等对象强制执行采样预览(如 `.head(5)`),贯彻“采样预览,全量生产”的核心原则。
|
|||
|
|
|
|||
|
|
## 5. 核心函数重构示例 (WorkflowExecutor)
|
|||
|
|
|
|||
|
|
def _get_execution_report(self) -> Dict:
|
|||
|
|
"""
|
|||
|
|
[utrace 架构决策]:废除双轨制,执行数据精炼采样。
|
|||
|
|
"""
|
|||
|
|
if not self.context:
|
|||
|
|
return {"success": False, "error": "Context not initialized"}
|
|||
|
|
|
|||
|
|
nodes_map = {}
|
|||
|
|
for nid, info in self.context.node_infos.items():
|
|||
|
|
# 合并执行上下文与结果
|
|||
|
|
raw_res = self.context.node_results.get(nid, {})
|
|||
|
|
raw_outputs = raw_res.get("outputs", {})
|
|||
|
|
|
|||
|
|
nodes_map[nid] = {
|
|||
|
|
"status": info.status.value,
|
|||
|
|
"metrics": {
|
|||
|
|
"duration": round(info.duration, 4),
|
|||
|
|
"cache_hit": info.cache_hit
|
|||
|
|
},
|
|||
|
|
"outputs": self._smart_serialize(raw_outputs),
|
|||
|
|
"error": info.error,
|
|||
|
|
"meta": {
|
|||
|
|
"class_name": self.graph.nodes[nid].get("class_name"),
|
|||
|
|
"node_type": info.node_type
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"success": all(n["status"] == "success" for n in nodes_map.values()),
|
|||
|
|
"execution_id": self.context.execution_id,
|
|||
|
|
"nodes": nodes_map
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
---
|
|||
|
|
**utrace 架构审计批注**:
|
|||
|
|
TraceStudio 坚持“逻辑即文档”。本次重构是消除系统熵增的关键一步。所有不符合此 Schema 的前端调用均视为待修复 Bug,而非后端需要兼容的对象。
|