第一次提交
Some checks failed
Java CI with Maven / build (11) (push) Has been cancelled
Java CI with Maven / build (17) (push) Has been cancelled
Java CI with Maven / build (8) (push) Has been cancelled
yudao-ui-admin CI / build (14.x) (push) Has been cancelled
yudao-ui-admin CI / build (16.x) (push) Has been cancelled

This commit is contained in:
2025-11-12 14:58:39 +08:00
commit 0cc7d05f55
6053 changed files with 615352 additions and 0 deletions

View File

@@ -0,0 +1,31 @@
package cn.iocoder.yudao.module.iot.core.biz;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceGetReqDTO;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceRespDTO;
/**
* IoT 设备通用 API
*
* @author haohao
*/
public interface IotDeviceCommonApi {
/**
* 设备认证
*
* @param authReqDTO 认证请求
* @return 认证结果
*/
CommonResult<Boolean> authDevice(IotDeviceAuthReqDTO authReqDTO);
/**
* 获取设备信息
*
* @param infoReqDTO 设备信息请求
* @return 设备信息
*/
CommonResult<IotDeviceRespDTO> getDevice(IotDeviceGetReqDTO infoReqDTO);
}

View File

@@ -0,0 +1,33 @@
package cn.iocoder.yudao.module.iot.core.biz.dto;
import lombok.Data;
import javax.validation.constraints.NotEmpty;
/**
* IoT 设备认证 Request DTO
*
* @author 芋道源码
*/
@Data
public class IotDeviceAuthReqDTO {
/**
* 客户端 ID
*/
@NotEmpty(message = "客户端 ID 不能为空")
private String clientId;
/**
* 用户名
*/
@NotEmpty(message = "用户名不能为空")
private String username;
/**
* 密码
*/
@NotEmpty(message = "密码不能为空")
private String password;
}

View File

@@ -0,0 +1,27 @@
package cn.iocoder.yudao.module.iot.core.biz.dto;
import lombok.Data;
/**
* IoT 设备信息查询 Request DTO
*
* @author 芋道源码
*/
@Data
public class IotDeviceGetReqDTO {
/**
* 设备编号
*/
private Long id;
/**
* 产品标识
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
}

View File

@@ -0,0 +1,41 @@
package cn.iocoder.yudao.module.iot.core.biz.dto;
import lombok.Data;
/**
* IoT 设备信息 Response DTO
*
* @author 芋道源码
*/
@Data
public class IotDeviceRespDTO {
/**
* 设备编号
*/
private Long id;
/**
* 产品标识
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 租户编号
*/
private Long tenantId;
// ========== 产品相关字段 ==========
/**
* 产品编号
*/
private Long productId;
/**
* 编解码器类型
*/
private String codecType;
}

View File

@@ -0,0 +1,86 @@
package cn.iocoder.yudao.module.iot.core.enums;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import cn.iocoder.yudao.framework.common.util.collection.SetUtils;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.Arrays;
import java.util.Set;
/**
* IoT 设备消息的方法枚举
*
* @author haohao
*/
@Getter
@AllArgsConstructor
public enum IotDeviceMessageMethodEnum implements ArrayValuable<String> {
// ========== 设备状态 ==========
STATE_UPDATE("thing.state.update", "设备状态更新", true),
// TODO 芋艿:要不要加个 ping 消息;
// ========== 设备属性 ==========
// 可参考https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
PROPERTY_POST("thing.property.post", "属性上报", true),
PROPERTY_SET("thing.property.set", "属性设置", false),
// ========== 设备事件 ==========
// 可参考https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
EVENT_POST("thing.event.post", "事件上报", true),
// ========== 设备服务调用 ==========
// 可参考https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services
SERVICE_INVOKE("thing.service.invoke", "服务调用", false),
// ========== 设备配置 ==========
// 可参考https://help.aliyun.com/zh/iot/user-guide/remote-configuration-1
CONFIG_PUSH("thing.config.push", "配置推送", false),
// ========== OTA 固件 ==========
// 可参考https://help.aliyun.com/zh/iot/user-guide/perform-ota-updates
OTA_UPGRADE("thing.ota.upgrade", "OTA 固定信息推送", false),
OTA_PROGRESS("thing.ota.progress", "OTA 升级进度上报", true),
;
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageMethodEnum::getMethod)
.toArray(String[]::new);
/**
* 不进行 reply 回复的方法集合
*/
public static final Set<String> REPLY_DISABLED = SetUtils.asSet(
STATE_UPDATE.getMethod(),
OTA_PROGRESS.getMethod() // 参考阿里云OTA 升级进度上报,不进行回复
);
private final String method;
private final String name;
private final Boolean upstream;
@Override
public String[] array() {
return ARRAYS;
}
public static IotDeviceMessageMethodEnum of(String method) {
return ArrayUtil.firstMatch(item -> item.getMethod().equals(method),
IotDeviceMessageMethodEnum.values());
}
public static boolean isReplyDisabled(String method) {
return REPLY_DISABLED.contains(method);
}
}

View File

