SpringCloud之RocketMQ

1.RocketMQ 与其他区别

SpringCloud之RocketMQ

2.名词之间关系、架构及图

2.1 架构图

SpringCloud之RocketMQ
SpringCloud之RocketMQ

2.2 消息模型(Message Model)

(1)图

SpringCloud之RocketMQ

(2)组成

a.主要由 Producer、Broker、Consumer 三部分组成。
其中 Producer 负责生产消息, Consumer 负责消费消息, Broker 负责存储消息。

b. Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息
也可以分片存储于不同的 Broker。

c. Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。
ConsumerGroup 由多个Consumer 实例构成。

2.3 Name Server

(1)概念:名称服务,充当路由消息的提供者。

是一个几乎 无状态节点,可集群部署,节点之间无任何信息同步。
在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。

(2)2个功能

NameServer没有状态,可以横向扩展。
每个broker在启动的时候会到NameServer注册;
Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;
Consumer也会定时获取topic路由信息。

a.接收broker的请求,注册broker的路由信息

b.接收client(producer/consumer)的请求,根据某个topic获取其到broker的路由信息

2.4 Broker
(1)消息中转角色,负责存储消息,转发消息。
可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。
broker是RocketMQ的核心,所以需要保证broker的高可用

(2)broker分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。

(3)Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

(4)每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。

2.5 Producer(生产者)
与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。

2.6 Consumer(消费者)
(1)与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息

(2)并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。

(3)Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。

2.7 Broker中名词
(1)Broker、topic、queue关系图

SpringCloud之RocketMQ
(2)topic
消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息…… 一条消息必须有一个Topic。
(3)Queue
a.主题被划分为一个或多个子主题,称为”message queues”。
b.一个topic下,我们可以设置多个queue(消息队列)。当我们发送消息时,需要要指定该消息的topic。RocketMQ会轮询该topic下的所有队列,将消息发送出去。
c.Queue是Topic在一个Broker上的分片,在分片基础上再等分为若干份(可指定份数)后的其中一份,是负载均衡过程中资源分配的基本单元。
d.集群消费模式下一个消费者只消费该Topic中部分Queue中的消息,当一个消费者开启广播模式时则会消费该Topic下所有Queue中的消息。
(4)Tags
a.Topic下的次级消息类型/二级类型(注:Tags也支持TagA || TagB这样的表达式),可以在同一个Topic下基于Tags进行消息过滤。

b.Tags的过滤需要经过两次比对,首先会在Broker端通过Tag hashcode进行一次比对过滤,匹配成功传到consumer端后再对具体Tags进行比对,以防止Tag hashcode重复的情况。

c.比如交易消息又可以分为:交易创建消息,交易完成消息等….. 一条消息可以没有Tag。有了标签,来自同一个业务模块的不同目的的消息可能具有相同的主题和不同的标签。

d.Queue中具体的存储单元结构如下图,最后面的8个Byte存储Tag信息。

SpringCloud之RocketMQ
2.8 Producer 与 Producer Group

(1)Producer
a.表示消息队列的生产者,用来生产和发送消息的,一般指业务系统。

b.RocketMQ提供了发送:普通消息(同步、异步和单向(one-way)消息)、定时消息、延时消息、事务消息。

(2)Producer Group
a.是一类Producer的集合名称。

b.这类Producer通常发送一类消息,且发送逻辑一致。相同角色的生产者被分组在一起

c.同一生产者组的另一个生产者实例可能被broker联系,以提交或回滚事务,以防原始生产者在交易后崩溃。

d.每个生产者组只允许一个实例,以避免对生产者实例进行不必要的初始化。

2.8 Consumer 与 Consumer Group

(1)Consumer
a.消息消费者,一般由业务后台系统异步的消费消息。

b.Push Consumer(对应推模式)。Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。

c.Pull Consumer(对应拉模式)。Consumer 的另一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

(2)Consumer Group
a.是一类Consumer的集合名称。

b.这类通常消费一类消息,且消费逻辑一致(使用相同 Group ID 的订阅者属于同一个集群。同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点)。

c.消费者群体实现了负载平衡和容错的目标,在信息消费方面,是非常容易的。

d.消费者群体的消费者实例必须订阅完全相同的主题。

