Dubbo源码(七)-集群

前言

本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

集群(cluster)就是一组计算机,它们作为一个总体向用户提供一组网络资源。这些单个的计算机系统就是集群的节点(node)。

在Dubbo中,为了避免单点故障,同一个服务允许有多个服务提供者,也允许同时连接多个注册中心。那么,服务消费者引用服务时,该请求哪个注册中心的服务提供者以及调用失败之后该如何处理呢?这些就是Dubbo集群所做的事。

集群容错

在分析集群源码之前,先看看集群容错的所有组件,下图是官方文档的组件图

Dubbo源码(七)-集群

Dubbo 定义了集群接口 Cluster 以及 Cluster Invoker:

  • Cluster 是接口,其只有一个方法,负责生成Cluster Invoker
  • Cluster Invoker继承了Invoker接口,是一个 Invoker,是主要逻辑实现的地方

将上图从中间切分,可将集群工作过程分为两个阶段,左边为第一阶段。

  1. 第一个阶段是在服务消费者初始化期间。 集群 Cluster 实现类为服务消费者创建 Cluster Invoker,即图上的 merge 操作,也就是将多个服务提供者合并为一个 Cluster Invoker
  2. 第二个阶段是在服务消费者进行远程调用时。 步骤大体上就如图所示:list → route → select → invoke
  3. list:从服务目录拿到 invoker 集合
  4. route:通过路由过滤出符合规则的 invoker 集合
  5. select:通过负载均衡从 invoker 集合中选择一个
  6. invoke:执行 invoker 的 invoke 方法,进行真正的远程调用 其中,list、route操作在之前文章讲过了,传送门:《服务目录》《服务路由》 select 不是本文重点,后续负载均衡时讲解。

以上就是集群工作的整个流程,这里并没有介绍集群是如何容错的,也就是 invoke 步骤调用失败的处理。Dubbo提供了多种容错方式:集群容错示例

下面的源码我们以默认的 Failover Cluster – 失败自动切换 进行分析

源码分析

Cluster

首先来看看 Cluster 接口,这是一个自适应拓展类,默认实现为 FailoverCluster

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        return new FailoverClusterInvoker(directory);
    }

}

前面讲了,Cluster 的作用就是将多个服务提供者合并为一个 Cluster Invoker

多个服务提供者合并也就是 服务目录(Directory) 中的 invoker 集合。join 方法返回了一个 Cluster Invoker

接下来,我们看看调用路径。 Cluster 接口在多个地方被调用,我们看服务消费者初始化期间的调用。

// 调用路径如下:
// ReferenceBean#getObject()
// ReferenceConfig#get()
// ReferenceConfig#init()
// ReferenceConfig#createProxy(Map map)
// RegistryProtocol#refer(Class type, URL url)
// RegistryProtocol#doRefer(Cluster cluster, Registry registry, Class type, URL url)
private  Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
    // 创建 RegistryDirectory 实例
    RegistryDirectory directory = new RegistryDirectory(type, url);
    // 设置注册中心和协议
    directory.setRegistry(registry);
    directory.setProtocol(protocol);
    // all attributes of REFER_KEY
    Map parameters = new HashMap(directory.getUrl().getParameters());
    // 生成服务消费者链接
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    // 注册服务消费者,在 consumers 目录下新节点
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
        registry.register(registeredConsumerUrl);
        directory.setRegisteredConsumerUrl(registeredConsumerUrl);
    }

    // 订阅 providers、configurators、routers 等节点数据
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));

    // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
    Invoker invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

调用在倒数第三行代码。

如果看过我之前写的《服务引用》那篇文章,想必对 doRefer 方法不陌生了。在服务目录订阅完注册中心的数据后,就调用 join 方法生成 Cluster Invoker

啰嗦多一句:

可以这么理解,实际负责远程调用的,是服务目录中的 invoker 集合中的 invoker,而 Cluster Invoker 则对服务目录中的 invoker 集合进行处理。

Cluster Invoker

默认的 Cluster Invoker 是 FailoverClusterInvoker,既然是一个 Invoker,我们就从它的 invoke 方法入手。

AbstractClusterInvoker

invoke 方法在它的父类 AbstractClusterInvoker

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();
    LoadBalance loadbalance = null;

    // 绑定 attachments 到 invocation 中.

    Map contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }

    // 列举 Invoker
    List> invokers = list(invocation);
    if (invokers != null && !invokers.isEmpty()) {
        // 加载 LoadBalance
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    }
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 调用 doInvoke 进行后续操作
    return doInvoke(invocation, invokers, loadbalance);
}

