设备接入(自定义协议)

推荐阅读:

本文以 TCP 协议为蓝本,一步步讲解如何在 IoT 网关中扩展一种新协议。

1. 整体思路

新增一种协议,需要改动以下模块和类:

步骤 改动位置 说明
IotProtocolTypeEnum 新增协议类型枚举
IotXxxConfig + IotGatewayProperties.ProtocolProperties 协议配置类
IotXxxProtocol 实现 IotProtocol 接口 协议主类,管理生命周期
上行 Handler + 下行 Subscriber 消息收发处理
IotProtocolManager 注册协议到管理器

yudao-module-iot-gateway 模块的 protocol/ 目录下新建协议包,最终的包结构如下:

protocol/
└── xxx/                                    // 你的协议包
    ├── IotXxxProtocol.java                 // 协议主类
    ├── IotXxxConfig.java                   // 专属配置类
    ├── handler/
    │   ├── upstream/                       // 上行(设备 → 网关)
    │   │   ├── IotXxxAuthHandler.java      // 认证
    │   │   └── IotXxxUpstreamHandler.java  // 上行消息
    │   └── downstream/                     // 下行(网关 → 设备)
    │       ├── IotXxxDownstreamHandler.java     // 下行消息处理逻辑
    │       └── IotXxxDownstreamSubscriber.java  // 消息总线订阅者
    └── manager/                            // 长连接协议需要
        └── IotXxxConnectionManager.java    // 设备连接管理

提示

短连接协议(如 HTTP)通常不需要 manager/ 目录。

2. 实现步骤

2.1 新增协议类型

① 在 yudao-module-iot-core 模块的 IotProtocolTypeEnum 枚举中,新增一个枚举值:

public enum IotProtocolTypeEnum {

    // ... 已有协议 ...
    HTTP("http"),
    MQTT("mqtt"),
    TCP("tcp"),

    XXX("xxx"); // 新增:你的协议类型标识

}

注意

"xxx" 是协议的类型标识,必须与 application.yaml 配置中的 protocol 字段一致。

② 需要在管理后台的「系统管理 → 字典管理」中,找到字典类型 iot_product_protocol_type,新增一条字典数据(值与枚举标识一致,如 "xxx"),这样前端创建产品时才能选择该协议类型。

2.2 创建协议配置类

① 新建专属配置类 IotXxxConfig,存放该协议独有的配置参数:

@Data
public class IotXxxConfig {

    /**
     * 最大连接数
     */
    private Integer maxConnections = 1000;

    // ... 其他协议特有参数 ...
}

可参考 IotTcpConfig(含 codec、maxConnections 等字段)。

② 在 ProtocolProperties 中注册,编辑 IotGatewayProperties 的 ProtocolProperties 内部类:

@Data
public static class ProtocolProperties {

    // ... 已有字段 ...
    private IotHttpConfig http;
    private IotMqttConfig mqtt;

    private IotXxxConfig xxx; // 新增
}

2.3 实现 IotProtocol 接口

IotProtocol 是所有协议的核心接口,需要实现以下方法:

方法 说明
#getId() 协议实例 ID,对应配置中的 id 字段
#getServerId() 服务标识,用于下行消息路由
#getType() 协议类型枚举
#start() 启动协议服务
#stop() 停止协议服务
#isRunning() 是否正在运行

以 IotTcpProtocol 为参考,协议主类的实现结构如下:

@Slf4j
public class IotXxxProtocol implements IotProtocol {

    private final ProtocolProperties properties;
    @Getter
    private final String serverId;
    @Getter
    private volatile boolean running = false;

    // 协议资源(服务器、连接等)
    private YourServer server;
    private IotXxxDownstreamSubscriber downstreamSubscriber;

    public IotXxxProtocol(ProtocolProperties properties) {
        this.properties = properties;
        this.serverId = IotDeviceMessageUtils.generateServerId(properties.getPort());
    }

    @Override
    public String getId() {
        return properties.getId();
    }

    @Override
    public IotProtocolTypeEnum getType() {
        return IotProtocolTypeEnum.XXX;
    }

