Dubbo源码(五)-服务目录

前言

本文基于Dubbo2.6.x版本,中文注释版源码已上传github:xiaoguyu/dubbo

今天,来聊聊Dubbo的 服务目录(Directory)。下面是官方文档对服务目录的定义:

服务目录中存储了一些和服务提供者有关的信息,通过服务目录,服务消费者可获取到服务提供者的信息,比如 ip、端口、服务协议等。

服务目录持有 Invoker对象集合,Dubbo的服务调用均由 Invoker发起。

当服务提供者信息发生变化时(比如某一个服务挂了),服务目录也需要动态调整。

继承体系

Dubbo源码(五)-服务目录

服务目录目前内置的实现有两个,分别为 StaticDirectory 和 RegistryDirectory。它们均继承自AbstractDirectory,而 AbstractDirectory 实现了 Directory 接口。Directory 接口提供了list(Invocation invocation) 方法,这个方法就是用来获取 invoker 集合的。

再看 RegistryDirectory 实现了 NotifyListener 接口,这个接口中只有一个方法,notify(List urls),当注册中心节点信息发生变化后,触发此方法调整服务目录中的配置信息以及 invoker 集合。

源码分析

上面我们讲了,服务调用需求用到 invoker,而服务目录持有 invoker 集合,并通过 list 方法提供 invoker。下面放上 服务消费者Demo中DemoService#sayHello 方法的调用路径

Dubbo源码(五)-服务目录

AbstractDirectory 实现了 Directory 接口的 list 方法

public List> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    // 调用 doList 方法列举 Invoker,doList 是模板方法,由子类实现
    List> invokers = doList(invocation);
    // 获取路由 Router 列表
    List localRouters = this.routers; // local reference
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {
            try {
                // 获取 runtime 参数,并根据参数决定是否进行路由
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    // 进行服务路由
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

此方法就两段逻辑:

  1. 通过 doList 获取 invoker 集合
  2. 通过路由选择合适的 invoker

路由非本文重点,略过。

doList 是模板方法,由子类实现。

StaticDirectory

StaticDirectory 是一个静态服务目录,其 invokers 集合通过构造方法注入,不应被改变。

// StaticDirectory的doList啥都没做,直接返回持有的invokers
protected List> doList(Invocation invocation) throws RpcException {
    // 列举 Inovker,也就是直接返回 invokers 成员变量
    return invokers;
}

StaticDirectory 的其它方法就不分析了,同样很简单。

RegistryDirectory

RegistryDirectory 是动态调整的服务目录,其持有的 invokers 有内部方法生成。

订阅节点

在上篇博文《Dubbo源码(四) – 服务引用(消费者)》中,我留了一个坑,也就是服务引用过程中,创建了注册中心之后,如何订阅节点数据。在 RegistryProtocol#doRefer方法中。

其中调用了 RegistryDirectory#subscribe(URL url)方法

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}

我们用的注册中心是 zookeeper,所以 registry 是 ZookeeperRegistry,而 subscribe 方法的实现在其父类 FailbackRegistry

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // Sending a subscription request to the server side
        doSubscribe(url, listener);
    } catch (Exception e) {
        ......

        // 订阅失败处理
        addFailedSubscribed(url, listener);
    }
}

模板方法,调用子类的 doSubscribe 方法

protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            ...

        } else {
            List urls = new ArrayList();
            // 切割路径(providers、configurators、routers等)
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap());
                    listeners = zkListeners.get(url);
                }
                // 缓存操作,获取节点监听器
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List currentChilds) {
                            // 这里和方法末尾的 notify(url, listener, urls); 是调用的同一个方法
                            // 节点变更时触发变更操作
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                // 注册节点监听器
                List children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 触发节点变更操作
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

订阅方法做了3个操作:

  1. 切割url,拆分订阅路径
  2. 创建节点监听器
  3. 触发节点变更操作

这里注意下,订阅时节点数据并没有发生变更,所以需要手动触发 notify 方法。

下面继续看节点变更操作做了什么,调用路径有点深,就不一步一步调试了,直接把路径写在注释上。

// FailbackRegistry#notify(URL url, NotifyListener listener, List urls) ->
// FailbackRegistry#doNotify(URL url, NotifyListener listener, List urls) ->
// AbstractRegistry#notify(URL url, NotifyListener listener, List urls)
protected void notify(URL url, NotifyListener listener, List urls) {
    ......

    Map> result = new HashMap>();
    // 将urls按分类分组转成map
    ......

    for (Map.Entry> entry : result.entrySet()) {
        String category = entry.getKey();
        List categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        saveProperties(url);
        listener.notify(categoryList);
    }
}

此处的 listener变量,就是本节的主角 RegistryDirectory,下面来分析 listener.notify(categoryList)

public synchronized void notify(List urls) {
    // 定义三个集合,分别用于存放服务提供者 url,路由 url,配置器 url
    List invokerUrls = new ArrayList();
    List routerUrls = new ArrayList();
    List configuratorUrls = new ArrayList();
    // 根据 category 参数分别对3种url进行处理
    ......

    // 刷新 Invoker 列表
    refreshInvoker(invokerUrls);
}

此方法分别对服务提供者 url,路由 url,配置器 url各自进行了处理,这里我省略了对路由 url 和配置器 url 的处理,感兴趣的自行去看源码。咱们聚焦在 Invoker 的处理中

private void refreshInvoker(List invokerUrls) {
    // invokerUrls 仅有一个元素,且 url 协议头为 empty,此时表示禁用所有服务
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // 设置 forbidden 为 true
        this.forbidden = true; // Forbid to access
        this.methodInvokerMap = null; // Set the method invoker map to null
        // 销毁所有 Invoker
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            // 添加缓存 url 到 invokerUrls 中
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet();
            // 缓存 invokerUrls
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 将 url 转成 Invoker
        Map> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        // 将 newUrlInvokerMap 转成方法名到 Invoker 列表的映射
        Map>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
        // state change
        // If the calculation is wrong, it is not processed.

        // 转换出错,直接打印异常,并返回
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        // 合并多个组的 Invoker
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            // 销毁无用 Invoker
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

此方法中的逻辑有点多,

  1. 判断是否要销毁所有 invoker
  2. 创建 invoker
  3. 处理映射
  4. 销毁无用 invoker

我们关注下 invoker 的创建,toInvokers(invokerUrls)

private Map> toInvokers(List urls) {
    ......

    // 获取服务消费端配置的协议
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        ......

        // 将本地 Invoker 缓存赋值给 localUrlInvokerMap
        Map> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    // 获取 disable 配置,取反,然后赋值给 enable 变量
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    // 获取 enable 配置,并赋值给 enable 变量
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    // 调用 refer 获取 Invoker
                    invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // Put new invoker in cache
                // 缓存 Invoker 实例
                newUrlInvokerMap.put(key, invoker);
            }
            // 缓存命中
        } else {
            // 将 invoker 存储到 newUrlInvokerMap 中
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

这里的判断有点复杂,会对协议各种判断(是否支持、是否为empty)等,然后如果缓存未命中,则需要创建invoker,也就是 protocol.refer(serviceType, url)这一段代码。

此时,我们上一篇文章留下的另一个坑也填上了,也就是 DubboProtocol#refer的调用时机。

获取invoker集合

public List> doList(Invocation invocation) {
    ......

    List> invokers = null;
    // 获取 Invoker 本地缓存
    Map>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        // 获取方法名和参数列表
        String methodName = RpcUtils.getMethodName(invocation);
        Object[] args = RpcUtils.getArguments(invocation);
        // 检测参数列表的第一个参数是否为 String 或 enum 类型
        if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            // 通过 方法名 + 第一个参数名称 查询 Invoker 列表,具体的使用场景暂时没想到
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
        }
        if (invokers == null) {
            // 通过方法名获取 Invoker 列表
            invokers = localMethodInvokerMap.get(methodName);
        }
        if (invokers == null) {
            // 通过星号 * 获取 Invoker 列表
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }

        // 冗余逻辑,pull request #2861 移除了下面的 if 分支代码
        if (invokers == null) {
            Iterator>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    // 返回 Invoker 列表
    return invokers == null ? new ArrayList>(0) : invokers;
}

这里的逻辑也很简单,就是从类变量 methodInvokerMap 中获取invoker,所有我们需要去看看 methodInvokerMap 的赋值。

我们在上一小节的 refreshInvoker 方法中,讲了 invoker 的生成。refreshInvoker 方法中还有对methodInvokerMap 的处理。也就是 toMethodInvokers(newUrlInvokerMap) 方法

这里面会将 url-invoker 的映射转成 方法名-invoker 的映射。

总结

Dubbo的服务调用,需要通过服务目录拿到 invoker 才能发起。当注册中心发生变化时,服务目录同样需要动态调整,并刷新持有的 invoker 集合。服务目录是 Dubbo 集群容错的一部分,也是比较基础的部分。

PS:以上讲的不包含本地服务调用,别杠

参考资料

Dubbo开发指南

Original: https://www.cnblogs.com/konghuanxi/p/16531775.html
Author: 王谷雨
Title: Dubbo源码(五)-服务目录

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/599025/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • SQL语言的总结

    SQL语言分类:1.数据查询语言(DQL:Data Query Language),也称为”数据检索语句”,用以从表中查询获得数据,常用关键字SELECT …

    数据库 2023年6月16日
    084
  • 雷军传-怀揣梦想,砥砺前行

    最近几天看完了一本书,是一本个人传记–《雷军传-站在风口上》,我总结为”怀揣梦想,砥砺前行”。 其实在我高中时期就已经把雷军视为偶像,只不过当时…

    数据库 2023年6月11日
    077
  • go interface{}使用

    先上代码 为什么会报错? 因为空接口拥有两个指针,内存布局上会占用两个机器字长。 对于长度为n的空接口切片而言,它的每个元素都是以2机器字长为单位的连续空间,因此总共会占用 2n个…

    数据库 2023年6月9日
    066
  • MySQL实战45讲 17

    17 | 如何正确地显示随机消息? 场景:从一个单词表中随机选出三个单词。 表的建表语句和初始数据的命令如下,在这个表里面插入了 10000 行记录: CREATE TABLE w…

    数据库 2023年6月14日
    054
  • MySQL索引(一)

    一、索引概念 二、索引类型 (一)业务逻辑分类 1、NORMAL – 普通索引 2、UNIQUE – 唯一索引 3、PRIMARY KEY – …

    数据库 2023年6月16日
    083
  • 刚入职没多久,连夜手写了一个代码生成器,项目开发速度瞬间屌炸了!

    一、简介 最近刚入职一个新团队,还没来得及熟悉业务,甲方爸爸就要求项目要在2个月内完成开发并上线! 本想着往后推迟1个月在交付,但是甲方爸爸不同意,只能赶鸭子上架了! 然后根据业务…

    数据库 2023年6月14日
    095
  • 关于在linux上部署.netcore项目,只能Linux访问,不能外部主机访问的问题

    在我们在Linux上部署完.netcore项目之后,是进入到部署项目的文件夹之下启动项目,比如我的就是在www/core文件夹下。 首先cd 之后我们直接启动项目 之后我们在win…

    数据库 2023年6月11日
    0161
  • 设计模式之建造者模式

    一、建造者模式:如果创建某个对象要经过多个组件组装才能完成,我们可以设计一个充当建造者角色的类和一个充当指挥者的类,通过指挥者控制建造者按步骤组装需要创建的对象,这样客户端就只依赖…

    数据库 2023年6月14日
    062
  • Golang实现set

    Golang语言本身未实现set,但是实现了map golang的map是一种无序的键值对的集合,其中键是唯一的 而set是键的不重复的集合,因此可以用map来实现set 由于ma…

    数据库 2023年6月14日
    060
  • maven项目编译报错处理

    1、问题一: [ERROR] Failed to execute goal on project data-common:Could not resolve dependencie…

    数据库 2023年6月11日
    085
  • Intellij IDEA个人常用快捷键

    分享一下个人常用快捷键。 说明:字母排序规则遵循字母表(a->z) 快捷键 介绍 ctrl+b 快速打开当前光标处的类或方法 ctrl+d 复制当前光标所在行至下一行 ctr…

    数据库 2023年6月14日
    074
  • Binlog分析利器-binlog_summary.py

    ​Binlog中,除了具体的SQL,其实,还包含了很多有价值的信息,如, 拿到上面这些信息,我们可以做哪些事情呢? 开发了一个简单的Binlog分析工具-binlog_summar…

    数据库 2023年6月11日
    089
  • Linux(CentOS)安装Redis保姆级教程

    Linux(CentOs)安装Redis教程 一,下载Redis(两种方式) 1,找到redis官网(https://redis.io/download ) 如果想下载指定版本就去…

    数据库 2023年6月11日
    078
  • 在CentOS 7系统安装StoneDB数据库

    今天我会进行StoneDB数据库在CentOS 7系统下的安装。 在官方的快速部署文档中有详细的安装流程,我会严格遵循流程。 [En] There is a detailed in…

    数据库 2023年5月24日
    074
  • 重写Feign编码器

    有个spring cloud 架构的项目需要调用php小组的api接口,但php提供的接口入参大部分是下划线命名,而Java这边的实体类是按照驼峰编写,如果使用Fegin调用会导致…

    数据库 2023年6月6日
    079
  • MyBatis-Plus入门教程及基本API使用案例

    一、MyBatisPlus简介 1. 入门案例 问题导入 MyBatisPlus环境搭建的步骤? 1.1 SpringBoot整合MyBatisPlus入门程序 ①:创建新模块,选…

    数据库 2023年5月24日
    094
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球