Files
hospital_performance/backend/app/services/template_service.py
2026-02-28 15:06:52 +08:00

293 lines
9.7 KiB
Python

"""
指标模板服务层
"""
import json
from typing import Optional, List, Dict, Any
from sqlalchemy import select, func, delete
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.models.models import (
IndicatorTemplate, TemplateIndicator, Indicator,
TemplateType, BSCDimension
)
from app.schemas.schemas import (
IndicatorTemplateCreate, IndicatorTemplateUpdate,
TemplateIndicatorCreate, TemplateIndicatorUpdate
)
class TemplateService:
"""指标模板服务"""
@staticmethod
async def get_list(
db: AsyncSession,
template_type: Optional[str] = None,
is_active: Optional[bool] = None,
page: int = 1,
page_size: int = 20
) -> tuple[List[Dict], int]:
"""获取模板列表"""
query = select(IndicatorTemplate)
if template_type:
query = query.where(IndicatorTemplate.template_type == template_type)
if is_active is not None:
query = query.where(IndicatorTemplate.is_active == is_active)
# 统计总数
count_query = select(func.count()).select_from(query.subquery())
total = await db.scalar(count_query)
# 分页
query = query.order_by(IndicatorTemplate.template_type, IndicatorTemplate.id)
query = query.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(query)
templates = result.scalars().all()
# 获取每个模板的指标数量
template_list = []
for t in templates:
indicator_count = await db.scalar(
select(func.count()).where(TemplateIndicator.template_id == t.id)
)
template_dict = {
"id": t.id,
"template_name": t.template_name,
"template_code": t.template_code,
"template_type": t.template_type.value,
"description": t.description,
"dimension_weights": t.dimension_weights,
"assessment_cycle": t.assessment_cycle,
"is_active": t.is_active,
"indicator_count": indicator_count or 0,
"created_at": t.created_at,
"updated_at": t.updated_at
}
template_list.append(template_dict)
return template_list, total or 0
@staticmethod
async def get_by_id(db: AsyncSession, template_id: int) -> Optional[IndicatorTemplate]:
"""根据 ID 获取模板"""
result = await db.execute(
select(IndicatorTemplate)
.options(selectinload(IndicatorTemplate.indicators).selectinload(TemplateIndicator.indicator))
.where(IndicatorTemplate.id == template_id)
)
return result.scalar_one_or_none()
@staticmethod
async def get_by_code(db: AsyncSession, template_code: str) -> Optional[IndicatorTemplate]:
"""根据编码获取模板"""
result = await db.execute(
select(IndicatorTemplate).where(IndicatorTemplate.template_code == template_code)
)
return result.scalar_one_or_none()
@staticmethod
async def create(
db: AsyncSession,
template_data: IndicatorTemplateCreate
) -> IndicatorTemplate:
"""创建模板"""
# 创建模板
template = IndicatorTemplate(
template_name=template_data.template_name,
template_code=template_data.template_code,
template_type=template_data.template_type,
description=template_data.description,
dimension_weights=template_data.dimension_weights,
assessment_cycle=template_data.assessment_cycle
)
db.add(template)
await db.flush()
# 添加指标关联
if template_data.indicators:
for idx, ind_data in enumerate(template_data.indicators):
ti = TemplateIndicator(
template_id=template.id,
indicator_id=ind_data.indicator_id,
category=ind_data.category,
target_value=ind_data.target_value,
target_unit=ind_data.target_unit,
weight=ind_data.weight,
scoring_method=ind_data.scoring_method,
scoring_params=ind_data.scoring_params,
sort_order=ind_data.sort_order or idx,
remark=ind_data.remark
)
db.add(ti)
await db.commit()
await db.refresh(template)
return template
@staticmethod
async def update(
db: AsyncSession,
template_id: int,
template_data: IndicatorTemplateUpdate
) -> Optional[IndicatorTemplate]:
"""更新模板"""
template = await TemplateService.get_by_id(db, template_id)
if not template:
return None
update_data = template_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(template, key, value)
await db.commit()
await db.refresh(template)
return template
@staticmethod
async def delete(db: AsyncSession, template_id: int) -> bool:
"""删除模板"""
template = await TemplateService.get_by_id(db, template_id)
if not template:
return False
await db.delete(template)
await db.commit()
return True
@staticmethod
async def add_indicator(
db: AsyncSession,
template_id: int,
indicator_data: TemplateIndicatorCreate
) -> Optional[TemplateIndicator]:
"""添加模板指标"""
# 检查模板是否存在
template = await db.execute(
select(IndicatorTemplate).where(IndicatorTemplate.id == template_id)
)
if not template.scalar_one_or_none():
return None
# 检查指标是否已存在
existing = await db.execute(
select(TemplateIndicator).where(
TemplateIndicator.template_id == template_id,
TemplateIndicator.indicator_id == indicator_data.indicator_id
)
)
if existing.scalar_one_or_none():
return None
# 获取最大排序
max_order = await db.scalar(
select(func.max(TemplateIndicator.sort_order)).where(
TemplateIndicator.template_id == template_id
)
)
ti = TemplateIndicator(
template_id=template_id,
indicator_id=indicator_data.indicator_id,
category=indicator_data.category,
target_value=indicator_data.target_value,
target_unit=indicator_data.target_unit,
weight=indicator_data.weight,
scoring_method=indicator_data.scoring_method,
scoring_params=indicator_data.scoring_params,
sort_order=indicator_data.sort_order or (max_order or 0) + 1,
remark=indicator_data.remark
)
db.add(ti)
await db.commit()
await db.refresh(ti)
return ti
@staticmethod
async def update_indicator(
db: AsyncSession,
template_id: int,
indicator_id: int,
indicator_data: TemplateIndicatorUpdate
) -> Optional[TemplateIndicator]:
"""更新模板指标"""
result = await db.execute(
select(TemplateIndicator).where(
TemplateIndicator.template_id == template_id,
TemplateIndicator.indicator_id == indicator_id
)
)
ti = result.scalar_one_or_none()
if not ti:
return None
update_data = indicator_data.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(ti, key, value)
await db.commit()
await db.refresh(ti)
return ti
@staticmethod
async def remove_indicator(
db: AsyncSession,
template_id: int,
indicator_id: int
) -> bool:
"""移除模板指标"""
result = await db.execute(
select(TemplateIndicator).where(
TemplateIndicator.template_id == template_id,
TemplateIndicator.indicator_id == indicator_id
)
)
ti = result.scalar_one_or_none()
if not ti:
return False
await db.delete(ti)
await db.commit()
return True
@staticmethod
async def get_template_indicators(
db: AsyncSession,
template_id: int
) -> List[TemplateIndicator]:
"""获取模板的所有指标"""
result = await db.execute(
select(TemplateIndicator)
.options(selectinload(TemplateIndicator.indicator))
.where(TemplateIndicator.template_id == template_id)
.order_by(TemplateIndicator.sort_order)
)
return result.scalars().all()
@staticmethod
def get_template_type_label(template_type: str) -> str:
"""获取模板类型标签"""
type_map = {
"general": "通用模板",
"surgical": "手术临床科室",
"nonsurgical_ward": "非手术有病房科室",
"nonsurgical_noward": "非手术无病房科室",
"medical_tech": "医技科室",
"nursing": "护理单元",
"admin": "行政科室",
"logistics": "后勤科室"
}
return type_map.get(template_type, template_type)
@staticmethod
def get_dimension_label(dimension: str) -> str:
"""获取维度标签"""
dimension_map = {
"financial": "财务管理",
"customer": "顾客服务",
"internal_process": "内部流程",
"learning_growth": "学习与成长"
}
return dimension_map.get(dimension, dimension)