Files
his/.qoder/skills/zentao/scripts/zentao_client.py

361 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
禅道开源版 22.x API 客户端 — HealthLink-HIS 集成
基址: https://zentao.gentronhealth.com/api.php/v2
认证: POST /users/login (account/password) → token (session id)
后续请求通过 Cookie `zentaosid=<token>` 携带
铁律(硬编码):
A. 人类提的 Bug → 只加备注,不改状态/分配
B. 智能体提的 Bug → 可改分配 + 加备注
C. 已关闭/已解决 → 只读
D. 修改前先 GET 一次
用法:
python zentao_client.py get <id>
python zentao_client.py comment <id> "content"
python zentao_client.py create --title X --steps Y [--severity 2] [--pri 2] [--assignedTo user]
python zentao_client.py update <id> [--status X] [--resolution Y] [--assignedTo Z]
python zentao_client.py list [--status active] [--assignedTo user] [--limit 20]
"""
import argparse
import http.cookiejar
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime
from pathlib import Path
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
pass
SKILL_DIR = Path(__file__).resolve().parent.parent
ENV_FILE = SKILL_DIR / ".env"
AUDIT_LOG = SKILL_DIR / "audit.log"
SESSION_FILE = SKILL_DIR / ".session"
def load_env() -> dict:
if not ENV_FILE.exists():
fail(f"缺少配置文件: {ENV_FILE}。请参考 SKILL.md 创建 .env")
env = {}
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
env[k.strip()] = v.strip().strip('"').strip("'")
required = ["ZENTAO_URL", "ZENTAO_ACCOUNT", "ZENTAO_PASSWORD", "ZENTAO_PRODUCT_ID"]
missing = [k for k in required if not env.get(k)]
if missing:
fail(f"缺少必填配置: {', '.join(missing)}")
return env
HUMAN_ACCOUNTS = {"admin", "chenqi", "root", "administrator"}
AGENT_ACCOUNTS = {"qodercn", "agent-bot", "codex", "claude"}
CLOSED_STATUSES = {"closed", "resolved"}
def fail(msg: str, code: int = 1):
print(json.dumps({"ok": False, "error": msg}, ensure_ascii=False))
sys.exit(code)
def ok(data: dict):
print(json.dumps({"ok": True, **data}, ensure_ascii=False, indent=2))
def audit(action: str, bug_id, result: str, summary: str = "", user: str = "agent"):
ts = datetime.now().isoformat(timespec="seconds")
line = f"[{ts}] [{user}] [{action}] [bug:{bug_id}] [{result}] {summary}\n"
with AUDIT_LOG.open("a", encoding="utf-8") as f:
f.write(line)
class ZentaoClient:
def __init__(self, env: dict):
self.base_url = env["ZENTAO_URL"].rstrip("/")
self.api = f"{self.base_url}/api.php/v2"
self.account = env["ZENTAO_ACCOUNT"]
self.password = env["ZENTAO_PASSWORD"]
self.product = int(env["ZENTAO_PRODUCT_ID"])
self.execution = int(env.get("ZENTAO_DEFAULT_EXECUTION", "0") or 0)
self.agent_account = env.get("ZENTAO_AGENT_ACCOUNT", "qodercn")
self.token = self._load_or_login()
# ---------- 认证 ----------
def _load_or_login(self) -> str:
if SESSION_FILE.exists():
try:
data = json.loads(SESSION_FILE.read_text(encoding="utf-8"))
if data.get("account") == self.account and data.get("token"):
return data["token"]
except Exception:
pass
return self._login()
def _login(self) -> str:
payload = {"account": self.account, "password": self.password}
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
f"{self.api}/users/login", data=body, method="POST"
)
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
resp = json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="ignore")
fail(f"登录失败 HTTP {e.code}: {detail[:300]}")
except urllib.error.URLError as e:
fail(f"登录失败 网络错误: {e.reason}")
if resp.get("status") != "success" or not resp.get("token"):
fail(f"登录失败: {resp}")
token = resp["token"]
SESSION_FILE.write_text(
json.dumps({"account": self.account, "token": token,
"ts": datetime.now().isoformat(timespec="seconds")}),
encoding="utf-8",
)
try:
SESSION_FILE.chmod(0o600)
except OSError:
pass
return token
def _invalidate(self):
if SESSION_FILE.exists():
SESSION_FILE.unlink()
# ---------- 请求 ----------
def _req(self, method: str, path: str, payload: dict | None = None, retry: bool = True):
url = f"{self.api}/{path.lstrip('/')}"
data = json.dumps(payload, ensure_ascii=False).encode("utf-8") if payload else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Content-Type", "application/json")
# 开源版 22.x: token 作为 session id 通过 cookie 传递
req.add_header("Cookie", f"zentaosid={self.token}")
# 兼容部分部署同时用 Token header
req.add_header("Token", self.token)
try:
with urllib.request.urlopen(req, timeout=30) as r:
body = r.read().decode("utf-8")
return json.loads(body) if body else {}
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="ignore")
if e.code == 401 and retry:
self._invalidate()
self.token = self._login()
return self._req(method, path, payload, retry=False)
fail(f"HTTP {e.code}: {detail[:300]}")
except urllib.error.URLError as e:
fail(f"网络错误: {e.reason}")
def get_bug(self, bug_id: int) -> dict:
r = self._req("GET", f"/bugs/{bug_id}")
if isinstance(r, dict) and "bug" in r and isinstance(r["bug"], dict):
return r["bug"]
return r
def create_bug(self, payload: dict) -> dict:
payload.setdefault("product", self.product)
if self.execution:
payload.setdefault("execution", self.execution)
payload.setdefault("openedBy", self.agent_account)
return self._req("POST", "/bugs", payload)
def update_bug(self, bug_id: int, payload: dict) -> dict:
return self._req("PUT", f"/bugs/{bug_id}", payload)
def add_comment(self, bug_id: int, comment: str) -> dict:
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
text = f"[QoderCN @ {ts}] {comment}"
# 开源版 22.x 备注走 /bugs/{id}/comment
return self._req("POST", f"/bugs/{bug_id}/comment",
{"comment": text, "action": "comment"})
def list_bugs(self, params: dict) -> dict:
qs = urllib.parse.urlencode({k: v for k, v in params.items() if v is not None})
return self._req("GET", f"/products/{self.product}/bugs?{qs}")
def download_images(self, bug_id: int, out_dir: Path) -> list[Path]:
"""从 Bug 的 steps 字段提取 {fileID} 并下载图片到本地"""
import re
bug = self.get_bug(bug_id)
steps = bug.get("steps", "") or ""
seen = set()
file_ids = []
for m in re.findall(r"fileID=(\d+)", steps):
fid = int(m)
if fid not in seen:
seen.add(fid)
file_ids.append(fid)
for m in re.findall(r"\{(\d+)\}", steps):
fid = int(m)
if fid not in seen:
seen.add(fid)
file_ids.append(fid)
out_dir.mkdir(parents=True, exist_ok=True)
saved = []
for fid in file_ids:
url = f"{self.api}/files/{fid}/download"
req = urllib.request.Request(url, method="GET")
req.add_header("Cookie", f"zentaosid={self.token}")
req.add_header("Token", self.token)
try:
with urllib.request.urlopen(req, timeout=60) as r:
ctype = r.headers.get("Content-Type", "")
data = r.read()
except (urllib.error.HTTPError, urllib.error.URLError) as e:
audit("download", bug_id, "fail", f"fileID={fid} err={e}")
continue
ext = "png"
if "jpeg" in ctype or "jpg" in ctype:
ext = "jpg"
elif "gif" in ctype:
ext = "gif"
elif "webp" in ctype:
ext = "webp"
elif "html" in ctype or data[:15].lower().startswith(b"<!doctype"):
audit("download", bug_id, "fail", f"fileID={fid} got HTML instead of image")
continue
path = out_dir / f"bug{bug_id}_file{fid}.{ext}"
path.write_bytes(data)
saved.append(path)
audit("download", bug_id, "ok", f"fileID={fid} -> {path.name} ({len(data)}B)")
return saved
def classify(self, bug: dict) -> str:
status = (bug.get("status") or "").lower()
if status in CLOSED_STATUSES:
return "closed"
opened = (bug.get("openedBy") or "").lower()
if opened in HUMAN_ACCOUNTS:
return "human"
if opened in AGENT_ACCOUNTS or opened == self.agent_account.lower():
return "agent"
return "human"
def cmd_get(c: ZentaoClient, args):
bug = c.get_bug(args.id)
cls = c.classify(bug)
ok({"bug": bug, "policy": cls, "id": args.id})
audit("get", args.id, "ok", f"policy={cls}")
def cmd_comment(c: ZentaoClient, args):
c.get_bug(args.id)
c.add_comment(args.id, args.content)
ok({"id": args.id, "action": "comment"})
audit("comment", args.id, "ok", args.content[:80])
def cmd_create(c: ZentaoClient, args):
payload = {
"title": args.title,
"steps": args.steps,
"severity": args.severity,
"pri": args.pri,
"type": "codeerror",
}
if args.assignedTo:
payload["assignedTo"] = args.assignedTo
if args.module:
payload["module"] = args.module
r = c.create_bug(payload)
new_id = r.get("id") or r.get("bugID")
ok({"id": new_id, "action": "create", "raw": r})
audit("create", new_id, "ok", args.title[:60])
def cmd_update(c: ZentaoClient, args):
bug = c.get_bug(args.id)
cls = c.classify(bug)
payload = {}
if args.status:
payload["status"] = args.status
if args.resolution:
payload["resolution"] = args.resolution
if args.assignedTo:
payload["assignedTo"] = args.assignedTo
if cls == "closed":
fail(f"铁律C: Bug #{args.id} 状态为 {bug.get('status')},禁止修改")
if cls == "human" and (payload.get("status") or payload.get("assignedTo")):
fail(f"铁律A: Bug #{args.id} 由人类提出,仅允许 comment拒绝修改 status/assignedTo")
c.update_bug(args.id, payload)
ok({"id": args.id, "action": "update", "policy": cls, "fields": list(payload.keys())})
audit("update", args.id, "ok", f"policy={cls} fields={list(payload.keys())}")
def cmd_list(c: ZentaoClient, args):
params = {
"status": args.status,
"assignedTo": args.assignedTo,
"limit": args.limit or 20,
"page": 1,
}
r = c.list_bugs(params)
ok({"bugs": r.get("bugs", r), "total": r.get("total", "?")})
audit("list", "-", "ok", f"status={args.status}")
def cmd_describe(c: ZentaoClient, args):
bug = c.get_bug(args.id)
img_dir = SKILL_DIR / "images" / str(args.id)
saved = c.download_images(args.id, img_dir)
ok({
"id": args.id,
"title": bug.get("title", ""),
"status": bug.get("status", ""),
"openedBy": bug.get("openedBy", ""),
"policy": c.classify(bug),
"image_count": len(saved),
"images": [str(p) for p in saved],
"steps_plain": (bug.get("steps", "") or "")[:500],
"hint": "Use QoderCN Read tool on each image path above to get visual analysis",
})
audit("describe", args.id, "ok", f"images={len(saved)}")
def main():
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
sp = sub.add_parser("get"); sp.add_argument("id", type=int)
sp = sub.add_parser("comment"); sp.add_argument("id", type=int); sp.add_argument("content")
sp = sub.add_parser("create")
sp.add_argument("--title", required=True); sp.add_argument("--steps", required=True)
sp.add_argument("--severity", type=int, default=2)
sp.add_argument("--pri", type=int, default=2)
sp.add_argument("--assignedTo"); sp.add_argument("--module", type=int, default=0)
sp = sub.add_parser("update"); sp.add_argument("id", type=int)
sp.add_argument("--status"); sp.add_argument("--resolution"); sp.add_argument("--assignedTo")
sp = sub.add_parser("list")
sp.add_argument("--status", default="active")
sp.add_argument("--assignedTo"); sp.add_argument("--limit", type=int, default=20)
sp = sub.add_parser("describe", help="下载 Bug 截图到本地,供 QoderCN Read 工具识图")
sp.add_argument("id", type=int)
args = p.parse_args()
env = load_env()
c = ZentaoClient(env)
{"get": cmd_get, "comment": cmd_comment, "create": cmd_create,
"update": cmd_update, "list": cmd_list, "describe": cmd_describe}[args.cmd](c, args)
if __name__ == "__main__":
main()