@@ -0,0 +1,37 @@
package cn.iocoder.yudao.module.iot.core.enums;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
/**
* IoT 设备消息类型枚举
*/
@Getter
@RequiredArgsConstructor
public enum IotDeviceMessageTypeEnum implements ArrayValuable<String> {
STATE("state"), // 设备状态
// PROPERTY("property"), // 设备属性:可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services 设备属性、事件、服务
EVENT("event"), // 设备事件:可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services 设备属性、事件、服务
SERVICE("service"), // 设备服务:可参考 https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services 设备属性、事件、服务
CONFIG("config"), // 设备配置:可参考 https://help.aliyun.com/zh/iot/user-guide/remote-configuration-1 远程配置
OTA("ota"), // 设备 OTA可参考 https://help.aliyun.com/zh/iot/user-guide/ota-update OTA 升级
REGISTER("register"), // 设备注册:可参考 https://help.aliyun.com/zh/iot/user-guide/register-devices 设备身份注册
TOPOLOGY("topology"),; // 设备拓扑:可参考 https://help.aliyun.com/zh/iot/user-guide/manage-topological-relationships 设备拓扑
public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageTypeEnum::getType).toArray(String[]::new);
/**
* 属性
*/
private final String type;
@Override
public String[] array() {
return ARRAYS;
}
}

View File

@@ -0,0 +1,46 @@
package cn.iocoder.yudao.module.iot.core.enums;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import java.util.Arrays;
/**
* IoT 设备状态枚举
*
* @author haohao
*/
@RequiredArgsConstructor
@Getter
public enum IotDeviceStateEnum implements ArrayValuable<Integer> {
INACTIVE(0, "未激活"),
ONLINE(1, "在线"),
OFFLINE(2, "离线");
public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDeviceStateEnum::getState).toArray(Integer[]::new);
/**
* 状态
*/
private final Integer state;
/**
* 状态名
*/
private final String name;
@Override
public Integer[] array() {
return ARRAYS;
}
public static boolean isOnline(Integer state) {
return ONLINE.getState().equals(state);
}
public static boolean isNotOnline(Integer state) {
return !isOnline(state);
}
}

View File

@@ -0,0 +1,129 @@
package cn.iocoder.yudao.module.iot.core.messagebus.config;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisStreamMessageCleanupJob;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.local.IotLocalMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.redis.IotRedisMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq.IotRocketMQMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import java.util.List;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
/**
* IoT 消息总线自动配置
*
* @author 芋道源码
*/
@AutoConfiguration
@EnableConfigurationProperties(IotMessageBusProperties.class)
@Slf4j
public class IotMessageBusAutoConfiguration {
@Bean
public IotDeviceMessageProducer deviceMessageProducer(IotMessageBus messageBus) {
return new IotDeviceMessageProducer(messageBus);
}
// ==================== Local 实现 ====================
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "local", matchIfMissing = true)
public static class IotLocalMessageBusConfiguration {
@Bean
public IotLocalMessageBus iotLocalMessageBus(ApplicationContext applicationContext) {
log.info("[iotLocalMessageBus][创建 IoT Local 消息总线]");
return new IotLocalMessageBus(applicationContext);
}
}
// ==================== RocketMQ 实现 ====================
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rocketmq")
@ConditionalOnClass(RocketMQTemplate.class)
public static class IotRocketMQMessageBusConfiguration {
@Bean
public IotRocketMQMessageBus iotRocketMQMessageBus(RocketMQProperties rocketMQProperties,
RocketMQTemplate rocketMQTemplate) {
log.info("[iotRocketMQMessageBus][创建 IoT RocketMQ 消息总线]");
return new IotRocketMQMessageBus(rocketMQProperties, rocketMQTemplate);
}
}
// ==================== Redis 实现 ====================
/**
* 特殊:由于 YudaoRedisMQConsumerAutoConfiguration 关于 Redis stream 的消费是动态注册,所以这里只能拷贝相关的逻辑!!!
*
* @see cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration
*/
@Configuration
@ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "redis")
@ConditionalOnClass(RedisTemplate.class)
public static class IotRedisMessageBusConfiguration {
@Bean
public IotRedisMessageBus iotRedisMessageBus(StringRedisTemplate redisTemplate) {
log.info("[iotRedisMessageBus][创建 IoT Redis 消息总线]");
return new IotRedisMessageBus(redisTemplate);
}
/**
* 创建 Redis Stream 重新消费的任务
*/
@Bean
public RedisPendingMessageResendJob iotRedisPendingMessageResendJob(IotRedisMessageBus messageBus,
RedisMQTemplate redisTemplate,
RedissonClient redissonClient) {
List<AbstractRedisStreamMessageListener<?>> listeners = getListeners(messageBus);
return new RedisPendingMessageResendJob(listeners, redisTemplate, redissonClient);
}
/**
* 创建 Redis Stream 消息清理任务
*/
@Bean
public RedisStreamMessageCleanupJob iotRedisStreamMessageCleanupJob(IotRedisMessageBus messageBus,
RedisMQTemplate redisTemplate,
RedissonClient redissonClient) {
List<AbstractRedisStreamMessageListener<?>> listeners = getListeners(messageBus);
return new RedisStreamMessageCleanupJob(listeners, redisTemplate, redissonClient);
}
private List<AbstractRedisStreamMessageListener<?>> getListeners(IotRedisMessageBus messageBus) {
return convertList(messageBus.getSubscribers(), subscriber ->
new AbstractRedisStreamMessageListener<AbstractRedisStreamMessage>(subscriber.getTopic(), subscriber.getGroup()) {
@Override
public void onMessage(AbstractRedisStreamMessage message) {
throw new UnsupportedOperationException("不应该调用!!!");
}
});
}
}
}

View File

