rocketmq 如何保证顺序消费

rocketMQ 为了保证 consumer 顺序消费,做了很多工作。

MQClientManager 在 jvm 进程中是单例,其内部维护一个 map,键是 clientId,值是 MQClientInstance,业务 producer 和 consumer 使用的是同一个 MQClientInstance,其对应的 clientId 是 ip@pid

在 MQClientInstance 内部维护 2 个 map,consumer 的信息存放在 consumerTable 中,即一个 group 只有一个 MQConsumerInner。

当消息按顺序存放在 queue 中后,consumer 拉取消息消费,如何保证顺序呢?
1. ConsumeMessageOrderlyService 通过 rebalance 获取分配到的 queue,向 broker 发起请求锁住这些 queue
2. 同时在消费时,保证 queue 的消息只有一个线程在消费
3. 如果消息消费失败了,不直接发回给 broker ,而是继续消费该条消息

定时任务锁住 broker 中的 queue

broker 处理 LOCK_BATCH_MQ 请求,如果 queue 没有其他客户端加锁,或者加锁过期,则分配给该当前客户端

ConsumeMessageOrderlyService 在关闭的时候,会 unlock 所有的 queue

在 MQClientInstance 内部获取 queue 的锁,确保 MQClientInstance 中只有一个线程消费当前 queue 的消息,如果当前 ProcessQueue 没有锁住,或者锁过期了,则等获取锁后再消费

为方便说明,假设 batchSize 为 1,当前线程锁住 ProcessQueue,从 msgTreeMap 取出一条消息,并放入 consumingMsgOrderlyTreeMap 中,
如果消费失败了,但是为了保证顺序性,会把这条消息从 consumingMsgOrderlyTreeMap 取出,重新放入 msgTreeMap 中,当超过了最大重试次数后,尝试发回 broker

Original: https://www.cnblogs.com/allenwas3/p/12905170.html
Author: 偶尔发呆
Title: rocketmq 如何保证顺序消费

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/540267/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球