Files
hospital_performance/backend/app/services/stats_service.py
2026-02-28 15:06:52 +08:00

300 lines
11 KiB
Python

"""
统计服务层 - BSC 维度分析、绩效统计
"""
from typing import Optional, List, Dict, Any
from datetime import datetime
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.models import (
Assessment, AssessmentDetail, Indicator, Department, Staff,
BSCDimension, AssessmentStatus
)
class StatsService:
"""统计服务"""
@staticmethod
async def get_bsc_dimension_stats(
db: AsyncSession,
department_id: Optional[int] = None,
period_year: int = None,
period_month: int = None
) -> Dict[str, Any]:
"""获取 BSC 维度统计"""
dimensions = {
BSCDimension.FINANCIAL: {'score': 0, 'weight': 0, 'indicators': 0},
BSCDimension.CUSTOMER: {'score': 0, 'weight': 0, 'indicators': 0},
BSCDimension.INTERNAL_PROCESS: {'score': 0, 'weight': 0, 'indicators': 0},
BSCDimension.LEARNING_GROWTH: {'score': 0, 'weight': 0, 'indicators': 0},
}
# 构建查询条件
conditions = [
Assessment.status == AssessmentStatus.FINALIZED
]
if period_year:
conditions.append(Assessment.period_year == period_year)
if period_month:
conditions.append(Assessment.period_month == period_month)
if department_id:
conditions.append(Assessment.staff_id.has(Staff.department_id == department_id))
# 查询各维度得分
result = await db.execute(
select(
Indicator.bs_dimension,
func.sum(AssessmentDetail.score * Indicator.weight).label('total_score'),
func.sum(Indicator.weight).label('total_weight'),
func.count(AssessmentDetail.id).label('indicator_count')
)
.join(AssessmentDetail, AssessmentDetail.indicator_id == Indicator.id)
.join(Assessment, Assessment.id == AssessmentDetail.assessment_id)
.where(and_(*conditions))
.group_by(Indicator.bs_dimension)
)
for row in result.fetchall():
dim = row.bs_dimension
if dim in dimensions:
dimensions[dim] = {
'score': float(row.total_score) if row.total_score else 0,
'weight': float(row.total_weight) if row.total_weight else 0,
'indicators': row.indicator_count,
'average': (float(row.total_score) / float(row.total_weight)) if row.total_weight else 0
}
return {
'dimensions': dimensions,
'period': f"{period_year}{period_month}" if period_year and period_month else "全部"
}
@staticmethod
async def get_department_stats(
db: AsyncSession,
period_year: int = None,
period_month: int = None
) -> List[Dict[str, Any]]:
"""获取科室绩效统计"""
conditions = [
Assessment.status == AssessmentStatus.FINALIZED
]
if period_year:
conditions.append(Assessment.period_year == period_year)
if period_month:
conditions.append(Assessment.period_month == period_month)
result = await db.execute(
select(
Department.id,
Department.name,
Department.dept_type,
Staff.id.label('staff_id'),
Staff.name.label('staff_name'),
Assessment.total_score,
Assessment.weighted_score
)
.join(Staff, Staff.department_id == Department.id)
.join(Assessment, Assessment.staff_id == Staff.id)
.where(and_(*conditions))
.order_by(Department.id, Assessment.weighted_score.desc())
)
# 按科室汇总
dept_stats = {}
for row in result.fetchall():
dept_id = row.id
if dept_id not in dept_stats:
dept_stats[dept_id] = {
'department_id': dept_id,
'department_name': row.name,
'dept_type': row.dept_type,
'staff_count': 0,
'total_score': 0,
'avg_score': 0,
'max_score': 0,
'min_score': None,
'staff_list': []
}
dept_stats[dept_id]['staff_count'] += 1
dept_stats[dept_id]['total_score'] += row.weighted_score or 0
dept_stats[dept_id]['staff_list'].append({
'staff_id': row.staff_id,
'staff_name': row.staff_name,
'score': row.weighted_score
})
if row.weighted_score:
if row.weighted_score > dept_stats[dept_id]['max_score']:
dept_stats[dept_id]['max_score'] = row.weighted_score
if dept_stats[dept_id]['min_score'] is None or row.weighted_score < dept_stats[dept_id]['min_score']:
dept_stats[dept_id]['min_score'] = row.weighted_score
# 计算平均分
result_list = []
for dept in dept_stats.values():
if dept['staff_count'] > 0:
dept['avg_score'] = dept['total_score'] / dept['staff_count']
result_list.append(dept)
# 按平均分排序
result_list.sort(key=lambda x: x['avg_score'], reverse=True)
return result_list
@staticmethod
async def get_trend_stats(
db: AsyncSession,
department_id: Optional[int] = None,
period_year: int = None,
months: int = 6
) -> List[Dict[str, Any]]:
"""获取趋势统计(月度)"""
conditions = [
Assessment.status == AssessmentStatus.FINALIZED
]
if period_year:
# 查询最近 months 个月的数据
from datetime import datetime
current_month = datetime.now().month
start_month = current_month - months + 1
if start_month < 1:
# 跨年份
conditions.append(
((Assessment.period_year == period_year - 1) & (Assessment.period_month >= start_month + 12)) |
((Assessment.period_year == period_year) & (Assessment.period_month <= current_month))
)
else:
conditions.append(Assessment.period_year == period_year)
conditions.append(Assessment.period_month >= start_month)
conditions.append(Assessment.period_month <= current_month)
if department_id:
conditions.append(Assessment.staff_id.has(Staff.department_id == department_id))
result = await db.execute(
select(
Assessment.period_month,
func.avg(Assessment.total_score).label('avg_score'),
func.avg(Assessment.weighted_score).label('avg_weighted_score'),
func.count(Assessment.id).label('count')
)
.join(Staff, Staff.id == Assessment.staff_id)
.where(and_(*conditions))
.group_by(Assessment.period_month)
.order_by(Assessment.period_month)
)
return [
{
'month': row.period_month,
'avg_score': float(row.avg_score) if row.avg_score else 0,
'avg_weighted_score': float(row.avg_weighted_score) if row.avg_weighted_score else 0,
'count': row.count
}
for row in result.fetchall()
]
@staticmethod
async def get_ranking_stats(
db: AsyncSession,
period_year: int = None,
period_month: int = None,
limit: int = 10
) -> List[Dict[str, Any]]:
"""获取绩效排名"""
conditions = [
Assessment.status == AssessmentStatus.FINALIZED
]
if period_year:
conditions.append(Assessment.period_year == period_year)
if period_month:
conditions.append(Assessment.period_month == period_month)
result = await db.execute(
select(
Staff.id,
Staff.name,
Staff.employee_id,
Department.name.label('dept_name'),
Assessment.total_score,
Assessment.weighted_score
)
.join(Department, Department.id == Staff.department_id)
.join(Assessment, Assessment.staff_id == Staff.id)
.where(and_(*conditions))
.order_by(Assessment.weighted_score.desc())
.limit(limit)
)
return [
{
'staff_id': row.id,
'staff_name': row.name,
'employee_id': row.employee_id,
'department': row.dept_name,
'total_score': float(row.total_score) if row.total_score else 0,
'weighted_score': float(row.weighted_score) if row.weighted_score else 0,
'rank': idx + 1
}
for idx, row in enumerate(result.fetchall())
]
@staticmethod
async def get_completion_stats(
db: AsyncSession,
indicator_id: Optional[int] = None,
period_year: int = None,
period_month: int = None
) -> Dict[str, Any]:
"""获取指标完成度统计"""
conditions = [
Assessment.status == AssessmentStatus.FINALIZED
]
if period_year:
conditions.append(Assessment.period_year == period_year)
if period_month:
conditions.append(Assessment.period_month == period_month)
query = select(
Indicator.id,
Indicator.name,
Indicator.code,
Indicator.target_value,
Indicator.max_score,
func.avg(AssessmentDetail.score).label('avg_score'),
func.max(AssessmentDetail.score).label('max_score'),
func.min(AssessmentDetail.score).label('min_score'),
func.count(AssessmentDetail.id).label('count')
).join(AssessmentDetail, AssessmentDetail.indicator_id == Indicator.id)
if indicator_id:
conditions.append(Indicator.id == indicator_id)
result = await db.execute(
query.where(and_(*conditions))
.group_by(Indicator.id, Indicator.name, Indicator.code, Indicator.target_value, Indicator.max_score)
)
indicators = []
for row in result.fetchall():
completion_rate = 0
if row.target_value and row.avg_score:
completion_rate = (float(row.avg_score) / float(row.target_value)) * 100 if row.target_value else 0
indicators.append({
'indicator_id': row.id,
'indicator_name': row.name,
'indicator_code': row.code,
'target_value': float(row.target_value) if row.target_value else None,
'max_score': float(row.max_score) if row.max_score else 0,
'avg_score': float(row.avg_score) if row.avg_score else 0,
'completion_rate': min(completion_rate, 100), # 最高 100%
'count': row.count
})
return {'indicators': indicators}