""" 满意度调查服务 功能: 1. 调查问卷管理 - 问卷CRUD、题目管理 2. 调查响应处理 - 提交回答、计算得分 3. 满意度统计 - 科室满意度、趋势分析 """ from typing import Optional, List, Dict, Any from datetime import datetime from sqlalchemy import select, func, and_, or_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload import json from app.models.models import ( Survey, SurveyQuestion, SurveyResponse, SurveyAnswer, SurveyStatus, SurveyType, QuestionType, Department ) class SurveyService: """满意度调查服务""" # ==================== 问卷管理 ==================== @staticmethod async def get_survey_list( db: AsyncSession, survey_type: Optional[SurveyType] = None, status: Optional[SurveyStatus] = None, page: int = 1, page_size: int = 20 ) -> tuple[List[Survey], int]: """获取问卷列表""" query = select(Survey).options( selectinload(Survey.questions) ) if survey_type: query = query.where(Survey.survey_type == survey_type) if status: query = query.where(Survey.status == status) # 统计总数 count_query = select(func.count()).select_from(query.subquery()) total = await db.scalar(count_query) # 分页 query = query.order_by(Survey.created_at.desc()) query = query.offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) surveys = result.scalars().all() return list(surveys), total or 0 @staticmethod async def get_survey_by_id(db: AsyncSession, survey_id: int) -> Optional[Survey]: """获取问卷详情""" result = await db.execute( select(Survey) .options( selectinload(Survey.questions) ) .where(Survey.id == survey_id) ) return result.scalar_one_or_none() @staticmethod async def create_survey( db: AsyncSession, survey_name: str, survey_code: str, survey_type: SurveyType, description: Optional[str] = None, target_departments: Optional[List[int]] = None, is_anonymous: bool = True, created_by: Optional[int] = None ) -> Survey: """创建问卷""" survey = Survey( survey_name=survey_name, survey_code=survey_code, survey_type=survey_type, description=description, target_departments=json.dumps(target_departments) if target_departments else None, is_anonymous=is_anonymous, created_by=created_by, status=SurveyStatus.DRAFT ) db.add(survey) await db.flush() await db.refresh(survey) return survey @staticmethod async def update_survey( db: AsyncSession, survey_id: int, **kwargs ) -> Optional[Survey]: """更新问卷""" survey = await SurveyService.get_survey_by_id(db, survey_id) if not survey: return None # 只有草稿状态可以修改 if survey.status != SurveyStatus.DRAFT: raise ValueError("只有草稿状态的问卷可以修改") for key, value in kwargs.items(): if hasattr(survey, key): if key == "target_departments" and isinstance(value, list): value = json.dumps(value) setattr(survey, key, value) await db.flush() await db.refresh(survey) return survey @staticmethod async def publish_survey(db: AsyncSession, survey_id: int) -> Optional[Survey]: """发布问卷""" survey = await SurveyService.get_survey_by_id(db, survey_id) if not survey: return None if survey.status != SurveyStatus.DRAFT: raise ValueError("只有草稿状态的问卷可以发布") # 检查是否有题目 if survey.total_questions == 0: raise ValueError("问卷没有题目,无法发布") survey.status = SurveyStatus.PUBLISHED survey.start_date = datetime.utcnow() await db.flush() await db.refresh(survey) return survey @staticmethod async def close_survey(db: AsyncSession, survey_id: int) -> Optional[Survey]: """结束问卷""" survey = await SurveyService.get_survey_by_id(db, survey_id) if not survey: return None if survey.status != SurveyStatus.PUBLISHED: raise ValueError("只有已发布的问卷可以结束") survey.status = SurveyStatus.CLOSED survey.end_date = datetime.utcnow() await db.flush() await db.refresh(survey) return survey @staticmethod async def delete_survey(db: AsyncSession, survey_id: int) -> bool: """删除问卷""" survey = await SurveyService.get_survey_by_id(db, survey_id) if not survey: return False if survey.status == SurveyStatus.PUBLISHED: raise ValueError("发布中的问卷无法删除") await db.delete(survey) await db.flush() return True # ==================== 题目管理 ==================== @staticmethod async def add_question( db: AsyncSession, survey_id: int, question_text: str, question_type: QuestionType, options: Optional[List[Dict]] = None, score_max: int = 5, is_required: bool = True ) -> SurveyQuestion: """添加题目""" survey = await SurveyService.get_survey_by_id(db, survey_id) if not survey: raise ValueError("问卷不存在") if survey.status != SurveyStatus.DRAFT: raise ValueError("只有草稿状态的问卷可以添加题目") # 获取最大排序号 result = await db.execute( select(func.max(SurveyQuestion.sort_order)) .where(SurveyQuestion.survey_id == survey_id) ) max_order = result.scalar() or 0 question = SurveyQuestion( survey_id=survey_id, question_text=question_text, question_type=question_type, options=json.dumps(options, ensure_ascii=False) if options else None, score_max=score_max, is_required=is_required, sort_order=max_order + 1 ) db.add(question) # 更新问卷题目数 survey.total_questions += 1 await db.flush() await db.refresh(question) return question @staticmethod async def update_question( db: AsyncSession, question_id: int, **kwargs ) -> Optional[SurveyQuestion]: """更新题目""" result = await db.execute( select(SurveyQuestion).where(SurveyQuestion.id == question_id) ) question = result.scalar_one_or_none() if not question: return None # 检查问卷状态 survey = await SurveyService.get_survey_by_id(db, question.survey_id) if survey and survey.status != SurveyStatus.DRAFT: raise ValueError("只有草稿状态的问卷题目可以修改") for key, value in kwargs.items(): if hasattr(question, key): if key == "options" and isinstance(value, list): value = json.dumps(value, ensure_ascii=False) setattr(question, key, value) await db.flush() await db.refresh(question) return question @staticmethod async def delete_question(db: AsyncSession, question_id: int) -> bool: """删除题目""" result = await db.execute( select(SurveyQuestion).where(SurveyQuestion.id == question_id) ) question = result.scalar_one_or_none() if not question: return False # 检查问卷状态 survey = await SurveyService.get_survey_by_id(db, question.survey_id) if survey and survey.status != SurveyStatus.DRAFT: raise ValueError("只有草稿状态的问卷题目可以删除") # 更新问卷题目数 if survey: survey.total_questions -= 1 await db.delete(question) await db.flush() return True # ==================== 提交回答 ==================== @staticmethod async def submit_response( db: AsyncSession, survey_id: int, department_id: Optional[int], answers: List[Dict[str, Any]], respondent_type: str = "patient", respondent_id: Optional[int] = None, respondent_phone: Optional[str] = None, ip_address: Optional[str] = None, user_agent: Optional[str] = None ) -> SurveyResponse: """提交问卷回答""" survey = await SurveyService.get_survey_by_id(db, survey_id) if not survey: raise ValueError("问卷不存在") if survey.status != SurveyStatus.PUBLISHED: raise ValueError("问卷未发布或已结束") # 计算得分 total_score = 0.0 max_score = 0.0 # 创建回答记录 response = SurveyResponse( survey_id=survey_id, department_id=department_id, respondent_type=respondent_type, respondent_id=respondent_id, respondent_phone=respondent_phone, ip_address=ip_address, user_agent=user_agent ) db.add(response) await db.flush() # 处理每个回答 for answer_data in answers: question_id = answer_data.get("question_id") answer_value = answer_data.get("answer_value") # 获取题目 q_result = await db.execute( select(SurveyQuestion).where(SurveyQuestion.id == question_id) ) question = q_result.scalar_one_or_none() if not question: continue # 计算得分 score = 0.0 if question.question_type == QuestionType.SCORE: # 评分题:直接取分值 try: score = float(answer_value) except (ValueError, TypeError): score = 0 max_score += question.score_max elif question.question_type == QuestionType.SINGLE_CHOICE: # 单选题:根据选项得分 if question.options: options = json.loads(question.options) for opt in options: if opt.get("value") == answer_value: score = opt.get("score", 0) break max_score += 5 # 假设单选题最高5分 elif question.question_type == QuestionType.MULTIPLE_CHOICE: # 多选题:累加选中选项得分 if question.options: options = json.loads(question.options) selected = answer_value.split(",") if answer_value else [] for opt in options: if opt.get("value") in selected: score += opt.get("score", 0) max_score += 5 total_score += score # 创建回答明细 answer = SurveyAnswer( response_id=response.id, question_id=question_id, answer_value=str(answer_value) if answer_value else None, score=score ) db.add(answer) # 更新回答记录得分 response.total_score = total_score response.max_score = max_score if max_score > 0 else 1 response.satisfaction_rate = (total_score / response.max_score * 100) if response.max_score > 0 else 0 await db.flush() await db.refresh(response) return response # ==================== 满意度统计 ==================== @staticmethod async def get_department_satisfaction( db: AsyncSession, survey_id: Optional[int] = None, department_id: Optional[int] = None, period_year: Optional[int] = None, period_month: Optional[int] = None ) -> List[Dict[str, Any]]: """获取科室满意度统计""" query = select( Department.id.label("department_id"), Department.name.label("department_name"), func.count(SurveyResponse.id).label("response_count"), func.avg(SurveyResponse.satisfaction_rate).label("avg_satisfaction"), func.sum(SurveyResponse.total_score).label("total_score"), func.sum(SurveyResponse.max_score).label("max_score") ).join( SurveyResponse, SurveyResponse.department_id == Department.id ) conditions = [] if survey_id: conditions.append(SurveyResponse.survey_id == survey_id) if department_id: conditions.append(Department.id == department_id) if period_year and period_month: from sqlalchemy import extract conditions.append(extract('year', SurveyResponse.submitted_at) == period_year) conditions.append(extract('month', SurveyResponse.submitted_at) == period_month) if conditions: query = query.where(and_(*conditions)) query = query.group_by(Department.id, Department.name) query = query.order_by(func.avg(SurveyResponse.satisfaction_rate).desc()) result = await db.execute(query) return [ { "department_id": row.department_id, "department_name": row.department_name, "response_count": row.response_count, "avg_satisfaction": round(float(row.avg_satisfaction), 2) if row.avg_satisfaction else 0, "total_score": float(row.total_score) if row.total_score else 0, "max_score": float(row.max_score) if row.max_score else 0 } for row in result.fetchall() ] @staticmethod async def get_satisfaction_trend( db: AsyncSession, department_id: int, months: int = 6 ) -> List[Dict[str, Any]]: """获取满意度趋势""" from sqlalchemy import extract from datetime import datetime current_date = datetime.now() current_year = current_date.year current_month = current_date.month query = select( extract('year', SurveyResponse.submitted_at).label("year"), extract('month', SurveyResponse.submitted_at).label("month"), func.count(SurveyResponse.id).label("response_count"), func.avg(SurveyResponse.satisfaction_rate).label("avg_satisfaction") ).where( SurveyResponse.department_id == department_id ).group_by( extract('year', SurveyResponse.submitted_at), extract('month', SurveyResponse.submitted_at) ).order_by( extract('year', SurveyResponse.submitted_at), extract('month', SurveyResponse.submitted_at) ) result = await db.execute(query) return [ { "year": int(row.year), "month": int(row.month), "period": f"{int(row.year)}年{int(row.month)}月", "response_count": row.response_count, "avg_satisfaction": round(float(row.avg_satisfaction), 2) if row.avg_satisfaction else 0 } for row in result.fetchall() ] @staticmethod async def get_question_stats(db: AsyncSession, survey_id: int) -> List[Dict[str, Any]]: """获取问卷各题目统计""" # 获取问卷题目 questions_result = await db.execute( select(SurveyQuestion) .where(SurveyQuestion.survey_id == survey_id) .order_by(SurveyQuestion.sort_order) ) questions = questions_result.scalars().all() stats = [] for question in questions: # 统计该题目的回答 answer_result = await db.execute( select( func.count(SurveyAnswer.id).label("count"), func.avg(SurveyAnswer.score).label("avg_score"), func.sum(SurveyAnswer.score).label("total_score") ).where(SurveyAnswer.question_id == question.id) ) row = answer_result.fetchone() question_stat = { "question_id": question.id, "question_text": question.question_text, "question_type": question.question_type.value, "response_count": row.count if row else 0, "avg_score": round(float(row.avg_score), 2) if row and row.avg_score else 0, "total_score": float(row.total_score) if row and row.total_score else 0, "max_possible_score": question.score_max } # 如果是选择题,统计各选项占比 if question.question_type in [QuestionType.SINGLE_CHOICE, QuestionType.MULTIPLE_CHOICE]: if question.options: options = json.loads(question.options) option_stats = [] for opt in options: count_result = await db.execute( select(func.count(SurveyAnswer.id)) .where(SurveyAnswer.question_id == question.id) .where(SurveyAnswer.answer_value.contains(opt.get("value"))) ) opt_count = count_result.scalar() or 0 option_stats.append({ "option": opt.get("label"), "value": opt.get("value"), "count": opt_count }) question_stat["option_stats"] = option_stats stats.append(question_stat) return stats