invoke 方法逻辑也很简单:

  1. 列举 invoker
  2. 加载 LoadBalance(自适应拓展类)
  3. 调用 doInvoke 进行后续操作

其中列举 invoker 如下

protected List> list(Invocation invocation) throws RpcException {
    List> invokers = directory.list(invocation);
    return invokers;
}

list 方法就是调用服务目录的 list 方法,里面做了两件事(结合前面的组件图):

  • list:从服务目录拿到 invoker 集合
  • route:通过路由过滤出符合规则的 invoker 集合

FailoverClusterInvoker

doInvoke 方法具体实现在 FailoverClusterInvoker中,此 invoker 的容错方式为失败自动切换。

public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
    List> copyinvokers = invokers;
    checkInvokers(copyinvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    // 获取重试次数
    int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len > invoked = new ArrayList>(copyinvokers.size()); // invoked invokers.

    Set providers = new HashSet(len);
    for (int i = 0; i < len; i++) {
        if (i > 0) {
            checkWhetherDestroyed();
            // 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
            // 通过调用 list 可得到最新可用的 Invoker 列表
            copyinvokers = list(invocation);
            // check again
            // 对 copyinvokers 进行判空检查
            checkInvokers(copyinvokers, invocation);
        }
        // 通过负载均衡选择 Invoker
        Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
        // 添加到 invoker 到 invoked 列表中
        invoked.add(invoker);
        // 设置 invoked 到 RPC 上下文中
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            // 调用目标 Invoker 的 invoke 方法
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.

                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    // 若重试失败,则抛出异常
    throw new RpcException(xxx);
}

doInvoke 方法代码量不少,但是逻辑简化之后也很简单,就是根据重试次数,在 for 循环中进行远程调用,成功则返回,失败就重试。如果重试次数耗尽还无法调用成功,则抛出异常。

从这里可以知道,Dubbo的默认失败重试次数是3次。

此方法中我们关注下 select 方法,它负责从 invoker 集合中选出一个 infoker

protected Invoker select(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    // 获取调用方法名
    String methodName = invocation == null ? "" : invocation.getMethodName();

    // 获取 sticky 配置,sticky 表示粘滞连接。所谓粘滞连接是指让服务消费者尽可能的
    // 调用同一个服务提供者,除非该提供者挂了再进行切换
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    {
        // 检测 invokers 列表是否包含 stickyInvoker,如果不包含,
        // 说明 stickyInvoker 代表的服务提供者挂了,此时需要将其置空
        if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
            stickyInvoker = null;
        }
        // 在 sticky 为 true,且 stickyInvoker != null 的情况下。如果 selected 包含
        // stickyInvoker,表明 stickyInvoker 对应的服务提供者可能因网络原因未能成功提供服务。
        // 但是该提供者并没挂,此时 invokers 列表中仍存在该服务提供者对应的 Invoker。
        if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
            // availablecheck 表示是否开启了可用性检查,如果开启了,则调用 stickyInvoker 的
            // isAvailable 方法进行检查,如果检查通过,则直接返回 stickyInvoker。
            if (availablecheck && stickyInvoker.isAvailable()) {
                return stickyInvoker;
            }
        }
    }

    // 如果线程走到当前代码处,说明前面的 stickyInvoker 为空,或者不可用。
    // 此时继续调用 doSelect 选择 Invoker
    Invoker invoker = doSelect(loadbalance, invocation, invokers, selected);

    // 如果 sticky 为 true,则将负载均衡组件选出的 Invoker 赋值给 stickyInvoker
    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

select 方法主要处理对粘滞连接特性的支持。注释写的很清楚了。选择 invoker 的操作在 doSelect 方法