    @Override
    public void start() {
        if (running) {
            return;
        }
        try {
            // ① 创建并启动协议服务器
            this.server = createAndStartServer();

            running = true;
            log.info("[start][协议 {} 启动成功,端口:{}]", getId(), properties.getPort());

            // ② 启动下行消息订阅者
            IotMessageBus messageBus = SpringUtil.getBean(IotMessageBus.class);
            this.downstreamSubscriber = new IotXxxDownstreamSubscriber(this, messageBus);
            this.downstreamSubscriber.start();
        } catch (Exception e) {
            log.error("[start][协议 {} 启动失败]", getId(), e);
            stop0();
            throw e;
        }
    }

    @Override
    public void stop() {
        if (!running) {
            return;
        }
        stop0();
    }

    private void stop0() {
        // ① 停止下行订阅者
        if (downstreamSubscriber != null) {
            downstreamSubscriber.stop();
            downstreamSubscriber = null;
        }
        // ② 关闭服务器
        if (server != null) {
            server.close();
            server = null;
        }
        running = false;
    }
}

注意

stop0() 方法在 start() 异常时也会被调用(用于清理已初始化的资源),因此每个资源释放都应做 null 判断,避免 NPE。

2.4 实现上行 Handler

上行 Handler 负责处理设备发送到网关的请求,通常包括 认证消息上报 两部分。

2.4.1 认证

认证方式取决于协议的连接特性:

连接类型 认证方式 参考实现 说明
短连接 JWT Token IotHttpAuthHandler 认证成功返回 Token,后续请求携带 Token
长连接 连接时认证 IotTcpUpstreamHandler(handleAuth() 方法) 认证成功后在 ConnectionManager 中记录设备信息,连接期间无需重复认证

两种方式都通过 IotDeviceCommonApi 的 #authDevice(...) 方法完成认证校验。认证成功后,需调用 IotDeviceMessageService 的 #sendDeviceMessage(...) 发送上线消息:

IotDeviceMessage message = IotDeviceMessage.buildStateUpdateOnline();
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);

2.4.2 消息上报

处理设备上报的属性、事件等业务消息。核心流程:

  1. 从请求中解析出 productKeydeviceName
  2. 反序列化消息体为 IotDeviceMessage
  3. 调用 IotDeviceMessageService 的 #sendDeviceMessage(message, productKey, deviceName, serverId) 发布到消息总线
// 以 TCP 为例(IotTcpUpstreamHandler)
String productKey = connectionInfo.getProductKey();
String deviceName = connectionInfo.getDeviceName();
IotDeviceMessage message = deserializeMessage(data);
deviceMessageService.sendDeviceMessage(message, productKey, deviceName, serverId);

提示

如果需要自定义消息序列化格式,可参考附录「C. 自定义序列化」。

2.4.3 动态注册(可选)

如果协议需要支持设备动态注册(一型一密),参考 IotHttpRegisterHandler 和 IotHttpRegisterSubHandler,调用 IotDeviceCommonApi 的 #registerDevice(...)#registerSubDevices(...) 方法。

2.5 实现下行 Subscriber

① 情况一:下行 Subscriber 负责接收平台发送给设备的消息(如属性设置、服务调用),继承 AbstractIotProtocolDownstreamSubscriber 即可:

@Slf4j
public class IotXxxDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {

    public IotXxxDownstreamSubscriber(IotProtocol protocol, IotMessageBus messageBus) {
        super(protocol, messageBus);
    }

    @Override
    protected void handleMessage(IotDeviceMessage message) {
        // 短连接协议(如 HTTP)不支持下行推送,直接忽略
        log.info("[handleMessage][协议不支持下行推送,忽略消息:{}]", message.getId());
    }
}

② 情况二:如果是长连接协议,需要通过 ConnectionManager 查找设备连接并推送消息。可参考 IotTcpDownstreamSubscriber + IotTcpDownstreamHandler,完整示例如下:

IotXxxDownstreamSubscriber 将消息委托给 Handler 处理:

@Slf4j
public class IotXxxDownstreamSubscriber extends AbstractIotProtocolDownstreamSubscriber {

    private final IotXxxDownstreamHandler downstreamHandler;

    public IotXxxDownstreamSubscriber(IotProtocol protocol,
                                      IotXxxDownstreamHandler downstreamHandler,
                                      IotMessageBus messageBus) {
        super(protocol, messageBus);
        this.downstreamHandler = downstreamHandler;
    }

