RocketMQ保姆级教程

大家好,我是三友~~

上周花了一点时间从头到尾、从无到有地搭建了一套RocketMQ的环境,觉得还挺easy的,所以就写篇文章分享给大家。

整篇文章可以大致分为三个部分,第一部分属于一些核心概念和工作流程的讲解;第二部分就是纯手动搭建了一套环境;第三部分是基于环境进行测试和集成到SpringBoot,因为整个过程讲的比较细,所以我称之为”保姆级教程”。

好了,废话补多少,直接进入主题。

前言

RocketMQ是阿里巴巴旗下一款开源的MQ框架,经历过双十一考验、Java编程语言实现,有非常好完整生态系统。RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等,总之就是葛大爷的一句话

RocketMQ保姆级教程

核心概念

  • NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
  • Broker:核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。
  • 生产者:生产消息的一方就是生产者
  • 生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组
  • 消费者:用来消费生产者消息的一方
  • 消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。
  • topic(主题):可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。
  • Tag(子主题):比topic低一级,可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。

这里有组的概念是因为可以用来做到不同的生产者组或者消费者组有不同的配置,这样就可以使得生产者或者消费者更加灵活。

工作流程

说完核心概念,再来说一下核心的工作流程,这里我先画了一张图。

RocketMQ保姆级教程

通过这张图就可以很清楚的知道,RocketMQ大致的工作流程:

  • Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。
  • Producer在启动之后会跟会NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。
  • Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份
  • Consumer启动之后也会跟会NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费

就跟上面的图一样,整体的工作流程还是比较简单的,这里我简化了很多概念,主要是为了好理解。

环境搭建

终于讲完了一些简单的概念,接下来就来搭建一套RocketMQ的环境。

通过上面分析,我们知道,在RocketMQ中有NameServer、Broker、生产者、消费者四种角色。而生产者和消费者实际上就是业务系统,所以这里不需要搭建,真正要搭建的就是NameServer和Broker,但是为了方便RocketMQ数据的可视化,这里我多搭建一套可视化的服务。

搭建过程比较简单,按照步骤一步一步来就可以完成,如果提示一些命令不存在,那么直接通过yum安装这些命令就行。

一、准备

需要准备一个linux服务器,需要先安装好JDK

关闭防火墙

bash;gutter:true; systemctl stop firewalld systemctl disable firewalld</p> <pre><code> ### 下载并解压RocketMQ ### 1、创建一个目录,用来存放rocketmq相关的东西 ;gutter:true;
mkdir /usr/rocketmq
cd /usr/rocketmq

2、下载并解压rocketmq

下载

bash;gutter:true; wget https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip</p> <pre><code> 解压 ;gutter:true;
unzip rocketmq-all-4.7.1-bin-release.zip

看到这一个文件夹就完成了

RocketMQ保姆级教程

然后进入rocketmq-all-4.7.1-bin-release文件夹

