rocketmq 如何保证顺序消费

rocketMQ 为了保证 consumer 顺序消费,做了很多工作。

MQClientManager 在 jvm 进程中是单例,其内部维护一个 map,键是 clientId,值是 MQClientInstance,业务 producer 和 consumer 使用的是同一个 MQClientInstance,其对应的 clientId 是 ip@pid

在 MQClientInstance 内部维护 2 个 map,consumer 的信息存放在 consumerTable 中,即一个 group 只有一个 MQConsumerInner。

当消息按顺序存放在 queue 中后,consumer 拉取消息消费,如何保证顺序呢?
1. ConsumeMessageOrderlyService 通过 rebalance 获取分配到的 queue,向 broker 发起请求锁住这些 queue
2. 同时在消费时,保证 queue 的消息只有一个线程在消费
3. 如果消息消费失败了,不直接发回给 broker ,而是继续消费该条消息

定时任务锁住 broker 中的 queue

broker 处理 LOCK_BATCH_MQ 请求,如果 queue 没有其他客户端加锁,或者加锁过期,则分配给该当前客户端

ConsumeMessageOrderlyService 在关闭的时候,会 unlock 所有的 queue

在 MQClientInstance 内部获取 queue 的锁,确保 MQClientInstance 中只有一个线程消费当前 queue 的消息,如果当前 ProcessQueue 没有锁住,或者锁过期了,则等获取锁后再消费

为方便说明,假设 batchSize 为 1,当前线程锁住 ProcessQueue,从 msgTreeMap 取出一条消息,并放入 consumingMsgOrderlyTreeMap 中,
如果消费失败了,但是为了保证顺序性,会把这条消息从 consumingMsgOrderlyTreeMap 取出,重新放入 msgTreeMap 中,当超过了最大重试次数后,尝试发回 broker

Original: https://www.cnblogs.com/allenwas3/p/12905170.html
Author: 偶尔发呆
Title: rocketmq 如何保证顺序消费

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/540267/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • mysql安装提示start service失败

    由于之前安装过mysql(win7、64位),卸载时没有删除干净mysql相关的东西,导致下次安装时提示start service,安装失败。 解决办法: 1、先卸载mysql,然…

    Java 2023年6月5日
    075
  • java中的一维数组

    数组(array):是一种用于存储多个相同数据类型的存储模型(可以理解为容器) 数组定义和静态初始化 数组的两种定义格式: 格式1: 数据类型[] 变量名; 范例: int[] a…

    Java 2023年6月15日
    078
  • Spring的RestTemplate功能举例

    import com.google.common.collect.Maps; import com.shein.dms.common.BasicCase; import lombo…

    Java 2023年5月30日
    052
  • Makfile总结

    Makefile基础以及小技巧 当我们在命令行当中输入 make的时候他的执行流程如下: make命令首先会在当前目录下面寻找makefile或者Makefile文件。 寻找到ma…

    Java 2023年6月8日
    0137
  • Effective Java 第三版——78. 同步访问共享的可变数据

    Tips书中的源代码地址:https://github.com/jbloch/effective-java-3e-source-code注意,书中的有些代码里方法是基于Java 9…

    Java 2023年5月29日
    081
  • java动态代理实现与原理详细分析

    关于Java中的动态代理,我们首先需要了解的是一种常用的设计模式–代理模式,而对于代理,根据创建代理类的时间点,又可以分为静态代理和动态代理。 一、代理模式 代理模式是…

    Java 2023年5月29日
    073
  • 解决idea中每次创建项目都要重复配置maven,来看全网仅需几步全局配置

    你是不是idea每次创建项目都要重新配置maven呢?为啥呢老憋屈了。因为你之前的每一次的maven配置都是在项目中去配置的 ,所以只会在本项目中生效。也就是说在项目中配置的是 &…

    Java 2023年6月5日
    085
  • 一文了解Flink State Backends

    当我们使用Flink进行流式计算时,通常会产生各种形式的中间结果,我们称之为State。有状态产生,就必然涉及到状态的存储,那么Flink中定义了哪些形式的状态存储呢,下面一一给大…

    Java 2023年6月6日
    063
  • Spring Cloud Config 配置中心

    对于一些简单的项目来说,我们一般都是直接把相关配置放在单独的配置文件中,以 properties 或者 yml 的格式出现,更省事儿的方式是直接放到 application.pro…

    Java 2023年5月30日
    080
  • 【设计模式】Java设计模式-组合模式

    Java设计模式 – 组合模式 😄 不断学习才是王道🔥 继续踏上学习之路,学之分享笔记👊 总有一天我也能像各位大佬一样🏆原创作品,更多关注我CSDN: 一个有梦有戏的人…

    Java 2023年6月16日
    091
  • 三个线程交替按顺序打印ABC之条件队列的理解

    如题。本文给出交替打印的代码示例,并解释了条件变量在代码实现中所起的作用。 使用三个线程,一个只负责打印A,另一个只负责打印B,最后一个只负责打印C 按顺序交替。即打印A后,才能打…

    Java 2023年5月30日
    070
  • 启动tomcat时 错误: 代理抛出异常 : java.rmi.server.ExportException: Port already in use: 1099的解决办法

    一.问题描述 今天一来公司,在IntelliJ IDEA 中启动Tomcat服务器时就出现了如下图所示的错误: 错误: 代理抛出异常错误: java.rmi.server.Expo…

    Java 2023年5月29日
    0102
  • 安装教程—条龙服务—搞定重装系统后的软件安装

    背景:因为电脑原因,导致我经常重装系统,然后就需要安装很多东西,经常忘了如何安装,总之就是很烦,然后就需要搜索某某东西如何安装以及配置,毕竟我记忆力不好,只能一个一个去搜,这个过程…

    Java 2023年6月10日
    071
  • Spring IoC 的实现机制

    Spring 中的 IoC 的实现原理就是工厂模式加反射机制。 <span>interface<span> <span>Fruit<spa…

    Java 2023年5月30日
    057
  • HIT软构博客4–lab1记录与总结

    ​ 完成一个实验或小的项目使用java在需要的时候去搜索和看书比直接看很厚的书有意义一些,体验更加良好。自己对java的掌握不是很好,大一结束的夏天认真学习了java看了核心技术的…

    Java 2023年6月5日
    094
  • ElasticSearch增加索引字段

    PUT xxx_index-000009/_mapping/_doc?include_type_name=true { “properties”:{ &#8…

    Java 2023年6月13日
    068
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球