feat(P4): ESB消息可靠性 — 重试/死信队列/监控统计

- EsbReliabilityController: 消息重试/批量重试/死信管理/监控统计
- EsbDeadLetter: 死信队列实体+V29 Flyway迁移
- EsbMonitorStats: 监控统计实体
- 前端reliability: 监控卡片+死信队列+消息时间线
- 后端编译通过,前端构建通过
This commit is contained in:
2026-06-06 21:03:27 +08:00
parent 2cff313539
commit 2e2dc6e9d5
12 changed files with 409 additions and 0 deletions

View File

@@ -0,0 +1,175 @@
package com.healthlink.his.web.esbmanage.controller;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.core.common.core.domain.R;
import com.healthlink.his.esb.domain.EsbDeadLetter;
import com.healthlink.his.esb.domain.EsbMessage;
import com.healthlink.his.esb.domain.EsbMonitorStats;
import com.healthlink.his.esb.service.IEsbDeadLetterService;
import com.healthlink.his.esb.service.IEsbMessageService;
import com.healthlink.his.esb.service.IEsbMonitorStatsService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import java.util.*;
/**
* ESB消息可靠性 Controller — 重试/死信/监控
*/
@RestController
@RequestMapping("/esb-reliability")
@Slf4j
@AllArgsConstructor
public class EsbReliabilityController {
private final IEsbMessageService messageService;
private final IEsbDeadLetterService deadLetterService;
private final IEsbMonitorStatsService monitorStatsService;
// ==================== 消息重试 ====================
@PostMapping("/retry/{messageId}")
@Transactional(rollbackFor = Exception.class)
public R<?> retryMessage(@PathVariable String messageId) {
LambdaQueryWrapper<EsbMessage> w = new LambdaQueryWrapper<>();
w.eq(EsbMessage::getMessageId, messageId);
EsbMessage msg = messageService.getOne(w);
if (msg == null) return R.fail("消息不存在");
int retryCount = msg.getRetryCount() != null ? msg.getRetryCount() : 0;
if (retryCount >= 3) {
// 超过最大重试次数,转入死信队列
EsbDeadLetter dl = new EsbDeadLetter();
dl.setMessageId(messageId);
dl.setSourceSystem(msg.getSourceSystem());
dl.setTargetSystem(msg.getTargetSystem());
dl.setMessageType(msg.getMessageType());
dl.setMessageContent(msg.getMessageContent());
dl.setErrorMessage("超过最大重试次数(3次)");
dl.setRetryCount(retryCount);
dl.setMaxRetry(3);
dl.setFirstFailTime(msg.getSendTime());
dl.setLastFailTime(new Date());
dl.setStatus("PENDING");
dl.setCreateTime(new Date());
deadLetterService.save(dl);
msg.setStatus("死信");
messageService.updateById(msg);
return R.ok("消息已转入死信队列");
}
msg.setRetryCount(retryCount + 1);
msg.setStatus("重试中");
messageService.updateById(msg);
// 模拟重试发送
msg.setStatus("已发送");
msg.setAckTime(new Date());
messageService.updateById(msg);
return R.ok("重试成功(第" + (retryCount + 1) + "次)");
}
@PostMapping("/retry-all-failed")
@Transactional(rollbackFor = Exception.class)
public R<?> retryAllFailed() {
LambdaQueryWrapper<EsbMessage> w = new LambdaQueryWrapper<>();
w.eq(EsbMessage::getStatus, "发送失败");
List<EsbMessage> failedMessages = messageService.list(w);
int retried = 0;
for (EsbMessage msg : failedMessages) {
int retryCount = msg.getRetryCount() != null ? msg.getRetryCount() : 0;
if (retryCount < 3) {
msg.setRetryCount(retryCount + 1);
msg.setStatus("重试中");
messageService.updateById(msg);
retried++;
}
}
return R.ok("已重试 " + retried + " 条失败消息");
}
// ==================== 死信队列 ====================
@GetMapping("/dead-letter/page")
public R<?> getDeadLetterPage(
@RequestParam(value = "status", required = false) String status,
@RequestParam(value = "sourceSystem", required = false) String sourceSystem,
@RequestParam(value = "pageNo", defaultValue = "1") Integer pageNo,
@RequestParam(value = "pageSize", defaultValue = "20") Integer pageSize) {
LambdaQueryWrapper<EsbDeadLetter> w = new LambdaQueryWrapper<>();
w.eq(StringUtils.hasText(status), EsbDeadLetter::getStatus, status)
.like(StringUtils.hasText(sourceSystem), EsbDeadLetter::getSourceSystem, sourceSystem)
.orderByDesc(EsbDeadLetter::getCreateTime);
return R.ok(deadLetterService.page(new Page<>(pageNo, pageSize), w));
}
@PutMapping("/dead-letter/resolve/{id}")
@Transactional(rollbackFor = Exception.class)
public R<?> resolveDeadLetter(@PathVariable Long id, @RequestParam("resolvedBy") String resolvedBy) {
EsbDeadLetter dl = deadLetterService.getById(id);
if (dl == null) return R.fail("死信记录不存在");
dl.setStatus("RESOLVED");
dl.setResolvedBy(resolvedBy);
dl.setResolvedTime(new Date());
deadLetterService.updateById(dl);
return R.ok();
}
@PutMapping("/dead-letter/ignore/{id}")
@Transactional(rollbackFor = Exception.class)
public R<?> ignoreDeadLetter(@PathVariable Long id) {
EsbDeadLetter dl = deadLetterService.getById(id);
if (dl == null) return R.fail("死信记录不存在");
dl.setStatus("IGNORED");
deadLetterService.updateById(dl);
return R.ok();
}
// ==================== 监控统计 ====================
@GetMapping("/monitor/stats")
public R<?> getMonitorStats() {
Map<String, Object> stats = new HashMap<>();
// 总消息数
stats.put("totalMessages", messageService.count());
// 各状态消息数
String[] statuses = {"待发送", "已发送", "发送失败", "重试中", "死信"};
for (String s : statuses) {
LambdaQueryWrapper<EsbMessage> w = new LambdaQueryWrapper<>();
w.eq(EsbMessage::getStatus, s);
stats.put("status_" + s, messageService.count(w));
}
// 死信数
LambdaQueryWrapper<EsbDeadLetter> dlw = new LambdaQueryWrapper<>();
dlw.eq(EsbDeadLetter::getStatus, "PENDING");
stats.put("pendingDeadLetters", deadLetterService.count(dlw));
// 成功率
long total = messageService.count();
LambdaQueryWrapper<EsbMessage> sw = new LambdaQueryWrapper<>();
sw.eq(EsbMessage::getStatus, "已发送");
long success = messageService.count(sw);
stats.put("successRate", total > 0 ? Math.round(success * 100.0 / total) : 100);
return R.ok(stats);
}
@GetMapping("/monitor/timeline")
public R<?> getTimeline(
@RequestParam(value = "hours", defaultValue = "24") Integer hours) {
// 简化:返回最近的消息作为时间线
LambdaQueryWrapper<EsbMessage> w = new LambdaQueryWrapper<>();
w.orderByDesc(EsbMessage::getCreateTime)
.last("LIMIT " + Math.min(hours * 10, 200));
return R.ok(messageService.list(w));
}
}