@@ -0,0 +1,27 @@
package cn.iocoder.yudao.module.iot.core.messagebus.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
import javax.validation.constraints.NotNull;
/**
* IoT 消息总线配置属性
*
* @author 芋道源码
*/
@ConfigurationProperties("yudao.iot.message-bus")
@Data
@Validated
public class IotMessageBusProperties {
/**
* 消息总线类型
*
* 可选值local、redis、rocketmq、rabbitmq
*/
@NotNull(message = "IoT 消息总线类型不能为空")
private String type = "local";
}

View File

@@ -0,0 +1,27 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core;
/**
* IoT 消息总线接口
*
* 用于在 IoT 系统中发布和订阅消息,支持多种消息中间件实现
*
* @author 芋道源码
*/
public interface IotMessageBus {
/**
* 发布消息到消息总线
*
* @param topic 主题
* @param message 消息内容
*/
void post(String topic, Object message);
/**
* 注册消息订阅者
*
* @param subscriber 订阅者
*/
void register(IotMessageSubscriber<?> subscriber);
}

View File

@@ -0,0 +1,29 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core;
/**
* IoT 消息总线订阅者接口
*
* 用于处理从消息总线接收到的消息
*
* @author 芋道源码
*/
public interface IotMessageSubscriber<T> {
/**
* @return 主题
*/
String getTopic();
/**
* @return 分组
*/
String getGroup();
/**
* 处理接收到的消息
*
* @param message 消息内容
*/
void onMessage(T message);
}

View File

@@ -0,0 +1,14 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core.local;
import lombok.AllArgsConstructor;
import lombok.Data;
@Data
@AllArgsConstructor
public class IotLocalMessage {
private String topic;
private Object message;
}

View File

@@ -0,0 +1,67 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core.local;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.event.EventListener;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 本地的 {@link IotMessageBus} 实现类
*
* 注意:仅适用于单机场景!!!
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotLocalMessageBus implements IotMessageBus {
private final ApplicationContext applicationContext;
/**
* 订阅者映射表
* Key: topic
*/
private final Map<String, List<IotMessageSubscriber<?>>> subscribers = new HashMap<>();
@Override
public void post(String topic, Object message) {
applicationContext.publishEvent(new IotLocalMessage(topic, message));
}
@Override
public void register(IotMessageSubscriber<?> subscriber) {
String topic = subscriber.getTopic();
List<IotMessageSubscriber<?>> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new ArrayList<>());
topicSubscribers.add(subscriber);
log.info("[register][topic({}/{}) 注册消费者({})成功]",
topic, subscriber.getGroup(), subscriber.getClass().getName());
}
@EventListener
@SuppressWarnings({"unchecked", "rawtypes"})
public void onMessage(IotLocalMessage message) {
String topic = message.getTopic();
List<IotMessageSubscriber<?>> topicSubscribers = subscribers.get(topic);
if (CollUtil.isEmpty(topicSubscribers)) {
return;
}
for (IotMessageSubscriber subscriber : topicSubscribers) {
try {
subscriber.onMessage(message.getMessage());
} catch (Exception ex) {
log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]",
subscriber.getTopic(), subscriber.getGroup(), message.getMessage(), subscriber.getClass().getName(), ex);
}
}
}
}

View File

@@ -0,0 +1,99 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core.redis;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.buildConsumerName;
import static cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration.checkRedisVersion;
/**
* Redis 的 {@link IotMessageBus} 实现类
*
* @author 芋道源码
*/
@Slf4j
public class IotRedisMessageBus implements IotMessageBus {
private final RedisTemplate<String, ?> redisTemplate;
private final StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer;
@Getter
private final List<IotMessageSubscriber<?>> subscribers = new ArrayList<>();
public IotRedisMessageBus(RedisTemplate<String, ?> redisTemplate) {
this.redisTemplate = redisTemplate;
checkRedisVersion(redisTemplate);
// 创建 options 配置
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
.batchSize(10) // 一次性最多拉取多少条消息
.targetType(String.class) // 目标类型。统一使用 String通过自己封装的 AbstractStreamMessageListener 去反序列化
.build();
// 创建 container 对象
this.redisStreamMessageListenerContainer =
StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
}
@PostConstruct
public void init() {
this.redisStreamMessageListenerContainer.start();
}
@PreDestroy
public void destroy() {
this.redisStreamMessageListenerContainer.stop();
}
@Override
public void post(String topic, Object message) {
redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(topic)); // 设置 stream key
}
@Override
public void register(IotMessageSubscriber<?> subscriber) {
Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0);
if (type == null) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
// 创建 listener 对应的消费者分组
try {
redisTemplate.opsForStream().createGroup(subscriber.getTopic(), subscriber.getGroup());
} catch (Exception ignore) {
}
// 创建 Consumer 对象
String consumerName = buildConsumerName();
Consumer consumer = Consumer.from(subscriber.getGroup(), consumerName);
// 设置 Consumer 消费进度,以最小消费进度为准
StreamOffset<String> streamOffset = StreamOffset.create(subscriber.getTopic(), ReadOffset.lastConsumed());
// 设置 Consumer 监听
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
.builder(streamOffset).consumer(consumer)
.autoAcknowledge(false) // 不自动 ack
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
redisStreamMessageListenerContainer.register(builder.build(), message -> {
// 消费消息
subscriber.onMessage(JsonUtils.parseObject(message.getValue(), type));
// ack 消息消费完成
redisTemplate.opsForStream().acknowledge(subscriber.getGroup(), message);
});
this.subscribers.add(subscriber);
}
}

