RocketMQ的push消费方式实现的太聪明了

大家好,我是三友,我又来了~~

最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,来掰扯一下,防止过两天就忘得一干二净了。

MQ消费方式

消费方式就是指消费者如何从MQ中获取到消息,分为两种方式,push(推方式)和pull(拉方式)。

1、push(推方式)

push,顾名思义,就是推的意思。就是当MQ收到生产者产生的消息的时候,会主动将消息推送到消费者进行消费,这种模式就叫push,也就是MQ将消息推给到消费者的意思。

RocketMQ的push消费方式实现的太聪明了

push模式

push这种模式的好处就是响应快,消息的实时性比较高,一旦消息MQ收到消息,那么就能立马将消息推送给消费者,消费者也就能立马收到消息进行消费。

但是这种push的模式,有个缺点就是一旦消息量比较大时,对消费者性能要求比较高,因为是消费者无法控制MQ消息的推送速度,一旦消息量大,那么消费者消费消息的压力就比较大。

2、pull(拉方式)

push是MQ主动给消费者推消息,那么pull呢?刚好跟push相反,就是消费者主动去MQ中拉取消息。

RocketMQ的push消费方式实现的太聪明了

pull模式

那么pull的优缺点自然也就跟push刚好相反。因为是消费者主动去MQ中拉取消息,那么消费者可根据自身消费的情况,决定何时去拉取消息,主动权在自己手上,这样消费者的压力就会相对小点;但是缺点也很明显,那么就会实时性相对于push方式会低一些,因为你得决定拉的时间间隔。

其实想想,消费方式就跟拿快递一样,快递就是一个消息,我自己就是消费者,快递要么快递小哥主动送(push)到家,要么我自己去快递站拿(pull)。

RocketMQ对于消费方式的实现

上一节说了消费消息的两种方式push和pull,或者说算一种理念。尚大的周阳老师有一句经常说的话我比较赞同,那就是”天上飞的理念,必然有落地的实现”。所以push或者pull到底如何落地,得看具体的MQ的产品了。

而RocketMQ作为阿里开源的一款高性能、功能丰富的MQ,自然同时实现了push和pull的两种消费方式,用户可以选择在项目中使用push还是pull。

RocketMQ的push消费方式实现的太聪明了

push模式的实现

RocketMQ的push消费方式实现的太聪明了

pull模式的实现

但是一般情况下,项目中都是使用push的方式来消费,因为pull除了时实性差外,pull方式还得让开发人员主动去维护消息消费进度,增加额外的操作。

所以接下来就着重讲一下RocketMQ是如何实现push的逻辑。

RocketMQ聪明地实现push的原因

上文说到push模式的优点是时实性好,但是缺点就是消费者压力会比较大,所以,难道实现push模式,只能舍弃压力的控制么?

就在这时,RocketMQ大喊了一声

RocketMQ的push消费方式实现的太聪明了

是的,RocketMQ对于push模式做到了实时和压力的平衡,这主要是因为RocketMQ的push模式其实算是一个”伪push”模式,真正底层的实现还是基于pull。

到这里可能有的小伙伴比较迷糊,怎么push变成”伪push”了,还是用pull实现的,到底是push还是pull?

RocketMQ的push消费方式实现的太聪明了

前面我说过,push和pull只是一种理论,具体的实现看MQ。

所以RocketMQ为了兼顾两者,就选择通过消费者主动拉消息来实现push的效果,这也是为什么我称为”伪push”的原因,RocketMQ都给封装好了,让你用起来感觉是MQ主动push消息给你的。

既然底层是pull,那么RokcetMQ在实现消费者的逻辑的时候,就可以很容易实现控制压力的效果,毕竟这是”拉”方式天然自带的buff;但是如何通过pull实现push的时实的优点呢?毕竟鱼和熊掌我RokcetMQ偏要兼得。

这时这就不得不提到一种叫”长轮询”的机制。

轮询与长轮询

轮询与长轮询都属于pull的实现,都是由客户端主动给服务端发送请求,拉取数据。套到MQ中,就是都是消费者主动去MQ拉消息。

轮询

轮询是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。

RocketMQ的push消费方式实现的太聪明了

再拿快递举例子,轮询就好比,小明买的iphone 13 pro max快递到了,显示正在派送中,但是小明等不及了,于是就去快递站拿,但是快递还没放到快递站,但是小明的心里急啊,他忍受不了相思之苦,于是小明每隔5分钟就往快递站跑一次,问一下快递到了没,到了就拿回来。这就是轮询的意思,也就是不论有没有数据,客户端都会每隔一定时间去请求一次服务端。

来分析一下拿快递的例子的问题:

  • 每隔5分钟就往快递站跑,那不是累死个小明么。
  • 还有一个问题,假设刚跑到快递站,快递没到,就回去了,但是刚到家的时候,快递到了,于是又等了5分钟,再去快递站终于拿到快递了,但是其实快递都到了几分钟了,你还是没有第一时间拿到快递,这就造成了延迟。