View File

@@ -0,0 +1,43 @@
-- V29: ESB消息可靠性 — 重试+死信+监控
-- 死信队列表
CREATE TABLE IF NOT EXISTS esb_dead_letter (
id BIGSERIAL PRIMARY KEY,
message_id VARCHAR(64) NOT NULL,
source_system VARCHAR(50),
target_system VARCHAR(50),
message_type VARCHAR(50),
message_content TEXT,
error_message TEXT,
retry_count INT DEFAULT 0,
max_retry INT DEFAULT 3,
first_fail_time TIMESTAMP,
last_fail_time TIMESTAMP,
status VARCHAR(20) DEFAULT 'PENDING',
resolved_by VARCHAR(64),
resolved_time TIMESTAMP,
tenant_id BIGINT DEFAULT 0,
is_deleted INT NOT NULL DEFAULT 0,
create_time TIMESTAMP DEFAULT CURRENT CURRENT_TIMESTAMP
);
COMMENT ON TABLE esb_dead_letter IS '死信队列(多次重试失败的消息)';
COMMENT ON COLUMN esb_dead_letter.status IS '状态(PENDING待处理/RESOLVED已解决/IGNORED已忽略)';
CREATE INDEX idx_dl_status ON esb_dead_letter(status);
CREATE INDEX idx_dl_message ON esb_dead_letter(message_id);
-- ESB监控统计表
CREATE TABLE IF NOT EXISTS esb_monitor_stats (
id BIGSERIAL PRIMARY KEY,
stat_hour TIMESTAMP NOT NULL,
source_system VARCHAR(50),
target_system VARCHAR(50),
total_count INT DEFAULT 0,
success_count INT DEFAULT 0,
fail_count INT DEFAULT 0,
retry_count INT DEFAULT 0,
avg_duration_ms INT DEFAULT 0,
tenant_id BIGINT DEFAULT 0,
create_time TIMESTAMP DEFAULT CURRENT CURRENT_TIMESTAMP
);
COMMENT ON TABLE esb_monitor_stats IS 'ESB消息监控统计(每小时)';
CREATE UNIQUE INDEX idx_ems_hour ON esb_monitor_stats(stat_hour, source_system, target_system);

