feat(dataflow): 为所有Handler添加重试机制

This commit is contained in:
2026-06-20 21:36:47 +08:00
parent f6680122eb
commit 1c2bf43d42
12 changed files with 327 additions and 20 deletions

View File

@@ -0,0 +1,22 @@
package com.healthlink.his.web.dataflow.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class EventRetryConfig {
@Bean("eventRetryExecutor")
public Executor eventRetryExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("event-retry-");
executor.initialize();
return executor;
}
}

View File

@@ -0,0 +1,50 @@
package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.administration.domain.ChargeItem;
import com.healthlink.his.administration.service.IChargeItemService;
import com.healthlink.his.common.enums.ChargeItemStatus;
import com.healthlink.his.web.dataflow.event.MedicationDispensedEvent;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.util.Date;
/**
* Chain 3: 药品→发药自动计费 — 发药后自动创建收费项
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AutoBillingHandler {
private final IChargeItemService chargeItemService;
@Async
@EventListener
public void onMedicationDispensed(MedicationDispensedEvent event) {
log.info("Chain3 AutoBilling: encounterId={}, dispenseId={}, itemId={}",
event.getEncounterId(), event.getDispenseId(), event.getItemId());
EventRetryUtil.executeVoidWithRetry("3-AutoBilling", () -> {
BigDecimal amount = event.getQuantity().multiply(event.getUnitPrice());
ChargeItem chargeItem = new ChargeItem();
chargeItem.setEncounterId(event.getEncounterId());
chargeItem.setProductId(event.getItemId());
chargeItem.setQuantityValue(event.getQuantity());
chargeItem.setUnitPrice(event.getUnitPrice());
chargeItem.setTotalPrice(amount);
chargeItem.setStatusEnum(ChargeItemStatus.BILLABLE.getValue());
chargeItem.setOccurrenceTime(new Date());
chargeItem.setDispenseId(event.getDispenseId());
chargeItemService.save(chargeItem);
log.info("Chain3 AutoBilling: charge created, amount={}, encounterId={}",
amount, event.getEncounterId());
}, 3);
}
}

View File

@@ -0,0 +1,48 @@
package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.criticalvalue.domain.CriticalValue;
import com.healthlink.his.criticalvalue.service.ICriticalValueService;
import com.healthlink.his.web.dataflow.event.LabReportPublishedEvent;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* Chain 4: 检验→危急值推送 — 检验报告发布后推送危急值
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class CriticalValueHandler {
private final ICriticalValueService criticalValueService;
@Async
@EventListener
public void onLabReportPublished(LabReportPublishedEvent event) {
log.info("Chain4 CriticalValue: patientId={}, testItem={}, isCritical={}",
event.getPatientId(), event.getTestItem(), event.getIsCritical());
EventRetryUtil.executeVoidWithRetry("4-CriticalValue", () -> {
if (!Boolean.TRUE.equals(event.getIsCritical())) {
return;
}
CriticalValue criticalValue = new CriticalValue();
criticalValue.setPatientId(event.getPatientId());
criticalValue.setEncounterId(event.getEncounterId());
criticalValue.setItemName(event.getTestItem());
criticalValue.setResultValue(event.getResultValue());
criticalValue.setReportTime(new Date());
criticalValue.setStatus("PENDING_NOTIFY");
criticalValueService.save(criticalValue);
log.info("Chain4 CriticalValue: critical value pushed for patientId={}, testItem={}",
event.getPatientId(), event.getTestItem());
}, 3);
}
}

View File

@@ -0,0 +1,60 @@
package com.healthlink.his.web.dataflow.handler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.healthlink.his.administration.domain.EncounterDiagnosis;
import com.healthlink.his.administration.service.IEncounterDiagnosisService;
import com.healthlink.his.web.dataflow.event.AdmissionSavedEvent;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.List;
/**
* Chain 1: 门诊→住院诊断同步 — 入院时自动复制门诊诊断到住院
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DiagnosisSyncHandler {
private final IEncounterDiagnosisService encounterDiagnosisService;
@Async
@EventListener
public void onAdmissionSaved(AdmissionSavedEvent event) {
log.info("Chain1 DiagnosisSync: encounterId={}, patientId={}", event.getEncounterId(), event.getPatientId());
EventRetryUtil.executeVoidWithRetry("1-DiagnosisSync", () -> {
Long encounterId = event.getEncounterId();
LambdaQueryWrapper<EncounterDiagnosis> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(EncounterDiagnosis::getEncounterId, encounterId)
.eq(EncounterDiagnosis::getMaindiseFlag, true)
.orderByDesc(EncounterDiagnosis::getDiagnosisTime)
.last("LIMIT 1");
List<EncounterDiagnosis> diagnoses = encounterDiagnosisService.list(wrapper);
if (diagnoses.isEmpty()) {
log.info("Chain1 DiagnosisSync: no outpatient diagnosis found for encounterId={}", encounterId);
return;
}
for (EncounterDiagnosis source : diagnoses) {
EncounterDiagnosis target = new EncounterDiagnosis();
target.setEncounterId(encounterId);
target.setDiagnosisDesc(source.getDiagnosisDesc());
target.setMaindiseFlag(source.getMaindiseFlag());
target.setDiagnosisTime(new Date());
target.setDoctor(source.getDoctor());
encounterDiagnosisService.save(target);
}
log.info("Chain1 DiagnosisSync: synced {} diagnoses for encounterId={}",
diagnoses.size(), encounterId);
}, 3);
}
}

View File

@@ -2,6 +2,7 @@ package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.web.dataflow.event.DischargeEvent;
import com.healthlink.his.web.dataflow.service.DrgGroupingService;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
@@ -21,11 +22,9 @@ public class DrgGroupingHandler {
@EventListener
public void onDischarge(DischargeEvent event) {
log.info("Chain5 DrgGrouping: encounterId={}, patientId={}", event.getEncounterId(), event.getPatientId());
try {
EventRetryUtil.executeVoidWithRetry("5-DrgGrouping", () -> {
Map<String, Object> groupingResult = drgGroupingService.group(event.getEncounterId(), event.getPatientId());
log.info("Chain5 DrgGrouping: completed, result={}", groupingResult);
} catch (Exception e) {
log.error("Chain5 DrgGrouping failed: encounterId={}", event.getEncounterId(), e);
}
}, 3);
}
}

View File

@@ -1,6 +1,7 @@
package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.web.dataflow.event.ExamReportPublishedEvent;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
@@ -18,7 +19,7 @@ public class ExamReportFeedbackHandler {
public void onExamReportPublished(ExamReportPublishedEvent event) {
log.info("Chain9 ExamFeedback: encounterId={}, examType={}, reportId={}",
event.getEncounterId(), event.getExamType(), event.getReportId());
try {
EventRetryUtil.executeVoidWithRetry("9-ExamFeedback", () -> {
// 1. 将检查结果关联到医嘱
// TODO: 更新医嘱执行状态
@@ -26,8 +27,6 @@ public class ExamReportFeedbackHandler {
// TODO: WebSocket推送
log.info("Chain9 ExamFeedback: feedback recorded for reportId={}", event.getReportId());
} catch (Exception e) {
log.error("Chain9 ExamFeedback failed: reportId={}", event.getReportId(), e);
}
}, 3);
}
}

View File

@@ -1,6 +1,7 @@
package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.web.dataflow.event.AdmissionAssessmentCompletedEvent;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
@@ -21,7 +22,7 @@ public class NursingPlanAutoGenerateHandler {
public void onAssessmentCompleted(AdmissionAssessmentCompletedEvent event) {
log.info("Chain10 NursingPlan: encounterId={}, riskLevel={}",
event.getEncounterId(), event.getRiskLevel());
try {
EventRetryUtil.executeVoidWithRetry("10-NursingPlan", () -> {
Map<String, Object> nursingPlan = new HashMap<>();
nursingPlan.put("encounterId", event.getEncounterId());
nursingPlan.put("patientId", event.getPatientId());
@@ -32,8 +33,6 @@ public class NursingPlanAutoGenerateHandler {
// TODO: 根据风险等级生成具体护理措施
log.info("Chain10 NursingPlan: plan generated for encounterId={}", event.getEncounterId());
} catch (Exception e) {
log.error("Chain10 NursingPlan failed: encounterId={}", event.getEncounterId(), e);
}
}, 3);
}
}

View File

@@ -2,6 +2,7 @@ package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.web.dataflow.event.NursingRecordSavedEvent;
import com.healthlink.his.web.dataflow.service.NursingQualityCheckService;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
@@ -25,12 +26,10 @@ public class NursingQualityHandler {
public void onNursingRecordSaved(NursingRecordSavedEvent event) {
log.info("Chain6 NursingQuality: encounterId={}, patientId={}, recordId={}",
event.getEncounterId(), event.getPatientId(), event.getRecordId());
try {
EventRetryUtil.executeVoidWithRetry("6-NursingQuality", () -> {
Map<String, Object> qualityResult = nursingQualityCheckService.check(
event.getEncounterId(), event.getPatientId(), event.getRecordId());
log.info("Chain6 NursingQuality: completed, result={}", qualityResult);
} catch (Exception e) {
log.error("Chain6 NursingQuality failed: recordId={}", event.getRecordId(), e);
}
}, 3);
}
}

View File

@@ -0,0 +1,53 @@
package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.common.enums.EncounterZyStatus;
import com.healthlink.his.document.service.IEmrService;
import com.healthlink.his.document.domain.Emr;
import com.healthlink.his.web.dataflow.event.OrderExecutedEvent;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* Chain 2: 医嘱→护理执行反馈 — 医嘱执行后通知医生
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderExecutionFeedbackHandler {
private final IEmrService emrService;
@Async
@EventListener
public void onOrderExecuted(OrderExecutedEvent event) {
log.info("Chain2 OrderFeedback: encounterId={}, orderType={}, orderId={}",
event.getEncounterId(), event.getOrderType(), event.getOrderId());
EventRetryUtil.executeVoidWithRetry("2-OrderFeedback", () -> {
Map<String, Object> feedbackContext = new HashMap<>();
feedbackContext.put("eventType", "ORDER_EXECUTED");
feedbackContext.put("orderType", event.getOrderType());
feedbackContext.put("orderId", event.getOrderId());
feedbackContext.put("executedBy", event.getPractitionerId());
feedbackContext.put("executeTime", new Date().toString());
Emr emr = new Emr();
emr.setEncounterId(event.getEncounterId());
emr.setPatientId(null);
emr.setRecordId(event.getPractitionerId());
emr.setRecordTime(new Date());
emr.setClassEnum(2);
emr.setContextJson(com.core.common.utils.JsonUtils.toJson(feedbackContext));
emrService.save(emr);
log.info("Chain2 OrderFeedback: feedback recorded for orderId={}", event.getOrderId());
}, 3);
}
}

View File

@@ -1,6 +1,7 @@
package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.web.dataflow.event.SurgeryCompletedEvent;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
@@ -18,7 +19,7 @@ public class PostSurgeryRecoveryHandler {
public void onSurgeryCompleted(SurgeryCompletedEvent event) {
log.info("Chain8 PostSurgery: encounterId={}, surgeryId={}, type={}",
event.getEncounterId(), event.getSurgeryId(), event.getSurgeryType());
try {
EventRetryUtil.executeVoidWithRetry("8-PostSurgery", () -> {
Map<String, Object> recoveryPlan = new HashMap<>();
recoveryPlan.put("encounterId", event.getEncounterId());
recoveryPlan.put("surgeryId", event.getSurgeryId());
@@ -30,8 +31,6 @@ public class PostSurgeryRecoveryHandler {
// TODO: 根据手术类型生成术后医嘱
log.info("Chain8 PostSurgery: recovery plan created for encounterId={}", event.getEncounterId());
} catch (Exception e) {
log.error("Chain8 PostSurgery failed: surgeryId={}", event.getSurgeryId(), e);
}
}, 3);
}
}

View File

@@ -0,0 +1,42 @@
package com.healthlink.his.web.dataflow.handler;
import com.healthlink.his.web.dataflow.event.StatisticsPushEvent;
import com.healthlink.his.web.dataflow.config.WebSocketConfig;
import com.healthlink.his.web.dataflow.util.EventRetryUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Chain 7: 统计→实时推送 — 关键操作发生时推送到仪表盘
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class StatisticsPushHandler {
private final WebSocketConfig webSocketConfig;
@Async
@EventListener
public void onStatisticsPush(StatisticsPushEvent event) {
log.info("Chain7 StatisticsPush: eventType={}", event.getEventType());
EventRetryUtil.executeVoidWithRetry("7-StatisticsPush", () -> {
Map<String, Object> payload = event.getData();
payload.put("eventType", event.getEventType());
payload.put("timestamp", System.currentTimeMillis());
WebSocketConfig.SessionRegistry sessionRegistry = WebSocketConfig.getSessionRegistry();
if (sessionRegistry != null) {
sessionRegistry.broadcast("STATISTICS", payload);
log.info("Chain7 StatisticsPush: pushed to dashboard, eventType={}", event.getEventType());
} else {
log.warn("Chain7 StatisticsPush: WebSocket not available, event dropped");
}
}, 3);
}
}

View File

@@ -0,0 +1,37 @@
package com.healthlink.his.web.dataflow.util;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Supplier;
@Slf4j
public class EventRetryUtil {
public static <T> T executeWithRetry(String chainName, Supplier<T> action, int maxRetries) {
Exception lastException = null;
for (int i = 0; i <= maxRetries; i++) {
try {
return action.get();
} catch (Exception e) {
lastException = e;
log.warn("Chain{} attempt {} failed: {}", chainName, i + 1, e.getMessage());
if (i < maxRetries) {
try {
Thread.sleep(1000L * (i + 1));
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
}
}
}
throw new RuntimeException("Chain" + chainName + " failed after " + maxRetries + " retries", lastException);
}
public static void executeVoidWithRetry(String chainName, Runnable action, int maxRetries) {
executeWithRetry(chainName, () -> {
action.run();
return null;
}, maxRetries);
}
}