有个接口消费 Kafka,测试发现消费一直在堆积没有减少。
问题背景
有个接口消费 Kafka,测试发现消费一直在堆积没有减少。在日志中打印出 partition,发现都是 partition:0, offset:203
,说明有大量重复消费的问题,把日志里面收到的同一条消息(同一个 partition 和 offset)的时间戳都筛选出来。
1 | $ cat tmp.txt | awk -F ' ' '{print $1" "$2}' | sort -k2 |
简单观察发现很多间隔时间为 70 秒,我怀疑是 Rebalance 时间至少要等待 70 秒,但服务并没有发生重启,不符合印象里消费者加入退出引起的 Rebalance 操作。
先去网上搜了一下,找到了 线上 Kafka 突发 rebalance 异常,如何快速解决? 了解 Rebalance 的步骤,通过 traceid 发现这个请求很久都没有返回,怀疑是接口处理时间太长没有 ACK,导致 Rebalance 之后重新消费。
去问了 AI 如何解决,AI 给出的解法在 golang 下是错误的。在 Java 中是通过 max.poll.interval.ms
来控制,在 Kafka 官方文档 中可以找到解释。
我使用的组件是sarama,查找相关 issue,在consume speed control and rebalance questions大概找到了配置是c.Consumer.Group.Rebalance.Timeout
,从描述里面说明如果出现超时,这个消费者会被移除消费组。
1 | // The maximum allowed time for each worker to join the group once a rebalance has begun. |
但是还有个 10 秒是哪儿来的呢?找了一下消费者的默认配置,发现这俩加起来刚好就是 70 秒,大概可以实锤就是这两个配置导致。
于是通过设置 Rebalance.Timeout 为 10 分钟解决问题。Kafka 的消息不适合执行太长时间,当出现消费者变动或者消息没有确认导致出现 Rebalance,消息就会被重复消费。
Kafka Rebalance 条件
找 DeepSeek 询问 Rebalance 发生的条件。
Kafka 消费者组的重平衡(Rebalance)在以下条件发生时触发:
- 消费者组成员变动
- 新消费者加入组:当新成员加入时,分区需重新分配。
- 消费者离开或崩溃:包括主动退出(如调用
unsubscribe()
)、崩溃(进程终止)、心跳超时(未在session.timeout.ms
内发送心跳)或被协调器踢出(如处理超时超过max.poll.interval.ms
)。
- 订阅主题的元数据变更
- 分区数增加:当已订阅的主题新增分区时,需重新分配新分区给消费者。
- 正则订阅匹配新主题:若消费者通过正则表达式(如
subscribe(Pattern.compile("topic.*"))
)订阅主题,新增符合规则的主题会触发重平衡。
- 消费者处理超时
- 若消费者处理消息的时间超过
max.poll.interval.ms
(两次调用poll()
的最大间隔),会被协调器移出组,触发重平衡。
- 会话超时
- 消费者在
session.timeout.ms
内未发送心跳,协调器判定其离线,触发重平衡。
- 协调器故障转移
- 若消费者组的协调器(Coordinator)所在 Broker 宕机,选举新协调器后可能触发重平衡。
- 消费者组初始化
- 首次启动消费者组时,所有成员加入后触发初始的 Rebalance。
DeepSeek 还给出相应的优化建议(减少 Rebalance 影响)
- 合理配置参数:调整
session.timeout.ms
(默认 10 秒)和max.poll.interval.ms
(默认 5 分钟),避免因网络波动或处理延迟误触发。- 避免频繁重启:批量操作消费者启停。
- 使用静态成员资格(Static Membership):通过
group.instance.id
配置,消费者短暂离线时保留分区分配,减少重平衡。- 优化处理逻辑:确保消息处理时间远低于
max.poll.interval.ms
,避免超时。
超时时间的思考
这次问题的本质就是我们的服务 context 默认超时时间为 10 分钟,而消息消费超时默认是 70 秒,导致不断处理同一条消息,启动的消息要等到 10 分钟才会被 context cancel
,进入了 offset 不会移动,消息永远无法确认消费完成的境地,导致程序无意义不断空跑。
正常接口并不会运行这么长时间,在规划接口功能时应该考虑业务场景,是不是可以拆分流水多次提交,让单条 Kafka 消息的消费时间变短。另外,统计类的功能应该走别的方式处理,例如通过大数据,定时任务的方式来触发。