bash;gutter:true; cd rocketmq-all-4.7.1-bin-release</p> <pre><code> RocketMQ的东西都在这了 ![](https://pic3.zhimg.com/80/v2-790fd479a3e6f66a02ae7617c20a7e0a_1440w.jpg) ### 二、搭建NameServer ### 修改jvm参数 在启动NameServer之前,强烈建议修改一下启动时的jvm参数,因为默认的参数都比较大,为了避免内存不够,建议修改小,当然,如果你的内存足够大,可以忽略。 ;gutter:true;
vi bin/runserver.sh

修改画圈的这一行

RocketMQ保姆级教程

这里你可以直接修改成跟我一样的

bash;gutter:true; -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=32m -XX:MaxMetaspaceSize=50m</p> <pre><code> ### 启动NameServer 修改完之后,执行如下命令就可以启动NameServer了 ;gutter:true;
nohup sh bin/mqnamesrv &

查看NameServer日志

bash;gutter:true; tail -f ~/logs/rocketmqlogs/namesrv.log</p> <pre><code> 如果看到如下的日志,就说明启动成功了 ![](https://pic4.zhimg.com/80/v2-03b2e3e738cf0d41af6814d48a5f5e77_1440w.jpg) NameServer日志 ### 三、搭建Broker 这里启动单机版的Broker ### 修改jvm参数 跟启动NameServer一样,也建议去修改jvm参数 ;gutter:true;
vi bin/runbroker.sh

将画圈的地方设置小点,当然也别太小啊

RocketMQ保姆级教程

当然你还是可以跟我设置的一样

bash;gutter:true; -server -Xms1g -Xmx1g -Xmn512m</p> <pre><code> ### 修改Broker配置文件broker.conf 这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册 ;gutter:true;
vi conf/broker.conf

Broker配置文件

RocketMQ保姆级教程

Broker配置文件

这里就能看出Broker的配置了,什么Broker集群的名称啊,Broker的名称啊,Broker的id啊,都跟前面说的对上了。

在文件末尾追加地址

bash;gutter:true; namesrvAddr = localhost:9876</p> <pre><code> 因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。 不过这里我还建议再修改一处信息,因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能你的电脑无法访问到这个自动获取的ip,所以我建议手动指定你的电脑可以访问到的服务器ip。 我的虚拟机的ip是192.168.200.143,所以就指定为192.168.200.143,如下 ;gutter:true;
brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143

如果以上都配置的话,最终的配置文件应该如下,红圈的为新加的

RocketMQ保姆级教程

启动Broker

bash;gutter:true; nohup sh bin/mqbroker -c conf/broker.conf &</p> <pre><code> -c 参数就是指定配置文件 查看日志 ;gutter:true;
tail -f ~/logs/rocketmqlogs/broker.log

当看到如下日志就说明启动成功了

RocketMQ保姆级教程

四、搭建可视化控制台

其实前面NameServer和Broker搭建完成之后,就可以用来收发消息了,但是为了更加直观,可以搭一套可视化的服务。

可视化服务其实就是一个jar包,启动就行了。

jar包可以从这获取

链接:https://pan.baidu.com/s/16s1qwY 2qzE2bxR81t5Wm6w
提取码:s0sd

将jar包上传到服务器,放到/usr/rocketmq的目录底下,当然放哪都无所谓,这里只是为了方便,因为rocketmq的东西都在这里

然后进入/usr/rocketmq下,执行如下命名

bash;gutter:true; nohup java -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 rocketmq-console-ng-1.0.1.jar &</p> <pre><code> rocketmq.config.namesrvAddr就是用来指定NameServer的地址的 查看日志 ;gutter:true;
tail -f ~/logs/consolelogs/rocketmq-console.log

当看到如下日志,就说明启动成功了

RocketMQ保姆级教程

然后在浏览器中输入http://linux ;服务器的ip:8088/就可以看到控制台了,如果无法访问,可以看看防火墙有没有关闭

RocketMQ保姆级教程

右上角可以把语言切换成中文

RocketMQ保姆级教程

Broker集群信息

RocketMQ保姆级教程

topic信息

通过控制台可以查看生产者、消费者、Broker集群等信息,非常直观。

功能很多,这里就不一一介绍了。

测试

环境搭好之后,就可以进行测试了。

引入依赖

html;gutter:true; org.apache.rocketmq rocketmq-client 4.7.1</p> <pre><code> ### 生产者发送消息 ;gutter:true;
public class Producer {
public static void main(String[] args) throws Exception {
//创建一个生产者,指定生产者组为sanyouProducer
DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer");

// 指定NameServer的地址
producer.setNamesrvAddr("192.168.200.143:9876");
// 第一次发送可能会超时,我设置的比较大
producer.setSendMsgTimeout(60000);

// 启动生产者
producer.start();

// 创建一条消息
// topic为 sanyouTopic
// 消息内容为 三友的java日记
// tags 为 TagA
Message msg = new Message("sanyouTopic", "TagA", "三友的java日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET));

// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);

// 关闭生产者
producer.shutdown();
}

}

  • 构建一个消息生产者DefaultMQProducer实例,然后指定生产者组为sanyouProducer;
  • 指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息
  • producer.start() 启动生产者
  • 构建一个内容为三友的java日记的消息,然后指定这个消息往sanyouTopic这个topic发送
  • producer.send(msg):发送消息,打印结果
  • 关闭生产者

运行结果如下

java;gutter:true; SendResult [sendStatus=SEND_OK, msgId=C0A81FAF54F818B4AAC2475FD2010000, offsetMsgId=C0A8C88F00002A9F000000000009AE55, messageQueue=MessageQueue [topic=sanyouTopic, brokerName=broker-a, queueId=0], queueOffset=0]</p> <pre><code> sendStatus=SEND_OK 说明发送成功了,此时就能后控制台看到未消费的消息了。 ![](https://pic4.zhimg.com/80/v2-d58cfd6a3ba454b9f1708518f79e9317_1440w.jpg) 到控制台看到消息那块,然后选定发送的topic,查询的时间范围手动再选一下,不选就查不出来(我怀疑这是个bug),然后查询就能看到了一条消息。 然后点击一下MESSAGE DETAIL就能够看到详情。 ![](https://pic3.zhimg.com/80/v2-cfa74cb8a042c0f37003f58f4dabe07a_1440w.jpg) 这里就能看到发送消息的详细信息。 左下角消息的消费的消费,因为我们还没有消费者订阅这个topic,所以左下角没数据。 ### 消费者消费消息 ;gutter:true;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {

// 通过push模式消费消息,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");

// 指定NameServer的地址
consumer.setNamesrvAddr("192.168.200.143:9876");

// 订阅这个topic下的所有的消息
consumer.subscribe("sanyouTopic", "*");

// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 启动消费者
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

  • 创建一个消费者实例对象,指定消费者组为sanyouConsumer
  • 指定NameServer的地址:服务器的ip:9876
  • 订阅 sanyouTopic 这个topic的所有信息
  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。
  • 启动消费者

启动之后,消费者就会消费刚才生产者发送的消息,于是控制台就打印出如下信息

java;gutter:true; Consumer Started.</p> <p>消费消息:三友的java日记</p> <pre><code> 此时再去看控制台 ![](https://pic3.zhimg.com/80/v2-dd04fa21be5cf2542a23710d42c36faa_1440w.jpg) 发现被sanyouConsumer这个消费者组给消费了。 ## SpringBoot环境下集成RocketMQ ### 集成 在实际项目中肯定不会像上面测试那样用,都是集成SpringBoot的。 ### 1、引入依赖 ;gutter:true;
org.apache.rocketmq
rocketmq-spring-boot-starter
2.1.1

org.springframework.boot
spring-boot-starter-test
2.1.1.RELEASE

2、yml配置

java;gutter:true; rocketmq: producer: group: sanyouProducer name-server: 192.168.200.143:9876</p> <pre><code> ### 3、创建消费者 SpringBoot底下只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可 ;gutter:true;
@Component
@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouTopic")
public class SanYouTopicListener implements RocketMQListener {

@Override
public void onMessage(String msg) {
System.out.println("处理消息:" + msg);
}

}

@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类

4、测试

java;gutter:true; @SpringBootTest(classes = RocketMQApplication.class) @RunWith(SpringRunner.class) public class RocketMQTest {</p> <pre><code>@Autowired private RocketMQTemplate template; @Test public void send() throws InterruptedException { template.convertAndSend("sanyouTopic", "三友的java日记"); Thread.sleep(60000); } </code></pre> <p>}</p> <pre><code> 直接注入一个RocketMQTemplate,然后通过RocketMQTemplate发送消息。 运行结果如下: ;gutter:true;
处理消息:三友的java日记

的确消费到消息了。

原理

其实原理是一样的,只不过在SpringBoot中给封装了一层,让使用起来更加简单。

1、RocketMQTemplate构造代码

RocketMQ保姆级教程

所以从这可以看出,最终在构造RocketMQTemplate的时候,传入了一个DefaultMQProducer,所以可想而知,最终RocketMQTemplate发送消息也是通过DefaultMQProducer发送的。

2、@RocketMQMessageListener 注解处理

RocketMQ保姆级教程

从这可以看出,会为每一个加了@RocketMQMessageListener注解的对象创建一个DefaultMQPushConsumer,所以最终也是通过DefaultMQPushConsumer消费消息的。

至于监听器,是在这

RocketMQ保姆级教程

遍历每条消息,然后调用handleMessage,最终会调用实现了RocketMQListener的对象处理消息。

最后

通过上面的理论介绍和实际的环境搭建再加上代码的测试,相信应该可以对RocketMQ有个入门,有兴趣的小伙伴可以手动搭起来,整个过程顺利的话可能就十几二十分钟这样子。

最后我再说一句,从文章整体也可以看出本文没有涉及太深入的一些机制和原理的讲解,比如消息是如何存储的,事务和延迟消息是如何实现的,主从是如何同步的等等,甚至压根就没提到队列这个词,主要是因为我打算后面再写一篇文章,来单独剖析这些机制和原理。

最后的最后,本文所有的代码地址:

https://github.com/sanyou3/rock etmq-demo.git

往期热门文章推荐

三万字盘点Spring/Boot的那些常用扩展点

RocketMQ的push消费方式实现的太聪明了

一网打尽异步神器CompletableFuture

@Async注解的坑,小心

7000字+24张图带你彻底弄懂线程池

扫码或者搜索关注公众号 三友的java日记 ,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习,回复 面试 即可获得一套精美的面试真题。

RocketMQ保姆级教程

Original: https://www.cnblogs.com/zzyang/p/16591213.html
Author: 三友的java日记
Title: RocketMQ保姆级教程

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/621442/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 文字与段落标记

    一、 标题字标记 1. 标题字标记h 说明:HTML文档中包含有各种级别的标题,各种级别的标题由 h1 到 h6 元素来定义。其中,h1 代表最高级别的标题,依次递减,h6 级别最…

    Java 2023年6月5日
    0100
  • RocketMQ基本API使用

    RocketMQ基本API使用 基于原生客户端 <dependency> <groupid>org.apache.rocketmq</groupid&…

    Java 2023年6月6日
    090
  • Photoshop CS安装教程

    百度云下载: 软件下载完成后,解压文件,找到Set-up.exe文件,双加运行,一直点击下一步(如需安装到D盘或者其他盘,可更改安装路径,选择试用)即可完成安装。 破解文件下载路径…

    Java 2023年6月16日
    073
  • Python数据库编程

    1.什么是MySQLdb? 2.如何连接数据库? 3.如何创建数据库表? 4.如何执行数据插入? 5.如何执行数据库查询操作? 6.如何更新数据库数据? 7.如何删除数据库数据? …

    Java 2023年6月7日
    076
  • String 部分方法使用

    package com.Mxhlin.String; import java.util.Locale; /** * @author Mxhlin * @Email fuhua277…

    Java 2023年6月7日
    072
  • java-抽象类笔记

    抽象方法 使用 abstract 修饰的方法, 没有方法体,只有声明。 定义的是一种”规范”, 就是告诉子类必须要给抽象方法提供具体的实现。 抽象类 包含 …

    Java 2023年6月15日
    054
  • Mybatis基本流程及配置文件解析

    Mybatis基本流程 1、利用Resources工具类加载配置文件,并转换成输入输出流 2、利用解析的配置,创建SqlSessionFactory工厂 3、生产SqlSessio…

    Java 2023年6月5日
    081
  • JAVA_基础篇(1)_JDK 8 的下载、安装与配置

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Java 2023年6月15日
    051
  • 删除链表的倒数第k个结点

    删除链表的倒数第k个结点 问题重述: 给你一个链表,删除链表的倒数第 k 个结点,并且返回链表的头结点。 示例 1: &#x8F93;&#x5165;&#x…

    Java 2023年6月7日
    0107
  • 清理忽略springboot控制台启动的banner和启动日志

    清理忽略springboot控制台启动的banner和启动日志 1、springboot的banner spring: main: banner-mode: off 2、mybat…

    Java 2023年6月9日
    070
  • Java基础 while 简单示例

    JDK :OpenJDK-11 OS :CentOS 7.6.1810 IDE :Eclipse 2019‑03 typesetting :Markdown code packag…

    Java 2023年5月29日
    095
  • Activiti 7 源码学习

    启动分析 源码版本是 7.1.0.M6 首先从 ProcessEngineAutoConfiguration 开始 ProcessEngineAutoConfiguration 是…

    Java 2023年6月7日
    078
  • Spring Boot 入门(一)搭建第一个Spring Boot程序

    *maven构建项目 官网地址:https://start.spring.io/ Packaging:这里选择War包,方便Application启动应用 点击”GEN…

    Java 2023年6月5日
    087
  • idea 插件推荐 Translation(翻译插件)

    idea 安装Translation: 我用的 windows idea 2019.1.3不同版本可能会不同 打开idea settings => Plugins 搜索tra…

    Java 2023年6月5日
    095
  • spring-retry使用

    Spring Retry提供了自动重新调用失败的操作的功能。这在错误可能是暂时性的(例如瞬时网络故障)的情况下很有用。Spring Retry提供对流程和基于策略的行为的声明式控制…

    Java 2023年5月30日
    099
  • Java8 新特性,打破你对接口的认知

    Java 8 之前, 接口里面只能写抽象方法,不能写实现方法 Java 8 开始是可以有方法实现的,可以在接口中添加默认方法和静态方法 默认方法用 default 修饰,只能用在接…

    Java 2023年6月5日
    052
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球