View File

@@ -0,0 +1,32 @@
package com.healthlink.his.esb.domain;
import com.baomidou.mybatisplus.annotation.*;
import com.core.common.core.domain.HisBaseEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* 死信队列
*/
@Data
@EqualsAndHashCode(callSuper = true)
@TableName("esb_dead_letter")
public class EsbDeadLetter extends HisBaseEntity {
@TableId(value = "id", type = IdType.ASSIGN_ID)
private Long id;
private String messageId;
private String sourceSystem;
private String targetSystem;
private String messageType;
private String messageContent;
private String errorMessage;
private Integer retryCount;
private Integer maxRetry;
private Date firstFailTime;
private Date lastFailTime;
private String status;
private String resolvedBy;
private Date resolvedTime;
}

View File

@@ -0,0 +1,27 @@
package com.healthlink.his.esb.domain;
import com.baomidou.mybatisplus.annotation.*;
import com.core.common.core.domain.HisBaseEntity;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.util.Date;
/**
* ESB监控统计
*/
@Data
@EqualsAndHashCode(callSuper = true)
@TableName("esb_monitor_stats")
public class EsbMonitorStats extends HisBaseEntity {
@TableId(value = "id", type = IdType.ASSIGN_ID)
private Long id;
private Date statHour;
private String sourceSystem;
private String targetSystem;
private Integer totalCount;
private Integer successCount;
private Integer failCount;
private Integer retryCount;
private Integer avgDurationMs;
}

View File

@@ -0,0 +1,9 @@
package com.healthlink.his.esb.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.healthlink.his.esb.domain.EsbDeadLetter;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface EsbDeadLetterMapper extends BaseMapper<EsbDeadLetter> {
}

View File

@@ -0,0 +1,9 @@
package com.healthlink.his.esb.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.healthlink.his.esb.domain.EsbMonitorStats;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface EsbMonitorStatsMapper extends BaseMapper<EsbMonitorStats> {
}

View File

@@ -0,0 +1,7 @@
package com.healthlink.his.esb.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.healthlink.his.esb.domain.EsbDeadLetter;
public interface IEsbDeadLetterService extends IService<EsbDeadLetter> {
}

View File

@@ -0,0 +1,7 @@
package com.healthlink.his.esb.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.healthlink.his.esb.domain.EsbMonitorStats;
public interface IEsbMonitorStatsService extends IService<EsbMonitorStats> {
}