View File

@@ -0,0 +1,98 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import javax.annotation.PreDestroy;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
/**
* 基于 RocketMQ 的 {@link IotMessageBus} 实现类
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotRocketMQMessageBus implements IotMessageBus {
private final RocketMQProperties rocketMQProperties;
private final RocketMQTemplate rocketMQTemplate;
/**
* 主题对应的消费者映射
*/
private final List<DefaultMQPushConsumer> topicConsumers = new ArrayList<>();
/**
* 销毁时关闭所有消费者
*/
@PreDestroy
public void destroy() {
for (DefaultMQPushConsumer consumer : topicConsumers) {
try {
consumer.shutdown();
log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup());
} catch (Exception e) {
log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e);
}
}
}
@Override
public void post(String topic, Object message) {
// TODO @芋艿:需要 orderly
SendResult result = rocketMQTemplate.syncSend(topic, JsonUtils.toJsonString(message));
log.info("[post][topic({}) 发送消息({}) result({})]", topic, message, result);
}
@Override
@SneakyThrows
public void register(IotMessageSubscriber<?> subscriber) {
Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0);
if (type == null) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
// 1.1 创建 DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
consumer.setConsumerGroup(subscriber.getGroup());
// 1.2 订阅主题
consumer.subscribe(subscriber.getTopic(), "*");
// 1.3 设置消息监听器
consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (MessageExt messageExt : messages) {
try {
byte[] body = messageExt.getBody();
subscriber.onMessage(JsonUtils.parseObject(body, type));
} catch (Exception ex) {
log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]",
subscriber.getTopic(), subscriber.getGroup(), messageExt, subscriber.getClass().getName(), ex);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 1.4 启动消费者
consumer.start();
// 2. 保存消费者引用
topicConsumers.add(consumer);
}
}

View File

@@ -0,0 +1,151 @@
package cn.iocoder.yudao.module.iot.core.mq.message;
import cn.hutool.core.map.MapUtil;
import cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
/**
* IoT 设备消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class IotDeviceMessage {
/**
* 【消息总线】应用的设备消息 Topic由 iot-gateway 发给 iot-biz 进行消费
*/
public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
/**
* 【消息总线】设备消息 Topic由 iot-biz 发送给 iot-gateway 的某个 "server"(protocol) 进行消费
*
* 其中,%s 就是该"server"(protocol) 的标识
*/
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
/**
* 消息编号
*
* 由后端生成,通过 {@link IotDeviceMessageUtils#generateMessageId()}
*/
private String id;
/**
* 上报时间
*
* 由后端生成,当前时间
*/
private LocalDateTime reportTime;
/**
* 设备编号
*/
private Long deviceId;
/**
* 租户编号
*/
private Long tenantId;
/**
* 服务编号,该消息由哪个 server 发送
*/
private String serverId;
// ========== codec编解码字段 ==========
/**
* 请求编号
*
* 由设备生成,对应阿里云 IoT 的 Alink 协议中的 id、华为云 IoTDA 协议的 request_id
*/
private String requestId;
/**
* 请求方法
*
* 枚举 {@link IotDeviceMessageMethodEnum}
* 例如说thing.property.report 属性上报
*/
private String method;
/**
* 请求参数
*
* 例如说:属性上报的 properties、事件上报的 params
*/
private Object params;
/**
* 响应结果
*/
private Object data;
/**
* 响应错误码
*/
private Integer code;
/**
* 返回结果信息
*/
private String msg;
// ========== 基础方法:只传递"codec编解码字段" ==========
public static IotDeviceMessage requestOf(String method) {
return requestOf(null, method, null);
}
public static IotDeviceMessage requestOf(String method, Object params) {
return requestOf(null, method, params);
}
public static IotDeviceMessage requestOf(String requestId, String method, Object params) {
return of(requestId, method, params, null, null, null);
}
public static IotDeviceMessage replyOf(String requestId, String method,
Object data, Integer code, String msg) {
if (code == null) {
code = GlobalErrorCodeConstants.SUCCESS.getCode();
msg = GlobalErrorCodeConstants.SUCCESS.getMsg();
}
return of(requestId, method, null, data, code, msg);
}
public static IotDeviceMessage of(String requestId, String method,
Object params, Object data, Integer code, String msg) {
// 通用参数
IotDeviceMessage message = new IotDeviceMessage()
.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now());
// 当前参数
message.setRequestId(requestId).setMethod(method).setParams(params)
.setData(data).setCode(code).setMsg(msg);
return message;
}
// ========== 核心方法:在 of 基础方法之上,添加对应 method ==========
public static IotDeviceMessage buildStateUpdateOnline() {
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
MapUtil.of("state", IotDeviceStateEnum.ONLINE.getState()));
}
public static IotDeviceMessage buildStateOffline() {
return requestOf(IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod(),
MapUtil.of("state", IotDeviceStateEnum.OFFLINE.getState()));
}
public static IotDeviceMessage buildOtaUpgrade(String version, String fileUrl, Long fileSize,
String fileDigestAlgorithm, String fileDigestValue) {
return requestOf(IotDeviceMessageMethodEnum.OTA_UPGRADE.getMethod(), MapUtil.builder()
.put("version", version).put("fileUrl", fileUrl).put("fileSize", fileSize)
.put("fileDigestAlgorithm", fileDigestAlgorithm).put("fileDigestValue", fileDigestValue)
.build());
}
}

