#!/usr/bin/env python3 """ 禅道开源版 22.x API 客户端 — HealthLink-HIS 集成 基址: https://zentao.gentronhealth.com/api.php/v2 认证: POST /users/login (account/password) → token (session id) 后续请求通过 Cookie `zentaosid=` 携带 铁律(硬编码): A. 人类提的 Bug → 只加备注,不改状态/分配 B. 智能体提的 Bug → 可改分配 + 加备注 C. 已关闭/已解决 → 只读 D. 修改前先 GET 一次 用法: python zentao_client.py get python zentao_client.py comment "content" python zentao_client.py create --title X --steps Y [--severity 2] [--pri 2] [--assignedTo user] python zentao_client.py update [--status X] [--resolution Y] [--assignedTo Z] python zentao_client.py list [--status active] [--assignedTo user] [--limit 20] """ import argparse import http.cookiejar import json import os import sys import urllib.error import urllib.parse import urllib.request from datetime import datetime from pathlib import Path try: sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") except (AttributeError, OSError): pass SKILL_DIR = Path(__file__).resolve().parent.parent ENV_FILE = SKILL_DIR / ".env" AUDIT_LOG = SKILL_DIR / "audit.log" SESSION_FILE = SKILL_DIR / ".session" def load_env() -> dict: if not ENV_FILE.exists(): fail(f"缺少配置文件: {ENV_FILE}。请参考 SKILL.md 创建 .env") env = {} for line in ENV_FILE.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) env[k.strip()] = v.strip().strip('"').strip("'") required = ["ZENTAO_URL", "ZENTAO_ACCOUNT", "ZENTAO_PASSWORD", "ZENTAO_PRODUCT_ID"] missing = [k for k in required if not env.get(k)] if missing: fail(f"缺少必填配置: {', '.join(missing)}") return env HUMAN_ACCOUNTS = {"admin", "chenqi", "root", "administrator"} AGENT_ACCOUNTS = {"qodercn", "agent-bot", "codex", "claude"} CLOSED_STATUSES = {"closed", "resolved"} def fail(msg: str, code: int = 1): print(json.dumps({"ok": False, "error": msg}, ensure_ascii=False)) sys.exit(code) def ok(data: dict): print(json.dumps({"ok": True, **data}, ensure_ascii=False, indent=2)) def audit(action: str, bug_id, result: str, summary: str = "", user: str = "agent"): ts = datetime.now().isoformat(timespec="seconds") line = f"[{ts}] [{user}] [{action}] [bug:{bug_id}] [{result}] {summary}\n" with AUDIT_LOG.open("a", encoding="utf-8") as f: f.write(line) class ZentaoClient: def __init__(self, env: dict): self.base_url = env["ZENTAO_URL"].rstrip("/") self.api = f"{self.base_url}/api.php/v2" self.account = env["ZENTAO_ACCOUNT"] self.password = env["ZENTAO_PASSWORD"] self.product = int(env["ZENTAO_PRODUCT_ID"]) self.execution = int(env.get("ZENTAO_DEFAULT_EXECUTION", "0") or 0) self.agent_account = env.get("ZENTAO_AGENT_ACCOUNT", "qodercn") self.token = self._load_or_login() # ---------- 认证 ---------- def _load_or_login(self) -> str: if SESSION_FILE.exists(): try: data = json.loads(SESSION_FILE.read_text(encoding="utf-8")) if data.get("account") == self.account and data.get("token"): return data["token"] except Exception: pass return self._login() def _login(self) -> str: payload = {"account": self.account, "password": self.password} body = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( f"{self.api}/users/login", data=body, method="POST" ) req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=30) as r: resp = json.loads(r.read().decode("utf-8")) except urllib.error.HTTPError as e: detail = e.read().decode("utf-8", errors="ignore") fail(f"登录失败 HTTP {e.code}: {detail[:300]}") except urllib.error.URLError as e: fail(f"登录失败 网络错误: {e.reason}") if resp.get("status") != "success" or not resp.get("token"): fail(f"登录失败: {resp}") token = resp["token"] SESSION_FILE.write_text( json.dumps({"account": self.account, "token": token, "ts": datetime.now().isoformat(timespec="seconds")}), encoding="utf-8", ) try: SESSION_FILE.chmod(0o600) except OSError: pass return token def _invalidate(self): if SESSION_FILE.exists(): SESSION_FILE.unlink() # ---------- 请求 ---------- def _req(self, method: str, path: str, payload: dict | None = None, retry: bool = True): url = f"{self.api}/{path.lstrip('/')}" data = json.dumps(payload, ensure_ascii=False).encode("utf-8") if payload else None req = urllib.request.Request(url, data=data, method=method) req.add_header("Content-Type", "application/json") # 开源版 22.x: token 作为 session id 通过 cookie 传递 req.add_header("Cookie", f"zentaosid={self.token}") # 兼容部分部署同时用 Token header req.add_header("Token", self.token) try: with urllib.request.urlopen(req, timeout=30) as r: body = r.read().decode("utf-8") return json.loads(body) if body else {} except urllib.error.HTTPError as e: detail = e.read().decode("utf-8", errors="ignore") if e.code == 401 and retry: self._invalidate() self.token = self._login() return self._req(method, path, payload, retry=False) fail(f"HTTP {e.code}: {detail[:300]}") except urllib.error.URLError as e: fail(f"网络错误: {e.reason}") def get_bug(self, bug_id: int) -> dict: r = self._req("GET", f"/bugs/{bug_id}") if isinstance(r, dict) and "bug" in r and isinstance(r["bug"], dict): return r["bug"] return r def create_bug(self, payload: dict) -> dict: payload.setdefault("product", self.product) if self.execution: payload.setdefault("execution", self.execution) payload.setdefault("openedBy", self.agent_account) return self._req("POST", "/bugs", payload) def update_bug(self, bug_id: int, payload: dict) -> dict: return self._req("PUT", f"/bugs/{bug_id}", payload) def add_comment(self, bug_id: int, comment: str) -> dict: ts = datetime.now().strftime("%Y-%m-%d %H:%M") text = f"[QoderCN @ {ts}] {comment}" # 开源版 22.x 备注走 /bugs/{id}/comment return self._req("POST", f"/bugs/{bug_id}/comment", {"comment": text, "action": "comment"}) def list_bugs(self, params: dict) -> dict: qs = urllib.parse.urlencode({k: v for k, v in params.items() if v is not None}) return self._req("GET", f"/products/{self.product}/bugs?{qs}") def download_images(self, bug_id: int, out_dir: Path) -> list[Path]: """从 Bug 的 steps 字段提取 {fileID} 并下载图片到本地""" import re bug = self.get_bug(bug_id) steps = bug.get("steps", "") or "" seen = set() file_ids = [] for m in re.findall(r"fileID=(\d+)", steps): fid = int(m) if fid not in seen: seen.add(fid) file_ids.append(fid) for m in re.findall(r"\{(\d+)\}", steps): fid = int(m) if fid not in seen: seen.add(fid) file_ids.append(fid) out_dir.mkdir(parents=True, exist_ok=True) saved = [] for fid in file_ids: url = f"{self.api}/files/{fid}/download" req = urllib.request.Request(url, method="GET") req.add_header("Cookie", f"zentaosid={self.token}") req.add_header("Token", self.token) try: with urllib.request.urlopen(req, timeout=60) as r: ctype = r.headers.get("Content-Type", "") data = r.read() except (urllib.error.HTTPError, urllib.error.URLError) as e: audit("download", bug_id, "fail", f"fileID={fid} err={e}") continue ext = "png" if "jpeg" in ctype or "jpg" in ctype: ext = "jpg" elif "gif" in ctype: ext = "gif" elif "webp" in ctype: ext = "webp" elif "html" in ctype or data[:15].lower().startswith(b" {path.name} ({len(data)}B)") return saved def classify(self, bug: dict) -> str: status = (bug.get("status") or "").lower() if status in CLOSED_STATUSES: return "closed" opened = (bug.get("openedBy") or "").lower() if opened in HUMAN_ACCOUNTS: return "human" if opened in AGENT_ACCOUNTS or opened == self.agent_account.lower(): return "agent" return "human" def cmd_get(c: ZentaoClient, args): bug = c.get_bug(args.id) cls = c.classify(bug) ok({"bug": bug, "policy": cls, "id": args.id}) audit("get", args.id, "ok", f"policy={cls}") def cmd_comment(c: ZentaoClient, args): c.get_bug(args.id) c.add_comment(args.id, args.content) ok({"id": args.id, "action": "comment"}) audit("comment", args.id, "ok", args.content[:80]) def cmd_create(c: ZentaoClient, args): payload = { "title": args.title, "steps": args.steps, "severity": args.severity, "pri": args.pri, "type": "codeerror", } if args.assignedTo: payload["assignedTo"] = args.assignedTo if args.module: payload["module"] = args.module r = c.create_bug(payload) new_id = r.get("id") or r.get("bugID") ok({"id": new_id, "action": "create", "raw": r}) audit("create", new_id, "ok", args.title[:60]) def cmd_update(c: ZentaoClient, args): bug = c.get_bug(args.id) cls = c.classify(bug) payload = {} if args.status: payload["status"] = args.status if args.resolution: payload["resolution"] = args.resolution if args.assignedTo: payload["assignedTo"] = args.assignedTo if cls == "closed": fail(f"铁律C: Bug #{args.id} 状态为 {bug.get('status')},禁止修改") if cls == "human" and (payload.get("status") or payload.get("assignedTo")): fail(f"铁律A: Bug #{args.id} 由人类提出,仅允许 comment;拒绝修改 status/assignedTo") c.update_bug(args.id, payload) ok({"id": args.id, "action": "update", "policy": cls, "fields": list(payload.keys())}) audit("update", args.id, "ok", f"policy={cls} fields={list(payload.keys())}") def cmd_list(c: ZentaoClient, args): params = { "status": args.status, "assignedTo": args.assignedTo, "limit": args.limit or 20, "page": 1, } r = c.list_bugs(params) ok({"bugs": r.get("bugs", r), "total": r.get("total", "?")}) audit("list", "-", "ok", f"status={args.status}") def cmd_describe(c: ZentaoClient, args): bug = c.get_bug(args.id) img_dir = SKILL_DIR / "images" / str(args.id) saved = c.download_images(args.id, img_dir) ok({ "id": args.id, "title": bug.get("title", ""), "status": bug.get("status", ""), "openedBy": bug.get("openedBy", ""), "policy": c.classify(bug), "image_count": len(saved), "images": [str(p) for p in saved], "steps_plain": (bug.get("steps", "") or "")[:500], "hint": "Use QoderCN Read tool on each image path above to get visual analysis", }) audit("describe", args.id, "ok", f"images={len(saved)}") def main(): p = argparse.ArgumentParser() sub = p.add_subparsers(dest="cmd", required=True) sp = sub.add_parser("get"); sp.add_argument("id", type=int) sp = sub.add_parser("comment"); sp.add_argument("id", type=int); sp.add_argument("content") sp = sub.add_parser("create") sp.add_argument("--title", required=True); sp.add_argument("--steps", required=True) sp.add_argument("--severity", type=int, default=2) sp.add_argument("--pri", type=int, default=2) sp.add_argument("--assignedTo"); sp.add_argument("--module", type=int, default=0) sp = sub.add_parser("update"); sp.add_argument("id", type=int) sp.add_argument("--status"); sp.add_argument("--resolution"); sp.add_argument("--assignedTo") sp = sub.add_parser("list") sp.add_argument("--status", default="active") sp.add_argument("--assignedTo"); sp.add_argument("--limit", type=int, default=20) sp = sub.add_parser("describe", help="下载 Bug 截图到本地,供 QoderCN Read 工具识图") sp.add_argument("id", type=int) args = p.parse_args() env = load_env() c = ZentaoClient(env) {"get": cmd_get, "comment": cmd_comment, "create": cmd_create, "update": cmd_update, "list": cmd_list, "describe": cmd_describe}[args.cmd](c, args) if __name__ == "__main__": main()