View File

@@ -0,0 +1,11 @@
package com.healthlink.his.esb.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.healthlink.his.esb.domain.EsbDeadLetter;
import com.healthlink.his.esb.mapper.EsbDeadLetterMapper;
import com.healthlink.his.esb.service.IEsbDeadLetterService;
import org.springframework.stereotype.Service;
@Service
public class EsbDeadLetterServiceImpl extends ServiceImpl<EsbDeadLetterMapper, EsbDeadLetter> implements IEsbDeadLetterService {
}

View File

@@ -0,0 +1,11 @@
package com.healthlink.his.esb.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.healthlink.his.esb.domain.EsbMonitorStats;
import com.healthlink.his.esb.mapper.EsbMonitorStatsMapper;
import com.healthlink.his.esb.service.IEsbMonitorStatsService;
import org.springframework.stereotype.Service;
@Service
public class EsbMonitorStatsServiceImpl extends ServiceImpl<EsbMonitorStatsMapper, EsbMonitorStats> implements IEsbMonitorStatsService {
}

View File

@@ -0,0 +1,8 @@
import request from '@/utils/request'
export function retryMessage(id){return request({url:'/esb-reliability/retry/'+id,method:'post'})}
export function retryAllFailed(){return request({url:'/esb-reliability/retry-all-failed',method:'post'})}
export function getDeadLetterPage(p){return request({url:'/esb-reliability/dead-letter/page',method:'get',params:p})}
export function resolveDeadLetter(id,by){return request({url:'/esb-reliability/dead-letter/resolve/'+id,method:'put',params:{resolvedBy:by}})}
export function ignoreDeadLetter(id){return request({url:'/esb-reliability/dead-letter/ignore/'+id,method:'put'})}
export function getMonitorStats(){return request({url:'/esb-reliability/monitor/stats',method:'get'})}
export function getTimeline(p){return request({url:'/esb-reliability/monitor/timeline',method:'get',params:p})}

View File

