RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

前言

本次分析基于 RocketMQ release-4.5.2 版本。

分析的目标是: RocketMQProducer 是怎么将消息发送至 Broker 的?

说到学习源码,首先当然是要把源代码下载下来,官方地址。使用 git clone https://github.com/apache/rocketmq.git 将源代码 clone 至本地。

项目结构

IDEA打开该项目

RocketMQ 源码学习笔记  Producer 是怎么将消息发送至 Broker 的?

rocketmq-client 模块

可以看到有很多子模块,这次学习的是 Producer 故打开 rocketmq-client 模块,可以在单元测试用找到测试 Producer 功能的类。

RocketMQ 源码学习笔记  Producer 是怎么将消息发送至 Broker 的?

DefaultMQProducerTest

打开该类,观察其方法

RocketMQ 源码学习笔记  Producer 是怎么将消息发送至 Broker 的?

可以看出以 test 开头的方法都是单元测试方法,可以直接运行。 init 方法和 terminate 分别是单元测试的初始化方法和销毁方法。

init 和 terminate

    // 创建一个默认的客户端实例
    @Spy
    private MQClientInstance mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
    // mock 一个真正与 broker 交互的对象
    @Mock
    private MQClientAPIImpl mQClientAPIImpl;
    @Mock
    private NettyRemotingClient nettyRemotingClient;

    private DefaultMQProducer producer;
    private Message message;
    private Message zeroMsg;
    private Message bigMessage;
    private String topic = "FooBar";
    private String producerGroupPrefix = "FooBar_PID";

    // 初始化
    @Before
    public void init() throws Exception {
        String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
        // 创建一个默认的 producer
        producer = new DefaultMQProducer(producerGroupTemp);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setCompressMsgBodyOverHowmuch(16);
        message = new Message(topic, new byte[] {'a'});
        zeroMsg = new Message(topic, new byte[] {});
        bigMessage = new Message(topic, "This is a very huge message!".getBytes());

        producer.start();

        // 反射将客户端实例设置到 producer 对象中
        Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory");
        field.setAccessible(true);
        field.set(producer.getDefaultMQProducerImpl(), mQClientFactory);

        // 反射将一个真正与 broker 交互的对象 设置到客户端实例中
        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
        field.setAccessible(true);
        field.set(mQClientFactory, mQClientAPIImpl);

        // 注册 客户端实例
        producer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());

        // mock 交互对象发消息
        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
            nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
        when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
            nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
            .thenReturn(createSendResult(SendStatus.SEND_OK));
    }

    // 销毁
    @After
    public void terminate() {
        producer.shutdown();
    }

testSendMessageSync_Success

这里选 testSendMessageSync_Success() 方法作为这次分析入口。(该方法用来测试成功的发送同步消息)。

RocketMQ 源码学习笔记  Producer 是怎么将消息发送至 Broker 的?

DEBUG 跟踪调用链可以看出 MQClientAPIImpl#sendMessage ,才是发送消息给 broker 的底层封装,其通过引入 rocketmq-remoting 模块的 org.apache.rocketmq.remoting.netty.NettyRemotingClient 类与 Broker 交互。至于与 Broker 基于 NettyRPC 协议分析,这里不展开分析。可以通过阅读上文提到的 NettyRemotingClient 类进一步了解。

PS:因为使用 mockito 所以调用链中会有些与 producer 发送消息不相关的栈。

PS:通过查看调用链的栈信息,可以快速了解源码中,某一行为的整体流程。

以下源码按调用链从底层往上走

MQClientAPIImpl#sendMessage
    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        // 发送消息
        return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
    }

    // 发送消息
    public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        // RPC 请求对象
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            // 该类的 field 全为 a,b,c,d 等,可以加速 FastJson 反序列化
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            // 根据 request code 创建 RPC 请求对象
            // 该设计是通过类型码的形式,来标识不同类型的请求
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }
        // 设置请求体,也就是消息的消息体
        request.setBody(msg.getBody());

        // 根据模式 选择 oneway 或者 同步 或者 异步
        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }
DefaultMQProducerImpl#sendKernelImpl
    private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        // 查找 brokerName 对应 broker,master 节点的地址
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        // 查找失败,尝试重新从 NameServer 拉取
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

        SendMessageContext context = null;
        if (brokerAddr != null) {
            // 根据 VIP Channel 设置,更新 broker 节点地址
            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

            byte[] prevBody = msg.getBody();
            try {
                //for MessageBatch,ID has been set in the generating process
                if (!(msg instanceof MessageBatch)) {
                    // 设置 自定义属性 UNIQ_KEY -> 0A0A15A01F3C18B4AAC22DB7B6AC0000
                    MessageClientIDSetter.setUniqID(msg);
                }

                boolean topicWithNamespace = false;
                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                    // 设置 自定义属性 INSTANCE_ID ->
                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                    topicWithNamespace = true;
                }
                // 消息设置 处理标识,用于标识消息经过什么样的处理,可以查看该类 org.apache.rocketmq.common.sysflag.MessageSysFlag ,该类是设计较好的标识处理,可以借鉴
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                // 根据 DefaultMQProducer#compressMsgBodyOverHowmuch 选择是否压缩,默认超过 4K 则压缩,压缩算法为 zip
                if (this.tryToCompressMessage(msg)) {
                    // 设置压缩标识,COMPRESSED_FLAG = 0x1
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }

                // 获取属性,判断是否是事务消息, PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    // 设置事务标识,TRANSACTION_PREPARED_TYPE = 0x1 << 2
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

                // hook 操作,这段是检测是否有发送权限 hook 操作, Hook 接口为 org.apache.rocketmq.client.hook.CheckForbiddenHook, 注意:在 DefaultMQProducerImpl 中,该类是以列表形式存在的
                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                // hook 操作,这段是执行发送消息前的 hook 操作, Hook 接口为 org.apache.rocketmq.client.hook.SendMessageHook, 注意:在 DefaultMQProducerImpl 中,该类是以列表形式存在的
                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    context.setNamespace(this.defaultMQProducer.getNamespace());
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

                // 设置 request header
                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

                SendResult sendResult = null;
                switch (communicationMode) {
                    case ASYNC:
                        Message tmpMessage = msg;
                        boolean messageCloned = false;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.

                            //Clone new message using commpressed message body and recover origin massage.

                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                            msg.setBody(prevBody);
                        }

                        if (topicWithNamespace) {
                            if (!messageCloned) {
                                tmpMessage = MessageAccessor.cloneMessage(msg);
                                messageCloned = true;
                            }
                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                        }

                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        // 异步发送消息
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        // oneway 或同步发送消息
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

                // hook 操作,这段是执行发送消息后的 hook 操作, Hook 接口为 org.apache.rocketmq.client.hook.SendMessageHook, 注意:在 DefaultMQProducerImpl 中,该类是以列表形式存在的
                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            } catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            } finally {
                msg.setBody(prevBody);
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
            }
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
DefaultMQProducerImpl#sendDefaultImpl
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 确保 Producer 状态为 RUNNING 态,所有状态可查看 org.apache.rocketmq.common.ServiceState 枚举类
        this.makeSureStateOK();
        // 校验消息是否符合规则,该工具类是比较好的参数校验封装形式,可以参考借鉴
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        // 第一次执行发送消息前的时间戳
        long beginTimestampFirst = System.currentTimeMillis();
        // 当前次发送消息前的时间戳
        long beginTimestampPrev = beginTimestampFirst;
        // 当前次发送消息后的时间戳
        long endTimestamp = beginTimestampFirst;

        // 从 NameServer 获取 topic 相关信息,包含 topic 中的 queue 相关信息; queue 路由相关信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        // 当 (topic 相关信息不为 null) 并且 (topic 中的 queue 列表不为 null 或者 空队列)
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 当模式为 SYNC 时, 默认执行次数为 3 次,包含 1 次正常调用,2 次重试;其他只执行 1 次
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            // 第几次发送对应的 broker 信息
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                // 获取上次发送的 broker 名称
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 选择一个 queue 进行发送。有失败重试策略,默认使用 RoundRobin 算法,可以通过 DefaultMQProducer#setSendLatencyFaultEnable 设置启用 LatencyFault 策略
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 记录发送前的时间戳
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.

                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        // 计算花费时间
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        // 花费时间 超过了 timeout ,则超时处理
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }

                        // 发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        // 当设置启用 LatencyFault 策略时,更新 FaultItem
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // 根据模式,选择发送消息后的处理方式
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                // 模式为 SYNC 时,会有重试处理
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }

                                return sendResult;
                            default:
                                break;
                        }
                    // 以下代码为异常处理
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        switch (e.getResponseCode()) {
                            case ResponseCode.TOPIC_NOT_EXIST:
                            case ResponseCode.SERVICE_NOT_AVAILABLE:
                            case ResponseCode.SYSTEM_ERROR:
                            case ResponseCode.NO_PERMISSION:
                            case ResponseCode.NO_BUYER_ID:
                            case ResponseCode.NOT_IN_CURRENT_UNIT:
                                continue;
                            default:
                                if (sendResult != null) {
                                    return sendResult;
                                }

                                throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }

            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }

            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }

            throw mqClientException;
        }

        List nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }

        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