3.组件关系

3.1 Broker,Producer和Consumer
如果不考虑负载均衡和高可用,最简单的Broker,Producer和Consumer之间的关系如下图所示:

SpringCloud之RocketMQ
3.2 Topic,Topic分片和Queue

SpringCloud之RocketMQ

3.2.1 关系

(1)Topic分片
RocketMQ的一个Topic可以分布在各个Broker上,我们可以把一个Topic分布在一个Broker上的子集定义为一个Topic分片。

(2)Queue
将Topic分片再切分为若干等分,其中的一份就是一个Queue。

3.2.2 queue数量指定方式

每个Topic分片等分的Queue的数量可以不同,由用户在创建Topic时指定,方式如下:

(1)代码指定
producer.setDefaultTopicQueueNums(8);

(2)配置文件指定
同时设置broker服务器的配置文件broker.properties:defaultTopicQueueNums=16

(3)rocket-console控制台指定

SpringCloud之RocketMQ
  1. 消息的种类

4.1 按照发送的特点分

4.1.1 同步消息
(1)属于可靠同步发送
是指消息发送方发出数据后,会阻塞直到MQ服务方发回响应消息。

(2)应用场景:
如重要通知邮件、报名短信通知、营销短信系统等。

(3)流程图

SpringCloud之RocketMQ
(4)代码
SendResult sendResult = producer.send(msg);

4.1.2 异步消息
(1)属于可靠异步发送
a.是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。
b.需要用户实现异步发送回调接口(SendCallback),在执行消息的异步发送时,应用不需要等待服务器响应即可直接返
回,通过回调接口接收服务器响应,并对服务器的响应结果进行处理。

(2)应用场景:
a.异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景。

b.例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

(3)流程图

SpringCloud之RocketMQ

(4)代码

