这是一次“功能已经完成,可靠性才刚开始”的项目。
第一版很快实现了查询数据并发布到 MQTT。随后需求逐渐完整:实时数据要立即发,部分数据按周期发,静态信息每天定时发,设备状态变化也要触发;生产环境还要求客户端证书。
发布一条消息很容易,管理四种时间语义并不容易。
四种触发器,一条执行链
我没有为四种模式复制四套发送代码,而是把任务拆成两部分:
Task = What + Where
Trigger = When
Task 描述设备范围、点位映射、目标 Topic 和连接配置。Trigger 可以是 MQTT 实时事件、Ticker、Cron 或状态变化。触发后都进入同一条查询、转换、发送链路。
这使一些规则有了统一位置:
- 同一任务是否允许并发;
- 上一次尚未结束时怎么处理;
- 空点位是省略还是发送空值;
- 发送失败记录哪些上下文;
- 静态属性和动态属性如何合并。
连接管理器比 Publish 更重要
早期问题包括客户端 ID 重复、连接没有复用、任务结束时误删共享连接。连接管理器后来承担了创建、复用和释放职责,key 由目标地址、身份和证书配置共同决定。
mTLS 配置需要同时加载 CA、客户端证书和私钥。这里我宁愿启动失败,也不在证书缺失时偷偷降级为不安全连接。证书错误应该尽早暴露,并明确指出是读取、解析还是握手失败。
QoS 不是可靠性的全部
后续我们把部分数据的 QoS 调整为 1。QoS 1 保证“至少一次”,意味着重复是合法结果。告警类消息因此还要使用事件 ID 去重;普通状态数据则可以按设备和时间覆盖。
网络断开时,消息是否需要落盘?积压上限是多少?恢复后是否补发过期状态?这些都不是 MQTT 协议自动替业务决定的。
一次共享连接被提前删除
几个月后仍然出现过一个问题:多个任务共享客户端,其中一个任务释放资源时把连接关闭,其他任务随之失败。这个 Bug 说明引用关系没有被连接管理器真正掌握。
可靠系统很少靠一个“正确 API”获得。它来自对生命周期的持续修正:谁创建连接,谁拥有连接,失败由谁感知,最后一个使用者离开时又由谁关闭。