【RocketMQ】MQ消息发送

消息发送

首先来看一个RcoketMQ发送消息的例子:

@Service
public class MQService {

    @Autowired
    DefaultMQProducer defaultMQProducer;

    public void sendMsg() {
        String msg = "我是一条消息";
        // 创建消息,指定TOPIC、TAG和消息内容
        Message sendMsg = new Message("TestTopic", "TestTag", msg.getBytes());
        SendResult sendResult = null;
        try {
            // 同步发送消息
            sendResult = defaultMQProducer.send(sendMsg);
            System.out.println("消息发送响应:" + sendResult.toString());
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

RocketMQ是通过 DefaultMQProducer进行消息发送的,它实现了 MQProducer接口, MQProducer接口中定义了消息发送的方法,方法主要分为三大类:

public interface MQProducer extends MQAdmin {

    // 同步发送消息
    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

    // 异步发送消息,SendCallback为回调函数
    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
        RemotingException, InterruptedException;

    // 异步发送消息,没有回调函数
    void sendOneway(final Message msg) throws MQClientException, RemotingException,
        InterruptedException;

    // 省略其他方法
}

接下来以将以同步消息发送为例来分析消息发送的流程。

DefaultMQProducer里面有一个 DefaultMQProducerImpl类型的成员变量 defaultMQProducerImpl,从默认的无参构造函数中可以看出在构造函数中对 defaultMQProducerImpl进行了实例化,在 send方法中就是调用 defaultMQProducerImpl的方法进行消息发送的:

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    /**
     * 默认消息生产者实现类
     */
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;

    /**
     * 默认的构造函数
     */
    public DefaultMQProducer() {
        this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);
    }
    /**
     * 构造函数
     */
    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.producerGroup = producerGroup;
        // 实例化
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }

    /**
     * 同步发送消息
     */
    @Override
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 设置主题
        msg.setTopic(withNamespace(msg.getTopic()));
        // 发送消息
        return this.defaultMQProducerImpl.send(msg);
    }
}

DefaultMQProducerImpl中消息的发送在 sendDefaultImpl方法中实现,处理逻辑如下:

public class DefaultMQProducerImpl implements MQProducerInner {
    /**
     * DEFAULT SYNC -------------------------------------------------------
     */
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }

    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }

    /**
     * 发送消息
     * @param msg 发送的消息
     * @param communicationMode
     * @param sendCallback 回调函数
     * @param timeout 超时时间
     */
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        // 开始时间
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //  查找主题路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            // 消息队列
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 获取失败重试次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                // 获取BrokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 根据BrokerName选择一个消息队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    // 记录本次选择的消息队列
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 记录时间
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.

                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        // 计算选择消息队列的耗时时间
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        // 如果已经超时,终止发送
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        // 发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        // 结束时间
                        endTimestamp = System.currentTimeMillis();
                        // 记录向Broker发送消息的请求耗时,消息发送结束时间 - 开始时间
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                // 如果发送失败
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    // 是否重试
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                // 返回结果
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        // 如果抛出异常,记录请求耗时
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    }
                    // ... 省略其他异常处理
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }
            // ...

        }

        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

}

DefaultMQProducerImpl中有一个路由信息表 topicPublishInfoTable,记录了主题对应的路由信息,其中KEY为topic, value为对应的路由信息对象TopicPublishInfo:

public class DefaultMQProducerImpl implements MQProducerInner {

    // 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo
    private final ConcurrentMap topicPublishInfoTable =
        new ConcurrentHashMap();
}

TopicPublishInfo中记录了主题所在的消息队列信息、所在Broker等信息:

sendWhichQueue:计数器,选择消息队列的时候增1,以此达到轮询的目的

topicRouteData:从NameServer查询到的主题对应的路由数据,包含了队列和Broker的相关数据

public class TopicPublishInfo {

    // 消息队列列表
    private List messageQueueList = new ArrayList();

    // 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

    // 主题路由数据
    private TopicRouteData topicRouteData;

    // ...

}

// 消息队列
public class MessageQueue implements Comparable, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    private String topic; // 主题
    private String brokerName; // 所属Broker名称
    private int queueId; // 队列ID
    // ...

}

// 主题路由数据
public class TopicRouteData extends RemotingSerializable {

    private List queueDatas; // 队列数据列表
    private List brokerDatas; // Broker信息列表
    // ...

}

