Files
hospital_performance/backend/app/services/finance_service.py
2026-02-28 15:06:52 +08:00

368 lines
13 KiB
Python

"""
财务核算服务层
"""
from typing import Optional, List, Dict, Any
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from decimal import Decimal
from datetime import datetime
from app.models.finance import (
DepartmentFinance, RevenueCategory, ExpenseCategory, FinanceType
)
from app.models.models import Department
class FinanceService:
"""财务核算服务"""
# 收入类别标签映射
REVENUE_LABELS = {
RevenueCategory.EXAMINATION: "检查费",
RevenueCategory.LAB_TEST: "检验费",
RevenueCategory.RADIOLOGY: "放射费",
RevenueCategory.BED: "床位费",
RevenueCategory.NURSING: "护理费",
RevenueCategory.TREATMENT: "治疗费",
RevenueCategory.SURGERY: "手术费",
RevenueCategory.INJECTION: "注射费",
RevenueCategory.OXYGEN: "吸氧费",
RevenueCategory.OTHER: "其他",
}
# 支出类别标签映射
EXPENSE_LABELS = {
ExpenseCategory.MATERIAL: "材料费",
ExpenseCategory.PERSONNEL: "人员支出",
ExpenseCategory.MAINTENANCE: "维修费",
ExpenseCategory.UTILITY: "水电费",
ExpenseCategory.OTHER: "其他",
}
@staticmethod
async def get_department_revenue(
db: AsyncSession,
department_id: Optional[int] = None,
period_year: Optional[int] = None,
period_month: Optional[int] = None
) -> List[Dict[str, Any]]:
"""获取科室收入"""
query = select(DepartmentFinance).options(
selectinload(DepartmentFinance.department)
).where(DepartmentFinance.finance_type == FinanceType.REVENUE)
if department_id:
query = query.where(DepartmentFinance.department_id == department_id)
if period_year:
query = query.where(DepartmentFinance.period_year == period_year)
if period_month:
query = query.where(DepartmentFinance.period_month == period_month)
query = query.order_by(
DepartmentFinance.period_year.desc(),
DepartmentFinance.period_month.desc(),
DepartmentFinance.id.desc()
)
result = await db.execute(query)
records = result.scalars().all()
# 转换为字典列表并添加额外信息
data = []
for record in records:
record_dict = {
"id": record.id,
"department_id": record.department_id,
"department_name": record.department.name if record.department else None,
"period_year": record.period_year,
"period_month": record.period_month,
"category": record.category,
"category_label": FinanceService.REVENUE_LABELS.get(
RevenueCategory(record.category), record.category
),
"amount": float(record.amount),
"source": record.source,
"remark": record.remark,
"created_at": record.created_at,
}
data.append(record_dict)
return data
@staticmethod
async def get_department_expense(
db: AsyncSession,
department_id: Optional[int] = None,
period_year: Optional[int] = None,
period_month: Optional[int] = None
) -> List[Dict[str, Any]]:
"""获取科室支出"""
query = select(DepartmentFinance).options(
selectinload(DepartmentFinance.department)
).where(DepartmentFinance.finance_type == FinanceType.EXPENSE)
if department_id:
query = query.where(DepartmentFinance.department_id == department_id)
if period_year:
query = query.where(DepartmentFinance.period_year == period_year)
if period_month:
query = query.where(DepartmentFinance.period_month == period_month)
query = query.order_by(
DepartmentFinance.period_year.desc(),
DepartmentFinance.period_month.desc(),
DepartmentFinance.id.desc()
)
result = await db.execute(query)
records = result.scalars().all()
# 转换为字典列表并添加额外信息
data = []
for record in records:
record_dict = {
"id": record.id,
"department_id": record.department_id,
"department_name": record.department.name if record.department else None,
"period_year": record.period_year,
"period_month": record.period_month,
"category": record.category,
"category_label": FinanceService.EXPENSE_LABELS.get(
ExpenseCategory(record.category), record.category
),
"amount": float(record.amount),
"source": record.source,
"remark": record.remark,
"created_at": record.created_at,
}
data.append(record_dict)
return data
@staticmethod
async def get_department_balance(
db: AsyncSession,
department_id: Optional[int] = None,
period_year: Optional[int] = None,
period_month: Optional[int] = None
) -> Dict[str, Any]:
"""获取收支结余"""
# 构建基础查询条件
conditions = []
if department_id:
conditions.append(DepartmentFinance.department_id == department_id)
if period_year:
conditions.append(DepartmentFinance.period_year == period_year)
if period_month:
conditions.append(DepartmentFinance.period_month == period_month)
# 查询总收入
revenue_query = select(func.coalesce(func.sum(DepartmentFinance.amount), 0)).where(
and_(
DepartmentFinance.finance_type == FinanceType.REVENUE,
*conditions
)
)
total_revenue = await db.scalar(revenue_query) or 0
# 查询总支出
expense_query = select(func.coalesce(func.sum(DepartmentFinance.amount), 0)).where(
and_(
DepartmentFinance.finance_type == FinanceType.EXPENSE,
*conditions
)
)
total_expense = await db.scalar(expense_query) or 0
# 计算结余
balance = float(total_revenue) - float(total_expense)
# 获取科室名称
department_name = None
if department_id:
dept_result = await db.execute(
select(Department).where(Department.id == department_id)
)
dept = dept_result.scalar_one_or_none()
department_name = dept.name if dept else None
return {
"department_id": department_id,
"department_name": department_name,
"period_year": period_year,
"period_month": period_month,
"total_revenue": float(total_revenue),
"total_expense": float(total_expense),
"balance": balance,
}
@staticmethod
async def get_revenue_by_category(
db: AsyncSession,
department_id: Optional[int] = None,
period_year: Optional[int] = None,
period_month: Optional[int] = None
) -> List[Dict[str, Any]]:
"""按类别统计收入"""
conditions = [DepartmentFinance.finance_type == FinanceType.REVENUE]
if department_id:
conditions.append(DepartmentFinance.department_id == department_id)
if period_year:
conditions.append(DepartmentFinance.period_year == period_year)
if period_month:
conditions.append(DepartmentFinance.period_month == period_month)
query = select(
DepartmentFinance.category,
func.sum(DepartmentFinance.amount).label("total_amount")
).where(and_(*conditions)).group_by(DepartmentFinance.category)
result = await db.execute(query)
rows = result.all()
data = []
for row in rows:
category = row.category
data.append({
"category": category,
"category_label": FinanceService.REVENUE_LABELS.get(
RevenueCategory(category), category
),
"amount": float(row.total_amount),
})
return data
@staticmethod
async def get_expense_by_category(
db: AsyncSession,
department_id: Optional[int] = None,
period_year: Optional[int] = None,
period_month: Optional[int] = None
) -> List[Dict[str, Any]]:
"""按类别统计支出"""
conditions = [DepartmentFinance.finance_type == FinanceType.EXPENSE]
if department_id:
conditions.append(DepartmentFinance.department_id == department_id)
if period_year:
conditions.append(DepartmentFinance.period_year == period_year)
if period_month:
conditions.append(DepartmentFinance.period_month == period_month)
query = select(
DepartmentFinance.category,
func.sum(DepartmentFinance.amount).label("total_amount")
).where(and_(*conditions)).group_by(DepartmentFinance.category)
result = await db.execute(query)
rows = result.all()
data = []
for row in rows:
category = row.category
data.append({
"category": category,
"category_label": FinanceService.EXPENSE_LABELS.get(
ExpenseCategory(category), category
),
"amount": float(row.total_amount),
})
return data
@staticmethod
async def create_finance_record(
db: AsyncSession,
department_id: int,
finance_type: FinanceType,
category: str,
amount: float,
period_year: int,
period_month: int,
source: Optional[str] = None,
remark: Optional[str] = None
) -> DepartmentFinance:
"""创建财务记录"""
record = DepartmentFinance(
department_id=department_id,
finance_type=finance_type,
category=category,
amount=amount,
period_year=period_year,
period_month=period_month,
source=source,
remark=remark
)
db.add(record)
await db.flush()
await db.refresh(record)
return record
@staticmethod
async def get_by_id(db: AsyncSession, record_id: int) -> Optional[DepartmentFinance]:
"""根据ID获取财务记录"""
result = await db.execute(
select(DepartmentFinance)
.options(selectinload(DepartmentFinance.department))
.where(DepartmentFinance.id == record_id)
)
return result.scalar_one_or_none()
@staticmethod
async def update_finance_record(
db: AsyncSession,
record_id: int,
**kwargs
) -> Optional[DepartmentFinance]:
"""更新财务记录"""
record = await FinanceService.get_by_id(db, record_id)
if not record:
return None
for key, value in kwargs.items():
if value is not None and hasattr(record, key):
setattr(record, key, value)
await db.flush()
await db.refresh(record)
return record
@staticmethod
async def delete_finance_record(db: AsyncSession, record_id: int) -> bool:
"""删除财务记录"""
record = await FinanceService.get_by_id(db, record_id)
if not record:
return False
await db.delete(record)
return True
@staticmethod
async def get_department_summary(
db: AsyncSession,
period_year: int,
period_month: int
) -> List[Dict[str, Any]]:
"""获取所有科室的财务汇总"""
# 获取所有科室
dept_result = await db.execute(
select(Department).where(Department.is_active == True).order_by(Department.name)
)
departments = dept_result.scalars().all()
summaries = []
for dept in departments:
balance_data = await FinanceService.get_department_balance(
db, dept.id, period_year, period_month
)
summaries.append({
"department_id": dept.id,
"department_name": dept.name,
"total_revenue": balance_data["total_revenue"],
"total_expense": balance_data["total_expense"],
"balance": balance_data["balance"],
})
return summaries