sse实时开发

This commit is contained in:
weixin_45799331
2026-01-27 13:31:03 +08:00
parent c5db404290
commit b0f2eabf6b
5 changed files with 96 additions and 72 deletions

View File

@@ -103,9 +103,6 @@ public class SecurityConfig {
// 静态资源,可匿名访问 // 静态资源,可匿名访问
.antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**") .antMatchers(HttpMethod.GET, "/", "/*.html", "/**/*.html", "/**/*.css", "/**/*.js", "/profile/**")
.permitAll() .permitAll()
// WebSocket 握手请求允许匿名访问
.antMatchers("/ws/**", "/test-ws")
.permitAll()
.antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**") .antMatchers("/swagger-ui.html", "/swagger-resources/**", "/webjars/**", "/*/api-docs", "/druid/**")
.permitAll() .permitAll()
.antMatchers("/patientmanage/information/**") .antMatchers("/patientmanage/information/**")

View File

@@ -70,12 +70,6 @@
<artifactId>velocity-engine-core</artifactId> <artifactId>velocity-engine-core</artifactId>
</dependency> </dependency>
<!-- WebSocket 支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- rabbitMQ --> <!-- rabbitMQ -->
<!-- <dependency> <!-- <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@@ -15,7 +15,7 @@ import com.openhis.web.triageandqueuemanage.dto.TriageQueueActionReq;
import com.openhis.web.triageandqueuemanage.dto.TriageQueueAddReq; import com.openhis.web.triageandqueuemanage.dto.TriageQueueAddReq;
import com.openhis.web.triageandqueuemanage.dto.TriageQueueAdjustReq; import com.openhis.web.triageandqueuemanage.dto.TriageQueueAdjustReq;
import com.openhis.web.triageandqueuemanage.dto.TriageQueueEncounterItem; import com.openhis.web.triageandqueuemanage.dto.TriageQueueEncounterItem;
import com.openhis.web.triageandqueuemanage.websocket.CallNumberWebSocket; import com.openhis.web.triageandqueuemanage.sse.CallNumberSseManager;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@@ -36,6 +36,9 @@ public class TriageQueueAppServiceImpl implements TriageQueueAppService {
@Resource @Resource
private TriageQueueItemService triageQueueItemService; private TriageQueueItemService triageQueueItemService;
@Resource
private CallNumberSseManager callNumberSseManager;
@Resource @Resource
private TriageCandidateExclusionService triageCandidateExclusionService; private TriageCandidateExclusionService triageCandidateExclusionService;
@@ -242,7 +245,7 @@ public class TriageQueueAppServiceImpl implements TriageQueueAppService {
selected.setStatus(STATUS_CALLING).setUpdateTime(LocalDateTime.now()); selected.setStatus(STATUS_CALLING).setUpdateTime(LocalDateTime.now());
triageQueueItemService.updateById(selected); triageQueueItemService.updateById(selected);
// ✅ 叫号后推送 WebSocket 消息 // ✅ 叫号后推送 SSE 消息(实时通知显示屏刷新)
pushDisplayUpdate(selected.getOrganizationId(), selected.getQueueDate(), selected.getTenantId()); pushDisplayUpdate(selected.getOrganizationId(), selected.getQueueDate(), selected.getTenantId());
return R.ok(true); return R.ok(true);
@@ -329,7 +332,7 @@ public class TriageQueueAppServiceImpl implements TriageQueueAppService {
recalcOrders(actualOrgId, null); recalcOrders(actualOrgId, null);
// ✅ 完成后推送 WebSocket 消息 // ✅ 完成后推送 SSE 消息(实时通知显示屏刷新)
pushDisplayUpdate(actualOrgId, calling.getQueueDate(), tenantId); pushDisplayUpdate(actualOrgId, calling.getQueueDate(), tenantId);
return R.ok(true); return R.ok(true);
@@ -425,7 +428,7 @@ public class TriageQueueAppServiceImpl implements TriageQueueAppService {
recalcOrders(actualOrgId, null); recalcOrders(actualOrgId, null);
// ✅ 过号重排后推送 WebSocket 消息 // ✅ 过号重排后推送 SSE 消息(实时通知显示屏刷新)
pushDisplayUpdate(actualOrgId, calling.getQueueDate(), tenantId); pushDisplayUpdate(actualOrgId, calling.getQueueDate(), tenantId);
return R.ok(true); return R.ok(true);
@@ -714,7 +717,7 @@ public class TriageQueueAppServiceImpl implements TriageQueueAppService {
} }
/** /**
* 推送显示屏更新消息到 WebSocket * 推送显示屏更新消息到 SSE
* @param organizationId 科室ID * @param organizationId 科室ID
* @param queueDate 队列日期 * @param queueDate 队列日期
* @param tenantId 租户ID * @param tenantId 租户ID
@@ -731,11 +734,11 @@ public class TriageQueueAppServiceImpl implements TriageQueueAppService {
message.put("data", displayData); message.put("data", displayData);
message.put("timestamp", System.currentTimeMillis()); message.put("timestamp", System.currentTimeMillis());
// 推送到该科室的所有 WebSocket 连接 // 推送到该科室的所有 SSE 连接
CallNumberWebSocket.pushToOrganization(organizationId, message); callNumberSseManager.pushToOrganization(organizationId, message);
} catch (Exception e) { } catch (Exception e) {
// WebSocket 推送失败不应该影响业务逻辑 // SSE 推送失败不应该影响业务逻辑
System.err.println("推送显示屏更新失败:" + e.getMessage()); System.err.println("推送显示屏更新失败:" + e.getMessage());
} }
} }

View File

@@ -8,13 +8,18 @@ import com.openhis.web.triageandqueuemanage.dto.CallNumberDisplayResp;
import com.openhis.web.triageandqueuemanage.dto.TriageQueueActionReq; import com.openhis.web.triageandqueuemanage.dto.TriageQueueActionReq;
import com.openhis.web.triageandqueuemanage.dto.TriageQueueAddReq; import com.openhis.web.triageandqueuemanage.dto.TriageQueueAddReq;
import com.openhis.web.triageandqueuemanage.dto.TriageQueueAdjustReq; import com.openhis.web.triageandqueuemanage.dto.TriageQueueAdjustReq;
import com.openhis.web.triageandqueuemanage.sse.CallNumberSseManager;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.format.annotation.DateTimeFormat; import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.LocalDate; import java.time.LocalDate;
import java.util.HashMap;
import java.util.Map;
@RestController @RestController
@Slf4j @Slf4j
@@ -23,6 +28,9 @@ public class TriageQueueController {
@Resource @Resource
private TriageQueueAppService triageQueueAppService; private TriageQueueAppService triageQueueAppService;
@Resource
private CallNumberSseManager callNumberSseManager;
@GetMapping("/list") @GetMapping("/list")
public R<?> list(@RequestParam(value = "organizationId", required = false) Long organizationId, public R<?> list(@RequestParam(value = "organizationId", required = false) Long organizationId,
@@ -97,6 +105,44 @@ public class TriageQueueController {
return R.fail("获取显示屏数据失败:" + e.getMessage()); return R.fail("获取显示屏数据失败:" + e.getMessage());
} }
} }
/**
* 叫号显示屏SSE 实时推送
*/
@Anonymous
@GetMapping(value = "/display/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamDisplayData(
@RequestParam(required = false) String organizationId,
@RequestParam(required = false) @DateTimeFormat(pattern = "yyyy-MM-dd") LocalDate date,
@RequestParam(required = false) Integer tenantId
) {
// 1) 解析科室与租户SSE 连接根据科室分组管理)
Long orgId = resolveOrganizationId(organizationId);
if (orgId == null) {
SseEmitter emitter = new SseEmitter(0L);
Map<String, Object> error = new HashMap<>();
error.put("type", "error");
error.put("message", "organizationId参数不合法或未获取到登录用户科室");
callNumberSseManager.sendToEmitter(emitter, error);
emitter.complete();
return emitter;
}
Integer actualTenantId = resolveTenantId(tenantId);
// 2) 创建并注册 SSE 连接
SseEmitter emitter = callNumberSseManager.addEmitter(orgId);
try {
// 3) 连接建立后,先推送一次初始化数据
CallNumberDisplayResp data = triageQueueAppService.getDisplayData(orgId, date, actualTenantId);
Map<String, Object> init = new HashMap<>();
init.put("type", "init");
init.put("data", data);
init.put("timestamp", System.currentTimeMillis());
callNumberSseManager.sendToEmitter(emitter, init);
} catch (Exception e) {
log.error("SSE初始化数据发送失败", e);
}
return emitter;
}
private Long resolveOrganizationId(String organizationId) { private Long resolveOrganizationId(String organizationId) {
if (!StringUtils.hasText(organizationId)) { if (!StringUtils.hasText(organizationId)) {

View File

@@ -107,10 +107,13 @@ const { orgId: userOrgId, tenantId: userTenantId } = storeToRefs(userStore)
const ORGANIZATION_ID = computed(() => (userOrgId.value ? String(userOrgId.value) : '')) const ORGANIZATION_ID = computed(() => (userOrgId.value ? String(userOrgId.value) : ''))
const TENANT_ID = computed(() => (userTenantId.value ? Number(userTenantId.value) : 1)) const TENANT_ID = computed(() => (userTenantId.value ? Number(userTenantId.value) : 1))
const API_BASE_URL = '/triage/queue' const API_BASE_URL = '/triage/queue'
// WebSocket 地址(通过 Nginx 代理,路径需要加 /openhis 前缀 // SSE 地址(走后端 API 代理
const WS_URL = computed( const SSE_URL = computed(() => {
() => `ws://${window.location.hostname}:18080/openhis/ws/call-number-display/${ORGANIZATION_ID.value}` const baseApi = import.meta.env.VITE_APP_BASE_API || ''
) const orgId = ORGANIZATION_ID.value
const tenantId = TENANT_ID.value
return `${baseApi}${API_BASE_URL}/display/stream?organizationId=${encodeURIComponent(orgId)}&tenantId=${tenantId}`
})
// 响应式数据 // 响应式数据
const currentTime = ref('') const currentTime = ref('')
@@ -121,7 +124,7 @@ const currentPage = ref(1)
const patientsPerPage = 5 const patientsPerPage = 5
const autoScrollInterval = ref(null) const autoScrollInterval = ref(null)
const scrollInterval = 5000 // 5秒自动翻页 const scrollInterval = 5000 // 5秒自动翻页
const wsConnection = ref(null) // WebSocket 连接 const sseConnection = ref(null) // SSE 连接
const timeInterval = ref(null) const timeInterval = ref(null)
const isFullscreen = ref(false) const isFullscreen = ref(false)
const screenContainer = ref(null) const screenContainer = ref(null)
@@ -347,72 +350,53 @@ const stopAutoScroll = () => {
} }
/** /**
* 初始化 WebSocket 连接 * 初始化 SSE 连接
*/ */
const initWebSocket = () => { const initSse = () => {
try { try {
if (!ORGANIZATION_ID.value) { if (!ORGANIZATION_ID.value) {
console.warn('未获取到科室ID跳过 WebSocket 连接') console.warn('未获取到科室ID跳过 SSE 连接')
return return
} }
console.log('正在连接 WebSocket:', WS_URL.value) if (sseConnection.value) {
wsConnection.value = new WebSocket(WS_URL.value) sseConnection.value.close()
}
console.log('正在连接 SSE:', SSE_URL.value)
sseConnection.value = new EventSource(SSE_URL.value)
wsConnection.value.onopen = () => { sseConnection.value.onopen = () => {
console.log('WebSocket 连接成功') console.log('SSE 连接成功')
ElMessage.success('实时连接已建立') ElMessage.success('实时连接已建立')
} }
wsConnection.value.onmessage = (event) => { sseConnection.value.onmessage = (event) => {
try { try {
const message = JSON.parse(event.data) const message = JSON.parse(event.data)
console.log('收到 WebSocket 消息:', message) console.log('收到 SSE 消息:', message)
if (message.type === 'connected') { if (message.type === 'init') {
console.log('WebSocket 连接确认:', message.message) handleSseUpdate(message.data)
} else if (message.type === 'update') { } else if (message.type === 'update') {
// 收到更新消息,刷新显示屏数据 handleSseUpdate(message.data)
console.log('收到更新通知,刷新显示屏数据')
handleWebSocketUpdate(message.data)
} else if (message.type === 'pong') {
// 心跳响应
console.log('心跳响应')
} }
} catch (error) { } catch (error) {
console.error('解析 WebSocket 消息失败:', error) console.error('解析 SSE 消息失败:', error)
} }
} }
wsConnection.value.onerror = (error) => { sseConnection.value.onerror = (error) => {
console.error('WebSocket 错误:', error) console.error('SSE 错误:', error)
ElMessage.error('实时连接出现错误') ElMessage.error('实时连接出现错误')
} }
wsConnection.value.onclose = () => {
console.log('WebSocket 连接关闭5秒后重连')
setTimeout(() => {
if (!wsConnection.value || wsConnection.value.readyState === WebSocket.CLOSED) {
initWebSocket()
}
}, 5000)
}
// 心跳检测每30秒发送一次 ping
setInterval(() => {
if (wsConnection.value && wsConnection.value.readyState === WebSocket.OPEN) {
wsConnection.value.send('ping')
}
}, 30000)
} catch (error) { } catch (error) {
console.error('初始化 WebSocket 失败:', error) console.error('初始化 SSE 失败:', error)
} }
} }
/** /**
* 处理 WebSocket 推送的更新数据 * 处理 SSE 推送的更新数据
*/ */
const handleWebSocketUpdate = (data) => { const handleSseUpdate = (data) => {
if (!data) return if (!data) return
// 更新科室名称 // 更新科室名称
@@ -435,7 +419,7 @@ const handleWebSocketUpdate = (data) => {
waitingCount.value = data.waitingCount waitingCount.value = data.waitingCount
} }
console.log('显示屏数据已更新(来自 WebSocket') console.log('显示屏数据已更新(来自 SSE')
// 播放语音(如果有新的叫号) // 播放语音(如果有新的叫号)
if (data.currentCall && data.currentCall.number) { if (data.currentCall && data.currentCall.number) {
@@ -466,13 +450,13 @@ const playVoiceNotification = (callInfo) => {
} }
/** /**
* 关闭 WebSocket 连接 * 关闭 SSE 连接
*/ */
const closeWebSocket = () => { const closeSse = () => {
if (wsConnection.value) { if (sseConnection.value) {
wsConnection.value.close() sseConnection.value.close()
wsConnection.value = null sseConnection.value = null
console.log('WebSocket 连接已关闭') console.log('SSE 连接已关闭')
} }
} }
@@ -488,8 +472,8 @@ onMounted(async () => {
// ✅ 获取初始数据(从后端 API // ✅ 获取初始数据(从后端 API
await fetchDisplayData() await fetchDisplayData()
// ✅ 初始化 WebSocket 连接(实时推送) // ✅ 初始化 SSE 连接(实时推送)
initWebSocket() initSse()
// 启动自动滚动 // 启动自动滚动
startAutoScroll() startAutoScroll()
@@ -509,7 +493,7 @@ onUnmounted(() => {
timeInterval.value = null timeInterval.value = null
} }
stopAutoScroll() stopAutoScroll()
closeWebSocket() // ✅ 关闭 WebSocket 连接 closeSse() // ✅ 关闭 SSE 连接
if (tableContainer) { if (tableContainer) {
tableContainer.removeEventListener('mouseenter', stopAutoScroll) tableContainer.removeEventListener('mouseenter', stopAutoScroll)
tableContainer.removeEventListener('mouseleave', startAutoScroll) tableContainer.removeEventListener('mouseleave', startAutoScroll)