// 队列数据
public class QueueData implements Comparable {
    private String brokerName; // Broker名称
    private int readQueueNums; // 可读队列数量
    private int writeQueueNums; // 可写队列数量
    private int perm;
    private int topicSysFlag;
}

// Broker数据
public class BrokerData implements Comparable {
    private String cluster; // 集群名称
    private String brokerName; // Broker名称
    private HashMap brokerAddrs; // Broker地址集合,KEY为Broker ID, value为Broker 地址
    // ...

}

在查找主题路由信息的时候首先从 DefaultMQProducerImpl缓存的路由表 topicPublishInfoTable中根据主题查找路由信息,如果查询成功返回即可,如果未查询到,需要从NameServer中获取路由信息,如果获取失败,则使用默认的主题路由信息:

public class DefaultMQProducerImpl implements MQProducerInner {

    // 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo
    private final ConcurrentMap topicPublishInfoTable =
        new ConcurrentHashMap();

    /**
     * 根据主题查找路由信息
     * @param topic 主题
     * @return
     */
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        // 根据主题获取对应的主题路由信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        // 如果未获取到
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 从NameServer中查询路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        // 如果路由信息获取成功
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            // 返回路由信息
            return topicPublishInfo;
        } else {
            // 如果路由信息未获取成功,使用默认主题查询路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            // 返回路由信息
            return topicPublishInfo;
        }
    }
}

从NameServer获取主题路由信息数据是在 MQClientInstance中的 updateTopicRouteInfoFromNameServer方法中实现的:

public class MQClientInstance {
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        // 从NameServer更新路由信息
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }

   /**
     * 从NameServer更新路由信息
     * @param topic 主题
     * @param isDefault 是否使用默认的主题
     * @param defaultMQProducer 默认消息生产者
     * @return
     */
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    // 是否使用默认的路由信息
                    if (isDefault && defaultMQProducer != null) {
                        // 使用默认的主题路由信息
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            clientConfig.getMqClientApiTimeout());
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums); // 设置可读队列数量
                                data.setWriteQueueNums(queueNums); // 设置可写队列数量
                            }
                        }
                    } else {
                        // 从NameServer获取路由信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                    }
                    // 如果路由信息不为空
                    if (topicRouteData != null) {
                        // 从路由表中获取旧的路由信息
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        // 判断路由信息是否发生变化
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            // 是否需要更新路由信息
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }
                        // 如果数据发生变化
                        if (changed) {
                            // 克隆一份新的路由信息
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            // 处理brokerAddrTable中的数据
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                // 更新brokerAddrTable中的数据
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // ...

                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            // 将新的路由信息加入到路由表
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                    }
                } catch (MQClientException e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } catch (RemotingException e) {
                    log.error("updateTopicRouteInfoFromNameServer Exception", e);
                    throw new IllegalStateException(e);
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }
}

向NameServer发送请求的代码实现在 MQClientAPIImplgetTopicRouteInfoFromNameServer方法中,可以看到构建了请求命令 RemotingCommand并设置请求类型为 RequestCode.GET_ROUTEINFO_BY_TOPIC,表示从NameServer获取路由信息,之后通过Netty向NameServer发送请求,并解析返回结果:

public class MQClientAPIImpl {
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
        // 从NameServer获取路由信息
        return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
    }

    /**
     * 从NameServer获取路由信息
     * @param topic
     * @param timeoutMillis
     * @param allowTopicNotExist
     * @return
     * @throws MQClientException
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     * @throws RemotingConnectException
     */
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        // 创建请求命令,请求类型为获取主题路由信息GET_ROUTEINFO_BY_TOPIC
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
        // 发送请求
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            // 如果主题不存在
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            // 如果请求发送成功
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                // 返回获取的路由信息
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }
}

主题路由信息数据 TopicPublishInfo获取到之后,需要从中选取一个消息队列,是通过调用MQFaultStrategy的 selectOneMessageQueue方法触发的,之后会进入 MQFaultStrategyselectOneMessageQueue方法从主题路由信息中选择消息队列:

public class DefaultMQProducerImpl implements MQProducerInner {
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 选择消息队列
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }
}

MQFaultStrategy的selectOneMessageQueue方法 主要是通过调用 TopicPublishInfo 中的相关方法进行消息队列选择的

启用故障延迟机制

如果启用了故障延迟机制,会遍历 TopicPublishInfo中存储的消息队列列表,对计数器增1,轮询选择一个消息队列, 接着会判断消息队列所属的Broker是否可用,如果Broker可用返回消息队列即可。

