64 lines
1.6 KiB
Python
64 lines
1.6 KiB
Python
import asyncio
|
|
import time
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Ensure server root is on sys.path so `app` package can be imported when running tests directly
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from app.core.workflow_executor import WorkflowExecutor
|
|
from app.core.node_registry import register_node, NodeRegistry
|
|
from app.core.node_base import TraceNode, NodeType, CachePolicy
|
|
|
|
|
|
# Simple test node
|
|
@register_node
|
|
class AddNode(TraceNode):
|
|
CATEGORY = "Math/Basic"
|
|
DISPLAY_NAME = "Add"
|
|
NODE_TYPE = NodeType.NORMAL
|
|
CACHE_POLICY = CachePolicy.MEMORY
|
|
|
|
InputSpec = {
|
|
"a": ("Number", {"required": True}),
|
|
"b": ("Number", {"required": True})
|
|
}
|
|
OutputSpec = {
|
|
"result": ("Number", {})
|
|
}
|
|
|
|
def process(self, inputs, context=None):
|
|
a = inputs.get("a", 0)
|
|
b = inputs.get("b", 0)
|
|
return {"result": a + b}
|
|
|
|
|
|
async def run_executor_test():
|
|
executor = WorkflowExecutor(user_id="test_user")
|
|
nodes = [
|
|
{"id": "n1", "type": "AddNode", "class_name": "AddNode", "params": {}},
|
|
]
|
|
edges = []
|
|
|
|
success, report = await executor.execute(nodes=nodes, edges=edges, global_context={})
|
|
print("success:", success)
|
|
print("report:", report)
|
|
|
|
assert success is True
|
|
assert "nodes" in report
|
|
assert "n1" in report["nodes"]
|
|
n1 = report["nodes"]["n1"]
|
|
assert n1["status"] == "success"
|
|
assert "outputs" in n1
|
|
assert "result" in n1["outputs"] or isinstance(n1["outputs"], dict)
|
|
|
|
|
|
def test_executor():
|
|
asyncio.run(run_executor_test())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(run_executor_test())
|