View File

@@ -0,0 +1,37 @@
package cn.iocoder.yudao.module.iot.core.mq.producer;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import lombok.RequiredArgsConstructor;
/**
* IoT 设备消息生产者
*
* @author 芋道源码
*/
@RequiredArgsConstructor
public class IotDeviceMessageProducer {
private final IotMessageBus messageBus;
/**
* 发送设备消息
*
* @param message 设备消息
*/
public void sendDeviceMessage(IotDeviceMessage message) {
messageBus.post(IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC, message);
}
/**
* 发送网关设备消息
*
* @param serverId 网关的 serverId 标识
* @param message 设备消息
*/
public void sendDeviceMessageToGateway(String serverId, IotDeviceMessage message) {
messageBus.post(IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(serverId), message);
}
}

View File

@@ -0,0 +1,85 @@
package cn.iocoder.yudao.module.iot.core.util;
import cn.hutool.crypto.digest.DigestUtil;
import cn.hutool.crypto.digest.HmacAlgorithm;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* IoT 设备【认证】的工具类,参考阿里云
*
* @see <a href="https://help.aliyun.com/zh/iot/user-guide/how-do-i-obtain-mqtt-parameters-for-authentication">如何计算 MQTT 签名参数</a>
*/
public class IotDeviceAuthUtils {
/**
* 认证信息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class AuthInfo {
/**
* 客户端 ID
*/
private String clientId;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
}
/**
* 设备信息
*/
@Data
public static class DeviceInfo {
private String productKey;
private String deviceName;
}
public static AuthInfo getAuthInfo(String productKey, String deviceName, String deviceSecret) {
String clientId = buildClientId(productKey, deviceName);
String username = buildUsername(productKey, deviceName);
String content = "clientId" + clientId +
"deviceName" + deviceName +
"deviceSecret" + deviceSecret +
"productKey" + productKey;
String password = buildPassword(deviceSecret, content);
return new AuthInfo(clientId, username, password);
}
private static String buildClientId(String productKey, String deviceName) {
return String.format("%s.%s", productKey, deviceName);
}
private static String buildUsername(String productKey, String deviceName) {
return String.format("%s&%s", deviceName, productKey);
}
private static String buildPassword(String deviceSecret, String content) {
return DigestUtil.hmac(HmacAlgorithm.HmacSHA256, deviceSecret.getBytes())
.digestHex(content);
}
public static DeviceInfo parseUsername(String username) {
String[] usernameParts = username.split("&");
if (usernameParts.length != 2) {
return null;
}
return new DeviceInfo().setProductKey(usernameParts[1]).setDeviceName(usernameParts[0]);
}
}

View File

@@ -0,0 +1,167 @@
package cn.iocoder.yudao.module.iot.core.util;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import java.util.Map;
/**
* IoT 设备【消息】的工具类
*
* @author 芋道源码
*/
public class IotDeviceMessageUtils {
// ========== Message 相关 ==========
public static String generateMessageId() {
return IdUtil.fastSimpleUUID();
}
/**
* 是否是上行消息:由设备发送
*
* @param message 消息
* @return 是否
*/
@SuppressWarnings("SimplifiableConditionalExpression")
public static boolean isUpstreamMessage(IotDeviceMessage message) {
IotDeviceMessageMethodEnum methodEnum = IotDeviceMessageMethodEnum.of(message.getMethod());
Assert.notNull(methodEnum, "无法识别的消息方法:" + message.getMethod());
// 注意:回复消息时,需要取反
return !isReplyMessage(message) ? methodEnum.getUpstream() : !methodEnum.getUpstream();
}
/**
* 是否是回复消息,通过 {@link IotDeviceMessage#getCode()} 非空进行识别
*
* @param message 消息
* @return 是否
*/
public static boolean isReplyMessage(IotDeviceMessage message) {
return message.getCode() != null;
}
/**
* 提取消息中的标识符
*
* @param message 消息
* @return 标识符
*/
@SuppressWarnings("unchecked")
public static String getIdentifier(IotDeviceMessage message) {
if (message.getParams() == null) {
return null;
}
if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.EVENT_POST.getMethod(),
IotDeviceMessageMethodEnum.SERVICE_INVOKE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "identifier");
} else if (StrUtil.equalsAny(message.getMethod(), IotDeviceMessageMethodEnum.STATE_UPDATE.getMethod())) {
Map<String, Object> params = (Map<String, Object>) message.getParams();
return MapUtil.getStr(params, "state");
}
return null;
}
/**
* 从设备消息中提取指定标识符的属性值
* - 支持多种消息格式和属性值提取策略
* - 兼容现有的消息结构
* - 提供统一的属性值提取接口
* <p>
* 支持的提取策略(按优先级顺序):
* 1. 直接值:如果 params 不是 Map直接返回该值适用于简单消息
* 2. 标识符字段:从 params[identifier] 获取
* 3. properties 结构:从 params.properties[identifier] 获取(标准属性上报)
* 4. data 结构:从 params.data[identifier] 获取
* 5. value 字段:从 params.value 获取(单值消息)
* 6. 单值 Map如果 Map 只包含 identifier 和一个值,返回该值
*
* @param message 设备消息
* @param identifier 属性标识符
* @return 属性值,如果未找到则返回 null
*/
@SuppressWarnings("unchecked")
public static Object extractPropertyValue(IotDeviceMessage message, String identifier) {
Object params = message.getParams();
if (params == null) {
return null;
}
// 策略1如果 params 不是 Map直接返回该值适用于简单的单属性消息
if (!(params instanceof Map)) {
return params;
}
Map<String, Object> paramsMap = (Map<String, Object>) params;
// 策略2直接通过标识符获取属性值
Object directValue = paramsMap.get(identifier);
if (directValue != null) {
return directValue;
}
// 策略3从 properties 字段中获取(适用于标准属性上报消息)
Object properties = paramsMap.get("properties");
if (properties instanceof Map) {
Map<String, Object> propertiesMap = (Map<String, Object>) properties;
Object propertyValue = propertiesMap.get(identifier);
if (propertyValue != null) {
return propertyValue;
}
}
// 策略4从 data 字段中获取(适用于某些消息格式)
Object data = paramsMap.get("data");
if (data instanceof Map) {
Map<String, Object> dataMap = (Map<String, Object>) data;
Object dataValue = dataMap.get(identifier);
if (dataValue != null) {
return dataValue;
}
}
// 策略5从 value 字段中获取(适用于单值消息)
Object value = paramsMap.get("value");
if (value != null) {
return value;
}
// 策略6如果 Map 只有两个字段且包含 identifier返回另一个字段的值
if (paramsMap.size() == 2 && paramsMap.containsKey("identifier")) {
for (Map.Entry<String, Object> entry : paramsMap.entrySet()) {
if (!"identifier".equals(entry.getKey())) {
return entry.getValue();
}
}
}
// 未找到对应的属性值
return null;
}
// ========== Topic 相关 ==========
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
return String.format(IotDeviceMessage.MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
}
/**
* 生成服务器编号
*
* @param serverPort 服务器端口
* @return 服务器编号
*/
public static String generateServerId(Integer serverPort) {
String serverId = String.format("%s.%d", SystemUtil.getHostInfo().getAddress(), serverPort);
// 避免一些场景无法使用 . 符号,例如说 RocketMQ Topic
return serverId.replaceAll("\\.", "_");
}
}

