Go 很容易启动并发:

go work()

困难的是回答另外几个问题:它什么时候结束,失败由谁处理,重复启动怎么办,服务退出时谁等待它。

在采集、MQTT 转发、定时任务和 WebSocket 服务中,我遇到的大部分并发问题并不是算法复杂,而是生命周期没有被设计。

一、每个 goroutine 都应该有归属

启动 goroutine 的代码,应当知道:

  • 为什么启动;
  • 如何停止;
  • 错误发给谁;
  • 是否允许重启;
  • 退出时是否需要等待。

最危险的是“顺手异步”:

go func() {
client.Publish(...)
}()

调用方不知道发布是否成功,也无法限制并发数量。流量升高后,goroutine 和内存可能一起增长。

二、用 context 传播取消信号

长期运行的工作循环应监听 context.Context

func run(ctx context.Context) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := collect(); err != nil {
return err
}
}
}
}

context 适合传递取消、截止时间和请求范围信息,不适合装载任意业务参数。

三、Channel 表达事件,Mutex 保护状态

二者不是互斥选择。

适合 Channel 的场景:

  • 通知初始化完成;
  • 传递任务;
  • 汇聚错误;
  • 广播退出;
  • 实现有界队列。

适合 Mutex 的场景:

  • 保护共享 map;
  • 原子地读取和更新一组字段;
  • 管理客户端连接状态;
  • 维护设备级锁映射。

如果只是为了读取一个 map,却强行让所有操作经过 Channel,代码会变复杂;如果用布尔值轮询等待初始化完成,则是在用共享状态模拟事件。

四、初始化只能成功一次吗

sync.Once 适合“进程生命周期内只尝试一次”的初始化,但不适合可能失败并需要重试的连接。

MQTT 客户端初始化更像一个小状态机:

IDLE -> CONNECTING -> READY
└-> FAILED -> IDLE

多个调用者同时请求客户端时:

  1. 第一个调用者负责连接;
  2. 其他调用者等待本轮结果;
  3. 成功后共享客户端;
  4. 失败后允许下一轮重试。

这比一个全局布尔值更能表达真实语义。

五、限制并发,而不是相信流量不会变大

批量采集或转发时,可以使用有界 worker pool:

jobs := make(chan Job, 100)

for i := 0; i < workerCount; i++ {
go worker(ctx, jobs)
}

队列满以后要明确策略:阻塞、丢弃、覆盖旧数据,还是返回错误。没有策略的无界并发,本质上是把流量压力转成内存压力。

六、锁的粒度与顺序

全局锁实现简单,但一台慢设备可能阻塞全部设备。按设备 GUID 建锁可以缩小临界区:

map[string]*sync.Mutex

锁映射本身也要受保护,并考虑设备删除后的清理。

如果一段逻辑需要获取多把锁,必须规定固定顺序,例如始终按 GUID 排序后加锁,否则不同 goroutine 可能形成死锁。

临界区内不要执行不可控的网络请求。锁应该保护状态转换,而不是保护整个业务流程。

七、错误不能消失在 goroutine 里

后台 goroutine 的错误可以通过错误通道或 errgroup 返回:

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error { return serveHTTP(ctx) })
g.Go(func() error { return runCollector(ctx) })
g.Go(func() error { return runForwarder(ctx) })

return g.Wait()

任一核心组件失败时,取消其他组件并统一退出,比留下半个服务继续运行更容易维护。

非核心任务则可以独立重试,但必须记录失败次数和最终状态。

八、优雅退出的顺序

一个常见顺序是:

  1. 停止接收新请求;
  2. 发出取消信号;
  3. 停止定时器和消费者;
  4. 等待正在处理的任务;
  5. 刷新缓冲区;
  6. 关闭 MQTT、数据库等连接;
  7. 超时后强制退出。

退出也要有最大等待时间,否则一个无法返回的外部调用会让进程永远停不下来。

九、并发问题如何测试

除了普通单元测试,还应使用:

go test -race ./...

重点覆盖:

  • 多个调用者同时初始化;
  • 读写共享 map;
  • 取消发生在任务处理中;
  • Channel 关闭时仍有发送者;
  • 重连与关闭同时发生;
  • 任务队列满载;
  • 服务连续启动和停止。

测试不一定证明没有并发问题,但可以把最常见的竞态提前暴露。

十、代码审查清单

  • [ ] goroutine 是否有退出路径;
  • [ ] ticker、连接和 Channel 由谁关闭;
  • [ ] 是否存在无界 goroutine 或无界队列;
  • [ ] 共享 map 是否受保护;
  • [ ] 锁内是否执行网络或磁盘操作;
  • [ ] 初始化失败后能否恢复;
  • [ ] 后台错误是否可见;
  • [ ] 取消信号是否能传到最底层;
  • [ ] 服务退出是否等待必要任务;
  • [ ] 是否运行过 race detector。

并发安全不只是“没有 data race”。更完整的标准是:任务能够被限制、观察、取消和回收,服务在成功与失败时都有确定的生命周期。