从而对应到程序中,就是会产生如下问题

  • 对于消息而言,会一直产生,这就要求消费者不停地间隔一定时间去拉取消息,即使没有消息也需要去请求,就会造成大量无用的请求,白白浪费大量耗费服务器内存和宽带资源。
  • 可能造成数据的延迟

长轮询

说长轮询概念之前,先来救救小明吧,毕竟小明可不想狗带。

既然原先小明每隔5分钟跑一次,那么是不是可以换种思路,当快递还没到的时候,让小明不要回来,直接在快递站待着,当快递到的时候,才让小明拿着快递回家。这下小明就喜死了,既可以有时间刷刷某音,逛逛某东,还可以在第一时间拿到13 pro max。

RocketMQ的push消费方式实现的太聪明了

所以这种可以在快递站等待的机制,就叫长轮询。

长轮询也是客户端请求服务端,如果服务端有数据,那么就立马返回,客户端再次请求;当服务端不存在数据的时候,服务端并不会给客户端响应,而是将请求给hold住,当服务端有数据的时候才会给客户端响应,返回数据。

所以长轮询可以解决如下问题

  • 解决轮询带来的频繁请求服务端但是没有的问题
  • 一旦新的数据到了,那么消费者能立马就可以获取到新的数据,所以从效果上,有点像是push的感觉。

但是长轮询也会带来服务端代码实现逻辑复杂的问题,当然相比于优点来说,都不太重要。

push消费方式源码探究

理论都讲完了,接下来就到了show me the code的时间了,来看看RocketMQ的是如何通过长轮询机制来实现压力和时实的平衡。

这里我画了一张push模式下消费者消费流程图。

RocketMQ的push消费方式实现的太聪明了

消费者拉取消息的逻辑

  • ①消费者有一个后台线程,会去处理拉取消息(PullRequest)
  • ②先去判断有没有过多消息没有消费,如果有的话,那么就间隔一定时间再次从①开始执行拉取消息的逻辑
  • ③消费者没有过多消息没有消费,那么就会直接向MQ发送拉取消息的请求,有消息就返回,没有消息就hold住请求,等有新的消息到的时候才返回
  • ④消费者获取到消息之后,会去找用户自定义的消息处理逻辑的实现(MessageListener的实现)去消费消息,同时会再次拉取消息,继续从①开始执行逻辑

1、消费者拉取消息控制压力源码

当消费者准备去拉消息的时候,会先去判断当前消费者消费的压力再决定是否去拉取消息。

RocketMQ提供了两种判断消费压力逻辑,一种是基于还未消费的消息的数量的大小,还有一种是基于还未消费的消息所占内存的大小。

RocketMQ的push消费方式实现的太聪明了

控制压力源码

  • 判断还未消费消息的数量,数量太多就等会再执行重新执行拉取消息的逻辑
  • 判断还未消费消息的大小,如果还未消息的消息占用的内存过大,就等会再执行重新执行拉取消息的逻辑

总的一句话就是,当消费者消费的压力过大时,就不会去拉取消息,而是等待一定的时间再去执行拉取消息的逻辑,如果压力还是很大,就还继续等,如此循环,直到消费者的消费压力小于阈值的时候,才会真正的发送请求到MQ中拉取消息。

2、MQ将请求hold住源码

当服务端未找到消息时,就将请求进行挂起,存起来

RocketMQ的push消费方式实现的太聪明了

请求hold住源码

拉取不到消息时,会调用PullRequestHoldService的suspendPullRequest方法讲请求存储起来。PullRequestHoldService是用来存储拉取请求的类。

RocketMQ的push消费方式实现的太聪明了

PullRequestHoldService

suspendPullRequest方法会将请求分类,放到ManyPullRequest里,然后用一个ConcurrentHashMap进行存储

3、MQ收到消息响应给消费者的源码

RocketMQ的push消费方式实现的太聪明了

NotifyMessageArrivingListener

当生产者发送的消息达到MQ的时候,MQ会回调NotifyMessageArrivingListener的arriving方法,之后就会调用PullRequestHoldService的notifyMessageArriving方法,MQ会重新处理拉取消息的逻辑,此时就能找到最新来的那条消息,从而将最新的消息通过网络返回给消费者。

RocketMQ的push消费方式实现的太聪明了

notifyMessageArriving和返回消息逻辑

最后

所以从以上的分析可以看出,RocketMQ对于push的消费方式的实现是基于长轮询机制来实现的,同时平衡了时实和压力,这其实就很nice了。

最后我想说一句,其实不论是pull还是push,又或是轮询和长轮询,其实都是一种理论或者说是一种思想,不单单是MQ的东西,就比如在Nacos中,也使用了push和长轮询机制。但是这些理论在不同产品的具体实现,实现方式可能不太一样,但都是大同小异,所以当你懂了这些思想,再看其它框架的源码,其实就很容易了。

最后的最后,我再说一句,终于***发年终奖了。。

本文如果对你有点帮助,还请帮忙点赞、在看、转发、非常感谢。

往期热门文章推荐

三万字盘点Spring/Boot的那些常用扩展点

一网打尽异步神器CompletableFuture

