MQTT API 很简单:连接、订阅、发布。麻烦的地方集中在网络闪断、重复消息、客户端重启和消费速度不一致时。
这里不展开 MQTT 协议的全部细节,只记边缘网关和数据转发服务里最容易出问题的地方。
一、先定义“可靠”是什么意思
不同消息需要不同保证:
| 消息类型 | 可以丢失吗 | 可以重复吗 | 是否需要保序 |
|---|---|---|---|
| 高频实时值 | 少量可接受 | 通常可以 | 设备内尽量保序 |
| 告警事件 | 不应丢失 | 可去重 | 需要 |
| 设备状态 | 可被新状态覆盖 | 可以 | 只关心最新值 |
| 配置指令 | 不应丢失 | 不可重复执行 | 需要 |
不先区分业务语义,就只能机械地把所有消息设成高 QoS,结果增加延迟和存储,却仍未解决业务重复。
二、QoS 不等于端到端可靠
QoS 解决的是客户端与 Broker 之间的交付等级:
QoS 0: 最多一次
QoS 1: 至少一次
QoS 2: 恰好一次的协议流程
它不保证订阅者处理成功,也不保证数据库事务已经提交。QoS 1 可能产生重复消息,消费端仍然需要幂等。
业务消息最好携带稳定标识:
{
"messageId": "device-guid:event-type:sequence",
"deviceGuid": "...",
"occurredAt": "...",
"payload": {}
}
消费者按 messageId 去重,或让数据库唯一约束做兜底。
三、Client ID 与会话
Client ID 决定 Broker 如何识别客户端。多个实例误用同一个 Client ID,可能互相踢下线,表面看起来像网络不稳定。
需要明确:
- Client ID 是否稳定;
- 同一服务是否允许多实例;
- 是否需要持久会话;
- 离线期间是否保留订阅消息;
- 订阅关系由谁恢复。
随机 Client ID 适合临时消费者,不适合依赖离线消息和会话恢复的长期服务。
四、连接状态必须由事件维护
仅在发布前调用一次 IsConnected() 不足以证明消息一定能发出。检查之后连接仍可能断开。
连接模块应监听:
- 建连成功;
- 连接丢失;
- 自动重连;
- 重连完成;
- 订阅恢复;
- 发布失败。
业务层获取到的不是一个永远可用的客户端,而是一个可能变化的资源。发布接口需要返回明确结果,并决定失败后进入重试队列还是直接丢弃。
五、重连后的订阅恢复
重新连接成功不代表服务恢复。若使用干净会话,所有订阅都需要重新注册。
推荐把订阅定义集中管理:
topic
qos
handler
每次连接建立后统一恢复,并记录每个订阅是否成功。不要把订阅散落在各业务模块的启动代码里,否则重连流程很难完整重放。
六、发布队列与背压
上游产生消息的速度高于网络发送速度时,必须做选择。
生产者 -> 有界队列 -> 发布 Worker -> Broker
队列满后的策略应由消息类型决定:
- 实时状态可以丢弃旧值,只保留最新值;
- 告警写入本地持久队列;
- 配置指令拒绝新请求并返回错误;
- 批量历史数据降低生产速度。
无限增长的内存队列只是推迟故障。
七、离线缓存
需要跨进程重启保留的数据,应落到本地数据库或日志队列,而不是只放内存。
缓存记录至少包含:
- 消息 ID;
- Topic;
- QoS;
- Payload;
- 创建时间;
- 重试次数;
- 下次重试时间;
- 业务优先级。
恢复发送时要限制速率,避免网络恢复瞬间,大量历史消息挤占实时消息。
八、重试与死信
重试应采用退避,并设置上限:
1s -> 5s -> 30s -> 2m -> 10m
永久失败的消息不能无限循环。超过次数后进入死信区,保留错误原因,等待人工处理或专项补偿。
需要区分可重试错误和永久错误。例如网络超时可以重试,序列化失败通常应直接进入死信。
九、Topic 是接口契约
Topic 一旦被多个系统依赖,就与 HTTP API 一样需要版本和规范。
v1/{tenant}/{deviceGuid}/telemetry
v1/{tenant}/{deviceGuid}/event
v1/{tenant}/{deviceGuid}/status
避免把易变名称作为关键路径,避免同一层级有时表示设备类型、有时表示设备 ID。权限规则、通配符订阅和后续迁移都会受到 Topic 设计影响。
十、安全
生产环境至少要考虑:
- TLS;
- 双向证书认证;
- 用户名密码或 Token;
- Topic 级 ACL;
- 证书到期监控;
- 私钥文件权限;
- 日志脱敏。
证书存在不代表证书有效。到期时间应该进入监控,而不是等连接失败后才发现。
十一、可观测性
建议记录以下指标:
mqtt_connected
mqtt_reconnect_total
mqtt_publish_total
mqtt_publish_failed_total
mqtt_receive_total
mqtt_handler_failed_total
mqtt_queue_size
mqtt_offline_message_total
mqtt_dead_letter_total
日志中应包含 Client ID、Topic、消息 ID 和错误环节,但避免完整打印敏感 Payload。
十二、故障演练
上线前主动测试:
- [ ] Broker 重启;
- [ ] 网络中断五分钟后恢复;
- [ ] 证书过期或错误;
- [ ] 两个实例使用相同 Client ID;
- [ ] 消费处理速度突然下降;
- [ ] 同一消息重复到达;
- [ ] 本地缓存写满;
- [ ] 服务在发布过程中退出;
- [ ] 重连成功但订阅失败;
- [ ] 历史补传与实时消息同时出现。
MQTT 的可靠性不来自某一个 QoS 数字,而来自连接、会话、队列、幂等、缓存、安全和观测共同形成的闭环。