rocketmq 如何保证顺序消费

rocketMQ 为了保证 consumer 顺序消费,做了很多工作。

MQClientManager 在 jvm 进程中是单例,其内部维护一个 map,键是 clientId,值是 MQClientInstance,业务 producer 和 consumer 使用的是同一个 MQClientInstance,其对应的 clientId 是 ip@pid

在 MQClientInstance 内部维护 2 个 map,consumer 的信息存放在 consumerTable 中,即一个 group 只有一个 MQConsumerInner。

当消息按顺序存放在 queue 中后,consumer 拉取消息消费,如何保证顺序呢?
1. ConsumeMessageOrderlyService 通过 rebalance 获取分配到的 queue,向 broker 发起请求锁住这些 queue
2. 同时在消费时,保证 queue 的消息只有一个线程在消费
3. 如果消息消费失败了,不直接发回给 broker ,而是继续消费该条消息

定时任务锁住 broker 中的 queue

broker 处理 LOCK_BATCH_MQ 请求,如果 queue 没有其他客户端加锁,或者加锁过期,则分配给该当前客户端

ConsumeMessageOrderlyService 在关闭的时候,会 unlock 所有的 queue

在 MQClientInstance 内部获取 queue 的锁,确保 MQClientInstance 中只有一个线程消费当前 queue 的消息,如果当前 ProcessQueue 没有锁住,或者锁过期了,则等获取锁后再消费

为方便说明,假设 batchSize 为 1,当前线程锁住 ProcessQueue,从 msgTreeMap 取出一条消息,并放入 consumingMsgOrderlyTreeMap 中,
如果消费失败了,但是为了保证顺序性,会把这条消息从 consumingMsgOrderlyTreeMap 取出,重新放入 msgTreeMap 中,当超过了最大重试次数后,尝试发回 broker

Original: https://www.cnblogs.com/allenwas3/p/12905170.html
Author: 偶尔发呆
Title: rocketmq 如何保证顺序消费

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/540267/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • mysql慢查询日志总结

    查看慢查询是否开启: mysql>show variables like ‘%slow_query_log%’; Variable_name Valu…

    Java 2023年6月5日
    067
  • SpringBoot 2.7.0 处理跨域的问题

    package com.clickpaas.config; import org.springframework.context.annotation.Bean; import o…

    Java 2023年6月8日
    0110
  • Spring 源码(11)Spring Bean 的创建过程(2)

    Spring Bean 的创建过程介绍了 FactoryBean 的创建方式,那么接下来介绍不是 FactoryBean的创建方式,在创建过程中,又会分为单例的Bean的创建,原型…

    Java 2023年6月14日
    081
  • JavaWeb-Tomcat阀

    Filter的功能之一就是预处理客户请求,而 Tomcat阀是对 Catalina容器接收到的 HTTP请求进行预处理. 过滤器实在 Servlet规范中提出来的,因此适用于所有的…

    Java 2023年5月29日
    086
  • Markdown随时记录

    Markdown学习 推荐文本编译器 Typora 标题(支持六级) 一级标题:# + 空格 + 内容 二级标题:## + 空格 + 内容 三级标题:### + 空格 + 内容 ….

    Java 2023年6月14日
    089
  • RenderX java的xml打印

    http://www.zdnet.com.cn/techupdate/apply/collaboration/story/0,3800030473,39347913,00.htm …

    Java 2023年5月29日
    0101
  • SpringCloudStream消息驱动的使用以及解决消息重复消费

    场景 SpringCloudConfig集成Bus消息总线实现动态刷新配置(全局广播和定点通知): https://blog.csdn.net/BADAO_LIUMANG_QIZH…

    Java 2023年5月30日
    084
  • Docker操作(二)容器

    Docker操作(二)容器 一、容器的特点: 1、容器是基于镜像来运行 2、容器具备沙箱机制,容器之间相互隔离 3、容器是运行在Docker内部,跟宿主机也是隔离的,但宿主机可以与…

    Java 2023年6月8日
    085
  • nginx配置解除上传文件413报错问题

    原因:nginx限制了文件上传大小 client_max_body_size 256M; 可以选择在 http{ }中设置: client_max_body_size 256m; …

    Java 2023年5月30日
    097
  • SpringCloudAlibaba 微服务讲解(四)Sentinel–服务容错(一)

    4.1 高并发带来的问题 在微服务中,我们将业务拆分成一个个的服务,服务与服务之间可以相互调用,但是由于网络原因或者自身的原因,服务并不能保证100%可用,如果单个服务出现问题,调…

    Java 2023年6月7日
    0101
  • SpringMVC&Maven进阶

    SpringMVC 3.1 了解SpringMVC 概述 SpringMVC技术与Servlet技术功能等同,均属于web层开发技术 学习路线 请求与响应 REST分割 SSM整合…

    Java 2023年6月6日
    098
  • 爽翻天!告别Java。一起来使用kotlin开发完整客户端

    必须写在前面:从Java转向Kotlin,只需要几天时间! 本篇是kotlin开发Android系列教程的最后一篇。前面几篇介绍了kotlin的基本语法、与java的不同之处等。在…

    Java 2023年5月29日
    0104
  • day12

    package com.oop.demo03;public class Pet { String name; int age; public void shout(){ Syste…

    Java 2023年6月5日
    089
  • JVM与GC调优《大厂学院》

    • JVM架构图? • class文件里是什么? • javac编译器编译步骤? • 什么是字节码指令? • 如何解读class文件? • 常量池是什么? • 常见字节码指令有哪些…

    Java 2023年6月7日
    084
  • 设计模式之组合模式

    本文通过老王和小王探讨书房、书架、各类书的管理问题,引出结构型设计模式家族中的一个重要成员——组合模式,本文会给予两种组合模式的典型代码实现,为了加深理解会在第三部分应用中介绍组合…

    Java 2023年6月8日
    097
  • mybatis-plus的BaseMapper入门使用

    入门使用BaseMapper完成增删改查 根据数据库表制作相应实体类 @TableName(value = "user") @Date public class…

    Java 2023年6月9日
    088
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球