View File

@@ -0,0 +1 @@
cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration

View File

@@ -0,0 +1,12 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core;
import lombok.Data;
@Data
public class TestMessage {
private String nickname;
private Integer age;
}

View File

@@ -0,0 +1,178 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core.local;
import cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* {@link IotLocalMessageBus} 集成测试
*
* @author 芋道源码
*/
@SpringBootTest(classes = LocalIotMessageBusIntegrationTest.class)
@Import(IotMessageBusAutoConfiguration.class)
@TestPropertySource(properties = {
"yudao.iot.message-bus.type=local"
})
@Slf4j
public class LocalIotMessageBusIntegrationTest {
@Resource
private IotMessageBus messageBus;
/**
* 1 topic 2 subscriber
*/
@Test
public void testSendMessageWithTwoSubscribers() throws InterruptedException {
// 准备
String topic = "test-topic";
String testMessage = "Hello IoT Message Bus!";
// 用于等待消息处理完成
CountDownLatch latch = new CountDownLatch(2);
// 用于记录接收到的消息
AtomicInteger subscriber1Count = new AtomicInteger(0);
AtomicInteger subscriber2Count = new AtomicInteger(0);
// 创建第一个订阅者
IotMessageSubscriber<String> subscriber1 = new IotMessageSubscriber<String>() {
@Override
public String getTopic() {
return topic;
}
@Override
public String getGroup() {
return "group1";
}
@Override
public void onMessage(String message) {
log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
subscriber1Count.incrementAndGet();
assertEquals(testMessage, message);
latch.countDown();
}
};
// 创建第二个订阅者
IotMessageSubscriber<String> subscriber2 = new IotMessageSubscriber<String>() {
@Override
public String getTopic() {
return topic;
}
@Override
public String getGroup() {
return "group2";
}
@Override
public void onMessage(String message) {
log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
subscriber2Count.incrementAndGet();
assertEquals(testMessage, message);
latch.countDown();
}
};
// 注册订阅者
messageBus.register(subscriber1);
messageBus.register(subscriber2);
// 发送消息
log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
messageBus.post(topic, testMessage);
// 等待消息处理完成(最多等待 10 秒)
boolean completed = latch.await(10, TimeUnit.SECONDS);
// 验证结果
assertTrue(completed, "消息处理超时");
assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息");
assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息");
log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息,订阅者 2 收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
}
/**
* 2 topic 2 subscriber
*/
@Test
public void testMultipleTopics() throws InterruptedException {
// 准备
String topic1 = "device-status";
String topic2 = "device-data";
String message1 = "设备在线";
String message2 = "温度:25°C";
CountDownLatch latch = new CountDownLatch(2);
// 创建订阅者 1 - 只订阅设备状态
IotMessageSubscriber<String> statusSubscriber = new IotMessageSubscriber<String>() {
@Override
public String getTopic() {
return topic1;
}
@Override
public String getGroup() {
return "status-group";
}
@Override
public void onMessage(String message) {
log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
assertEquals(message1, message);
latch.countDown();
}
};
// 创建订阅者 2 - 只订阅设备数据
IotMessageSubscriber<String> dataSubscriber = new IotMessageSubscriber<String>() {
@Override
public String getTopic() {
return topic2;
}
@Override
public String getGroup() {
return "data-group";
}
@Override
public void onMessage(String message) {
log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
assertEquals(message2, message);
latch.countDown();
}
};
// 注册订阅者到不同主题
messageBus.register(statusSubscriber);
messageBus.register(dataSubscriber);
// 发送消息到不同主题
messageBus.post(topic1, message1);
messageBus.post(topic2, message2);
// 等待消息处理完成
boolean completed = latch.await(10, TimeUnit.SECONDS);
assertTrue(completed, "消息处理超时");
log.info("[测试] 多主题测试完成");
}
}

