fix(security): 添加VITE_PAYMENT_URL环境变量配置

This commit is contained in:
2026-06-18 21:29:41 +08:00
parent 3d977d0a2d
commit 8afeb2e4d9
160 changed files with 21893 additions and 0 deletions

View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3
"""
禅道开源版 22.x API 客户端 — HealthLink-HIS 集成
基址: https://zentao.gentronhealth.com/api.php/v2
认证: POST /users/login (account/password) → token (session id)
后续请求通过 Cookie `zentaosid=<token>` 携带
铁律(硬编码):
A. 人类提的 Bug → 只加备注,不改状态/分配
B. 智能体提的 Bug → 可改分配 + 加备注
C. 已关闭/已解决 → 只读
D. 修改前先 GET 一次
用法:
python zentao_client.py get <id>
python zentao_client.py comment <id> "content"
python zentao_client.py create --title X --steps Y [--severity 2] [--pri 2] [--assignedTo user]
python zentao_client.py update <id> [--status X] [--resolution Y] [--assignedTo Z]
python zentao_client.py list [--status active] [--assignedTo user] [--limit 20]
"""
import argparse
import http.cookiejar
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime
from pathlib import Path
try:
sys.stdout.reconfigure(encoding="utf-8")
sys.stderr.reconfigure(encoding="utf-8")
except (AttributeError, OSError):
pass
SKILL_DIR = Path(__file__).resolve().parent.parent
ENV_FILE = SKILL_DIR / ".env"
AUDIT_LOG = SKILL_DIR / "audit.log"
SESSION_FILE = SKILL_DIR / ".session"
def load_env() -> dict:
if not ENV_FILE.exists():
fail(f"缺少配置文件: {ENV_FILE}。请参考 SKILL.md 创建 .env")
env = {}
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
env[k.strip()] = v.strip().strip('"').strip("'")
required = ["ZENTAO_URL", "ZENTAO_ACCOUNT", "ZENTAO_PASSWORD", "ZENTAO_PRODUCT_ID"]
missing = [k for k in required if not env.get(k)]
if missing:
fail(f"缺少必填配置: {', '.join(missing)}")
return env
HUMAN_ACCOUNTS = {"admin", "chenqi", "root", "administrator"}
AGENT_ACCOUNTS = {"qodercn", "agent-bot", "codex", "claude"}
CLOSED_STATUSES = {"closed", "resolved"}
def fail(msg: str, code: int = 1):
print(json.dumps({"ok": False, "error": msg}, ensure_ascii=False))
sys.exit(code)
def ok(data: dict):
print(json.dumps({"ok": True, **data}, ensure_ascii=False, indent=2))
def audit(action: str, bug_id, result: str, summary: str = "", user: str = "agent"):
ts = datetime.now().isoformat(timespec="seconds")
line = f"[{ts}] [{user}] [{action}] [bug:{bug_id}] [{result}] {summary}\n"
with AUDIT_LOG.open("a", encoding="utf-8") as f:
f.write(line)
class ZentaoClient:
def __init__(self, env: dict):
self.base_url = env["ZENTAO_URL"].rstrip("/")
self.api = f"{self.base_url}/api.php/v2"
self.account = env["ZENTAO_ACCOUNT"]
self.password = env["ZENTAO_PASSWORD"]
self.product = int(env["ZENTAO_PRODUCT_ID"])
self.execution = int(env.get("ZENTAO_DEFAULT_EXECUTION", "0") or 0)
self.agent_account = env.get("ZENTAO_AGENT_ACCOUNT", "qodercn")
self.token = self._load_or_login()
# ---------- 认证 ----------
def _load_or_login(self) -> str:
if SESSION_FILE.exists():
try:
data = json.loads(SESSION_FILE.read_text(encoding="utf-8"))
if data.get("account") == self.account and data.get("token"):
return data["token"]
except Exception:
pass
return self._login()
def _login(self) -> str:
payload = {"account": self.account, "password": self.password}
body = json.dumps(payload, ensure_ascii=False).encode("utf-8")
req = urllib.request.Request(
f"{self.api}/users/login", data=body, method="POST"
)
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as r:
resp = json.loads(r.read().decode("utf-8"))
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="ignore")
fail(f"登录失败 HTTP {e.code}: {detail[:300]}")
except urllib.error.URLError as e:
fail(f"登录失败 网络错误: {e.reason}")
if resp.get("status") != "success" or not resp.get("token"):
fail(f"登录失败: {resp}")
token = resp["token"]
SESSION_FILE.write_text(
json.dumps({"account": self.account, "token": token,
"ts": datetime.now().isoformat(timespec="seconds")}),
encoding="utf-8",
)
try:
SESSION_FILE.chmod(0o600)
except OSError:
pass
return token
def _invalidate(self):
if SESSION_FILE.exists():
SESSION_FILE.unlink()
# ---------- 请求 ----------
def _req(self, method: str, path: str, payload: dict | None = None, retry: bool = True):
url = f"{self.api}/{path.lstrip('/')}"
data = json.dumps(payload, ensure_ascii=False).encode("utf-8") if payload else None
req = urllib.request.Request(url, data=data, method=method)
req.add_header("Content-Type", "application/json")
# 开源版 22.x: token 作为 session id 通过 cookie 传递
req.add_header("Cookie", f"zentaosid={self.token}")
# 兼容部分部署同时用 Token header
req.add_header("Token", self.token)
try:
with urllib.request.urlopen(req, timeout=30) as r:
body = r.read().decode("utf-8")
return json.loads(body) if body else {}
except urllib.error.HTTPError as e:
detail = e.read().decode("utf-8", errors="ignore")
if e.code == 401 and retry:
self._invalidate()
self.token = self._login()
return self._req(method, path, payload, retry=False)
fail(f"HTTP {e.code}: {detail[:300]}")
except urllib.error.URLError as e:
fail(f"网络错误: {e.reason}")
def get_bug(self, bug_id: int) -> dict:
r = self._req("GET", f"/bugs/{bug_id}")
if isinstance(r, dict) and "bug" in r and isinstance(r["bug"], dict):
return r["bug"]
return r
def create_bug(self, payload: dict) -> dict:
payload.setdefault("product", self.product)
if self.execution:
payload.setdefault("execution", self.execution)
payload.setdefault("openedBy", self.agent_account)
return self._req("POST", "/bugs", payload)
def update_bug(self, bug_id: int, payload: dict) -> dict:
return self._req("PUT", f"/bugs/{bug_id}", payload)
def add_comment(self, bug_id: int, comment: str) -> dict:
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
text = f"[QoderCN @ {ts}] {comment}"
# 开源版 22.x 备注走 /bugs/{id}/comment
return self._req("POST", f"/bugs/{bug_id}/comment",
{"comment": text, "action": "comment"})
def list_bugs(self, params: dict) -> dict:
qs = urllib.parse.urlencode({k: v for k, v in params.items() if v is not None})
return self._req("GET", f"/products/{self.product}/bugs?{qs}")
def download_images(self, bug_id: int, out_dir: Path) -> list[Path]:
"""从 Bug 的 steps 字段提取 {fileID} 并下载图片到本地"""
import re
bug = self.get_bug(bug_id)
steps = bug.get("steps", "") or ""
seen = set()
file_ids = []
for m in re.findall(r"fileID=(\d+)", steps):
fid = int(m)
if fid not in seen:
seen.add(fid)
file_ids.append(fid)
for m in re.findall(r"\{(\d+)\}", steps):
fid = int(m)
if fid not in seen:
seen.add(fid)
file_ids.append(fid)
out_dir.mkdir(parents=True, exist_ok=True)
saved = []
for fid in file_ids:
url = f"{self.api}/files/{fid}/download"
req = urllib.request.Request(url, method="GET")
req.add_header("Cookie", f"zentaosid={self.token}")
req.add_header("Token", self.token)
try:
with urllib.request.urlopen(req, timeout=60) as r:
ctype = r.headers.get("Content-Type", "")
data = r.read()
except (urllib.error.HTTPError, urllib.error.URLError) as e:
audit("download", bug_id, "fail", f"fileID={fid} err={e}")
continue
ext = "png"
if "jpeg" in ctype or "jpg" in ctype:
ext = "jpg"
elif "gif" in ctype:
ext = "gif"
elif "webp" in ctype:
ext = "webp"
elif "html" in ctype or data[:15].lower().startswith(b"<!doctype"):
audit("download", bug_id, "fail", f"fileID={fid} got HTML instead of image")
continue
path = out_dir / f"bug{bug_id}_file{fid}.{ext}"
path.write_bytes(data)
saved.append(path)
audit("download", bug_id, "ok", f"fileID={fid} -> {path.name} ({len(data)}B)")
return saved
def classify(self, bug: dict) -> str:
status = (bug.get("status") or "").lower()
if status in CLOSED_STATUSES:
return "closed"
opened = (bug.get("openedBy") or "").lower()
if opened in HUMAN_ACCOUNTS:
return "human"
if opened in AGENT_ACCOUNTS or opened == self.agent_account.lower():
return "agent"
return "human"
def cmd_get(c: ZentaoClient, args):
bug = c.get_bug(args.id)
cls = c.classify(bug)
ok({"bug": bug, "policy": cls, "id": args.id})
audit("get", args.id, "ok", f"policy={cls}")
def cmd_comment(c: ZentaoClient, args):
c.get_bug(args.id)
c.add_comment(args.id, args.content)
ok({"id": args.id, "action": "comment"})
audit("comment", args.id, "ok", args.content[:80])
def cmd_create(c: ZentaoClient, args):
payload = {
"title": args.title,
"steps": args.steps,
"severity": args.severity,
"pri": args.pri,
"type": "codeerror",
}
if args.assignedTo:
payload["assignedTo"] = args.assignedTo
if args.module:
payload["module"] = args.module
r = c.create_bug(payload)
new_id = r.get("id") or r.get("bugID")
ok({"id": new_id, "action": "create", "raw": r})
audit("create", new_id, "ok", args.title[:60])
def cmd_update(c: ZentaoClient, args):
bug = c.get_bug(args.id)
cls = c.classify(bug)
payload = {}
if args.status:
payload["status"] = args.status
if args.resolution:
payload["resolution"] = args.resolution
if args.assignedTo:
payload["assignedTo"] = args.assignedTo
if cls == "closed":
fail(f"铁律C: Bug #{args.id} 状态为 {bug.get('status')},禁止修改")
if cls == "human" and (payload.get("status") or payload.get("assignedTo")):
fail(f"铁律A: Bug #{args.id} 由人类提出,仅允许 comment拒绝修改 status/assignedTo")
c.update_bug(args.id, payload)
ok({"id": args.id, "action": "update", "policy": cls, "fields": list(payload.keys())})
audit("update", args.id, "ok", f"policy={cls} fields={list(payload.keys())}")
def cmd_list(c: ZentaoClient, args):
params = {
"status": args.status,
"assignedTo": args.assignedTo,
"limit": args.limit or 20,
"page": 1,
}
r = c.list_bugs(params)
ok({"bugs": r.get("bugs", r), "total": r.get("total", "?")})
audit("list", "-", "ok", f"status={args.status}")
def cmd_describe(c: ZentaoClient, args):
bug = c.get_bug(args.id)
img_dir = SKILL_DIR / "images" / str(args.id)
saved = c.download_images(args.id, img_dir)
ok({
"id": args.id,
"title": bug.get("title", ""),
"status": bug.get("status", ""),
"openedBy": bug.get("openedBy", ""),
"policy": c.classify(bug),
"image_count": len(saved),
"images": [str(p) for p in saved],
"steps_plain": (bug.get("steps", "") or "")[:500],
"hint": "Use QoderCN Read tool on each image path above to get visual analysis",
})
audit("describe", args.id, "ok", f"images={len(saved)}")
def main():
p = argparse.ArgumentParser()
sub = p.add_subparsers(dest="cmd", required=True)
sp = sub.add_parser("get"); sp.add_argument("id", type=int)
sp = sub.add_parser("comment"); sp.add_argument("id", type=int); sp.add_argument("content")
sp = sub.add_parser("create")
sp.add_argument("--title", required=True); sp.add_argument("--steps", required=True)
sp.add_argument("--severity", type=int, default=2)
sp.add_argument("--pri", type=int, default=2)
sp.add_argument("--assignedTo"); sp.add_argument("--module", type=int, default=0)
sp = sub.add_parser("update"); sp.add_argument("id", type=int)
sp.add_argument("--status"); sp.add_argument("--resolution"); sp.add_argument("--assignedTo")
sp = sub.add_parser("list")
sp.add_argument("--status", default="active")
sp.add_argument("--assignedTo"); sp.add_argument("--limit", type=int, default=20)
sp = sub.add_parser("describe", help="下载 Bug 截图到本地,供 QoderCN Read 工具识图")
sp.add_argument("id", type=int)
args = p.parse_args()
env = load_env()
c = ZentaoClient(env)
{"get": cmd_get, "comment": cmd_comment, "create": cmd_create,
"update": cmd_update, "list": cmd_list, "describe": cmd_describe}[args.cmd](c, args)
if __name__ == "__main__":
main()