MQTT API 很简单:连接、订阅、发布。麻烦的地方集中在网络闪断、重复消息、客户端重启和消费速度不一致时。

这里不展开 MQTT 协议的全部细节,只记边缘网关和数据转发服务里最容易出问题的地方。

一、先定义“可靠”是什么意思

不同消息需要不同保证:

消息类型 可以丢失吗 可以重复吗 是否需要保序
高频实时值 少量可接受 通常可以 设备内尽量保序
告警事件 不应丢失 可去重 需要
设备状态 可被新状态覆盖 可以 只关心最新值
配置指令 不应丢失 不可重复执行 需要

不先区分业务语义,就只能机械地把所有消息设成高 QoS,结果增加延迟和存储,却仍未解决业务重复。

二、QoS 不等于端到端可靠

QoS 解决的是客户端与 Broker 之间的交付等级:

QoS 0: 最多一次
QoS 1: 至少一次
QoS 2: 恰好一次的协议流程

它不保证订阅者处理成功,也不保证数据库事务已经提交。QoS 1 可能产生重复消息,消费端仍然需要幂等。

业务消息最好携带稳定标识:

{
"messageId": "device-guid:event-type:sequence",
"deviceGuid": "...",
"occurredAt": "...",
"payload": {}
}

消费者按 messageId 去重,或让数据库唯一约束做兜底。

三、Client ID 与会话

Client ID 决定 Broker 如何识别客户端。多个实例误用同一个 Client ID,可能互相踢下线,表面看起来像网络不稳定。

需要明确:

  • Client ID 是否稳定;
  • 同一服务是否允许多实例;
  • 是否需要持久会话;
  • 离线期间是否保留订阅消息;
  • 订阅关系由谁恢复。

随机 Client ID 适合临时消费者,不适合依赖离线消息和会话恢复的长期服务。

四、连接状态必须由事件维护

仅在发布前调用一次 IsConnected() 不足以证明消息一定能发出。检查之后连接仍可能断开。

连接模块应监听:

  • 建连成功;
  • 连接丢失;
  • 自动重连;
  • 重连完成;
  • 订阅恢复;
  • 发布失败。

业务层获取到的不是一个永远可用的客户端,而是一个可能变化的资源。发布接口需要返回明确结果,并决定失败后进入重试队列还是直接丢弃。

五、重连后的订阅恢复

重新连接成功不代表服务恢复。若使用干净会话,所有订阅都需要重新注册。

推荐把订阅定义集中管理:

topic
qos
handler

每次连接建立后统一恢复,并记录每个订阅是否成功。不要把订阅散落在各业务模块的启动代码里,否则重连流程很难完整重放。

六、发布队列与背压

上游产生消息的速度高于网络发送速度时,必须做选择。

生产者 -> 有界队列 -> 发布 Worker -> Broker

队列满后的策略应由消息类型决定:

  • 实时状态可以丢弃旧值,只保留最新值;
  • 告警写入本地持久队列;
  • 配置指令拒绝新请求并返回错误;
  • 批量历史数据降低生产速度。

无限增长的内存队列只是推迟故障。

七、离线缓存

需要跨进程重启保留的数据,应落到本地数据库或日志队列,而不是只放内存。

缓存记录至少包含:

  • 消息 ID;
  • Topic;
  • QoS;
  • Payload;
  • 创建时间;
  • 重试次数;
  • 下次重试时间;
  • 业务优先级。

恢复发送时要限制速率,避免网络恢复瞬间,大量历史消息挤占实时消息。

八、重试与死信

重试应采用退避,并设置上限:

1s -> 5s -> 30s -> 2m -> 10m

永久失败的消息不能无限循环。超过次数后进入死信区,保留错误原因,等待人工处理或专项补偿。

需要区分可重试错误和永久错误。例如网络超时可以重试,序列化失败通常应直接进入死信。

九、Topic 是接口契约

Topic 一旦被多个系统依赖,就与 HTTP API 一样需要版本和规范。

v1/{tenant}/{deviceGuid}/telemetry
v1/{tenant}/{deviceGuid}/event
v1/{tenant}/{deviceGuid}/status

避免把易变名称作为关键路径,避免同一层级有时表示设备类型、有时表示设备 ID。权限规则、通配符订阅和后续迁移都会受到 Topic 设计影响。

十、安全

生产环境至少要考虑:

  • TLS;
  • 双向证书认证;
  • 用户名密码或 Token;
  • Topic 级 ACL;
  • 证书到期监控;
  • 私钥文件权限;
  • 日志脱敏。

证书存在不代表证书有效。到期时间应该进入监控,而不是等连接失败后才发现。

十一、可观测性

建议记录以下指标:

mqtt_connected
mqtt_reconnect_total
mqtt_publish_total
mqtt_publish_failed_total
mqtt_receive_total
mqtt_handler_failed_total
mqtt_queue_size
mqtt_offline_message_total
mqtt_dead_letter_total

日志中应包含 Client ID、Topic、消息 ID 和错误环节,但避免完整打印敏感 Payload。

十二、故障演练

上线前主动测试:

  • [ ] Broker 重启;
  • [ ] 网络中断五分钟后恢复;
  • [ ] 证书过期或错误;
  • [ ] 两个实例使用相同 Client ID;
  • [ ] 消费处理速度突然下降;
  • [ ] 同一消息重复到达;
  • [ ] 本地缓存写满;
  • [ ] 服务在发布过程中退出;
  • [ ] 重连成功但订阅失败;
  • [ ] 历史补传与实时消息同时出现。

MQTT 的可靠性不来自某一个 QoS 数字,而来自连接、会话、队列、幂等、缓存、安全和观测共同形成的闭环。