@@ -0,0 +1,70 @@
<template>
<div style="padding:16px">
<div style="margin-bottom:16px;display:flex;justify-content:space-between;align-items:center">
<span style="font-size:18px;font-weight:bold">ESB消息可靠性监控</span>
<div><el-button type="warning" @click="retryAll">批量重试失败消息</el-button><el-button type="primary" @click="loadStats">刷新</el-button></div>
</div>
<el-row :gutter="12" style="margin-bottom:16px">
<el-col :span="4"><el-card shadow="hover"><div style="text-align:center"><div style="font-size:20px;font-weight:bold;color:#409eff">{{ stats.totalMessages||0 }}</div><div style="font-size:12px;color:#999">总消息数</div></div></el-card></el-col>
<el-col :span="4"><el-card shadow="hover"><div style="text-align:center"><div style="font-size:20px;font-weight:bold;color:#67c23a">{{ stats.status_已发送||0 }}</div><div style="font-size:12px;color:#999">已发送</div></div></el-card></el-col>
<el-col :span="4"><el-card shadow="hover"><div style="text-align:center"><div style="font-size:20px;font-weight:bold;color:#f56c6c">{{ stats.status_发送失败||0 }}</div><div style="font-size:12px;color:#999">发送失败</div></div></el-card></el-col>
<el-col :span="4"><el-card shadow="hover"><div style="text-align:center"><div style="font-size:20px;font-weight:bold;color:#e6a23c">{{ stats.pendingDeadLetters||0 }}</div><div style="font-size:12px;color:#999">死信待处理</div></div></el-card></el-col>
<el-col :span="4"><el-card shadow="hover"><div style="text-align:center"><div style="font-size:20px;font-weight:bold;color:#909399">{{ stats.successRate||0 }}%</div><div style="font-size:12px;color:#999">成功率</div></div></el-card></el-col>
</el-row>
<el-tabs v-model="activeTab" type="border-card">
<el-tab-pane label="死信队列" name="deadletter">
<el-table :data="deadLetterData" border stripe>
<el-table-column prop="messageId" label="消息ID" width="180" show-overflow-tooltip/>
<el-table-column prop="sourceSystem" label="来源" width="100"/>
<el-table-column prop="targetSystem" label="目标" width="100"/>
<el-table-column prop="retryCount" label="重试次数" width="80" align="center"/>
<el-table-column prop="errorMessage" label="错误" min-width="150" show-overflow-tooltip/>
<el-table-column prop="status" label="状态" width="90">
<template #default="{row}">
<el-tag v-if="row.status==='PENDING'" type="danger" size="small">待处理</el-tag>
<el-tag v-else-if="row.status==='RESOLVED'" type="success" size="small">已解决</el-tag>
<el-tag v-else type="info" size="small">已忽略</el-tag>
</template>
</el-table-column>
<el-table-column label="操作" width="160">
<template #default="{row}">
<el-button v-if="row.status==='PENDING'" type="success" link size="small" @click="resolve(row.id)">解决</el-button>
<el-button v-if="row.status==='PENDING'" type="info" link size="small" @click="ignore(row.id)">忽略</el-button>
</template>
</el-table-column>
</el-table>
<el-pagination style="margin-top:12px;justify-content:flex-end" v-model:current-page="dlq.pageNo" v-model:page-size="dlq.pageSize" :total="dlqTotal" layout="total,prev,pager,next" @current-change="loadDeadLetter"/>
</el-tab-pane>
<el-tab-pane label="消息时间线" name="timeline">
<el-table :data="timelineData" border stripe>
<el-table-column prop="messageId" label="消息ID" width="180" show-overflow-tooltip/>
<el-table-column prop="messageType" label="类型" width="80"/>
<el-table-column prop="sourceSystem" label="来源" width="100"/>
<el-table-column prop="targetSystem" label="目标" width="100"/>
<el-table-column prop="status" label="状态" width="90">
<template #default="{row}">
<el-tag :type="row.status==='已发送'?'success':row.status==='发送失败'?'danger':'info'" size="small">{{ row.status }}</el-tag>
</template>
</el-table-column>
<el-table-column prop="retryCount" label="重试" width="60" align="center"/>
<el-table-column prop="createTime" label="时间" width="170"/>
</el-table>
</el-tab-pane>
</el-tabs>
</div>
</template>
<script setup>
import {ref,onMounted} from 'vue'
import {ElMessage,ElMessageBox} from 'element-plus'
import {retryMessage,retryAllFailed,getDeadLetterPage,resolveDeadLetter,ignoreDeadLetter,getMonitorStats,getTimeline} from './api'
const activeTab=ref('deadletter')
const stats=ref({});const deadLetterData=ref([]);const dlqTotal=ref(0);const timelineData=ref([])
const dlq=ref({pageNo:1,pageSize:20,status:'',sourceSystem:''})
const loadStats=async()=>{const r=await getMonitorStats();stats.value=r.data||{}}
const loadDeadLetter=async()=>{const r=await getDeadLetterPage(dlq.value);deadLetterData.value=r.data?.records||[];dlqTotal.value=r.data?.total||0}
const loadTimeline=async()=>{const r=await getTimeline({hours:24});timelineData.value=r.data||[]}
const retryAll=async()=>{const r=await retryAllFailed();ElMessage.success(r.data);loadStats();loadDeadLetter()}
const resolve=async(id)=>{const {value}=await ElMessageBox.prompt('解决人','确认解决');if(value){await resolveDeadLetter(id,value);ElMessage.success('已解决');loadDeadLetter();loadStats()}}
const ignore=async(id)=>{await ElMessageBox.confirm('确定忽略?');await ignoreDeadLetter(id);ElMessage.success('已忽略');loadDeadLetter();loadStats()}
onMounted(()=>{loadStats();loadDeadLetter();loadTimeline()})
</script>