如果选出的队列所属Broker不可用,会调用 latencyFaultTolerancepickOneAtLeast方法(下面会讲到)选择一个Broker,从tpInfo中获取此Broker可写的队列数量,如果数量大于0,调用 selectOneMessageQueue()方法选择一个队列。

如果故障延迟机制未选出消息队列,依旧会调用 selectOneMessageQueue()选择出一个消息队列。

未启用故障延迟机制

直接调用的 selectOneMessageQueue(String lastBrokerName)方法并传入上一次使用的Broker名称进行选择。

public class MQFaultStrategy {
     /**
     * 选择消息队列
     * @param tpInfo 主题路由信息
     * @param lastBrokerName 上一次使用的Broker名称
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 如果启用故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                // 计数器增1
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                // 遍历TopicPublishInfo中存储的消息队列列表
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 轮询选择一个消息队列
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    // 如果下标小于0,则使用0
                    if (pos < 0)
                        pos = 0;
                    // 根据下标获取消息队列
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                // 如果未获取到可用的Broker
                // 调用pickOneAtLeast选择一个
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                // 从tpInfo中获取Broker可写的队列数量
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                // 如果可写的队列数量大于0
                if (writeQueueNums > 0) {
                    // 选择一个消息队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        // 设置消息队列所属的Broker
                        mq.setBrokerName(notBestBroker);
                        // 设置队列ID
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    // 返回消息队列
                    return mq;
                } else {
                    // 移除Broker
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列
            return tpInfo.selectOneMessageQueue();
        }
        // 根据上一次使用的BrokerName获取消息队列
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

selectOneMessageQueue方法的实现

selectOneMessageQueue方法中,如果上一次选择的BrokerName为空,则调用无参的 selectOneMessageQueue方法选择消息队列,也是默认的选择方式,首先对计数器增一,然后用计数器的值对 messageQueueList列表的长度取余得到下标值 pos,再从 messageQueueList中获取 pos位置的元素, 以此达到轮询从 messageQueueList 列表中选择消息队列的目的。

如果传入的BrokerName不为空,遍历messageQueueList列表,同样对计数器增一,并对 messageQueueList列表的长度取余,选取一个消息队列, 不同的地方是选择消息队列之后,会判断消息队列所属的Broker是否与上一次选择的Broker名称一致,如果一致则继续循环,轮询选择下一个消息队列,也就是说,如果上一次选择了某个Broker发送消息,本次将不会再选择这个Broker,当然如果最后仍未找到满足要求的消息队列,则仍旧使用默认的选择方式,也就是调用无参的selectOneMessageQueue方法进行选择。

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List messageQueueList = new ArrayList(); // 消息队列列表
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的
    private TopicRouteData topicRouteData;

    // ...

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        // 如果上一次选择的BrokerName为空
        if (lastBrokerName == null) {
            // 选择消息队列
            return selectOneMessageQueue();
        } else {
            // 遍历消息队列列表
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                // 计数器增1
                int index = this.sendWhichQueue.incrementAndGet();
                // 对长度取余
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                // 获取消息队列,也就是使用使用轮询的方式选择消息队列
                MessageQueue mq = this.messageQueueList.get(pos);
                // 如果队列所属的Broker与上一次选择的不同,返回消息队列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            // 使用默认方式选择
            return selectOneMessageQueue();
        }
    }

    // 选择消息队列
    public MessageQueue selectOneMessageQueue() {
        // 自增
        int index = this.sendWhichQueue.incrementAndGet();
        // 对长度取余
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        // 选择消息队列
        return this.messageQueueList.get(pos);
    }
}

回到发送消息的代码中,可以看到消息发送无论成功与否都会调用 updateFaultItem方法更新失败条目:

public class DefaultMQProducerImpl implements MQProducerInner {

    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();

    // 发送消息
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            // ...

            for (; times < timesTotal; times++) {
                    try {
                        // 开始时间
                        beginTimestampPrev = System.currentTimeMillis();
                        // ...

                        // 发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        // 结束时间
                        endTimestamp = System.currentTimeMillis();
                        // 更新失败条目
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // ...

                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        // 更新失败条目
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    }
                    // 省略其他catch
                    // ...

                    catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

          // ...

    }

    // 更新FaultItem
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        // 调用MQFaultStrategy的updateFaultItem方法
        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }

}

MQFaultStrategy中有一个类型的成员变量,最终是通过调用 latencyFaultToleranceupdateFaultItem方法进行更新的,并传入了三个参数:

brokerName:Broker名称

currentLatency:当前延迟时间,由上面的调用可知传入的值为 发送消息的耗时时间,即消息发送结束时间 – 开始时间

duration:持续时间,根据 isolation的值决定,如果为true, duration的值为30000ms也就是30s,否则与currentLatency的值一致

public class MQFaultStrategy {

    // 故障延迟机制
    private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();

    /**
     *  更新失败条目
     * @param brokerName Broker名称
     * @param currentLatency 发送消息耗时:请求结束时间 - 开始时间
     * @param isolation
     */
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            // 计算duration,isolation为true时使用30000,否则使用发送消息的耗时时间currentLatency
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            // 更新到latencyFaultTolerance中
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
}

