""" 统计服务层 - BSC 维度分析、绩效统计 """ from typing import Optional, List, Dict, Any from datetime import datetime from sqlalchemy import select, func, and_ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.models import ( Assessment, AssessmentDetail, Indicator, Department, Staff, BSCDimension, AssessmentStatus ) class StatsService: """统计服务""" @staticmethod async def get_bsc_dimension_stats( db: AsyncSession, department_id: Optional[int] = None, period_year: int = None, period_month: int = None ) -> Dict[str, Any]: """获取 BSC 维度统计""" dimensions = { BSCDimension.FINANCIAL: {'score': 0, 'weight': 0, 'indicators': 0}, BSCDimension.CUSTOMER: {'score': 0, 'weight': 0, 'indicators': 0}, BSCDimension.INTERNAL_PROCESS: {'score': 0, 'weight': 0, 'indicators': 0}, BSCDimension.LEARNING_GROWTH: {'score': 0, 'weight': 0, 'indicators': 0}, } # 构建查询条件 conditions = [ Assessment.status == AssessmentStatus.FINALIZED ] if period_year: conditions.append(Assessment.period_year == period_year) if period_month: conditions.append(Assessment.period_month == period_month) if department_id: conditions.append(Assessment.staff_id.has(Staff.department_id == department_id)) # 查询各维度得分 result = await db.execute( select( Indicator.bs_dimension, func.sum(AssessmentDetail.score * Indicator.weight).label('total_score'), func.sum(Indicator.weight).label('total_weight'), func.count(AssessmentDetail.id).label('indicator_count') ) .join(AssessmentDetail, AssessmentDetail.indicator_id == Indicator.id) .join(Assessment, Assessment.id == AssessmentDetail.assessment_id) .where(and_(*conditions)) .group_by(Indicator.bs_dimension) ) for row in result.fetchall(): dim = row.bs_dimension if dim in dimensions: dimensions[dim] = { 'score': float(row.total_score) if row.total_score else 0, 'weight': float(row.total_weight) if row.total_weight else 0, 'indicators': row.indicator_count, 'average': (float(row.total_score) / float(row.total_weight)) if row.total_weight else 0 } return { 'dimensions': dimensions, 'period': f"{period_year}年{period_month}月" if period_year and period_month else "全部" } @staticmethod async def get_department_stats( db: AsyncSession, period_year: int = None, period_month: int = None ) -> List[Dict[str, Any]]: """获取科室绩效统计""" conditions = [ Assessment.status == AssessmentStatus.FINALIZED ] if period_year: conditions.append(Assessment.period_year == period_year) if period_month: conditions.append(Assessment.period_month == period_month) result = await db.execute( select( Department.id, Department.name, Department.dept_type, Staff.id.label('staff_id'), Staff.name.label('staff_name'), Assessment.total_score, Assessment.weighted_score ) .join(Staff, Staff.department_id == Department.id) .join(Assessment, Assessment.staff_id == Staff.id) .where(and_(*conditions)) .order_by(Department.id, Assessment.weighted_score.desc()) ) # 按科室汇总 dept_stats = {} for row in result.fetchall(): dept_id = row.id if dept_id not in dept_stats: dept_stats[dept_id] = { 'department_id': dept_id, 'department_name': row.name, 'dept_type': row.dept_type, 'staff_count': 0, 'total_score': 0, 'avg_score': 0, 'max_score': 0, 'min_score': None, 'staff_list': [] } dept_stats[dept_id]['staff_count'] += 1 dept_stats[dept_id]['total_score'] += row.weighted_score or 0 dept_stats[dept_id]['staff_list'].append({ 'staff_id': row.staff_id, 'staff_name': row.staff_name, 'score': row.weighted_score }) if row.weighted_score: if row.weighted_score > dept_stats[dept_id]['max_score']: dept_stats[dept_id]['max_score'] = row.weighted_score if dept_stats[dept_id]['min_score'] is None or row.weighted_score < dept_stats[dept_id]['min_score']: dept_stats[dept_id]['min_score'] = row.weighted_score # 计算平均分 result_list = [] for dept in dept_stats.values(): if dept['staff_count'] > 0: dept['avg_score'] = dept['total_score'] / dept['staff_count'] result_list.append(dept) # 按平均分排序 result_list.sort(key=lambda x: x['avg_score'], reverse=True) return result_list @staticmethod async def get_trend_stats( db: AsyncSession, department_id: Optional[int] = None, period_year: int = None, months: int = 6 ) -> List[Dict[str, Any]]: """获取趋势统计(月度)""" conditions = [ Assessment.status == AssessmentStatus.FINALIZED ] if period_year: # 查询最近 months 个月的数据 from datetime import datetime current_month = datetime.now().month start_month = current_month - months + 1 if start_month < 1: # 跨年份 conditions.append( ((Assessment.period_year == period_year - 1) & (Assessment.period_month >= start_month + 12)) | ((Assessment.period_year == period_year) & (Assessment.period_month <= current_month)) ) else: conditions.append(Assessment.period_year == period_year) conditions.append(Assessment.period_month >= start_month) conditions.append(Assessment.period_month <= current_month) if department_id: conditions.append(Assessment.staff_id.has(Staff.department_id == department_id)) result = await db.execute( select( Assessment.period_month, func.avg(Assessment.total_score).label('avg_score'), func.avg(Assessment.weighted_score).label('avg_weighted_score'), func.count(Assessment.id).label('count') ) .join(Staff, Staff.id == Assessment.staff_id) .where(and_(*conditions)) .group_by(Assessment.period_month) .order_by(Assessment.period_month) ) return [ { 'month': row.period_month, 'avg_score': float(row.avg_score) if row.avg_score else 0, 'avg_weighted_score': float(row.avg_weighted_score) if row.avg_weighted_score else 0, 'count': row.count } for row in result.fetchall() ] @staticmethod async def get_ranking_stats( db: AsyncSession, period_year: int = None, period_month: int = None, limit: int = 10 ) -> List[Dict[str, Any]]: """获取绩效排名""" conditions = [ Assessment.status == AssessmentStatus.FINALIZED ] if period_year: conditions.append(Assessment.period_year == period_year) if period_month: conditions.append(Assessment.period_month == period_month) result = await db.execute( select( Staff.id, Staff.name, Staff.employee_id, Department.name.label('dept_name'), Assessment.total_score, Assessment.weighted_score ) .join(Department, Department.id == Staff.department_id) .join(Assessment, Assessment.staff_id == Staff.id) .where(and_(*conditions)) .order_by(Assessment.weighted_score.desc()) .limit(limit) ) return [ { 'staff_id': row.id, 'staff_name': row.name, 'employee_id': row.employee_id, 'department': row.dept_name, 'total_score': float(row.total_score) if row.total_score else 0, 'weighted_score': float(row.weighted_score) if row.weighted_score else 0, 'rank': idx + 1 } for idx, row in enumerate(result.fetchall()) ] @staticmethod async def get_completion_stats( db: AsyncSession, indicator_id: Optional[int] = None, period_year: int = None, period_month: int = None ) -> Dict[str, Any]: """获取指标完成度统计""" conditions = [ Assessment.status == AssessmentStatus.FINALIZED ] if period_year: conditions.append(Assessment.period_year == period_year) if period_month: conditions.append(Assessment.period_month == period_month) query = select( Indicator.id, Indicator.name, Indicator.code, Indicator.target_value, Indicator.max_score, func.avg(AssessmentDetail.score).label('avg_score'), func.max(AssessmentDetail.score).label('max_score'), func.min(AssessmentDetail.score).label('min_score'), func.count(AssessmentDetail.id).label('count') ).join(AssessmentDetail, AssessmentDetail.indicator_id == Indicator.id) if indicator_id: conditions.append(Indicator.id == indicator_id) result = await db.execute( query.where(and_(*conditions)) .group_by(Indicator.id, Indicator.name, Indicator.code, Indicator.target_value, Indicator.max_score) ) indicators = [] for row in result.fetchall(): completion_rate = 0 if row.target_value and row.avg_score: completion_rate = (float(row.avg_score) / float(row.target_value)) * 100 if row.target_value else 0 indicators.append({ 'indicator_id': row.id, 'indicator_name': row.name, 'indicator_code': row.code, 'target_value': float(row.target_value) if row.target_value else None, 'max_score': float(row.max_score) if row.max_score else 0, 'avg_score': float(row.avg_score) if row.avg_score else 0, 'completion_rate': min(completion_rate, 100), # 最高 100% 'count': row.count }) return {'indicators': indicators}