Files
hospital_performance/backend/app/services/assessment_service.py
2026-02-28 15:06:52 +08:00

263 lines
9.1 KiB
Python

"""
绩效考核服务层
"""
from typing import Optional, List
from datetime import datetime
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.models import Assessment, AssessmentDetail, Staff, Indicator, AssessmentStatus
from app.schemas.schemas import AssessmentCreate, AssessmentUpdate
class AssessmentService:
"""绩效考核服务"""
@staticmethod
async def get_list(
db: AsyncSession,
staff_id: Optional[int] = None,
department_id: Optional[int] = None,
period_year: Optional[int] = None,
period_month: Optional[int] = None,
status: Optional[str] = None,
page: int = 1,
page_size: int = 20
) -> tuple[List[Assessment], int]:
"""获取考核列表"""
query = select(Assessment).options(
selectinload(Assessment.staff).selectinload(Staff.department)
)
if staff_id:
query = query.where(Assessment.staff_id == staff_id)
if department_id:
query = query.join(Staff).where(Staff.department_id == department_id)
if period_year:
query = query.where(Assessment.period_year == period_year)
if period_month:
query = query.where(Assessment.period_month == period_month)
if status:
query = query.where(Assessment.status == status)
# 统计总数
count_query = select(func.count()).select_from(query.subquery())
total = await db.scalar(count_query)
# 分页
query = query.order_by(Assessment.period_year.desc(), Assessment.period_month.desc(), Assessment.id.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
assessments = result.scalars().all()
return assessments, total or 0
@staticmethod
async def get_by_id(db: AsyncSession, assessment_id: int) -> Optional[Assessment]:
"""根据ID获取考核详情"""
result = await db.execute(
select(Assessment)
.options(
selectinload(Assessment.staff).selectinload(Staff.department),
selectinload(Assessment.details).selectinload(AssessmentDetail.indicator)
)
.where(Assessment.id == assessment_id)
)
return result.scalar_one_or_none()
@staticmethod
async def create(db: AsyncSession, assessment_data: AssessmentCreate, assessor_id: Optional[int] = None) -> Assessment:
"""创建考核记录"""
# 计算总分
total_score = sum(d.score for d in assessment_data.details)
# 获取指标权重计算加权得分
weighted_score = 0.0
for detail in assessment_data.details:
indicator = await db.execute(
select(Indicator).where(Indicator.id == detail.indicator_id)
)
ind = indicator.scalar_one_or_none()
if ind:
weighted_score += detail.score * float(ind.weight)
assessment = Assessment(
staff_id=assessment_data.staff_id,
period_year=assessment_data.period_year,
period_month=assessment_data.period_month,
period_type=assessment_data.period_type,
total_score=total_score,
weighted_score=weighted_score,
assessor_id=assessor_id,
)
db.add(assessment)
await db.flush()
# 创建明细
for detail_data in assessment_data.details:
detail = AssessmentDetail(
assessment_id=assessment.id,
**detail_data.model_dump()
)
db.add(detail)
await db.flush()
await db.refresh(assessment)
return assessment
@staticmethod
async def update(db: AsyncSession, assessment_id: int, assessment_data: AssessmentUpdate) -> Optional[Assessment]:
"""更新考核记录"""
assessment = await AssessmentService.get_by_id(db, assessment_id)
if not assessment:
return None
if assessment.status not in ["draft", "rejected"]:
return None
if assessment_data.details is not None:
# 删除旧明细
await db.execute(
select(AssessmentDetail).where(AssessmentDetail.assessment_id == assessment_id)
)
for detail in assessment.details:
await db.delete(detail)
# 创建新明细
total_score = 0.0
weighted_score = 0.0
for detail_data in assessment_data.details:
detail = AssessmentDetail(
assessment_id=assessment_id,
**detail_data.model_dump()
)
db.add(detail)
total_score += detail_data.score
# 获取权重
indicator = await db.execute(
select(Indicator).where(Indicator.id == detail_data.indicator_id)
)
ind = indicator.scalar_one_or_none()
if ind:
weighted_score += detail_data.score * float(ind.weight)
assessment.total_score = total_score
assessment.weighted_score = weighted_score
if assessment_data.remark is not None:
assessment.remark = assessment_data.remark
await db.flush()
await db.refresh(assessment)
return assessment
@staticmethod
async def submit(db: AsyncSession, assessment_id: int) -> Optional[Assessment]:
"""提交考核"""
assessment = await AssessmentService.get_by_id(db, assessment_id)
if not assessment or assessment.status != AssessmentStatus.DRAFT:
return None
assessment.status = AssessmentStatus.SUBMITTED
assessment.submit_time = datetime.utcnow()
await db.flush()
await db.refresh(assessment)
return assessment
@staticmethod
async def review(db: AsyncSession, assessment_id: int, reviewer_id: int, approved: bool, remark: Optional[str] = None) -> Optional[Assessment]:
"""审核考核"""
assessment = await AssessmentService.get_by_id(db, assessment_id)
if not assessment or assessment.status != AssessmentStatus.SUBMITTED:
return None
assessment.reviewer_id = reviewer_id
assessment.review_time = datetime.utcnow()
if approved:
assessment.status = AssessmentStatus.REVIEWED
else:
assessment.status = AssessmentStatus.REJECTED
if remark:
assessment.remark = remark
await db.flush()
await db.refresh(assessment)
return assessment
@staticmethod
async def finalize(db: AsyncSession, assessment_id: int) -> Optional[Assessment]:
"""确认考核"""
assessment = await AssessmentService.get_by_id(db, assessment_id)
if not assessment or assessment.status != AssessmentStatus.REVIEWED:
return None
assessment.status = AssessmentStatus.FINALIZED
await db.flush()
await db.refresh(assessment)
return assessment
@staticmethod
async def batch_create_for_department(
db: AsyncSession,
department_id: int,
period_year: int,
period_month: int,
indicators: List[int]
) -> List[Assessment]:
"""为科室批量创建考核"""
# 获取科室所有在职员工
staff_result = await db.execute(
select(Staff).where(
Staff.department_id == department_id,
Staff.status == "active"
)
)
staff_list = staff_result.scalars().all()
assessments = []
for staff in staff_list:
# 检查是否已存在
existing = await db.execute(
select(Assessment).where(
Assessment.staff_id == staff.id,
Assessment.period_year == period_year,
Assessment.period_month == period_month
)
)
if existing.scalar_one_or_none():
continue
# 创建考核记录
assessment = Assessment(
staff_id=staff.id,
period_year=period_year,
period_month=period_month,
period_type="monthly",
total_score=0,
weighted_score=0,
)
db.add(assessment)
await db.flush()
# 创建明细
for indicator_id in indicators:
detail = AssessmentDetail(
assessment_id=assessment.id,
indicator_id=indicator_id,
score=0
)
db.add(detail)
assessments.append(assessment)
await db.flush()
return assessments