LatencyFaultToleranceImpl

LatencyFaultToleranceImpl中有一个 faultItemTable,记录了每个Broker对应的 FaultItem,在 updateFaultItem方法中首先根据Broker名称从 faultItemTable获取 FaultItem

  • 如果获取为空,说明需要新增 FaultItem,新建 FaultItem对象,设置传入的 currentLatency延迟时间(消息发送结束时间 – 开始时间)和开始时间即当前时间 + notAvailableDurationnotAvailableDuration值有两种情况,值为30000毫秒或者与 currentLatency的值一致
  • 如果获取不为空,说明之前已经创建过对应的 FaultItem,更新 FaultItem中的 currentLatency延迟时间和 StartTimestamp开始时间
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
    // FaultItem集合,Key为BrokerName,value为对应的FaultItem对象
    private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);

    /**
     * 更新FaultItem
     * @param name Broker名称
     * @param currentLatency 延迟时间,也就是发送消息耗时:请求结束时间 - 开始时间
     * @param notAvailableDuration 不可用的持续时间,也就是上一步中的duration
     */
    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        // 获取FaultItem
        FaultItem old = this.faultItemTable.get(name);
        // 如果不存在
        if (null == old) {
            // 新建FaultItem
            final FaultItem faultItem = new FaultItem(name);
            // 设置currentLatency延迟时间
            faultItem.setCurrentLatency(currentLatency);
            // 设置规避故障开始时间,当前时间 + 不可用的持续时间,不可用的持续时间有两种情况:值为30000或者与currentLatency一致
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            // 添加到faultItemTable
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            // 更新时间
            old.setCurrentLatency(currentLatency);
            // 更新开始时间
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
}

FaultItemLatencyFaultToleranceImpl的一个内部类,里面有三个变量:

  • name:Broker名称。
  • currentLatency:延迟时间,等于发送消息耗时时间:发送消息结束时间 – 开始时间。
  • startTimestamp:规避故障开始时间:新建/更新FaultItem的时间 + 不可用的时间 notAvailableDurationnotAvailableDuration值有两种情况,值为30000毫秒或者与 currentLatency的值一致。

isAvailable方法

isAvailable方法用于开启故障延迟机制时判断Broker是否可用,可用判断方式为:当前时间 – startTimestamp的值大于等于 0,如果小于0则认为不可用。

上面分析可知 startTimestamp的值为新建/更新FaultItem的时间 + 不可用的时间, 如果当前时间减去规避故障开始时间的值大于等于0,说明此Broker已经超过了设置的规避时间,可以重新被选择用于发送消息。

compareTo方法

FaultItem还实现了 Comparable,重写了 compareTo方法,在排序的时候使用,对比大小的规则如下:

isAvailable 方法返回true的时候表示 FaultItem 对象的值越小,因为true代表Broker已经过了规避故障的时间,可以重新被选择。

currentLatency 的值越小表示 FaultItem 的值越小。 currentLatency的值与Broker发送消息的耗时有关,耗时越低,值就越小。