producer.sendAsync(msg, new SendCallback() {//...});

4.1.3 单向消息
(1)one-way
a.发送特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。

b.发送消息的过程耗时非常短,一般在微秒级别。

(2)应用场景:
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

(3)流程图

SpringCloud之RocketMQ

(4)代码

producer.sendOneway(msg);

以上三种消息特点及区别:

SpringCloud之RocketMQ

4.2 按照使用功能特点分
4.2.1 普通消息
(1)无特性的消息,区别于有特性的定时、延时、顺序和事务消息。
(2)包括上面的同步、异步、单向消息

4.2.2 顺序消息
(1)消息队列 RocketMQ 提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息

(2)全局顺序消息
a.概念
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

b.流程图:

SpringCloud之RocketMQ
c.示例
在证券处理中,以人民币兑换美元为 Topic,在价格相同的情况下,先出价者优先处理,则可以通过全局顺序的方式按照 FIFO 的方式进行发布和消费。

(3)分区顺序消息
a.概念
对于指定的一个 Topic,所有消息根据 Sharding Key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。

b.注意:Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的 Message Key是完全不同的概念。

c.流程图:

SpringCloud之RocketMQ
e.示例
例一:用户注册需要发送发验证码,以用户 ID 作为 sharding key, 那么同一个用户发送的消息都会按照先后顺序来发布和消费。

例二:电商的订单创建,以订单 ID 作为 sharding key,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照先后顺序来发布和消费。

(4)发送顺序消息-MessageQueueSelector()

String orderId = "Order_0000001";

//msg-key: "PAY_201907151223001" 标识此条消息业务id
//msg-key: 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
String payId = "PAY_201907151223001";
Message msg = new Message("pay", "TAG1", "PAY_201907151223001" ,
        ("支付消息,内容为:xxxxx " ).getBytes(RemotingHelper.DEFAULT_CHARSET));

 // 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。
// 全局顺序消息,该字段可以设置为任意非空一个字符串常量即可。
String shardingKey = orderId;
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List mqs, Message msg, Object arg) {
        //arg为后续传递的shardingKey,可以根据hash算法or其他方法来计算出id;
        //可参考hashmap的hash算法;
        int id = hash(arg);
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, shardingKey);

(5)接收顺序消息-MessageListenerOrderly()

consumer.registerMessageListener(new MessageListenerOrderly() {

       @Override
       public ConsumeOrderlyStatus consumeMessage(List msgs,
                                                  ConsumeOrderlyContext context) {
          //处理消息...

          return ConsumeOrderlyStatus.SUCCESS;

       }
    });

4.2.3 延时、定时消息

(1)延时消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到 Consumer 进行消费

(2)定时消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费

(3)代码

// 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在 3 秒后投递
long delayTime = System.currentTimeMillis() + 3000;
// 设置消息需要被投递的时间
msg.setStartDeliverTime(delayTime);

// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2019-08-01 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2019-08-01 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);

4.2.4 事务消息
(1)概念
消息队列 RocketMQ 提供类似 X/Open XA 的分布事务功能,通过消息队列 RocketMQ 的事务消息能达到分布式事务的最终一致.

(2)流程图

SpringCloud之RocketMQ
(3)以上流程图步骤
  1. 发送方向消息队列 RocketMQ 服务端发送消息。
  2. 服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
  3. 发送方开始执行本地事务逻辑。
  4. 发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
  5. 在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查。
  6. 发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作。

说明:事务消息发送对应步骤 1、2、3、4,事务消息回查对应步骤 5、6、7。

(4)代码

a.producer

TransactionListener transactionListener = new DeducationTransactionListenerImpl();
//producer需要绑定transactionListener
producer.setTransactionListener(transactionListener);

//producer需要sendMessageInTransaction方法发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);

b.TransactionListener

public class DeducationTransactionListenerImpl implements TransactionListener {

    //当发送prepare(half)消息成功后,会执行此逻辑
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        LocalTransactionState state ;
        //todo 执行业务方法,并根据执行结果,返回state
        return state;
    }

    /**
     * 当没有回应prepare(half)消息时,brokder会检查此条消息的状态
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        LocalTransactionState state ;
        //todo 查看订单bizNo的状态,并返回state
        return  state;
    }
}

c.TransactionStatus

TransactionStatus.CommitTransaction 提交事务,允许订阅方消费该消息。
TransactionStatus.RollbackTransaction 回滚事务,消息将被丢弃不允许消费。
TransactionStatus.Unknow 暂时无法判断状态,期待固定时间以后消息队列 RocketMQ 服务端向发送方进行消息回查。

d.Message设置消息回查时间

/**
 *  在消息属性中添加第一次消息回查的最快时间,单位秒。
 *  例如,以下设置实际第一次回查时间为 120 秒 ~ 125 秒之间
 *
 *  以上方式只确定事务消息的第一次回查的最快时间,实际回查时间向后浮动0~5秒;
 *  如第一次回查后事务仍未提交,后续每隔5秒回查一次。
     */
msg.putUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS,"120");

4.2.5 其他消息
广播消息、 批量消息

4.2.6 附录
消息类型对比:

SpringCloud之RocketMQ

发送方式对比:

SpringCloud之RocketMQ
  1. 发布订阅

5.1 收发模型

SpringCloud之RocketMQ

5.2 producer端消息发布原理图

SpringCloud之RocketMQ

5.3 consumer端两种消息获取模式
(1)push模式
MQServer主动向消费端推送

(2)pull模式
消费端在需要时,主动到MQServer拉取

5.4 consumer端有两种消息消费模式
5.4.1 集群消费
(1)集群:使用相同 Group ID 的订阅者属于同一个集群。
同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。

(2)集群消费:
任意一条消息只需要被集群内的任意一个消费者处理即可。

(3)注意事项
a.消费端集群化部署,每条消息只需要被处理一次。
b.由于消费进度在服务端维护,可靠性更高。
c.集群消费模式下,每一条消息都只会被分发到一台机器上处理。
d.集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。

(4)流程图

SpringCloud之RocketMQ

5.4.2 广播消费
(1)RocketMQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

(2)广播消费模式下不支持顺序消息。
a.不支持重置消费位点。
b.每条消息都需要被相同逻辑的多台机器处理。
c.消费进度在客户端维护,出现重复的概率稍大于集群模式。
d.消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
e.客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
f.每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
h.目前仅 Java 客户端支持广播模式。
i.服务端不维护消费进度,所以控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

(3)流程图

SpringCloud之RocketMQ