View File

@@ -0,0 +1,269 @@
package cn.iocoder.yudao.module.iot.core.messagebus.core.rocketmq;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.messagebus.core.TestMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.TestPropertySource;
import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* {@link IotRocketMQMessageBus} 集成测试
*
* @author 芋道源码
*/
@SpringBootTest(classes = RocketMQIotMessageBusTest.class)
@Import({RocketMQAutoConfiguration.class, IotMessageBusAutoConfiguration.class})
@TestPropertySource(properties = {
"yudao.iot.message-bus.type=rocketmq",
"rocketmq.name-server=127.0.0.1:9876",
"rocketmq.producer.group=test-rocketmq-group",
"rocketmq.producer.send-message-timeout=10000"
})
@Slf4j
public class RocketMQIotMessageBusTest {
@Resource
private IotMessageBus messageBus;
/**
* 1 topic 1 subscriberstring
*/
@Test
public void testSendMessageWithOneSubscriber() throws InterruptedException {
// 准备
String topic = "test-topic-" + IdUtil.simpleUUID();
// String topic = "test-topic-pojo";
String testMessage = "Hello IoT Message Bus!";
// 用于等待消息处理完成
CountDownLatch latch = new CountDownLatch(1);
// 用于记录接收到的消息
AtomicInteger subscriberCount = new AtomicInteger(0);
AtomicReference<String> subscriberMessageRef = new AtomicReference<>();
// 发送消息(需要提前发,保证 RocketMQ 路由的创建)
log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
messageBus.post(topic, testMessage);
// 创建订阅者
IotMessageSubscriber<String> subscriber1 = new IotMessageSubscriber<String>() {
@Override
public String getTopic() {
return topic;
}
@Override
public String getGroup() {
return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
// return "test-topic-consumer-01";
}
@Override
public void onMessage(String message) {
log.info("[订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
subscriberCount.incrementAndGet();
subscriberMessageRef.set(message);
assertEquals(testMessage, message);
latch.countDown();
}
};
// 注册订阅者
messageBus.register(subscriber1);
// 等待消息处理完成(最多等待 5 秒)
boolean completed = latch.await(10, TimeUnit.SECONDS);
// 验证结果
assertTrue(completed, "消息处理超时");
assertEquals(1, subscriberCount.get(), "订阅者应该收到 1 条消息");
log.info("[测试] 测试完成 - 订阅者收到{}条消息", subscriberCount.get());
assertEquals(testMessage, subscriberMessageRef.get(), "接收到的消息内容不匹配");
}
/**
* 1 topic 2 subscriberpojo
*/
@Test
public void testSendMessageWithTwoSubscribers() throws InterruptedException {
// 准备
String topic = "test-topic-" + IdUtil.simpleUUID();
// String topic = "test-topic-pojo";
TestMessage testMessage = new TestMessage().setNickname("yunai").setAge(18);
// 用于等待消息处理完成
CountDownLatch latch = new CountDownLatch(2);
// 用于记录接收到的消息
AtomicInteger subscriber1Count = new AtomicInteger(0);
AtomicReference<TestMessage> subscriber1MessageRef = new AtomicReference<>();
AtomicInteger subscriber2Count = new AtomicInteger(0);
AtomicReference<TestMessage> subscriber2MessageRef = new AtomicReference<>();
// 发送消息(需要提前发,保证 RocketMQ 路由的创建)
log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage);
messageBus.post(topic, testMessage);
// 创建第一个订阅者
IotMessageSubscriber<TestMessage> subscriber1 = new IotMessageSubscriber<TestMessage>() {
@Override
public String getTopic() {
return topic;
}
@Override
public String getGroup() {
return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
// return "test-topic-consumer-01";
}
@Override
public void onMessage(TestMessage message) {
log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
subscriber1Count.incrementAndGet();
subscriber1MessageRef.set(message);
assertEquals(testMessage, message);
latch.countDown();
}
};
// 创建第二个订阅者
IotMessageSubscriber<TestMessage> subscriber2 = new IotMessageSubscriber<TestMessage>() {
@Override
public String getTopic() {
return topic;
}
@Override
public String getGroup() {
return "test-topic-" + IdUtil.simpleUUID() + "-consumer";
// return "test-topic-consumer-02";
}
@Override
public void onMessage(TestMessage message) {
log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
subscriber2Count.incrementAndGet();
subscriber2MessageRef.set(message);
assertEquals(testMessage, message);
latch.countDown();
}
};
// 注册订阅者
messageBus.register(subscriber1);
messageBus.register(subscriber2);
// 等待消息处理完成(最多等待 5 秒)
boolean completed = latch.await(10, TimeUnit.SECONDS);
// 验证结果
assertTrue(completed, "消息处理超时");
assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息");
assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息");
log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
assertEquals(testMessage, subscriber1MessageRef.get(), "接收到的消息内容不匹配");
assertEquals(testMessage, subscriber2MessageRef.get(), "接收到的消息内容不匹配");
}
/**
* 2 topic 2 subscriber
*/
@Test
public void testMultipleTopics() throws InterruptedException {
// 准备
String topic1 = "device-status-" + IdUtil.simpleUUID();
String topic2 = "device-data-" + IdUtil.simpleUUID();
String message1 = "设备在线";
String message2 = "温度:25°C";
CountDownLatch latch = new CountDownLatch(2);
AtomicInteger subscriber1Count = new AtomicInteger(0);
AtomicReference<String> subscriber1MessageRef = new AtomicReference<>();
AtomicInteger subscriber2Count = new AtomicInteger(0);
AtomicReference<String> subscriber2MessageRef = new AtomicReference<>();
// 发送消息到不同主题(需要提前发,保证 RocketMQ 路由的创建)
log.info("[测试] 发送消息 - Topic1: {}, Message1: {}", topic1, message1);
messageBus.post(topic1, message1);
log.info("[测试] 发送消息 - Topic2: {}, Message2: {}", topic2, message2);
messageBus.post(topic2, message2);
// 创建订阅者 1 - 只订阅设备状态
IotMessageSubscriber<String> statusSubscriber = new IotMessageSubscriber<String>() {
@Override
public String getTopic() {
return topic1;
}
@Override
public String getGroup() {
return "status-group-" + IdUtil.simpleUUID();
}
@Override
public void onMessage(String message) {
log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
subscriber1Count.incrementAndGet();
subscriber1MessageRef.set(message);
assertEquals(message1, message);
latch.countDown();
}
};
// 创建订阅者 2 - 只订阅设备数据
IotMessageSubscriber<String> dataSubscriber = new IotMessageSubscriber<String>() {
@Override
public String getTopic() {
return topic2;
}
@Override
public String getGroup() {
return "data-group-" + IdUtil.simpleUUID();
}
@Override
public void onMessage(String message) {
log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message);
subscriber2Count.incrementAndGet();
subscriber2MessageRef.set(message);
assertEquals(message2, message);
latch.countDown();
}
};
// 注册订阅者到不同主题
messageBus.register(statusSubscriber);
messageBus.register(dataSubscriber);
// 等待消息处理完成
boolean completed = latch.await(10, TimeUnit.SECONDS);
// 验证结果
assertTrue(completed, "消息处理超时");
assertEquals(1, subscriber1Count.get(), "状态订阅者应该收到 1 条消息");
assertEquals(message1, subscriber1MessageRef.get(), "状态订阅者接收到的消息内容不匹配");
assertEquals(1, subscriber2Count.get(), "数据订阅者应该收到 1 条消息");
assertEquals(message2, subscriber2MessageRef.get(), "数据订阅者接收到的消息内容不匹配");
log.info("[测试] 多主题测试完成 - 状态订阅者收到{}条消息,数据订阅者收到{}条消息", subscriber1Count.get(), subscriber2Count.get());
}
}

