Files
hospital_performance/backend/init_test_data.py
2026-02-28 15:06:52 +08:00

565 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
测试数据初始化脚本
包含:
1. 科室类型BSC维度权重配置初始化
2. 满意度调查问卷示例数据
3. 评分方法示例数据
"""
import asyncio
import sys
import os
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from sqlalchemy import select
from app.core.database import async_session_maker
from app.models.models import (
DeptType, DeptTypeDimensionWeight, BSCDimension,
Survey, SurveyQuestion, SurveyStatus, SurveyType, QuestionType,
Indicator, IndicatorType, IndicatorTemplate, TemplateIndicator, TemplateType,
Department, Staff
)
from app.services.dimension_weight_service import DEFAULT_WEIGHTS
import json
async def init_dimension_weights():
"""初始化科室类型BSC维度权重配置"""
print("=" * 50)
print("初始化科室类型BSC维度权重配置...")
print("=" * 50)
async with async_session_maker() as db:
for dept_type, weights in DEFAULT_WEIGHTS.items():
# 检查是否已存在
result = await db.execute(
select(DeptTypeDimensionWeight)
.where(DeptTypeDimensionWeight.dept_type == dept_type)
)
existing = result.scalar_one_or_none()
if existing:
print(f" {dept_type.value}: 已存在,跳过")
continue
config = DeptTypeDimensionWeight(
dept_type=dept_type,
financial_weight=weights["financial"],
customer_weight=weights["customer"],
internal_process_weight=weights["internal_process"],
learning_growth_weight=weights["learning_growth"],
description=weights.get("description", "")
)
db.add(config)
print(f" {dept_type.value}: 财务{weights['financial']*100}%, 客户{weights['customer']*100}%, 流程{weights['internal_process']*100}%, 学习{weights['learning_growth']*100}%")
await db.commit()
print("科室类型BSC维度权重配置初始化完成\n")
async def init_sample_surveys():
"""初始化满意度调查问卷示例数据"""
print("=" * 50)
print("初始化满意度调查问卷示例数据...")
print("=" * 50)
async with async_session_maker() as db:
# 检查是否已存在问卷
result = await db.execute(select(Survey))
if result.scalars().first():
print(" 已存在问卷数据,跳过初始化\n")
return
# 创建住院患者满意度问卷
survey1 = Survey(
survey_name="住院患者满意度调查问卷",
survey_code="INPATIENT_001",
survey_type=SurveyType.INPATIENT,
description="用于评估住院患者对医院服务的满意度",
status=SurveyStatus.PUBLISHED,
is_anonymous=True,
total_questions=10
)
db.add(survey1)
await db.flush()
# 添加问卷题目
questions1 = [
{
"question_text": "您对入院手续办理的便捷程度是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 1
},
{
"question_text": "您对病房环境的整洁舒适程度是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 2
},
{
"question_text": "您对医生的服务态度是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 3
},
{
"question_text": "您对护士的护理服务是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 4
},
{
"question_text": "您对医生的技术水平是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 5
},
{
"question_text": "您对医院餐饮服务质量是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 6
},
{
"question_text": "您对检查检验等候时间是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 7
},
{
"question_text": "您对出院结算流程是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 8
},
{
"question_text": "您对医院整体服务是否满意?",
"question_type": QuestionType.SINGLE_CHOICE,
"options": json.dumps([
{"label": "非常满意", "value": "5", "score": 5},
{"label": "满意", "value": "4", "score": 4},
{"label": "一般", "value": "3", "score": 3},
{"label": "不满意", "value": "2", "score": 2},
{"label": "非常不满意", "value": "1", "score": 1}
], ensure_ascii=False),
"score_max": 5,
"sort_order": 9
},
{
"question_text": "您对医院有什么意见或建议?",
"question_type": QuestionType.TEXT,
"is_required": False,
"sort_order": 10
}
]
for q in questions1:
question = SurveyQuestion(
survey_id=survey1.id,
question_text=q["question_text"],
question_type=q["question_type"],
options=q.get("options"),
score_max=q.get("score_max", 5),
is_required=q.get("is_required", True),
sort_order=q["sort_order"]
)
db.add(question)
print(f" 创建问卷: {survey1.survey_name}")
# 创建门诊患者满意度问卷
survey2 = Survey(
survey_name="门诊患者满意度调查问卷",
survey_code="OUTPATIENT_001",
survey_type=SurveyType.OUTPATIENT,
description="用于评估门诊患者对医院服务的满意度",
status=SurveyStatus.PUBLISHED,
is_anonymous=True,
total_questions=8
)
db.add(survey2)
await db.flush()
questions2 = [
{
"question_text": "您对挂号流程的便捷程度是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 1
},
{
"question_text": "您对候诊时间是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 2
},
{
"question_text": "您对医生的诊疗服务是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 3
},
{
"question_text": "您对药房取药流程是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 4
},
{
"question_text": "您对收费处服务是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 5
},
{
"question_text": "您对门诊环境是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 6
},
{
"question_text": "您对门诊整体服务是否满意?",
"question_type": QuestionType.SINGLE_CHOICE,
"options": json.dumps([
{"label": "非常满意", "value": "5", "score": 5},
{"label": "满意", "value": "4", "score": 4},
{"label": "一般", "value": "3", "score": 3},
{"label": "不满意", "value": "2", "score": 2},
{"label": "非常不满意", "value": "1", "score": 1}
], ensure_ascii=False),
"score_max": 5,
"sort_order": 7
},
{
"question_text": "您对医院有什么意见或建议?",
"question_type": QuestionType.TEXT,
"is_required": False,
"sort_order": 8
}
]
for q in questions2:
question = SurveyQuestion(
survey_id=survey2.id,
question_text=q["question_text"],
question_type=q["question_type"],
options=q.get("options"),
score_max=q.get("score_max", 5),
is_required=q.get("is_required", True),
sort_order=q["sort_order"]
)
db.add(question)
print(f" 创建问卷: {survey2.survey_name}")
# 创建科室间满意度问卷
survey3 = Survey(
survey_name="科室间协作满意度调查",
survey_code="INTERNAL_001",
survey_type=SurveyType.DEPARTMENT,
description="用于评估科室之间的协作配合满意度",
status=SurveyStatus.DRAFT,
is_anonymous=False,
total_questions=6
)
db.add(survey3)
await db.flush()
questions3 = [
{
"question_text": "您对该科室的工作响应速度是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 1
},
{
"question_text": "您对该科室的工作配合度是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 2
},
{
"question_text": "您对该科室的服务态度是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 3
},
{
"question_text": "您对该科室的工作质量是否满意?",
"question_type": QuestionType.SCORE,
"score_max": 5,
"sort_order": 4
},
{
"question_text": "您对该科室整体协作是否满意?",
"question_type": QuestionType.SINGLE_CHOICE,
"options": json.dumps([
{"label": "非常满意", "value": "5", "score": 5},
{"label": "满意", "value": "4", "score": 4},
{"label": "一般", "value": "3", "score": 3},
{"label": "不满意", "value": "2", "score": 2},
{"label": "非常不满意", "value": "1", "score": 1}
], ensure_ascii=False),
"score_max": 5,
"sort_order": 5
},
{
"question_text": "您对该科室有什么建议?",
"question_type": QuestionType.TEXT,
"is_required": False,
"sort_order": 6
}
]
for q in questions3:
question = SurveyQuestion(
survey_id=survey3.id,
question_text=q["question_text"],
question_type=q["question_type"],
options=q.get("options"),
score_max=q.get("score_max", 5),
is_required=q.get("is_required", True),
sort_order=q["sort_order"]
)
db.add(question)
print(f" 创建问卷: {survey3.survey_name}")
await db.commit()
print("满意度调查问卷示例数据初始化完成!\n")
async def init_sample_survey_responses():
"""初始化满意度调查回答示例数据"""
print("=" * 50)
print("初始化满意度调查回答示例数据...")
print("=" * 50)
async with async_session_maker() as db:
# 获取问卷
result = await db.execute(
select(Survey).where(Survey.survey_code == "INPATIENT_001")
)
survey = result.scalar_one_or_none()
if not survey:
print(" 未找到住院患者满意度问卷,跳过\n")
return
# 获取科室列表
dept_result = await db.execute(select(Department))
departments = dept_result.scalars().all()
if not departments:
print(" 未找到科室数据,跳过\n")
return
# 获取问卷题目
q_result = await db.execute(
select(SurveyQuestion)
.where(SurveyQuestion.survey_id == survey.id)
.order_by(SurveyQuestion.sort_order)
)
questions = q_result.scalars().all()
# 为每个科室生成示例回答
from app.models.models import SurveyResponse, SurveyAnswer
import random
for dept in departments[:5]: # 只为前5个科室生成
# 生成5-10条回答
num_responses = random.randint(5, 10)
for _ in range(num_responses):
total_score = 0
max_score = 0
response = SurveyResponse(
survey_id=survey.id,
department_id=dept.id,
respondent_type="patient"
)
db.add(response)
await db.flush()
# 为每个题目生成回答
for q in questions:
if q.question_type == QuestionType.TEXT:
continue
# 随机生成评分(倾向高分)
score = random.choices(
[5, 4, 3, 2, 1],
weights=[50, 30, 15, 3, 2]
)[0]
answer = SurveyAnswer(
response_id=response.id,
question_id=q.id,
answer_value=str(score),
score=score
)
db.add(answer)
total_score += score
max_score += q.score_max
# 更新回答记录得分
response.total_score = total_score
response.max_score = max_score
response.satisfaction_rate = (total_score / max_score * 100) if max_score > 0 else 0
await db.commit()
print(f"{min(len(departments), 5)}个科室生成了满意度调查回答数据")
print("满意度调查回答示例数据初始化完成!\n")
async def init_scoring_method_indicators():
"""初始化评分方法示例指标"""
print("=" * 50)
print("初始化评分方法示例指标...")
print("=" * 50)
async with async_session_maker() as db:
# 检查是否已存在评分方法相关指标
result = await db.execute(
select(Indicator).where(Indicator.code.in_(["SCORE001", "SCORE002", "SCORE003", "SCORE004"]))
)
if result.scalars().first():
print(" 已存在评分方法示例指标,跳过初始化\n")
return
# 创建示例指标,展示各种评分方法
scoring_indicators = [
# 目标参照法示例
Indicator(
name="业务收支结余率",
code="SCORE001",
indicator_type=IndicatorType.QUANTITY,
bs_dimension=BSCDimension.FINANCIAL,
weight=12.6,
max_score=100,
target_value=15.0, # 目标值15%
target_unit="%",
calculation_method="(业务收入 - 业务支出)/业务收入 × 100%",
assessment_method="target_reference", # 目标参照法
data_source="财务系统",
applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]),
is_active=True
),
# 区间法-趋高指标示例
Indicator(
name="人均收支结余",
code="SCORE002",
indicator_type=IndicatorType.QUANTITY,
bs_dimension=BSCDimension.FINANCIAL,
weight=16.8,
max_score=100,
target_value=5000, # 基准值
target_unit="",
calculation_method="业务收支结余/平均职工人数",
assessment_method="interval_high", # 区间法-趋高
data_source="财务系统",
applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]),
is_active=True
),
# 区间法-趋低指标示例
Indicator(
name="百元收入耗材率",
code="SCORE003",
indicator_type=IndicatorType.COST,
bs_dimension=BSCDimension.FINANCIAL,
weight=12.6,
max_score=100,
target_value=25.0, # 目标值≤25元
target_unit="",
calculation_method="耗材支出/业务收入 × 100",
assessment_method="interval_low", # 区间法-趋低
data_source="物资系统",
applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]),
is_active=True
),
# 扣分法示例
Indicator(
name="病历质量考核",
code="SCORE004",
indicator_type=IndicatorType.QUALITY,
bs_dimension=BSCDimension.INTERNAL_PROCESS,
weight=3.2,
max_score=100,
target_value=0, # 目标值0无乙级病历
calculation_method="乙级病历每份扣5分丙级病历不得分",
assessment_method="deduction", # 扣分法
deduction_standard="乙级病历每份扣5分",
data_source="病案质控",
applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]),
is_active=True
),
# 加分法示例
Indicator(
name="科研教学工作",
code="SCORE005",
indicator_type=IndicatorType.QUANTITY, # 使用QUANTITY代替
bs_dimension=BSCDimension.LEARNING_GROWTH,
weight=4.5,
max_score=100,
target_value=0,
calculation_method="科研教学成果加分",
assessment_method="bonus", # 加分法
deduction_standard="科研项目立项市级加5分省级加10分国家级加20分\n论文发表核心期刊加5分SCI加10分",
data_source="科教科",
applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward", "medical_tech"]),
is_active=True
),
# 区间法-趋中指标示例
Indicator(
name="平均住院日",
code="SCORE006",
indicator_type=IndicatorType.EFFICIENCY,
bs_dimension=BSCDimension.INTERNAL_PROCESS,
weight=2.0,
max_score=100,
target_value=10.0, # 目标值10天
target_unit="",
calculation_method="出院患者平均住院天数",
assessment_method="interval_center", # 区间法-趋中
data_source="病案统计",
applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]),
is_active=True
),
]
for indicator in scoring_indicators:
db.add(indicator)
print(f" 创建指标: {indicator.name} ({indicator.assessment_method})")
await db.commit()
print("评分方法示例指标初始化完成!\n")
async def main():
"""主函数"""
print("\n" + "=" * 60)
print("开始初始化测试数据...")
print("=" * 60 + "\n")
# 初始化科室类型BSC维度权重配置
await init_dimension_weights()
# 初始化满意度调查问卷示例数据
await init_sample_surveys()
# 初始化满意度调查回答示例数据
await init_sample_survey_responses()
# 初始化评分方法示例指标
await init_scoring_method_indicators()
print("\n" + "=" * 60)
print("测试数据初始化完成!")
print("=" * 60 + "\n")
if __name__ == "__main__":
asyncio.run(main())