128 lines
4.9 KiB
Python
128 lines
4.9 KiB
Python
|
|
"""Agent 客户端:通过 HTTP 调用宿主上的 Agent 服务,并提供路径映射工具。"""
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import os
|
|||
|
|
import urllib.request
|
|||
|
|
import urllib.error
|
|||
|
|
from pathlib import Path, PurePosixPath, PureWindowsPath
|
|||
|
|
from typing import Any, Dict, List, Optional, Union, Union
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AgentClient:
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
base_url: str = "http://localhost:8100",
|
|||
|
|
*,
|
|||
|
|
host_cloud_root: Optional[Path | str] = None,
|
|||
|
|
container_cloud_root: Optional[Path | str] = None,
|
|||
|
|
):
|
|||
|
|
self.base_url = base_url.rstrip("/")
|
|||
|
|
self.host_cloud_root = Path(host_cloud_root) if host_cloud_root else None
|
|||
|
|
self.container_cloud_root = Path(container_cloud_root) if container_cloud_root else None
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def from_env(cls, *, container_cloud_root: Optional[Path | str] = None) -> "AgentClient":
|
|||
|
|
base_url = os.environ.get("AGENT_BASE_URL", "http://host.docker.internal:8100")
|
|||
|
|
host_root = os.environ.get("AGENT_HOST_CLOUD_ROOT")
|
|||
|
|
return cls(base_url=base_url, host_cloud_root=host_root, container_cloud_root=container_cloud_root)
|
|||
|
|
|
|||
|
|
# ---------------------- 路径映射 (核心修正) ----------------------
|
|||
|
|
def map_cloud_to_host(self, path: Union[str, Path]) -> str:
|
|||
|
|
"""
|
|||
|
|
【Docker -> Host】
|
|||
|
|
将容器内的路径(/opt/...)转换为宿主机的路径(D:\...)。
|
|||
|
|
返回 str 以便直接传给 Agent API。
|
|||
|
|
"""
|
|||
|
|
# 输入可能是 pathlib.Path (Posix) 或 字符串
|
|||
|
|
p_obj = PurePosixPath(path)
|
|||
|
|
|
|||
|
|
# 如果没有配置映射,直接返回字符串
|
|||
|
|
if not self.container_cloud_root or not self.host_cloud_root:
|
|||
|
|
return str(p_obj)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 1. 计算相对路径 (Linux 逻辑)
|
|||
|
|
# 例如: /opt/tracestudio/cloud/traces/test.utrace -> traces/test.utrace
|
|||
|
|
rel = p_obj.relative_to(self.container_cloud_root)
|
|||
|
|
|
|||
|
|
# 2. 拼接到 Windows 根路径 (Windows 逻辑)
|
|||
|
|
# 例如: D:\XGame\...\cloud + traces/test.utrace -> D:\XGame\...\cloud\traces\test.utrace
|
|||
|
|
return str(self.host_cloud_root / rel)
|
|||
|
|
except ValueError:
|
|||
|
|
# 如果路径不在 container_cloud_root 下,原样返回(可能是绝对路径或其他挂载)
|
|||
|
|
return str(p_obj)
|
|||
|
|
|
|||
|
|
def map_host_to_container(self, path: Union[str, Path]) -> Path:
|
|||
|
|
"""
|
|||
|
|
【Host -> Docker】
|
|||
|
|
将 Agent 返回的 Windows 路径(D:\...)转换为容器内可读的路径(/opt/...)。
|
|||
|
|
返回 Path 对象以便 Python 打开文件。
|
|||
|
|
"""
|
|||
|
|
# 输入是 Windows 格式字符串
|
|||
|
|
p_obj = PureWindowsPath(path)
|
|||
|
|
|
|||
|
|
if not self.host_cloud_root or not self.container_cloud_root:
|
|||
|
|
# 无法映射,尝试直接转换(大概率读不到,但保持类型一致)
|
|||
|
|
return Path(str(p_obj))
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 1. 计算相对路径 (Windows 逻辑)
|
|||
|
|
# Windows 不区分大小写,但 relative_to 默认区分。
|
|||
|
|
# 如果路径来源可靠,通常没问题。
|
|||
|
|
rel = p_obj.relative_to(self.host_cloud_root)
|
|||
|
|
|
|||
|
|
# 2. 拼接到 Linux 容器路径
|
|||
|
|
# 这里的 Path 是具体平台的 Path (在 Docker 里就是 PosixPath)
|
|||
|
|
return Path(self.container_cloud_root / rel)
|
|||
|
|
except ValueError:
|
|||
|
|
return Path(str(p_obj))
|
|||
|
|
|
|||
|
|
# ---------------------- 调用 ----------------------
|
|||
|
|
def run(
|
|||
|
|
self,
|
|||
|
|
tool: str,
|
|||
|
|
args: Optional[List[str]] = None,
|
|||
|
|
*,
|
|||
|
|
workdir: Optional[str] = None,
|
|||
|
|
timeout: Optional[int] = None,
|
|||
|
|
env: Optional[Dict[str, str]] = None,
|
|||
|
|
capture_output: Optional[bool] = None,
|
|||
|
|
strip_output: Optional[bool] = None,
|
|||
|
|
) -> Dict[str, Any]:
|
|||
|
|
payload = {
|
|||
|
|
"tool": tool,
|
|||
|
|
"args": args or [],
|
|||
|
|
}
|
|||
|
|
if workdir:
|
|||
|
|
payload["workdir"] = workdir
|
|||
|
|
if timeout is not None:
|
|||
|
|
payload["timeout"] = timeout
|
|||
|
|
if env:
|
|||
|
|
payload["env"] = env
|
|||
|
|
if capture_output is not None:
|
|||
|
|
payload["capture_output"] = capture_output
|
|||
|
|
if strip_output is not None:
|
|||
|
|
payload["strip_output"] = strip_output
|
|||
|
|
|
|||
|
|
url = f"{self.base_url}/run"
|
|||
|
|
data = json.dumps(payload).encode("utf-8")
|
|||
|
|
req = urllib.request.Request(
|
|||
|
|
url,
|
|||
|
|
data=data,
|
|||
|
|
headers={"Content-Type": "application/json"},
|
|||
|
|
method="POST",
|
|||
|
|
)
|
|||
|
|
try:
|
|||
|
|
with urllib.request.urlopen(req, timeout=timeout or 30) as resp:
|
|||
|
|
body = resp.read().decode("utf-8")
|
|||
|
|
return json.loads(body)
|
|||
|
|
except urllib.error.HTTPError as e:
|
|||
|
|
detail = e.read().decode("utf-8") if e.fp else str(e)
|
|||
|
|
raise RuntimeError(f"Agent HTTP {e.code}: {detail}") from e
|
|||
|
|
except Exception as e:
|
|||
|
|
raise RuntimeError(f"Agent 调用失败: {e}") from e
|
|||
|
|
|
|||
|
|
|
|||
|
|
__all__ = ["AgentClient"]
|