""" 员工服务层 """ from typing import Optional, List from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.models.models import Staff, Department from app.schemas.schemas import StaffCreate, StaffUpdate class StaffService: """员工服务""" @staticmethod async def get_list( db: AsyncSession, department_id: Optional[int] = None, status: Optional[str] = None, keyword: Optional[str] = None, page: int = 1, page_size: int = 20 ) -> tuple[List[Staff], int]: """获取员工列表""" query = select(Staff).options(selectinload(Staff.department)) if department_id: query = query.where(Staff.department_id == department_id) if status: query = query.where(Staff.status == status) if keyword: query = query.where( (Staff.name.ilike(f"%{keyword}%")) | (Staff.employee_id.ilike(f"%{keyword}%")) ) # 统计总数 count_query = select(func.count()).select_from(query.subquery()) total = await db.scalar(count_query) # 分页 query = query.order_by(Staff.id.desc()) query = query.offset((page - 1) * page_size).limit(page_size) result = await db.execute(query) staff_list = result.scalars().all() return staff_list, total or 0 @staticmethod async def get_by_id(db: AsyncSession, staff_id: int) -> Optional[Staff]: """根据ID获取员工""" result = await db.execute( select(Staff) .options(selectinload(Staff.department)) .where(Staff.id == staff_id) ) return result.scalar_one_or_none() @staticmethod async def get_by_employee_id(db: AsyncSession, employee_id: str) -> Optional[Staff]: """根据工号获取员工""" result = await db.execute( select(Staff).where(Staff.employee_id == employee_id) ) return result.scalar_one_or_none() @staticmethod async def create(db: AsyncSession, staff_data: StaffCreate) -> Staff: """创建员工""" staff = Staff(**staff_data.model_dump()) db.add(staff) await db.flush() await db.refresh(staff) return staff @staticmethod async def update(db: AsyncSession, staff_id: int, staff_data: StaffUpdate) -> Optional[Staff]: """更新员工""" staff = await StaffService.get_by_id(db, staff_id) if not staff: return None update_data = staff_data.model_dump(exclude_unset=True) for key, value in update_data.items(): setattr(staff, key, value) await db.flush() await db.refresh(staff) return staff @staticmethod async def delete(db: AsyncSession, staff_id: int) -> bool: """删除员工""" staff = await StaffService.get_by_id(db, staff_id) if not staff: return False await db.delete(staff) return True @staticmethod async def get_by_department(db: AsyncSession, department_id: int) -> List[Staff]: """获取科室下所有员工""" result = await db.execute( select(Staff) .where(Staff.department_id == department_id, Staff.status == "active") .order_by(Staff.name) ) return list(result.scalars().all())