2.为什么要用RocketMQ?
2.1 应用解耦
2..1.1 系统的耦合性越高,容错性就越低。以电商为例,用户创建完订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个系统出现故障或者因为升级等原因暂时不可用,都回造成下单的异常,影响用户的体验。
2.1.2 如下图:
(1)子系统间耦合性太大的问题,使用mq之后,我们只需要依赖于mq,避免了各个子系统间的强依赖问题。

(2)订单系统作为消息生产者,保证它自己没有异常即可,不会受到支付系统等业务子系统的异常影响,并且各个消费者业务子系统之间,也互不影响。

SpringCloud之RocketMQ

2.2 流量削峰
2.2.1 应用系统如果遇到系统请求流量瞬间猛增,有可能会将系统压垮。如果有消息队列,遇到此情况,可以将大量请求存储起来,将一瞬间的峰值请求分散到一段时间进行处理,这样可以大大提高系统的稳定性

2.2.2 如下图:
(1)由于突然出现的请求峰值,导致系统不稳定的问题。使用mq后,能够起到消峰的作用。

(2)订单系统接收到用户请求之后,将请求直接发送到mq,然后订单消费者从mq中消费消息,做写库操作。

(3)如果出现请求峰值的情况,由于消费者的消费能力有限,会按照自己的节奏来消费消息,多的请求不处理,保留在mq的队列中,不会对系统的稳定性造成影响

SpringCloud之RocketMQ
2.3 异步
2.3.1
(1)用户调用一个接口的时候,可能该接口调用了别的方法。例如:用户注册的时候,后台可能需要调用:查询数据库,插入数据库,发送邮件等等…

(2)但是用户可能并不需要后台将所有的任务执行完毕,那么此时在初入数据口后面加入MQ,用户就能很快得到注册成功的响应而去做一些别的事情。mq的机制又能保证最终的一致性,所以使用起来很安全很稳定。

2.3.2 如下图:
(1)同步接口调用导致响应时间长的问题,使用mq之后,将同步调用改成异步,能够显著减少系统响应时间。

(2)系统A作为消息的生产者,在完成本职工作后,就能直接返回结果了。而无需等待消息消费者的返回,它们最终会独立完成所有的业务功能。

(3)这样能避免总耗时比较长,从而影响用户的体验的问题。

SpringCloud之RocketMQ

3.部署:Dledger快速搭建

3.1 源码构建

构建分为两个部分,需要先构建 DLedger,然后 构建 RocketMQ

3.1.1 构建 DLedger

git clone https://github.com/openmessaging/openmessaging-storage-dledger.git

cd openmessaging-storage-dledger

mvn clean install -DskipTests

3.1.2 构建 RocketMQ

git clone https://github.com/apache/rocketmq.git

cd rocketmq

git checkout -b store_with_dledger origin/store_with_dledger

mvn -Prelease-all -DskipTests clean install -U

3.2 快速部署

在构建成功后

cd distribution/target/apache-rocketmq

sh bin/dledger/fast-try.sh start

如果上面的步骤执行成功,可以通过 mqadmin 运维命令查看集群状态。

sh bin/mqadmin clusterList -n 127.0.0.1:9876

(BID 为 0 的表示 Master,其余都是 Follower)

启动成功,现在可以向集群收发消息,并进行容灾切换测试了。

关闭快速集群,可以执行:

sh bin/dledger/fast-try.sh stop

快速部署,默认配置在 conf/dledger 里面,默认的存储路径在 /tmp/rmqstore。

3.3 容灾切换

部署成功,杀掉 Leader 之后(在上面的例子中,杀掉端口 30931 所在的进程),等待约 10s 左右,用 clusterList 命令查看集群,就会发现 Leader 切换到另一个节点了。

4.其他

4.1 异步复制和同步双写总结

SpringCloud之RocketMQ

4.2 集群方式对比

SpringCloud之RocketMQ

官方文档

_ 参考链接1_

_ 参考链接2_

_ 参考链接3_

_ 参考链接4_

随心所往,看见未来。Follow your heart,see night!

欢迎点赞、关注、留言,收藏及转发,一起学习、交流!

Original: https://www.cnblogs.com/folyh/p/16584533.html
Author: folyh
Title: SpringCloud之RocketMQ

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/608386/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球