DefaultMQProducerImpl#send
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息 同步模式
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

    /**
     * DEFAULT SYNC -------------------------------------------------------
     */
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息,默认超时时间为3000ms
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
DefaultMQProducer#send

该类使用了门面模式,简单来说就是通过一个门面类,将内部复杂的细节封装好,给客户端提供统一的调用接口。

RocketMQ 源码学习笔记  Producer 是怎么将消息发送至 Broker 的?

扩展可以参考博主之前写的博客《设计模式学习笔记 —— 外观 (Facade) 模式》

    /**
     * Send message in synchronous mode. This method returns only when the sending procedure totally completes.

     *
     *
     * Warn: this method has internal retry-mechanism, that is, internal implementation will retry
     * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
     * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.

     *
     * @param msg Message to send.

     * @return {@link SendResult} instance to inform senders details of the deliverable, say Message ID of the message,
     * {@link SendStatus} indicating broker storage/replication status, message queue sent to, etc.

     * @throws MQClientException if there is any client error.

     * @throws RemotingException if there is any network-tier error.

     * @throws MQBrokerException if there is any error with broker.

     * @throws InterruptedException if the sending thread is interrupted.

     */
    @Override
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 校验消息是否符合规则,该工具类是比较好的参数校验封装形式,可以参考借鉴
        Validators.checkMessage(msg, this);
        // 使用 NamespaceUtil 工具类包装 namespace ,逻辑看 org.apache.rocketmq.common.protocol.NamespaceUtilTest#testWrapNamespace 单元测试
        msg.setTopic(withNamespace(msg.getTopic()));
        // 发送消息
        return this.defaultMQProducerImpl.send(msg);
    }
DefaultMQProducerTest#testSendMessageSync_Success
    @Test
    public void testSendMessageSync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
        // mock 从 NameServer 获取 Topic 的路由信息
        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());

        // 发送消息
        SendResult sendResult = producer.send(message);

        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
        assertThat(sendResult.getQueueOffset()).isEqualTo(456L);
    }

本文由博客一文多发平台 OpenWrite 发布!
分享并记录所学所见