startTimestamp 值越小同样表示整个 FaultItem 的值也越小。 startTimestamp的值与 currentLatency有关(值不为默认的30000毫秒情况下), currentLatency值越小, startTimestamp的值也越小。

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
    class FaultItem implements Comparable {
        private final String name; // Broker名称
        private volatile long currentLatency; // 发送消息耗时时间:请求结束时间 - 开始时间
        private volatile long startTimestamp; // 规避开始时间:新建/更新FaultItem的时间 + 不可用的时间notAvailableDuration

        @Override
        public int compareTo(final FaultItem other) {
            // 如果isAvailable不相等,说明一个为true一个为false
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable()) // 如果当前对象为true
                    return -1; // 当前对象小

                if (other.isAvailable())// 如果other对象为true
                    return 1; // other对象大
            }
            // 对比发送消息耗时时间
            if (this.currentLatency < other.currentLatency)
                return -1;// 当前对象小
            else if (this.currentLatency > other.currentLatency) {
                return 1; // other对象大
            }
            // 对比故障规避开始时间
            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }
        // 用于判断Broker是否可用
        public boolean isAvailable() {
            // 当前时间减去startTimestamp的值是否大于等于0,大于等于0表示可用
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
   }
}

在选择消息队列时,如果开启故障延迟机制并且未找到合适的消息队列,会调用 pickOneAtLeast方法选择一个Broker,那么是如何选择Broker的呢?