@Async注解的坑,小心

万字+28张图带你探秘小而美的规则引擎框

7000字+24张图带你彻底弄懂线程池

扫码或者搜索关注公众号 三友的java日记 ,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套面试真题。

RocketMQ的push消费方式实现的太聪明了

Original: https://www.cnblogs.com/zzyang/p/16565620.html
Author: 三友的java日记
Title: RocketMQ的push消费方式实现的太聪明了

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/621444/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • mqtt实现跨平台跨应用通讯

    介绍 最近物联网应用一直很火,也打算做一些这方面的尝试,就边学边做在家花了2天时间做了一个简单demo,功能很简单,使用emq x 作为mqtt broker,用python写了一…

    Java 2023年5月30日
    0102
  • Java中的装箱和拆箱

    一.什么是装箱?什么是拆箱? 在Java SE5之前,如果要生成一个数值为10的Integer对象,必须这样进行: Integer i = new Integer(10); 在从J…

    Java 2023年5月29日
    075
  • 使用JDK的同步容器时,应该避免那些坑?

    摘要:在使用JDK中的同步容器时,应该尽量避免哪些坑 同步容器与并发容器 在JDK中,总体上可以将容器分为同步容器和并发容器。 同步容器一般指的是JDK1.5版本之前的线程安全的容…

    Java 2023年5月30日
    071
  • 强软弱虚引用,只有体会过了,才能记住

    以前学习强软弱虚引用的时候,只是走马观花看看博客,并没有自己写代码去实践、去证明,导致每次看完后,过不了多久就忘了,后来下定决心,一定要自己敲敲代码,这样才能让印象更加深刻,古人云…

    Java 2023年6月5日
    073
  • MyBatis 简介及简单使用

    MyBatis 简介1、MyBatis 历史MyBatis最初是Apache的一个开源项目 iBatis, 2010年6月这个项目由Apache Software Foundati…

    Java 2023年6月7日
    072
  • java重试

    重试 重试,就是多试几次。一次不成功,多试几次说不定就成功了。 什么时候重试? 要执行的逻辑比较重要,或者是服务不稳定,或者是Rpc远程调服务有时不成功,都可以使用重试。 示例代码…

    Java 2023年5月29日
    074
  • CTF大赛模拟-CFS三层内网漫游

    CTF大赛模拟-CFS三层内网漫游 环境: 三台虚拟机,三个网络。 target 1:192.168.161.178 , 192.168.52.132 (linux) target…

    Java 2023年6月6日
    094
  • Java中的static关键字

    在Java中并不存在全局变量的概念,但是我们可以通过static来实现一个”伪全局”的概念,在Java中static表示”全局”或者…

    Java 2023年5月29日
    0114
  • 面试官:@Autowired, @Resource, @Inject 三个注解的区别?一下懵了。。。

    作者:Richard_Yi来源:juejin.cn/post/6844904056230690824 前言 本章的内容主要是想探讨我们在进行Spring 开发过程当中,关于依赖注入…

    Java 2023年6月15日
    073
  • java Math类

    java.lang.Math类包含用于执行基本数学运算的方法,如初等指数、对数、平方根和三角函数。类似这样的工具类,其所有方法均为静态方法,并且不会创建对象,调用起来非常简单 常用…

    Java 2023年5月29日
    081
  • MongoDB高级应用之数据转存与恢复(5)

    1、MongoDB索引 1.1、创建索引 db.books.ensureIndex{{number:1}} 创建索引同时指定索引的名字 db.books.ensureIndex({…

    Java 2023年6月7日
    096
  • 阿里巴巴编码规范-考试认证

    阿里巴巴编码规范-考试认证 雨打梨花深闭门,忘了青春,误了青春。 1、注册阿里云账号 2、购买认证 需要怒支付一顿早餐Q,可以用支付宝支付,选择支付宝支付然后直接输入支付密码就OK…

    Java 2023年6月5日
    0103
  • Java8特性详解 lambda表达式(二):流式处理中的lambda

    ​ 要讲 Stream ,那就不得不先说一下它的左膀右臂 Lambda 和方法引用,你用的 Stream API 其实就是函数式的编程风格,其中的「函数」就是方法引用,「式」就是 …

    Java 2023年5月29日
    070
  • JDBC概述

    JDBC概述 JDBC(Java DataBase Connectivity),它是一种用于执行 SQL语句的 JavaAPI。通过使用JDBC就可以使用 相同的API访问 不同的…

    Java 2023年6月9日
    076
  • 6、接口调试工具

    下面模拟手机微信 向公众号发信息 测试结果: Original: https://www.cnblogs.com/weiapro/p/7732113.htmlAuthor: 天涯越…

    Java 2023年6月13日
    078
  • 我是一个Dubbo数据包…

    hello,大家好呀,我是小楼! 今天给大家带来一篇关于Dubbo IO交互的文章,本文是一位同事写的文章,用有趣的文字把枯燥的知识点写出来,通俗易懂,非常有意思,所以迫不及待找作…

    Java 2023年6月6日
    0111
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球