Original: https://www.cnblogs.com/switchvov/p/15095101.html
Author: switchvov
Title: RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/572874/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • SpringBoot自动装配-自定义Start

    SpringBoot自动装配 JAVA技术交流群:737698533 SpringBootApplication注解 什么是自动装配,也就是说帮你把需要的类自动添加到Spring容…

    Java 2023年6月6日
    091
  • javaweb实现下载

    实现代码 java代码 package com.kuang.servlet; &#x200B; import javax.servlet.ServletException;…

    Java 2023年6月13日
    061
  • 分享一款好玩的工具

    闲着无聊,想看看电视剧,可惜像我这么懒的人是不会冲会员的,也无法忍受某酷那些网站的广告的毒害,然后我还懒得为了看个电视剧去各个网站找视频,麻烦呀 记得之前分享过一个直接复制那些收费…

    Java 2023年6月6日
    080
  • 「免费开源」基于Vue和Quasar的前端SPA项目crudapi后台管理系统实战之数据库逆向(十二)

    基于Vue和Quasar的前端SPA项目实战之数据库逆向(十二) 回顾 通过之前文章基于Vue和Quasar的前端SPA项目实战之动态表单(五)的介绍,实现了动态表单功能。如果是全…

    Java 2023年6月6日
    0145
  • Java new Date() 获取的时间不正确 【已解决】

    ▌问题描述new Date()获取正确,使用TimeUtils.timeInUTC()转换日期格式后,时间早了比北京时间晚了8小时 ▌原因分析时区不正确,TimeUtils默认使用…

    Java 2023年5月29日
    0102
  • mac(m1)配置my.cnf

    今天开始学习了数据库,在安装MySQL之后启动一直报错,然后在网上找了很多解决方法,最后用以下方法解决 对于习惯了windows的小伙伴来说,直接去安装目录里边修改my.ini就可…

    Java 2023年6月14日
    065
  • AC自动机:Tire树+KMP

    简介 AC自动机是一个多模式匹配算法,在模式匹配领域被广泛应用,举一个经典的例子,违禁词查找并替换为***。AC自动机其实是Trie树和KMP 算法的结合,首先将多模式串建立一个T…

    Java 2023年6月9日
    072
  • keepalived 踩坑记录

    坑之一 在执行脚本的时候返回:-bash: ./nginx_check.sh: /bin/bash^M: 坏 的 解 释 器 : 没 有 那 个 文 件 或 目 录 这个问题是我们…

    Java 2023年6月8日
    075
  • Spring笔记–@ConditionalOnBean坑

    @ConditionalOnBean 巨坑 场景:SpringBoot 引入 redis-starter , 加载 RabbitAutoConfiguration ,进而存在 St…

    Java 2023年5月30日
    077
  • 【Unity Shader学习笔记】Unity光照-阴影

    1、原理 由一个物体向其他物体投射阴影,以及一个物体如何接收其他物体的阴影。 实时渲染中经常使用 Shadow Map 技术。它会首先把摄像机的位置放在与光源重合的位置上, 那么场…

    Java 2023年6月9日
    078
  • 3.门面Slf4j+logback

    1.导入pomyil org.slf4j slf4j-api 1.7.27 ch.qos.logback logback-classic 1.2.3 2.默认使用logback.x…

    Java 2023年6月13日
    077
  • 人到中年,做管理真的需要懂的管理必备知识

    课堂三点要求: 认真听讲,记笔记 * – 讲义电子版会给补充,不要急于找资料 – 跟着课堂节奏 积极参与课堂互动,远程依然有温度 * – 课堂提…

    Java 2023年6月16日
    069
  • MCU软件最佳实践——独立按键

    短小精悍,适用于有rtos和无rtos场合的按键驱动程序 引子 在进行mcu驱动和应用开发时,经常会遇到独立按键驱动的开发,独立按键似乎是每一个嵌入式工程师的入门必修课。笔者翻阅了…

    Java 2023年6月6日
    0112
  • Git命令备忘录

    前言 基本内容 开始之前 基础内容 基础概念 基础命令 远程仓库 分支管理 基本命令 stash命令 rebase命令 冲突问题 标签管理 前言 Git在平时的开发中经常使用,整理…

    Java 2023年6月10日
    060
  • Java利用ShutDownHook关闭系统资源

    Java关闭钩子 在Java程序中能够通过加入关闭钩子,实如今程序退出时关闭资源的功能。使用Runtime.addShutdownHook(Thread hook)向JVM加入关闭…

    Java 2023年5月29日
    056
  • springboot系列十二、springboot集成RestTemplate及常见用法

    一、背景介绍 在微服务都是以HTTP接口的形式暴露自身服务的,因此在调用远程服务时就必须使用HTTP客户端。我们可以使用JDK原生的 URLConnection、Apache的 H…

    Java 2023年5月30日
    078
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球