private Invoker doSelect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
    if (invokers == null || invokers.isEmpty())
        return null;
    if (invokers.size() == 1)
        return invokers.get(0);
    if (loadbalance == null) {
        // 如果 loadbalance 为空,这里通过 SPI 加载 Loadbalance,默认为 RandomLoadBalance
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // 通过负载均衡组件选择 Invoker
    Invoker invoker = loadbalance.select(invokers, getUrl(), invocation);

    // 如果 selected 包含负载均衡选择出的 Invoker,或者该 Invoker 无法经过可用性检查,此时进行重选
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // 进行重选
            Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rinvoker != null) {
                invoker = rinvoker;
            } else {
                // rinvoker 为空,定位 invoker 在 invokers 中的位置
                int index = invokers.indexOf(invoker);
                try {
                    // 获取 index + 1 位置处的 Invoker,以下代码等价于:
                    //     invoker = invokers.get((index + 1) % invokers.size());
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

这里通过负载均衡选出 invoker,如果 invoker 在 selected 中(就是在doInvoke方法中调用失败的invoker)或者不可用,则调用 reselect 方法进行重选。如果重选还是选不出 invoker,则返回 invoker 集合中的下一个元素。这里的繁琐判断,就是为了尽量保证拿到可用的 invoker

我们继续看看 reselect 方法

private Invoker reselect(LoadBalance loadbalance, Invocation invocation,
                                List> invokers, List> selected, boolean availablecheck)
            throws RpcException {

    List> reselectInvokers = new ArrayList>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

    // 下面的 if-else 分支逻辑有些冗余,pull request #2826 对这段代码进行了简化,可以参考一下
    // 根据 availablecheck 进行不同的处理
    if (availablecheck) { // invoker.isAvailable() should be checked
        for (Invoker invoker : invokers) {
            if (invoker.isAvailable()) {
                if (selected == null || !selected.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    } else { // do not check invoker.isAvailable()
        for (Invoker invoker : invokers) {
            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    }
    {
        // 若线程走到此处,说明 reselectInvokers 集合为空,此时不会调用负载均衡组件进行筛选。
        // 这里从 selected 列表中查找可用的 Invoker,并将其添加到 reselectInvokers 集合中
        if (selected != null) {
            for (Invoker invoker : selected) {
                if ((invoker.isAvailable()) // available first
                        && !reselectInvokers.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (!reselectInvokers.isEmpty()) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    }
    return null;
}

这个方法可以分成两部分:

  1. 在非 selected 的 invoker 集合中,调用负载均衡选择一个 invoker
  2. 在步骤1无法选出 invoker 时,在 selected 中选出 invoker

至此,Dubbo的集群就讲完了。负载均衡有空再说。

再论Cluster

前面我们提到 Cluster 接口在多个地方被调用,也讲了同一个服务有多个服务提供者时的处理。那么,有多个注册中心呢,该如何处理?

// 类ReferenceConfig
private T createProxy(Map map) {
    ......

    // 本地引用
    if (isJvmRefer) {
        ......

    // 远程引用
    } else {
        ......

        // 单个注册中心或服务提供者(服务直连,下同)
        if (urls.size() == 1) {
            // 调用 RegistryProtocol 的 refer 构建 Invoker 实例
            invoker = refprotocol.refer(interfaceClass, urls.get(0));

        // 多个注册中心或多个服务提供者,或者两者混合
        } else {
            List> invokers = new ArrayList>();
            URL registryURL = null;

            // 获取所有的 Invoker
            for (URL url : urls) {
                // 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
                // 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                // 如果注册中心链接不为空,则将使用 AvailableCluster
                URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                // 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    // 生成代理类
    return (T) proxyFactory.getProxy(invoker);
}

createProxy 是服务引用时,生成服务代理对象的方法。这里会判断,如果有多个注册中心,会再封装一层集群,也就是先选择注册中心,再选择服务提供者。

这里一般情况 registryURL 不为空,cluster 使用的是 AvailableCluster

public class AvailableCluster implements Cluster {
    public static final String NAME = "available";

    @Override
    public  Invoker join(Directory directory) throws RpcException {
        return new AbstractClusterInvoker(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker invoker : invokers) {
                    if (invoker.isAvailable()) {
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
    }
}

AvailableCluster的逻辑很简单,按顺序选择可使用的 invoker (这里的invoker其实就是每个注册中心)

总结

本篇文章介绍了Dubbo集群容错的整体工作过程和调用逻辑。Dubbo提供了多种集群实现,本文只介绍了Failover Cluster,其余实现感兴趣的可以自行查看源码。

参考资料

Dubbo开发指南

Original: https://www.cnblogs.com/konghuanxi/p/16572167.html
Author: 王谷雨
Title: Dubbo源码(七)-集群

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/599019/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 高并发组件了解

    消息队列 A服务和多个服务耦合,内部维护对多个服务发送数据的接口,那么这些接口如果有的挂了,有的不需要了,那么还得修改A内部的代码,如果使用MQ,A发送消息就好,不必考虑那么多事情…

    数据库 2023年6月16日
    055
  • 设计模式之享元模式

    一、享元模式模式:享元模式是实现对象重用的一种方式,适用于为了尽可能的减少对象的重复创建而增大资源开销的情况,与单例模式有类似的作用。 二、实现思路 :对象被第一次创建后,如果后续…

    数据库 2023年6月14日
    076
  • 2_爬豆瓣电影_ajax动态加载

    什么是 AJAX ? AJAX 是一种在无需重新加载整个网页的情况下,能够更新部分网页的技术。 AJAX = Asynchronous JavaScript and XML(AJA…

    数据库 2023年6月11日
    090
  • 工厂模式

    工厂模式是java中最常用的设计模式之一,这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。在工厂模式中,我们在创建对象时不会对客户端暴露创建逻辑,并且是通过使用一…

    数据库 2023年6月11日
    079
  • mysql

    mysql 1.1数据库 关系型数据库:数据存储在硬盘上 [En] Relational database: the data is stored in the hard disk…

    数据库 2023年5月24日
    081
  • idea热部署

    idea热部署 一、修改 pom.xml 文件 修改配置文件 二、打开自动build: File -> Settings -> Build,Exe… -&g…

    数据库 2023年6月16日
    081
  • 从SQL Server到MySQL,携程核心系统无感迁移实战

    前言 携程酒店订单系统的存储设计从1999年收录第一单以来,已经完成了从单一SQLServer数据库到多IDC容灾、完成分库分表等多个阶段,在见证了大量业务奇迹的同时,也开始逐渐暴…

    数据库 2023年5月24日
    073
  • 数据库中异常与隔离级别

    数据库相对于其它存储软件一个核心的特征是它支持事务,所谓事务的ACID就是原子性,一致性,隔离性和持久性。其中原子性,一致性,持久性更多是关注单个事务本身,比如,原子性要求事务中的…

    数据库 2023年6月9日
    069
  • Golang context

    Context Go 语言中提供了 context 包,通过显示传递 context, 实现请求级别的元数据、取消信号、终止信号的传递。context 包提供了从现有的上下文值(c…

    数据库 2023年6月16日
    077
  • 23种设计模式之访问者模式(Visitor Pattern)

    文章目录 概述 访问者模式的优缺点 访问者模式的使用场景 访问者模式的结构和实现 * 模式结构 模式实现 总结 概述 访问者模式把数据结构和作用于结构上的操作解耦合,使得操作集合可…

    数据库 2023年6月6日
    081
  • 强烈推荐一款优秀且通用的后台管理系统

    最近看到一款优秀的通用管理后台——likeadmin,推荐给大家。likeadmin的部署方式简单,界面美观,基于MIT协议,完全免费,非常值得一用。 likeadmin快速开发通…

    数据库 2023年6月14日
    083
  • 如何干涉MySQL优化器使用hash join

    GreatSQL社区原创内容未经授权不得随意使用,转载请联系小编并注明来源。 GreatSQL是MySQL的国产分支版本,使用上与MySQL一致。 前言 实验 总结 前言 数据库的…

    数据库 2023年6月11日
    082
  • 第十四章 静态代理设计模式

    1.为什么需要代理设计模式 1.1 问题 在javaEE分层开发中,那个层对我们最重要? DAO–>Service–>Controller 在javaEE分层开发中…

    数据库 2023年6月14日
    074
  • MySQL实战45讲 18

    18 | 为什么这些SQL语句逻辑相同,性能却差异巨大? 在 MySQL 中,有很多看上去逻辑相同,但性能却差异巨大的 SQL 语句。对这些语句使用不当的话,就会不经意间导致整个数…

    数据库 2023年6月14日
    071
  • 生成随机数的若干种方法

    背景: 创建账户时我们需要配置初始随机密码,使用手机号注册时需要随机验证码,抽奖活动需要随机点名,俄罗斯方块游戏需要随机出形状。这些案例都在说明一个问题,随机数据很重要!而在 Sh…

    数据库 2023年6月14日
    051
  • 数据结构堆

    引用 [](https://zh.wikipedia.org/wiki/%E5%A0%86%E7%A9%8D “维基百科堆结构”) [堆结构](https:…

    数据库 2023年6月9日
    085
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球