300 lines
11 KiB
Python
300 lines
11 KiB
Python
"""
|
|
统计服务层 - BSC 维度分析、绩效统计
|
|
"""
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime
|
|
from sqlalchemy import select, func, and_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.models.models import (
|
|
Assessment, AssessmentDetail, Indicator, Department, Staff,
|
|
BSCDimension, AssessmentStatus
|
|
)
|
|
|
|
|
|
class StatsService:
|
|
"""统计服务"""
|
|
|
|
@staticmethod
|
|
async def get_bsc_dimension_stats(
|
|
db: AsyncSession,
|
|
department_id: Optional[int] = None,
|
|
period_year: int = None,
|
|
period_month: int = None
|
|
) -> Dict[str, Any]:
|
|
"""获取 BSC 维度统计"""
|
|
dimensions = {
|
|
BSCDimension.FINANCIAL: {'score': 0, 'weight': 0, 'indicators': 0},
|
|
BSCDimension.CUSTOMER: {'score': 0, 'weight': 0, 'indicators': 0},
|
|
BSCDimension.INTERNAL_PROCESS: {'score': 0, 'weight': 0, 'indicators': 0},
|
|
BSCDimension.LEARNING_GROWTH: {'score': 0, 'weight': 0, 'indicators': 0},
|
|
}
|
|
|
|
# 构建查询条件
|
|
conditions = [
|
|
Assessment.status == AssessmentStatus.FINALIZED
|
|
]
|
|
if period_year:
|
|
conditions.append(Assessment.period_year == period_year)
|
|
if period_month:
|
|
conditions.append(Assessment.period_month == period_month)
|
|
if department_id:
|
|
conditions.append(Assessment.staff_id.has(Staff.department_id == department_id))
|
|
|
|
# 查询各维度得分
|
|
result = await db.execute(
|
|
select(
|
|
Indicator.bs_dimension,
|
|
func.sum(AssessmentDetail.score * Indicator.weight).label('total_score'),
|
|
func.sum(Indicator.weight).label('total_weight'),
|
|
func.count(AssessmentDetail.id).label('indicator_count')
|
|
)
|
|
.join(AssessmentDetail, AssessmentDetail.indicator_id == Indicator.id)
|
|
.join(Assessment, Assessment.id == AssessmentDetail.assessment_id)
|
|
.where(and_(*conditions))
|
|
.group_by(Indicator.bs_dimension)
|
|
)
|
|
|
|
for row in result.fetchall():
|
|
dim = row.bs_dimension
|
|
if dim in dimensions:
|
|
dimensions[dim] = {
|
|
'score': float(row.total_score) if row.total_score else 0,
|
|
'weight': float(row.total_weight) if row.total_weight else 0,
|
|
'indicators': row.indicator_count,
|
|
'average': (float(row.total_score) / float(row.total_weight)) if row.total_weight else 0
|
|
}
|
|
|
|
return {
|
|
'dimensions': dimensions,
|
|
'period': f"{period_year}年{period_month}月" if period_year and period_month else "全部"
|
|
}
|
|
|
|
@staticmethod
|
|
async def get_department_stats(
|
|
db: AsyncSession,
|
|
period_year: int = None,
|
|
period_month: int = None
|
|
) -> List[Dict[str, Any]]:
|
|
"""获取科室绩效统计"""
|
|
conditions = [
|
|
Assessment.status == AssessmentStatus.FINALIZED
|
|
]
|
|
if period_year:
|
|
conditions.append(Assessment.period_year == period_year)
|
|
if period_month:
|
|
conditions.append(Assessment.period_month == period_month)
|
|
|
|
result = await db.execute(
|
|
select(
|
|
Department.id,
|
|
Department.name,
|
|
Department.dept_type,
|
|
Staff.id.label('staff_id'),
|
|
Staff.name.label('staff_name'),
|
|
Assessment.total_score,
|
|
Assessment.weighted_score
|
|
)
|
|
.join(Staff, Staff.department_id == Department.id)
|
|
.join(Assessment, Assessment.staff_id == Staff.id)
|
|
.where(and_(*conditions))
|
|
.order_by(Department.id, Assessment.weighted_score.desc())
|
|
)
|
|
|
|
# 按科室汇总
|
|
dept_stats = {}
|
|
for row in result.fetchall():
|
|
dept_id = row.id
|
|
if dept_id not in dept_stats:
|
|
dept_stats[dept_id] = {
|
|
'department_id': dept_id,
|
|
'department_name': row.name,
|
|
'dept_type': row.dept_type,
|
|
'staff_count': 0,
|
|
'total_score': 0,
|
|
'avg_score': 0,
|
|
'max_score': 0,
|
|
'min_score': None,
|
|
'staff_list': []
|
|
}
|
|
|
|
dept_stats[dept_id]['staff_count'] += 1
|
|
dept_stats[dept_id]['total_score'] += row.weighted_score or 0
|
|
dept_stats[dept_id]['staff_list'].append({
|
|
'staff_id': row.staff_id,
|
|
'staff_name': row.staff_name,
|
|
'score': row.weighted_score
|
|
})
|
|
|
|
if row.weighted_score:
|
|
if row.weighted_score > dept_stats[dept_id]['max_score']:
|
|
dept_stats[dept_id]['max_score'] = row.weighted_score
|
|
if dept_stats[dept_id]['min_score'] is None or row.weighted_score < dept_stats[dept_id]['min_score']:
|
|
dept_stats[dept_id]['min_score'] = row.weighted_score
|
|
|
|
# 计算平均分
|
|
result_list = []
|
|
for dept in dept_stats.values():
|
|
if dept['staff_count'] > 0:
|
|
dept['avg_score'] = dept['total_score'] / dept['staff_count']
|
|
result_list.append(dept)
|
|
|
|
# 按平均分排序
|
|
result_list.sort(key=lambda x: x['avg_score'], reverse=True)
|
|
|
|
return result_list
|
|
|
|
@staticmethod
|
|
async def get_trend_stats(
|
|
db: AsyncSession,
|
|
department_id: Optional[int] = None,
|
|
period_year: int = None,
|
|
months: int = 6
|
|
) -> List[Dict[str, Any]]:
|
|
"""获取趋势统计(月度)"""
|
|
conditions = [
|
|
Assessment.status == AssessmentStatus.FINALIZED
|
|
]
|
|
if period_year:
|
|
# 查询最近 months 个月的数据
|
|
from datetime import datetime
|
|
current_month = datetime.now().month
|
|
start_month = current_month - months + 1
|
|
if start_month < 1:
|
|
# 跨年份
|
|
conditions.append(
|
|
((Assessment.period_year == period_year - 1) & (Assessment.period_month >= start_month + 12)) |
|
|
((Assessment.period_year == period_year) & (Assessment.period_month <= current_month))
|
|
)
|
|
else:
|
|
conditions.append(Assessment.period_year == period_year)
|
|
conditions.append(Assessment.period_month >= start_month)
|
|
conditions.append(Assessment.period_month <= current_month)
|
|
|
|
if department_id:
|
|
conditions.append(Assessment.staff_id.has(Staff.department_id == department_id))
|
|
|
|
result = await db.execute(
|
|
select(
|
|
Assessment.period_month,
|
|
func.avg(Assessment.total_score).label('avg_score'),
|
|
func.avg(Assessment.weighted_score).label('avg_weighted_score'),
|
|
func.count(Assessment.id).label('count')
|
|
)
|
|
.join(Staff, Staff.id == Assessment.staff_id)
|
|
.where(and_(*conditions))
|
|
.group_by(Assessment.period_month)
|
|
.order_by(Assessment.period_month)
|
|
)
|
|
|
|
return [
|
|
{
|
|
'month': row.period_month,
|
|
'avg_score': float(row.avg_score) if row.avg_score else 0,
|
|
'avg_weighted_score': float(row.avg_weighted_score) if row.avg_weighted_score else 0,
|
|
'count': row.count
|
|
}
|
|
for row in result.fetchall()
|
|
]
|
|
|
|
@staticmethod
|
|
async def get_ranking_stats(
|
|
db: AsyncSession,
|
|
period_year: int = None,
|
|
period_month: int = None,
|
|
limit: int = 10
|
|
) -> List[Dict[str, Any]]:
|
|
"""获取绩效排名"""
|
|
conditions = [
|
|
Assessment.status == AssessmentStatus.FINALIZED
|
|
]
|
|
if period_year:
|
|
conditions.append(Assessment.period_year == period_year)
|
|
if period_month:
|
|
conditions.append(Assessment.period_month == period_month)
|
|
|
|
result = await db.execute(
|
|
select(
|
|
Staff.id,
|
|
Staff.name,
|
|
Staff.employee_id,
|
|
Department.name.label('dept_name'),
|
|
Assessment.total_score,
|
|
Assessment.weighted_score
|
|
)
|
|
.join(Department, Department.id == Staff.department_id)
|
|
.join(Assessment, Assessment.staff_id == Staff.id)
|
|
.where(and_(*conditions))
|
|
.order_by(Assessment.weighted_score.desc())
|
|
.limit(limit)
|
|
)
|
|
|
|
return [
|
|
{
|
|
'staff_id': row.id,
|
|
'staff_name': row.name,
|
|
'employee_id': row.employee_id,
|
|
'department': row.dept_name,
|
|
'total_score': float(row.total_score) if row.total_score else 0,
|
|
'weighted_score': float(row.weighted_score) if row.weighted_score else 0,
|
|
'rank': idx + 1
|
|
}
|
|
for idx, row in enumerate(result.fetchall())
|
|
]
|
|
|
|
@staticmethod
|
|
async def get_completion_stats(
|
|
db: AsyncSession,
|
|
indicator_id: Optional[int] = None,
|
|
period_year: int = None,
|
|
period_month: int = None
|
|
) -> Dict[str, Any]:
|
|
"""获取指标完成度统计"""
|
|
conditions = [
|
|
Assessment.status == AssessmentStatus.FINALIZED
|
|
]
|
|
if period_year:
|
|
conditions.append(Assessment.period_year == period_year)
|
|
if period_month:
|
|
conditions.append(Assessment.period_month == period_month)
|
|
|
|
query = select(
|
|
Indicator.id,
|
|
Indicator.name,
|
|
Indicator.code,
|
|
Indicator.target_value,
|
|
Indicator.max_score,
|
|
func.avg(AssessmentDetail.score).label('avg_score'),
|
|
func.max(AssessmentDetail.score).label('max_score'),
|
|
func.min(AssessmentDetail.score).label('min_score'),
|
|
func.count(AssessmentDetail.id).label('count')
|
|
).join(AssessmentDetail, AssessmentDetail.indicator_id == Indicator.id)
|
|
|
|
if indicator_id:
|
|
conditions.append(Indicator.id == indicator_id)
|
|
|
|
result = await db.execute(
|
|
query.where(and_(*conditions))
|
|
.group_by(Indicator.id, Indicator.name, Indicator.code, Indicator.target_value, Indicator.max_score)
|
|
)
|
|
|
|
indicators = []
|
|
for row in result.fetchall():
|
|
completion_rate = 0
|
|
if row.target_value and row.avg_score:
|
|
completion_rate = (float(row.avg_score) / float(row.target_value)) * 100 if row.target_value else 0
|
|
|
|
indicators.append({
|
|
'indicator_id': row.id,
|
|
'indicator_name': row.name,
|
|
'indicator_code': row.code,
|
|
'target_value': float(row.target_value) if row.target_value else None,
|
|
'max_score': float(row.max_score) if row.max_score else 0,
|
|
'avg_score': float(row.avg_score) if row.avg_score else 0,
|
|
'completion_rate': min(completion_rate, 100), # 最高 100%
|
|
'count': row.count
|
|
})
|
|
|
|
return {'indicators': indicators}
|