View File

@@ -0,0 +1,141 @@
package cn.iocoder.yudao.module.iot.core.util;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
/**
* {@link IotDeviceMessageUtils} 的单元测试
*
* @author HUIHUI
*/
public class IotDeviceMessageUtilsTest {
@Test
public void testExtractPropertyValue_directValue() {
// 测试直接值(非 Map
IotDeviceMessage message = new IotDeviceMessage();
message.setParams(25.5);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result);
}
@Test
public void testExtractPropertyValue_directIdentifier() {
// 测试直接通过标识符获取
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("temperature", 25.5);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result);
}
@Test
public void testExtractPropertyValue_propertiesStructure() {
// 测试 properties 结构
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> properties = new HashMap<>();
properties.put("temperature", 25.5);
properties.put("humidity", 60);
Map<String, Object> params = new HashMap<>();
params.put("properties", properties);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result);
}
@Test
public void testExtractPropertyValue_dataStructure() {
// 测试 data 结构
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> data = new HashMap<>();
data.put("temperature", 25.5);
Map<String, Object> params = new HashMap<>();
params.put("data", data);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result);
}
@Test
public void testExtractPropertyValue_valueField() {
// 测试 value 字段
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("identifier", "temperature");
params.put("value", 25.5);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result);
}
@Test
public void testExtractPropertyValue_singleValueMap() {
// 测试单值 Map包含 identifier 和一个值)
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("identifier", "temperature");
params.put("actualValue", 25.5);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result);
}
@Test
public void testExtractPropertyValue_notFound() {
// 测试未找到属性值
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> params = new HashMap<>();
params.put("humidity", 60);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertNull(result);
}
@Test
public void testExtractPropertyValue_nullParams() {
// 测试 params 为 null
IotDeviceMessage message = new IotDeviceMessage();
message.setParams(null);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertNull(result);
}
@Test
public void testExtractPropertyValue_priorityOrder() {
// 测试优先级顺序:直接标识符 > properties > data > value
IotDeviceMessage message = new IotDeviceMessage();
Map<String, Object> properties = new HashMap<>();
properties.put("temperature", 20.0);
Map<String, Object> data = new HashMap<>();
data.put("temperature", 30.0);
Map<String, Object> params = new HashMap<>();
params.put("temperature", 25.5); // 最高优先级
params.put("properties", properties);
params.put("data", data);
params.put("value", 40.0);
message.setParams(params);
Object result = IotDeviceMessageUtils.extractPropertyValue(message, "temperature");
assertEquals(25.5, result); // 应该返回直接标识符的值
}
}