112 lines
3.5 KiB
Python
112 lines
3.5 KiB
Python
"""
|
|
员工服务层
|
|
"""
|
|
from typing import Optional, List
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from app.models.models import Staff, Department
|
|
from app.schemas.schemas import StaffCreate, StaffUpdate
|
|
|
|
|
|
class StaffService:
|
|
"""员工服务"""
|
|
|
|
@staticmethod
|
|
async def get_list(
|
|
db: AsyncSession,
|
|
department_id: Optional[int] = None,
|
|
status: Optional[str] = None,
|
|
keyword: Optional[str] = None,
|
|
page: int = 1,
|
|
page_size: int = 20
|
|
) -> tuple[List[Staff], int]:
|
|
"""获取员工列表"""
|
|
query = select(Staff).options(selectinload(Staff.department))
|
|
|
|
if department_id:
|
|
query = query.where(Staff.department_id == department_id)
|
|
if status:
|
|
query = query.where(Staff.status == status)
|
|
if keyword:
|
|
query = query.where(
|
|
(Staff.name.ilike(f"%{keyword}%")) |
|
|
(Staff.employee_id.ilike(f"%{keyword}%"))
|
|
)
|
|
|
|
# 统计总数
|
|
count_query = select(func.count()).select_from(query.subquery())
|
|
total = await db.scalar(count_query)
|
|
|
|
# 分页
|
|
query = query.order_by(Staff.id.desc())
|
|
query = query.offset((page - 1) * page_size).limit(page_size)
|
|
|
|
result = await db.execute(query)
|
|
staff_list = result.scalars().all()
|
|
|
|
return staff_list, total or 0
|
|
|
|
@staticmethod
|
|
async def get_by_id(db: AsyncSession, staff_id: int) -> Optional[Staff]:
|
|
"""根据ID获取员工"""
|
|
result = await db.execute(
|
|
select(Staff)
|
|
.options(selectinload(Staff.department))
|
|
.where(Staff.id == staff_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def get_by_employee_id(db: AsyncSession, employee_id: str) -> Optional[Staff]:
|
|
"""根据工号获取员工"""
|
|
result = await db.execute(
|
|
select(Staff).where(Staff.employee_id == employee_id)
|
|
)
|
|
return result.scalar_one_or_none()
|
|
|
|
@staticmethod
|
|
async def create(db: AsyncSession, staff_data: StaffCreate) -> Staff:
|
|
"""创建员工"""
|
|
staff = Staff(**staff_data.model_dump())
|
|
db.add(staff)
|
|
await db.flush()
|
|
await db.refresh(staff)
|
|
return staff
|
|
|
|
@staticmethod
|
|
async def update(db: AsyncSession, staff_id: int, staff_data: StaffUpdate) -> Optional[Staff]:
|
|
"""更新员工"""
|
|
staff = await StaffService.get_by_id(db, staff_id)
|
|
if not staff:
|
|
return None
|
|
|
|
update_data = staff_data.model_dump(exclude_unset=True)
|
|
for key, value in update_data.items():
|
|
setattr(staff, key, value)
|
|
|
|
await db.flush()
|
|
await db.refresh(staff)
|
|
return staff
|
|
|
|
@staticmethod
|
|
async def delete(db: AsyncSession, staff_id: int) -> bool:
|
|
"""删除员工"""
|
|
staff = await StaffService.get_by_id(db, staff_id)
|
|
if not staff:
|
|
return False
|
|
|
|
await db.delete(staff)
|
|
return True
|
|
|
|
@staticmethod
|
|
async def get_by_department(db: AsyncSession, department_id: int) -> List[Staff]:
|
|
"""获取科室下所有员工"""
|
|
result = await db.execute(
|
|
select(Staff)
|
|
.where(Staff.department_id == department_id, Staff.status == "active")
|
|
.order_by(Staff.name)
|
|
)
|
|
return list(result.scalars().all())
|