    @Override
    protected void handleMessage(IotDeviceMessage message) {
        downstreamHandler.handle(message);
    }
}

IotXxxDownstreamHandler 查找连接、序列化、发送:

@Slf4j
@RequiredArgsConstructor
public class IotXxxDownstreamHandler {

    private final IotXxxConnectionManager connectionManager;
    private final IotMessageSerializer serializer;

    public void handle(IotDeviceMessage message) {
        // ① 检查设备连接
        IotXxxConnectionManager.ConnectionInfo connectionInfo =
                connectionManager.getConnectionInfoByDeviceId(message.getDeviceId());
        if (connectionInfo == null) {
            log.warn("[handle][设备 {} 不在线]", message.getDeviceId());
            return;
        }
        // ② 序列化消息
        byte[] payload = serializer.serialize(message);
        // ③ 发送给设备
        connectionManager.sendToDevice(message.getDeviceId(), payload);
        log.info("[handle][下行消息发送成功, 设备 ID: {}, 方法: {}]",
                message.getDeviceId(), message.getMethod());
    }
}

2.6 注册到 IotProtocolManager

在 IotProtocolManager 中完成两处改动:

① 在 #createProtocol(config)switch 中增加分支:

switch (protocolType) {
    // ... 已有 case ...
    case XXX:
        return createXxxProtocol(config);
}

② 新增工厂方法:

private IotXxxProtocol createXxxProtocol(ProtocolProperties config) {
    return new IotXxxProtocol(config);
}

2.7 添加 YAML 配置

网关application.yaml 中添加协议实例配置:

yudao:
  iot:
    gateway:
      protocols:
        - id: xxx-json
          enabled: true
          protocol: xxx        # 对应 IotProtocolTypeEnum 的标识
          port: 9000
          serialize: json      # 可选,短连接协议通常不需要
          xxx:                 # 对应 IotXxxConfig 的字段
            max-connections: 1000

注意

enabled 默认为 false,需要显式设置为 true 才会启动。

附录

A. 关键服务一览

协议实现中常用的服务,均可通过 SpringUtil.getBean(...) 获取:

服务 说明
IotDeviceMessageService 消息发送、序列化/反序列化(按设备配置)
IotDeviceTokenService JWT Token 创建/校验(短连接认证)
IotDeviceCommonApi 设备认证 #authDevice(...)、动态注册 #registerDevice(...) 等 RPC 接口
IotDeviceService 设备缓存查询 #getDeviceFromCache(...)
IotMessageBus 消息总线,注册/取消订阅者
IotMessageSerializerManager 获取指定类型的消息序列化器(JSON / Binary)

B. 短连接 vs 长连接

根据协议的连接特性,实现方式有所不同:

特性 短连接(HTTP、CoAP) 长连接(MQTT、TCP、UDP、WebSocket)
认证方式 Token(JWT 无状态) 连接时认证,ConnectionManager 记录
下行推送 不支持(DownstreamSubscriber 忽略) 支持(通过 ConnectionManager 查找连接并推送)
序列化 固定 JSON(请求体即 JSON) 通过 serialize 配置 或 设备 serializeType 字段
连接管理 不需要 需要 ConnectionManager 管理连接和设备映射
离线检测 不涉及(Token 过期即视为离线) 连接断开时发送离线消息
参考实现 IotHttpProtocol IotTcpProtocol、IotMqttProtocol

C. 自定义序列化

系统内置 JSON 和 Binary 两种消息序列化方式(对应 IotSerializeTypeEnum 枚举)。如需自定义序列化格式,步骤如下:

  1. 在 IotSerializeTypeEnum 中新增枚举值(如 MY_FORMAT("my_format")
  2. 实现 IotMessageSerializer 接口,包含三个方法:
    • #serialize(IotDeviceMessage) — 将消息编码为 byte[]
    • #deserialize(byte[]) — 将 byte[] 解码为 IotDeviceMessage
    • #getType() — 返回对应的 IotSerializeTypeEnum 枚举值
  3. IotMessageSerializerManager 会自动加载所有枚举对应的序列化器,无需手动注册

可参考内置实现:IotJsonSerializer(JSON 格式)、IotBinarySerializer(自定义二进制协议,含魔数、版本号、消息类型等帧头结构)。