""" 工资核算服务层 """ from typing import Optional, List from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from decimal import Decimal from app.models.models import SalaryRecord, Staff, Assessment from app.schemas.schemas import SalaryRecordCreate, SalaryRecordUpdate class SalaryService: """工资核算服务""" # 绩效奖金基数(可根据医院实际情况调整) PERFORMANCE_BASE = 3000.0 @staticmethod async def get_list( db: AsyncSession, staff_id: Optional[int] = None, department_id: Optional[int] = None, period_year: Optional[int] = None, period_month: Optional[int] = None, status: Optional[str] = None, page: int = 1, page_size: int = 20 ) -> tuple[List[SalaryRecord], int]: """获取工资记录列表""" query = select(SalaryRecord).options( selectinload(SalaryRecord.staff).selectinload(Staff.department) ) if staff_id: query = query.where(SalaryRecord.staff_id == staff_id) if department_id: query = query.join(Staff).where(Staff.department_id == department_id) if period_year: query = query.where(SalaryRecord.period_year == period_year) if period_month: query = query.where(SalaryRecord.period_month == period_month) if status: query = query.where(SalaryRecord.status == status) # 统计总数 count_query = select(func.count()).select_from(query.subquery()) total = await db.scalar(count_query) # 分页 query = query.order_by(SalaryRecord.period_year.desc(), SalaryRecord.period_month.desc(), SalaryRecord.id.desc()) query = query.offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) records = result.scalars().all() return records, total or 0 @staticmethod async def get_by_id(db: AsyncSession, record_id: int) -> Optional[SalaryRecord]: """根据ID获取工资记录""" result = await db.execute( select(SalaryRecord) .options(selectinload(SalaryRecord.staff).selectinload(Staff.department)) .where(SalaryRecord.id == record_id) ) return result.scalar_one_or_none() @staticmethod async def calculate_performance_bonus(performance_score: float, performance_ratio: float) -> float: """计算绩效奖金""" # 绩效奖金 = 绩效基数 × (绩效得分/100) × 绩效系数 return SalaryService.PERFORMANCE_BASE * (performance_score / 100) * performance_ratio @staticmethod async def create(db: AsyncSession, record_data: SalaryRecordCreate) -> SalaryRecord: """创建工资记录""" # 获取员工信息 staff_result = await db.execute( select(Staff).where(Staff.id == record_data.staff_id) ) staff = staff_result.scalar_one_or_none() # 计算总工资 total_salary = ( record_data.base_salary + record_data.performance_bonus + record_data.allowance - record_data.deduction ) record = SalaryRecord( **record_data.model_dump(), total_salary=total_salary, status="pending" ) db.add(record) await db.flush() await db.refresh(record) return record @staticmethod async def update(db: AsyncSession, record_id: int, record_data: SalaryRecordUpdate) -> Optional[SalaryRecord]: """更新工资记录""" record = await SalaryService.get_by_id(db, record_id) if not record or record.status != "pending": return None update_data = record_data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(record, key, value) # 重新计算总工资 record.total_salary = ( record.base_salary + record.performance_bonus + record.allowance - record.deduction ) await db.flush() await db.refresh(record) return record @staticmethod async def generate_from_assessment( db: AsyncSession, staff_id: int, period_year: int, period_month: int ) -> Optional[SalaryRecord]: """根据考核记录生成工资记录""" # 获取员工信息 staff_result = await db.execute( select(Staff).where(Staff.id == staff_id) ) staff = staff_result.scalar_one_or_none() if not staff: return None # 获取考核记录 assessment_result = await db.execute( select(Assessment).where( Assessment.staff_id == staff_id, Assessment.period_year == period_year, Assessment.period_month == period_month, Assessment.status == "finalized" ) ) assessment = assessment_result.scalar_one_or_none() if not assessment: return None # 检查是否已存在工资记录 existing = await db.execute( select(SalaryRecord).where( SalaryRecord.staff_id == staff_id, SalaryRecord.period_year == period_year, SalaryRecord.period_month == period_month ) ) if existing.scalar_one_or_none(): return None # 计算绩效奖金 performance_bonus = await SalaryService.calculate_performance_bonus( float(assessment.weighted_score), float(staff.performance_ratio) ) # 创建工资记录 total_salary = float(staff.base_salary) + performance_bonus record = SalaryRecord( staff_id=staff_id, period_year=period_year, period_month=period_month, base_salary=float(staff.base_salary), performance_score=float(assessment.weighted_score), performance_bonus=performance_bonus, deduction=0, allowance=0, total_salary=total_salary, status="pending" ) db.add(record) await db.flush() await db.refresh(record) return record @staticmethod async def batch_generate_for_department( db: AsyncSession, department_id: int, period_year: int, period_month: int ) -> List[SalaryRecord]: """为科室批量生成工资记录""" # 获取科室所有已确认考核的员工 result = await db.execute( select(Assessment).join(Staff).where( Staff.department_id == department_id, Assessment.period_year == period_year, Assessment.period_month == period_month, Assessment.status == "finalized" ) ) assessments = result.scalars().all() records = [] for assessment in assessments: record = await SalaryService.generate_from_assessment( db, assessment.staff_id, period_year, period_month ) if record: records.append(record) return records @staticmethod async def confirm(db: AsyncSession, record_id: int) -> Optional[SalaryRecord]: """确认工资""" record = await SalaryService.get_by_id(db, record_id) if not record or record.status != "pending": return None record.status = "confirmed" await db.flush() await db.refresh(record) return record @staticmethod async def batch_confirm( db: AsyncSession, period_year: int, period_month: int, department_id: Optional[int] = None ) -> int: """批量确认工资""" query = select(SalaryRecord).where( SalaryRecord.period_year == period_year, SalaryRecord.period_month == period_month, SalaryRecord.status == "pending" ) if department_id: query = query.join(Staff).where(Staff.department_id == department_id) result = await db.execute(query) records = result.scalars().all() count = 0 for record in records: record.status = "confirmed" count += 1 await db.flush() return count