pytrace/tests/test_complex_remote_client.py

105 lines
3.8 KiB
Python
Raw Permalink Normal View History

2026-01-22 23:22:01 +08:00
import sys
import os
import time
# 确保 pytrace 在 Python 路径中
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from pytrace.api.node import TraceNode
from pytrace.api.decorators import register_class, register_node, input_port, output_port
from pytrace.runtime.context import TraceContext
from pytrace.remote.worker import TraceWorker, expose_node
# --- 1. 定义可被远程调用的节点 ---
# 使用 @expose_node 将此类暴露给服务器
@expose_node
@register_class(category="ComplexRemote", display_name="Complex Remote Node")
class ComplexRemoteNode(TraceNode):
"""
这个类包含一组用于远程调用的方法节点
每个方法都添加了详细的打印语句方便您在客户端控制台进行调试
"""
@register_node(
class_id="remote.process_text",
display_name="Process Text (Remote)",
description="在客户端处理文本,计算长度并添加前缀。"
)
@input_port(name="text_in", type="string")
@output_port(name="text_out", type="string")
@output_port(name="length", type="integer")
def process_text(self, ctx: TraceContext):
text = ctx.get_input("text_in", "")
print("\n--- [REMOTE] ---")
print(f"Node 'remote.process_text' is executing.")
print(f" - Received input 'text_in': '{text}'")
# 模拟一些耗时操作
time.sleep(0.5)
processed_text = f"processed_by_client:_{text}"
text_length = len(text)
ctx.set_output("text_out", processed_text)
ctx.set_output("length", text_length)
print(f" - Sent output 'text_out': '{processed_text}'")
print(f" - Sent output 'length': {text_length}")
print("--- [REMOTE] ---\
")
@register_node(
class_id="remote.calculate_ratio",
display_name="Calculate Ratio (Remote)",
description="在客户端根据两个数字计算比率。"
)
@input_port(name="numerator", type="float", label="分子")
@input_port(name="denominator", type="float", label="分母")
@output_port(name="ratio", type="float", label="比率")
def calculate_ratio(self, ctx: TraceContext):
numerator = ctx.get_input("numerator", 0.0)
denominator = ctx.get_input("denominator", 1.0)
print("\n--- [REMOTE] ---")
print(f"Node 'remote.calculate_ratio' is executing.")
print(f" - Received input 'numerator': {numerator}")
print(f" - Received input 'denominator': {denominator}")
if denominator == 0:
print(" - Error: Denominator is zero.")
# 在实际应用中可能需要更复杂的错误处理
ratio = 0.0
else:
ratio = numerator / denominator
ctx.set_output("ratio", ratio)
print(f" - Sent output 'ratio': {ratio}")
print("--- [REMOTE] ---\
")
# --- 2. 启动客户端 Worker ---
if __name__ == "__main__":
# 服务器的TCP RPC地址和端口
SERVER_HOST = 'localhost'
SERVER_PORT = 9999 # 这应该与 remote_test_server.py 中的端口一致
print(f"--- Complex Remote Test Client ---")
print(f"Attempting to connect to RPC Server at {SERVER_HOST}:{SERVER_PORT}...")
# 初始化并启动 Worker
# 它会自动查找所有被 @expose_node 装饰的类,并向服务器注册
worker = TraceWorker(host=SERVER_HOST, port=SERVER_PORT)
try:
# worker.start() 是一个阻塞操作,它会保持运行以监听服务器的调用
worker.start()
except KeyboardInterrupt:
print("\n[Client] Worker stopped by user.")
except Exception as e:
print(f"\n[Client] An error occurred: {e}")
print("[Client] Is the test server running?")