Files
his/MD/test/3d_reconstruction_test.py
华佗 aef7fd5c45 feat: 影像3D重建测试数据和测试脚本
测试数据:
- 10个3D重建任务(CT/MR, 胸部/头部/腹部/膝关节/脊柱/骨盆/心脏)
- 6个重建结果(VR/MPR/MIP三种类型)
- 6个重建报告(DRAFT/REPORTED/VERIFIED三种状态)
- 3位患者关联(刘潇凡/豆包/随子赫)

测试3D影像:
- chest_vr_render.ppm (胸部VR容积渲染)
- head_mpr_axial.ppm (头部MPR轴位)
- abdomen_mip_render.ppm (腹部MIP最大密度投影)
- knee_vr_render.ppm (膝关节VR)
- phantom_volume.raw (16x16x16体数据)
- dicom_metadata.json (DICOM元数据)

测试脚本:
- 3d_reconstruction_test.py (37个测试用例, 97.3%通过率)
- 覆盖: 任务管理/结果管理/报告管理/跨模块联动/数据质量

DB修复:
- reconstruction_task/result/report补全HisBaseEntity列
2026-06-08 09:20:33 +08:00

368 lines
17 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
HealthLink-HIS 影像3D重建模块 全链路测试
覆盖: 任务管理 + 结果管理 + 报告管理 + 业务逻辑验证
"""
import requests, json, time, sys, os
from datetime import datetime
BASE = "http://localhost:18082/healthlink-his"
R = []
P = F = 0
DEFECTS = []
def login():
r = requests.post(f"{BASE}/login", json={"username":"admin","password":"admin123","tenantId":"1"}, timeout=10)
return r.json().get("token")
TOKEN = None
def api(method, path, data=None, params=None, timeout=15):
global TOKEN
h = {"Content-Type": "application/json"}
if TOKEN: h["Authorization"] = f"Bearer {TOKEN}"
url = f"{BASE}{path}"
try:
if method == "GET": resp = requests.get(url, headers=h, params=params, timeout=timeout)
elif method == "POST": resp = requests.post(url, headers=h, json=data, timeout=timeout)
elif method == "PUT": resp = requests.put(url, headers=h, json=data, timeout=timeout)
elif method == "DELETE": resp = requests.delete(url, headers=h, timeout=timeout)
else: return None
j = resp.json() if "json" in resp.headers.get("content-type","") else {"code": resp.status_code, "msg": resp.text[:100]}
return {"ok": j.get("code")==200, "code": j.get("code", resp.status_code), "data": j.get("data"), "msg": j.get("msg",""), "raw": j}
except Exception as e:
return {"ok": False, "code": 0, "msg": str(e)[:100], "data": None}
def cnt(r):
if not r or not r.get("data"): return 0
d = r["data"]
if isinstance(d, dict): return d.get("total", len(d.get("records", d.get("rows", d.get("list", [])))))
if isinstance(d, list): return len(d)
return 0
def rec(tid, name, ok, detail=""):
global P, F
if ok: P += 1
else: F += 1
R.append({"id": tid, "name": name, "ok": ok, "detail": detail})
print(f" {'' if ok else ''} [{tid}] {name}" + (f"{detail}" if detail else ""))
def defect(severity, module, title, desc, api_path="", impact=""):
return {"severity": severity, "module": module, "title": title, "desc": desc, "api": api_path, "impact": impact}
# ======================== 1. 重建任务管理 ========================
def test_tasks():
print("\n" + "="*60)
print("🔬 模块一: 3D重建任务管理")
print("="*60)
# 1.1 查询任务列表
r = api("GET", "/reconstruction/task/page", params={"pageNo":1,"pageSize":10})
rec("3D-TASK-LIST", "任务列表", r["ok"], f"任务数={cnt(r)}")
# 1.2 按状态筛选
for status in ["COMPLETED", "PROCESSING", "PENDING", "CANCELLED"]:
r = api("GET", "/reconstruction/task/page", params={"taskStatus":status,"pageNo":1,"pageSize":10})
rec(f"3D-TASK-{status}", f"筛选{status}任务", r["ok"], f"数量={cnt(r)}")
# 1.3 按模态筛选
for modality in ["CT", "MR"]:
r = api("GET", "/reconstruction/task/page", params={"modality":modality,"pageNo":1,"pageSize":10})
rec(f"3D-TASK-MOD-{modality}", f"筛选{modality}任务", r["ok"], f"数量={cnt(r)}")
# 1.4 按患者名搜索
r = api("GET", "/reconstruction/task/page", params={"patientName":"刘潇凡","pageNo":1,"pageSize":10})
rec("3D-TASK-SEARCH", "患者名搜索", r["ok"], f"结果={cnt(r)}")
# 1.5 创建新任务
new_task = {
"patientId": 1980816965970288641,
"patientName": "测试患者",
"studyUid": f"1.2.840.113619.2.55.3.{int(time.time())}",
"modality": "CT",
"bodyPart": "胸部",
"scanRange": "肺尖-肺底",
"reconstructionType": "VR",
"sliceThickness": 1.25,
"pixelSpacing": "0.625x0.625",
"requestDoctor": "测试医生"
}
r = api("POST", "/reconstruction/task/add", data=new_task)
new_task_id = r["data"]["id"] if r["ok"] and r["data"] else None
rec("3D-TASK-ADD", "创建重建任务", r["ok"], f"任务ID={new_task_id}")
# 1.6 查询单个任务
if new_task_id:
r = api("GET", f"/reconstruction/task/{new_task_id}")
task_data = r["data"] if r["ok"] else None
rec("3D-TASK-GET", "查询单个任务", r["ok"] and task_data is not None)
# 验证任务状态流转
if task_data:
status_ok = task_data.get("taskStatus") in ["COMPLETED", "PENDING", "PROCESSING"]
rec("3D-TASK-STATUS", "任务状态验证", status_ok, f"状态={task_data.get('taskStatus')}")
# 1.7 取消任务
r = api("PUT", f"/reconstruction/task/cancel/{new_task_id}" if new_task_id else "/reconstruction/task/cancel/0")
rec("3D-TASK-CANCEL", "取消任务", r["ok"])
# 1.8 业务逻辑: 重建类型完整性
types = {"VR": "容积渲染", "MPR": "多平面重建", "MIP": "最大密度投影"}
for rtype, desc in types.items():
r = api("GET", "/reconstruction/task/page", params={"modality":"CT","pageNo":1,"pageSize":100})
if r["ok"] and r["data"]:
rows = r["data"].get("rows", r["data"].get("list", []))
if isinstance(rows, list):
type_count = sum(1 for t in rows if t.get("reconstructionType") == rtype)
rec(f"3D-TYPE-{rtype}", f"{desc}({rtype})任务", True, f"数量={type_count}")
# ======================== 2. 重建结果管理 ========================
def test_results():
print("\n" + "="*60)
print("📊 模块二: 3D重建结果管理")
print("="*60)
# 2.1 查询已有结果
r = api("GET", "/reconstruction/result/list/9000000001")
rec("3D-RESULT-LIST", "查询重建结果", r["ok"], f"结果数={cnt(r)}")
# 2.2 添加新结果
new_result = {
"taskId": 9000000001,
"resultType": "MPR",
"imagePath": "/data/reconstruction/test/result.png",
"volumeDataPath": "/data/reconstruction/test/volume/",
"measurements": json.dumps({"volume": "3200ml", "density": "0.85g/cm3"}),
"annotations": json.dumps({"finding": "右肺上叶结节"})
}
r = api("POST", "/reconstruction/result/add", data=new_result)
new_result_id = r["data"]["id"] if r["ok"] and r["data"] else None
rec("3D-RESULT-ADD", "添加重建结果", r["ok"], f"结果ID={new_result_id}")
# 2.3 验证结果关联
r = api("GET", f"/reconstruction/result/list/9000000001")
if r["ok"]:
results = r["data"] if isinstance(r["data"], list) else []
rec("3D-RESULT-COUNT", "结果关联验证", len(results) >= 2, f"任务9000000001有{len(results)}个结果")
# 2.4 业务逻辑: 结果类型完整性(跨任务检查)
result_types = ["VR", "MPR", "MIP"]
for rtype in result_types:
# Check across all tasks
r = api("GET", "/reconstruction/task/page", params={"pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", []) if isinstance(r["data"], dict) else []
found = False
for task in rows:
r2 = api("GET", f"/reconstruction/result/list/{task['id']}")
if r2["ok"] and isinstance(r2["data"], list):
if any(res.get("resultType") == rtype for res in r2["data"]):
found = True
break
rec(f"3D-RESULT-TYPE-{rtype}", f"结果类型{rtype}", found)
# ======================== 3. 重建报告管理 ========================
def test_reports():
print("\n" + "="*60)
print("📋 模块三: 3D重建报告管理")
print("="*60)
# 3.1 报告列表
r = api("GET", "/reconstruction/report/page", params={"pageNo":1,"pageSize":10})
rec("3D-RPT-LIST", "报告列表", r["ok"], f"报告数={cnt(r)}")
# 3.2 按状态筛选
for status in ["DRAFT", "REPORTED", "VERIFIED"]:
r = api("GET", "/reconstruction/report/page", params={"status":status,"pageNo":1,"pageSize":10})
rec(f"3D-RPT-{status}", f"筛选{status}报告", r["ok"], f"数量={cnt(r)}")
# 3.3 创建新报告
new_report = {
"taskId": 9000000006,
"patientId": 1979081512436203522,
"encounterId": 3,
"findings": "胸部CT 3D重建示双肺野清晰未见明显异常密度影。",
"impression": "胸部CT未见明显异常",
"conclusion": "建议随访。",
"reportDoctor": "测试医生"
}
r = api("POST", "/reconstruction/report/add", data=new_report)
new_rpt_id = r["data"]["id"] if r["ok"] and r["data"] else None
rec("3D-RPT-ADD", "创建报告", r["ok"], f"报告ID={new_rpt_id}")
# 3.4 提交报告
if new_rpt_id:
r = api("PUT", f"/reconstruction/report/submit/{new_rpt_id}")
rec("3D-RPT-SUBMIT", "提交报告", r["ok"])
# 验证状态变更
r = api("GET", "/reconstruction/report/page", params={"status":"REPORTED","pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
submitted = any(str(rp.get("id")) == str(new_rpt_id) for rp in rows)
rec("3D-RPT-STATUS", "报告状态验证", submitted, f"状态=REPORTED")
# 3.5 审核报告 - 找一个REPORTED状态的报告
r_rpt = api("GET", "/reconstruction/report/page", params={"status":"REPORTED","pageNo":1,"pageSize":1})
if r_rpt["ok"]:
rpt_rows = r_rpt["data"].get("records", []) if isinstance(r_rpt["data"], dict) else []
if rpt_rows:
verify_id = rpt_rows[0]["id"]
r = api("PUT", f"/reconstruction/report/verify/{verify_id}", params={"doctor":"审核医生"})
rec("3D-RPT-VERIFY", "审核报告", r["ok"], f"报告ID={verify_id}")
else:
rec("3D-RPT-VERIFY", "审核报告", False, "无可审核报告")
else:
rec("3D-RPT-VERIFY", "审核报告", False, "查询报告失败")
# 3.6 业务逻辑: 报告完整性检查
r = api("GET", "/reconstruction/report/page", params={"pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
complete_reports = sum(1 for rp in rows
if rp.get("findings") and rp.get("impression") and rp.get("conclusion")
and rp.get("reportDoctor") and rp.get("status") in ["REPORTED", "VERIFIED"])
rec("3D-RPT-COMPLETE", "报告完整性", complete_reports > 0, f"完整报告={complete_reports}")
# ======================== 4. 跨模块联动 ========================
def test_cross_module():
print("\n" + "="*60)
print("🔗 模块四: 跨模块联动验证")
print("="*60)
# 4.1 任务→结果关联
r = api("GET", "/reconstruction/task/page", params={"taskStatus":"COMPLETED","pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
tasks_with_results = 0
for task in rows:
r2 = api("GET", f"/reconstruction/result/list/{task['id']}")
if r2["ok"] and r2["data"] and len(r2["data"]) > 0:
tasks_with_results += 1
rec("3D-CROSS-TASK-RESULT", "任务→结果关联", tasks_with_results > 0, f"有结果的任务={tasks_with_results}/{len(rows)}")
# 4.2 任务→报告关联
r = api("GET", "/reconstruction/report/page", params={"pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
reports_with_task = sum(1 for rp in rows if rp.get("taskId"))
rec("3D-CROSS-RPT-TASK", "报告→任务关联", reports_with_task > 0, f"有任务关联={reports_with_task}")
# 4.3 患者→任务关联
r = api("GET", "/reconstruction/task/page", params={"patientName":"刘潇凡","pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
rec("3D-CROSS-PATIENT", "患者→任务关联", len(rows) > 0, f"刘潇凡的3D任务={len(rows)}")
# 4.4 统计验证
r = api("GET", "/reconstruction/task/page", params={"pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
stats = {}
for task in rows:
s = task.get("taskStatus", "UNKNOWN")
stats[s] = stats.get(s, 0) + 1
rec("3D-STATS", "状态分布统计", True, str(stats))
# ======================== 5. 数据质量验证 ========================
def test_data_quality():
print("\n" + "="*60)
print("🔍 模块五: 数据质量验证")
print("="*60)
# 5.1 任务数据完整性
r = api("GET", "/reconstruction/task/page", params={"pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
fields_check = {"patientName": 0, "modality": 0, "bodyPart": 0, "reconstructionType": 0, "requestDoctor": 0}
for task in rows:
for f in fields_check:
if task.get(f): fields_check[f] += 1
all_ok = all(v > 0 for v in fields_check.values())
detail = " ".join(f"{k}={v}" for k,v in fields_check.items())
rec("3D-DQ-TASK", "任务数据完整性", all_ok, detail)
# 5.2 报告数据质量
r = api("GET", "/reconstruction/report/page", params={"pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
has_findings = sum(1 for rp in rows if rp.get("findings"))
has_impression = sum(1 for rp in rows if rp.get("impression"))
has_conclusion = sum(1 for rp in rows if rp.get("conclusion"))
rec("3D-DQ-RPT", "报告数据质量", has_findings > 0,
f"有描述={has_findings} 有印象={has_impression} 有结论={has_conclusion}")
# 5.3 重建类型覆盖
r = api("GET", "/reconstruction/task/page", params={"pageNo":1,"pageSize":100})
if r["ok"]:
rows = r["data"].get("records", r["data"].get("rows", r["data"].get("list", []))) if isinstance(r["data"], dict) else r["data"]
if isinstance(rows, list):
types = set(t.get("reconstructionType") for t in rows if t.get("reconstructionType"))
expected = {"VR", "MPR", "MIP"}
rec("3D-DQ-TYPE", "重建类型覆盖", types == expected, f"类型={types}")
# ======================== Main ========================
def main():
global TOKEN, P, F
print("="*60)
print("🏥 HealthLink-HIS 影像3D重建模块 全链路测试")
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
TOKEN = login()
if not TOKEN:
print("❌ 登录失败!")
return
test_tasks()
test_results()
test_reports()
test_cross_module()
test_data_quality()
print("\n" + "="*60)
print(f"📊 测试汇总")
print(f" 通过: ✅ {P}")
print(f" 失败: ❌ {F}")
total = P + F
rate = (P / total * 100) if total > 0 else 0
print(f" 通过率: {rate:.1f}% ({P}/{total})")
print("="*60)
if DEFECTS:
print(f"\n🐛 发现缺陷: {len(DEFECTS)}")
for i, d in enumerate(DEFECTS, 1):
sev = {"":"🟠","":"🟡","":"🟢"}.get(d["severity"],"")
print(f" {sev} 缺陷#{i} [{d['severity']}] {d['title']}")
print(f" 模块: {d['module']} | 接口: {d['api']}")
print(f" 描述: {d['desc']}")
# Save report
report = {
"timestamp": datetime.now().isoformat(),
"summary": {"total": total, "passed": P, "failed": F, "passRate": f"{rate:.1f}%"},
"results": R,
"defects": DEFECTS
}
report_dir = os.path.join(os.path.dirname(__file__), "reports")
os.makedirs(report_dir, exist_ok=True)
report_path = os.path.join(report_dir, "3d_reconstruction_report.json")
with open(report_path, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n📄 报告: {report_path}")
return 0 if F == 0 else 1
if __name__ == "__main__":
sys.exit(main())