FaultItemcompareTo 方法可知,currentLatency和startTimestamp的值越小,整个 FaultItem 的值也就越小,正序排序时越靠前,靠前表示向Broker发送消息的延迟越低,在选择Broker时优先级越高,所以如果 half 值小于等于0的时候,取链表中的第一个元素, half 值大于0的时候,处于链表前half个的Brokerddd,延迟都是相对较低的,此时轮询从前haft个Broker中选择一个Broker。

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
    // FaultItem集合,Key为BrokerName,value为对应的FaultItem对象
    private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);

    @Override
    public String pickOneAtLeast() {
        final Enumeration elements = this.faultItemTable.elements();
        List tmpList = new LinkedList();
        // 遍历faultItemTable
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            // 将FaultItem添加到列表中
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            Collections.shuffle(tmpList);
            // 排序
            Collections.sort(tmpList);
            // 计算中间数
            final int half = tmpList.size() / 2;
            // 如果中位数小于等于0
            if (half

再回到 MQFaultStrategy中选择消息队列的地方,在开启故障延迟机制的时候,选择队列后会调用 LatencyFaultToleranceImplisAvailable方法来判断Broker是否可用,而 LatencyFaultToleranceImplisAvailable方法又是调用Broker对应 FaultItemisAvailable方法来判断的。

由上面的分析可知, isAvailable返回true表示Broker已经过了规避时间可以用于发送消息,返回false表示还在规避时间内,需要避免选择此Broker, 所以故障延迟机制指的是在发送消息时记录每个Broker的耗时时间,如果某个Broker发生故障,但是生产者还未感知(NameServer 30s检测一次心跳,有可能Broker已经发生故障但未到检测时间,所以会有一定的延迟),用耗时时间做为一个故障规避时间(也可以是30000ms),此时消息会发送失败,在重试或者下次选择消息队列的时候,如果在规避时间内,可以避免再次选择到此Broker,以此达到故障规避的目的。

如果某个主题所在的所有Broker都处于不可用状态,此时调用 pickOneAtLeast方法尽量选择延迟时间最短、规避时间最短(排序后的失败条目中靠前的元素)的Broker作为此次发生消息的Broker。

public class MQFaultStrategy {
    private final LatencyFaultTolerance latencyFaultTolerance = new LatencyFaultToleranceImpl();
     /**
     * 选择消息队列
     * @param tpInfo 主题路由信息
     * @param lastBrokerName 上一次使用的Broker名称
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 如果启用故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                // 计数器增1
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                // 遍历TopicPublishInfo中存储的消息队列列表
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 轮询选择一个消息队列
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    // 如果下标小于0,则使用0
                    if (pos < 0)
                        pos = 0;
                    // 根据下标获取消息队列
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                // 如果未获取到可用的Broker
                // 调用pickOneAtLeast选择一个
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                // 从tpInfo中获取Broker可写的队列数量
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                // 如果可写的队列数量大于0
                if (writeQueueNums > 0) {
                    // 选择一个消息队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        // 设置消息队列所属的Broker
                        mq.setBrokerName(notBestBroker);
                        // 设置队列ID
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    // 返回消息队列
                    return mq;
                } else {
                    // 移除Broker
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列
            return tpInfo.selectOneMessageQueue();
        }
        // 根据上一次使用的BrokerName获取消息队列
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
    private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);

    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            // 调用FaultItem的isAvailable方法判断是否可用
            return faultItem.isAvailable();
        }
        return true;
    }
}

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3

Original: https://www.cnblogs.com/shanml/p/16387192.html
Author: shanml
Title: 【RocketMQ】MQ消息发送

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/586287/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 头秃了,Spring Boot 自动配置源码解析了解一波~

    前言 源码版本 @SpringBootApplication 干了什么? @EnableAutoConfiguration 干了什么? 总结 为什么 Spring Boot这么火?…

    Java 2023年6月14日
    079
  • java通过freemarker导出包含富文本图片的word文档

    废话不多说,进入正题! 本文重点在于:对富文本图片的导出(基础的freemarker+word模板导出这里不做详细解说哈) 参考文章:http://www.cnblogs.com/…

    Java 2023年6月7日
    094
  • 机器学习(2)文本特征抽取

    博客园 :当前访问的博文已被密码保护 请输入阅读密码: Original: https://www.cnblogs.com/cgy1995/p/9974732.htmlAuthor…

    Java 2023年6月8日
    073
  • MyBatis 和 jeesite多表查询

    有时候经常碰到多级联查,比如通过某个功能A表查角色信息,但是A表和角色表没有直接的关联关系,需要通过用户表进行关联,所以就需要多级关联查询出来了(下面的只是举例,实际应用用户和角色…

    Java 2023年6月5日
    081
  • 多账号登录控制

    多账号登录控制 场景:java系统中用户账号登录实现控制,实现用户同时只能在一处登录 思路: 用户登录时添加用户的登录信息 用户退出时删除用户的登录信息 用户请求的session超…

    Java 2023年6月8日
    083
  • 使用CXF发布webservice服务及注意要点

    1、什么是webservice Web service是一个平台独立的,低耦合的,自包含的、基于可编程的web的应用程序,可使用开放的XML标准来描述、发布、发现、协调和配置这些应…

    Java 2023年6月15日
    063
  • Android RTL 语言适配

    使用 start/end 代替 left/right 属性值。 官方给出的需要替换的属性值列表如下: Android 对 RTL 的支持,是从 Android 4.2 版本开始的。…

    Java 2023年6月7日
    0105
  • Java SE-集合

    Java 的集合体系 Java集合可分为两大体系:Collection 和 Map 1.常见的Java集合如下: Collection接口:单列数据,定义了存取一组对象的方法的集合…

    Java 2023年6月10日
    089
  • MongoDB 学习笔记

    概述 MongoDB 是一个介于关系型数据库和非关系型数据库之间的产品,是非关系型数据库中功能最丰富,最像关系型数据库的。 MongoDB 支持的数据结构非常松散,类似 json …

    Java 2023年6月8日
    0169
  • SpringBoot 整合Swagger2 踩坑记录

    SpringBoot 整合Swagger2 踩坑记录 Failed to start bean ‘documentationPluginsBootstrapper&#8…

    Java 2023年6月5日
    096
  • MySQL的Explain总结

    Explain简介 MySQL优化器在基于成本的计算和基于规则的SQL优化会生成一个所谓的 执行计划,我们就可以使用执行计划查看MySQL对该语句具体的执行方式。 介绍这个好啰嗦就…

    Java 2023年6月16日
    067
  • js金额转中文大写

    基础参数: 主方法: 整数部分转换: 小数部分转换; 格式化处理: Original: https://www.cnblogs.com/tangzeqi/p/13044993.ht…

    Java 2023年6月6日
    0102
  • 3 垃圾收集算法

    1 垃圾收集三件事 2 对象存活判定算法 2.1 引用计数算法 2.2 可达性分析算法 2.2.1 不可达对象的后置处理 2.3 方法区回收判定 5 垃圾收集算法介绍 5.1 分代…

    Java 2023年6月7日
    090
  • Java连载155-IO总结(二)

    一、四种方式分别举例 1.FileInputStream &#xA0;&#xA0;InputStream&#xA0;is&#xA0;=&#x…

    Java 2023年6月13日
    078
  • 清理忽略springboot控制台启动的banner和启动日志

    清理忽略springboot控制台启动的banner和启动日志 1、springboot的banner spring: main: banner-mode: off 2、mybat…

    Java 2023年6月9日
    080
  • Fuck Sharepoint 2013

    最近遇到一个貌似是bug的问题,每次点击页面的时候页面的地址多出一行/_layouts/15/start.aspx#/ 解决办法: 1,点击 设置到 网站设置 2,打开 网站集管理…

    Java 2023年6月7日
    079
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球