""" 测试数据初始化脚本 包含: 1. 科室类型BSC维度权重配置初始化 2. 满意度调查问卷示例数据 3. 评分方法示例数据 """ import asyncio import sys import os # 添加项目路径 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from sqlalchemy import select from app.core.database import async_session_maker from app.models.models import ( DeptType, DeptTypeDimensionWeight, BSCDimension, Survey, SurveyQuestion, SurveyStatus, SurveyType, QuestionType, Indicator, IndicatorType, IndicatorTemplate, TemplateIndicator, TemplateType, Department, Staff ) from app.services.dimension_weight_service import DEFAULT_WEIGHTS import json async def init_dimension_weights(): """初始化科室类型BSC维度权重配置""" print("=" * 50) print("初始化科室类型BSC维度权重配置...") print("=" * 50) async with async_session_maker() as db: for dept_type, weights in DEFAULT_WEIGHTS.items(): # 检查是否已存在 result = await db.execute( select(DeptTypeDimensionWeight) .where(DeptTypeDimensionWeight.dept_type == dept_type) ) existing = result.scalar_one_or_none() if existing: print(f" {dept_type.value}: 已存在,跳过") continue config = DeptTypeDimensionWeight( dept_type=dept_type, financial_weight=weights["financial"], customer_weight=weights["customer"], internal_process_weight=weights["internal_process"], learning_growth_weight=weights["learning_growth"], description=weights.get("description", "") ) db.add(config) print(f" {dept_type.value}: 财务{weights['financial']*100}%, 客户{weights['customer']*100}%, 流程{weights['internal_process']*100}%, 学习{weights['learning_growth']*100}%") await db.commit() print("科室类型BSC维度权重配置初始化完成!\n") async def init_sample_surveys(): """初始化满意度调查问卷示例数据""" print("=" * 50) print("初始化满意度调查问卷示例数据...") print("=" * 50) async with async_session_maker() as db: # 检查是否已存在问卷 result = await db.execute(select(Survey)) if result.scalars().first(): print(" 已存在问卷数据,跳过初始化\n") return # 创建住院患者满意度问卷 survey1 = Survey( survey_name="住院患者满意度调查问卷", survey_code="INPATIENT_001", survey_type=SurveyType.INPATIENT, description="用于评估住院患者对医院服务的满意度", status=SurveyStatus.PUBLISHED, is_anonymous=True, total_questions=10 ) db.add(survey1) await db.flush() # 添加问卷题目 questions1 = [ { "question_text": "您对入院手续办理的便捷程度是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 1 }, { "question_text": "您对病房环境的整洁舒适程度是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 2 }, { "question_text": "您对医生的服务态度是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 3 }, { "question_text": "您对护士的护理服务是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 4 }, { "question_text": "您对医生的技术水平是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 5 }, { "question_text": "您对医院餐饮服务质量是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 6 }, { "question_text": "您对检查检验等候时间是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 7 }, { "question_text": "您对出院结算流程是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 8 }, { "question_text": "您对医院整体服务是否满意?", "question_type": QuestionType.SINGLE_CHOICE, "options": json.dumps([ {"label": "非常满意", "value": "5", "score": 5}, {"label": "满意", "value": "4", "score": 4}, {"label": "一般", "value": "3", "score": 3}, {"label": "不满意", "value": "2", "score": 2}, {"label": "非常不满意", "value": "1", "score": 1} ], ensure_ascii=False), "score_max": 5, "sort_order": 9 }, { "question_text": "您对医院有什么意见或建议?", "question_type": QuestionType.TEXT, "is_required": False, "sort_order": 10 } ] for q in questions1: question = SurveyQuestion( survey_id=survey1.id, question_text=q["question_text"], question_type=q["question_type"], options=q.get("options"), score_max=q.get("score_max", 5), is_required=q.get("is_required", True), sort_order=q["sort_order"] ) db.add(question) print(f" 创建问卷: {survey1.survey_name}") # 创建门诊患者满意度问卷 survey2 = Survey( survey_name="门诊患者满意度调查问卷", survey_code="OUTPATIENT_001", survey_type=SurveyType.OUTPATIENT, description="用于评估门诊患者对医院服务的满意度", status=SurveyStatus.PUBLISHED, is_anonymous=True, total_questions=8 ) db.add(survey2) await db.flush() questions2 = [ { "question_text": "您对挂号流程的便捷程度是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 1 }, { "question_text": "您对候诊时间是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 2 }, { "question_text": "您对医生的诊疗服务是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 3 }, { "question_text": "您对药房取药流程是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 4 }, { "question_text": "您对收费处服务是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 5 }, { "question_text": "您对门诊环境是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 6 }, { "question_text": "您对门诊整体服务是否满意?", "question_type": QuestionType.SINGLE_CHOICE, "options": json.dumps([ {"label": "非常满意", "value": "5", "score": 5}, {"label": "满意", "value": "4", "score": 4}, {"label": "一般", "value": "3", "score": 3}, {"label": "不满意", "value": "2", "score": 2}, {"label": "非常不满意", "value": "1", "score": 1} ], ensure_ascii=False), "score_max": 5, "sort_order": 7 }, { "question_text": "您对医院有什么意见或建议?", "question_type": QuestionType.TEXT, "is_required": False, "sort_order": 8 } ] for q in questions2: question = SurveyQuestion( survey_id=survey2.id, question_text=q["question_text"], question_type=q["question_type"], options=q.get("options"), score_max=q.get("score_max", 5), is_required=q.get("is_required", True), sort_order=q["sort_order"] ) db.add(question) print(f" 创建问卷: {survey2.survey_name}") # 创建科室间满意度问卷 survey3 = Survey( survey_name="科室间协作满意度调查", survey_code="INTERNAL_001", survey_type=SurveyType.DEPARTMENT, description="用于评估科室之间的协作配合满意度", status=SurveyStatus.DRAFT, is_anonymous=False, total_questions=6 ) db.add(survey3) await db.flush() questions3 = [ { "question_text": "您对该科室的工作响应速度是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 1 }, { "question_text": "您对该科室的工作配合度是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 2 }, { "question_text": "您对该科室的服务态度是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 3 }, { "question_text": "您对该科室的工作质量是否满意?", "question_type": QuestionType.SCORE, "score_max": 5, "sort_order": 4 }, { "question_text": "您对该科室整体协作是否满意?", "question_type": QuestionType.SINGLE_CHOICE, "options": json.dumps([ {"label": "非常满意", "value": "5", "score": 5}, {"label": "满意", "value": "4", "score": 4}, {"label": "一般", "value": "3", "score": 3}, {"label": "不满意", "value": "2", "score": 2}, {"label": "非常不满意", "value": "1", "score": 1} ], ensure_ascii=False), "score_max": 5, "sort_order": 5 }, { "question_text": "您对该科室有什么建议?", "question_type": QuestionType.TEXT, "is_required": False, "sort_order": 6 } ] for q in questions3: question = SurveyQuestion( survey_id=survey3.id, question_text=q["question_text"], question_type=q["question_type"], options=q.get("options"), score_max=q.get("score_max", 5), is_required=q.get("is_required", True), sort_order=q["sort_order"] ) db.add(question) print(f" 创建问卷: {survey3.survey_name}") await db.commit() print("满意度调查问卷示例数据初始化完成!\n") async def init_sample_survey_responses(): """初始化满意度调查回答示例数据""" print("=" * 50) print("初始化满意度调查回答示例数据...") print("=" * 50) async with async_session_maker() as db: # 获取问卷 result = await db.execute( select(Survey).where(Survey.survey_code == "INPATIENT_001") ) survey = result.scalar_one_or_none() if not survey: print(" 未找到住院患者满意度问卷,跳过\n") return # 获取科室列表 dept_result = await db.execute(select(Department)) departments = dept_result.scalars().all() if not departments: print(" 未找到科室数据,跳过\n") return # 获取问卷题目 q_result = await db.execute( select(SurveyQuestion) .where(SurveyQuestion.survey_id == survey.id) .order_by(SurveyQuestion.sort_order) ) questions = q_result.scalars().all() # 为每个科室生成示例回答 from app.models.models import SurveyResponse, SurveyAnswer import random for dept in departments[:5]: # 只为前5个科室生成 # 生成5-10条回答 num_responses = random.randint(5, 10) for _ in range(num_responses): total_score = 0 max_score = 0 response = SurveyResponse( survey_id=survey.id, department_id=dept.id, respondent_type="patient" ) db.add(response) await db.flush() # 为每个题目生成回答 for q in questions: if q.question_type == QuestionType.TEXT: continue # 随机生成评分(倾向高分) score = random.choices( [5, 4, 3, 2, 1], weights=[50, 30, 15, 3, 2] )[0] answer = SurveyAnswer( response_id=response.id, question_id=q.id, answer_value=str(score), score=score ) db.add(answer) total_score += score max_score += q.score_max # 更新回答记录得分 response.total_score = total_score response.max_score = max_score response.satisfaction_rate = (total_score / max_score * 100) if max_score > 0 else 0 await db.commit() print(f" 为{min(len(departments), 5)}个科室生成了满意度调查回答数据") print("满意度调查回答示例数据初始化完成!\n") async def init_scoring_method_indicators(): """初始化评分方法示例指标""" print("=" * 50) print("初始化评分方法示例指标...") print("=" * 50) async with async_session_maker() as db: # 检查是否已存在评分方法相关指标 result = await db.execute( select(Indicator).where(Indicator.code.in_(["SCORE001", "SCORE002", "SCORE003", "SCORE004"])) ) if result.scalars().first(): print(" 已存在评分方法示例指标,跳过初始化\n") return # 创建示例指标,展示各种评分方法 scoring_indicators = [ # 目标参照法示例 Indicator( name="业务收支结余率", code="SCORE001", indicator_type=IndicatorType.QUANTITY, bs_dimension=BSCDimension.FINANCIAL, weight=12.6, max_score=100, target_value=15.0, # 目标值15% target_unit="%", calculation_method="(业务收入 - 业务支出)/业务收入 × 100%", assessment_method="target_reference", # 目标参照法 data_source="财务系统", applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]), is_active=True ), # 区间法-趋高指标示例 Indicator( name="人均收支结余", code="SCORE002", indicator_type=IndicatorType.QUANTITY, bs_dimension=BSCDimension.FINANCIAL, weight=16.8, max_score=100, target_value=5000, # 基准值 target_unit="元", calculation_method="业务收支结余/平均职工人数", assessment_method="interval_high", # 区间法-趋高 data_source="财务系统", applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]), is_active=True ), # 区间法-趋低指标示例 Indicator( name="百元收入耗材率", code="SCORE003", indicator_type=IndicatorType.COST, bs_dimension=BSCDimension.FINANCIAL, weight=12.6, max_score=100, target_value=25.0, # 目标值≤25元 target_unit="元", calculation_method="耗材支出/业务收入 × 100", assessment_method="interval_low", # 区间法-趋低 data_source="物资系统", applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]), is_active=True ), # 扣分法示例 Indicator( name="病历质量考核", code="SCORE004", indicator_type=IndicatorType.QUALITY, bs_dimension=BSCDimension.INTERNAL_PROCESS, weight=3.2, max_score=100, target_value=0, # 目标值0(无乙级病历) calculation_method="乙级病历每份扣5分,丙级病历不得分", assessment_method="deduction", # 扣分法 deduction_standard="乙级病历每份扣5分", data_source="病案质控", applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]), is_active=True ), # 加分法示例 Indicator( name="科研教学工作", code="SCORE005", indicator_type=IndicatorType.QUANTITY, # 使用QUANTITY代替 bs_dimension=BSCDimension.LEARNING_GROWTH, weight=4.5, max_score=100, target_value=0, calculation_method="科研教学成果加分", assessment_method="bonus", # 加分法 deduction_standard="科研项目立项:市级加5分,省级加10分,国家级加20分\n论文发表:核心期刊加5分,SCI加10分", data_source="科教科", applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward", "medical_tech"]), is_active=True ), # 区间法-趋中指标示例 Indicator( name="平均住院日", code="SCORE006", indicator_type=IndicatorType.EFFICIENCY, bs_dimension=BSCDimension.INTERNAL_PROCESS, weight=2.0, max_score=100, target_value=10.0, # 目标值10天 target_unit="天", calculation_method="出院患者平均住院天数", assessment_method="interval_center", # 区间法-趋中 data_source="病案统计", applicable_dept_types=json.dumps(["clinical_surgical", "clinical_nonsurgical_ward"]), is_active=True ), ] for indicator in scoring_indicators: db.add(indicator) print(f" 创建指标: {indicator.name} ({indicator.assessment_method})") await db.commit() print("评分方法示例指标初始化完成!\n") async def main(): """主函数""" print("\n" + "=" * 60) print("开始初始化测试数据...") print("=" * 60 + "\n") # 初始化科室类型BSC维度权重配置 await init_dimension_weights() # 初始化满意度调查问卷示例数据 await init_sample_surveys() # 初始化满意度调查回答示例数据 await init_sample_survey_responses() # 初始化评分方法示例指标 await init_scoring_method_indicators() print("\n" + "=" * 60) print("测试数据初始化完成!") print("=" * 60 + "\n") if __name__ == "__main__": asyncio.run(main())