从零开始实现一个分布式RPC框架

该rpc框架是一个mini版的dubbo。学习rpc之前,建议先了解NIO,Netty和Dubbo等知识。请移步网络编程

前言:(借用阿里大佬的一段话)

为什么要自己写一个RPC框架,我觉得从个人成长上说,如果一个程序员能清楚的了解RPC框架所具备的要素,掌握RPC框架中涉及的服务注册发现、负载均衡、序列化协议、RPC通信协议、Socket通信、异步调用、熔断降级等技术,可以全方位的提升基本素质。虽然也有相关源码,但是只看源码容易眼高手低,动手写一个才是自己真正掌握这门技术的最优路径。

一.概述

什么是RPC?

  • 远程服务调用
  • 官方:一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的思想
  • 通俗一点:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。
  • 市面上常见的rpc框架:dobbo,springCloud,gRPC…

那为什么要有 RPC,HTTP 不好么?

  • 因为 RPC 和 HTTP 就不是一个层级的东西,所以严格意义上这两个没有可比性,也不应该来作比较。
  • HTTP 只是传输协议,协议只是规范了一定的交流格式
  • RPC 对比的是本地过程调用,是用来作为分布式系统之间的通信,它可以用 HTTP 来传输,也可以基于 TCP 自定义协议传输。
  • HTTP 协议比较冗余,所以 RPC 大多都是基于 TCP 自定义协议,定制化的才是最适合自己的。

项目总体结构

从零开始实现一个分布式RPC框架

整体架构

从零开始实现一个分布式RPC框架

接下来,分别解释上述的过程

二.自定义注解

服务的提供者和消费者公用一个接口,@ServiceExpose是为了暴露服务,放在生产者的某个实现类上;@ServiceReference是为了引用服务,放在消费者的需要注入的属性上。

  • Target:指定被修饰的Annotation可以放置的位置(被修饰的目标)
  • @Target(ElementType.TYPE) //接口、类
  • @Target(ElementType.FIELD) //属性
  • @Target(ElementType.METHOD) //方法
  • @Target(ElementType.PARAMETER) //方法参数
  • @Target(ElementType.CONSTRUCTOR) //构造函数
  • @Target(ElementType.LOCAL_VARIABLE) //局部变量
  • @Target(ElementType.ANNOTATION_TYPE) //注解
  • @Target(ElementType.PACKAGE) //包
  • Retention:定义注解的保留策略
  • @Retention(RetentionPolicy.SOURCE) //注解仅存在于源码中,在class字节码文件中不包含
  • @Retention(RetentionPolicy.CLASS) //默认的保留策略,注解会在class字节码文件中存在,但运行时无法获得
  • @Retention(RetentionPolicy.RUNTIME) //注解会在class字节码文件中存在,在运行时可以通过反射获取到
  • Documented:指定被修饰的该Annotation可以被javadoc工具提取成文档
  • Inherited:指定被修饰的Annotation将具有继承性

二.启动配置

主要是加载一些rpc相关的配置类,使用SpringBoot 自动装配。可以使用SPI机制加入一些自定义的类,放到指定文件夹中。

三.rpc接口注入/rpc服务扫描

这里主要就是通过反射获得对应注解的属性/类,进行服务暴露/服务引用。 这里需要关注的是什么时候进行服务暴露/引用?如下:

  • 客户端:一般有俩种方案

*
– 饿汉式:饿汉式是通过实现 Spring 的 InitializingBean接口中的 afterPropertiesSet方法,容器通过调用 ReferenceBeanafterPropertiesSet方法时引入服务。(在Spring启动时,给所有的属性注入实现类,包含远程和本地的实现类)
– 懒汉式:只有当这个服务被注入到其他类中时启动引入流程,也就是说用到了才会开始服务引入。
+ 在应用的Spring IOC 容器刷新完毕(spring Context初始化)之后,扫描所有的Bean,将Bean中带有@ServiceExpose/@ServiceReference注解的field获取到,然后创建field类型的代理对象,创建完成后,将代理对象set给此field。后续就通过该代理对象创建服务端连接,并发起调用。(dubbo默认)

  • 服务端:与懒汉式一样。

那么怎么知道Spring IOC刷新完成,这里就使用一个Spring提供的监听器,当Spring IOC刷新完成,就会触发监听器。

从零开始实现一个分布式RPC框架

四.服务注册到ZK/从Zk获得服务

Zookeeper采用节点树的数据模型,类似linux文件系统,/,/node1,/node2 比较简单。不懂Zookeeper请移步:Zookeeper原理

我们采用的是对每个服务名创建一个持久节点,服务注册时实际上就是在zookeeper中该持久节点下创建了一个临时节点,该临时节点存储了服务的IP、端口、序列化方式等。

从零开始实现一个分布式RPC框架

客户端获取服务时通过获取持久节点下的临时节点列表,解析服务地址数据:

从零开始实现一个分布式RPC框架

客户端监听服务变化:

从零开始实现一个分布式RPC框架

五.生成代理类对象

这里使用JDK的动态代理,也可以使用cglib或者Javassist(dobbo使用)。

public class ClientProxyFactory {
    /**
     * 获取代理对象,绑定 invoke 行为
     *
     * @param clazz 接口 class 对象
     * @param    类型
     * @return 代理对象
     */public  T getProxyInstance(Class clazz) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, new InvocationHandler() {
            final Random random = new Random();

            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                // 第一步:通过服务发现机制选择一个服务提供者暴露的服务
                String serviceName = clazz.getName();
                final List serviceInfos = serviceDiscovery.listServices(serviceName);
                logger.info("Rpc server instance list: {}", serviceInfos);
                if (CollectionUtils.isEmpty(serviceInfos)) {
                    throw new RpcException("No rpc servers found.");
                }

                // TODO: 这里模拟负载均衡,从多个服务提供者暴露的服务中随机挑选一个,后期写方法实现负载均衡
                final ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));

                // 第二步:构造 rpc 请求对象
                final RpcRequest rpcRequest = new RpcRequest();
                rpcRequest.setServiceName(serviceName);
                rpcRequest.setMethod(method.getName());
                rpcRequest.setParameterTypes(method.getParameterTypes());
                rpcRequest.setParameters(args);

                // 第三步:编码请求消息, TODO: 这里可以配置多种编码方式
                byte[] data = messageProtocol.marshallingReqMessage(rpcRequest);

                // 第四步:调用 rpc client 开始发送消息
                byte[] byteResponse = rpcClient.sendMessage(data, serviceInfo);

                // 第五步:解码响应消息
                final RpcResponse rpcResponse = messageProtocol.unmarshallingRespMessage(byteResponse);

                // 第六步:解析返回结果进行处理
                if (rpcResponse.getException() != null) {
                    throw rpcResponse.getException();
                }
                return rpcResponse.getRetValue();
            }
        });
    }
}

六.负载均衡

本实现支持两种主要负载均衡策略,随机和轮询,其中他们都支持带权重的随机和轮询,其实也就是四种策略。

七.Netty通信

服务端和客户端基本一样,这里只展示服务端的代码。代理对象在Spring启动的时候就生成了,但是没有调用,每一个调用(请求)都会生成一个Netty的连接。

public class NettyRpcServer extends RpcServer {

    @Override
    public void start() {
        // 创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创建服务端的启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap()
                    // 设置两个线程组
                    .group(bossGroup, workerGroup)
                    // 设置服务端通道实现类型
                    .channel(NioServerSocketChannel.class)
                    // 服务端用于接收进来的连接,也就是boosGroup线程, 线程队列大小
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // child 通道,worker 线程处理器
                    .childHandler(new ChannelInitializer() {
                        // 给 pipeline 管道设置自定义的处理器
                        @Override
                        public void initChannel(SocketChannel channel) {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            // 绑定端口号,同步启动服务
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            channel = channelFuture.channel();
            // 对关闭通道进行监听,变为同步
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            logger.error("server error.", e);
        } finally {
            // 释放线程组资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

实现具体handler

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //进行记录
        logger.info("channel active: {}", ctx);
    }

    //读取数据实际(这里我们可以读取客户端发送的消息)
    @Override
    public void channelRead(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
        //将数据读到buffer中
        final ByteBuf msgBuf = (ByteBuf) msg;
        final byte[] reqBytes = new byte[msgBuf.readableBytes()];
        msgBuf.readBytes(reqBytes);
    }

    //数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //使用反射获找到目标方法进行返回
        final byte[] respBytes = requestHandler.handleRequest(reqBytes);
        ctx.writeAndFlush(respBytes);
    }

    //处理异常, 一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

八.序列化协议

对计算机网络稍微有一点了解的同学都知道,数据在网络中传输是二进制的:01010101010101010,类似这种,只有二进制数据才能在网络中传输。但是在编码之前我们一般先进行序列化,目的是为了优化传输的数据量。因为有的数据太大,需要进行空间优化。

那么我们来区分一下序列化和编码:我画一张图大家都全明白了

从零开始实现一个分布式RPC框架

定义一个序列化协议,放入作为一个handler放入pipeline中。

Netty支持多种序列化,比如jdk,Json,ProtoBuf 等,这里使用ProtoBuf,其序列化后码流小性能高,非常适合RPC调用。接下来看怎么使用ProtoBuf?

  • 1.编写需要序列化的类xxx.proto :ProtoBuf有自己的语法规则(自行百度)

从零开始实现一个分布式RPC框架
  • 2.通过官网提供的protoc.exe生成对应的Java代码
  • 3.前面通过工具生成的代码(AnimalProto)已经帮我们封装好了序列化和反序列化的方法,我们只需要调用对应方法即可

引入Protobuf的依赖

com.google.protobuf
    protobuf-java
    2.4.1

序列化:

/**
 * 调用对象构造好的Builder,完成属性赋值和序列化操作
 * @return
 */
public static byte[] protobufSerializer(){
    AnimalProto.Animal.Builder builder = AnimalProto.Animal.newBuilder();
    builder.setId(1L);
    builder.setName("小猪");
    List actions = new ArrayList<>();
    actions.add("eat");
    actions.add("run");
    builder.addAllActions(actions);
    return builder.build().toByteArray();
}

反序列化:

/**
 * 通过调用parseFrom则完成反序列化
 * @param bytes
 * @return
 * @throws InvalidProtocolBufferException
 */
public static Animal deserialize(byte[] bytes) throws Exception {
    AnimalProto.Animal pAnimal = AnimalProto.Animal.parseFrom(bytes);
    Animal animal = new Animal();
    animal.setId(pAnimal.getId());
    animal.setName(pAnimal.getName());
    animal.setActions(pAnimal.getActionsList());
    return animal;
}

测试:

public static void main(String[] args) throws Exception {
    byte[] bytes = serializer();
    Animal animal = deserialize(bytes);
    System.out.println(animal);
}

以下看到是能正常序列化和反序列化的:

从零开始实现一个分布式RPC框架

九.通信协议

通信协议主要是解决网络传输问题,比如TCP拆包粘包问题。

TCP问题:

  • TCP拆包粘包主要就是把一些数据合并或者分割开进行发送,这时候有的数据就不完整,有的数据就多出一部分,就会造成问题。一般使用TCP协议都需要考虑拆包粘包问题
  • tcp粘包和半包问题就是因为滑动窗口。 因为不管你的数据是多少长度,怎么分割每一条数据。但是tcp只按照我滑动窗口的长度发送。
  • 本质是因为TCP是流式协议,消息无边界。

解决方案:业界的主流协议的解决方案可以归纳如下

  • 消息定长:例如每个报文的大小为固定长度100字节,如果不够用空格补足。(定长解码器)
    从零开始实现一个分布式RPC框架

从零开始实现一个分布式RPC框架
  • 消息长度+消息:将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段。
  • Netty自带:

从零开始实现一个分布式RPC框架

*
– 自定义编解码器

从零开始实现一个分布式RPC框架

这里只是列举出来编码过程,解码是逆过程。(说白了,编码就是找着固定的格式进行写入,解码就是照着固定的格式读)

从零开始实现一个分布式RPC框架

恭喜你,已经学会写RPC框架了,想深入了解的朋友可以参照源码。进行学习,升级。

该rpc最终打成一个Spring Boot starter,如果不会的请参照手写一个Spring Boot starter

寄语:生命只有一次,你要活得畅快淋漓

参考文章:

https://mp.weixin.qq.com/s/yaIOCfEigkQMm2kt6I7Orw

https://mp.weixin.qq.com/s/ltos1nEgktec5pn47xAgMw

Original: https://www.cnblogs.com/monkey-xuan/p/15893604.html
Author: 小猴子_X
Title: 从零开始实现一个分布式RPC框架

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/579534/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 日志技术简介

    java日志体系 日志的主流体系 日志的用途 软件开发中,我们经常需要去调试程序,做一些信息,状态的输出便于我们查询程序的运行状况。为了让我们能够更加灵活和方便的控制这些调试的信息…

    Java 2023年6月8日
    0121
  • Java之POI导出Excel(一):单sheet

    相信在大部分的web项目中都会有导出导入Excel的需求,今天我们就来看看如何用Java代码去实现 用POI导出Excel表格。 一、pom引用 pom文件中,添加以下依赖 查看代…

    Java 2023年6月15日
    067
  • Mybatis源码分析

    一、Mybatis的使用 创建maven工程。 添加maven依赖 org.mybatis mybatis 3.5.7 mysql mysql-connector-java 8.0…

    Java 2023年6月13日
    055
  • Spring Boot 入门(十三)使用Elasticsearch

    maven <dependency> <groupId>org.elasticsearch.clientgroupId> <artifactId…

    Java 2023年6月5日
    091
  • Springboot笔记

    SpringBoot HelloWorld 1.创建Meven工程 2.引入依赖 pom.xml org.springframework.boot spring-boot-star…

    Java 2023年6月16日
    0135
  • ssh 登陆远程nohup java 脚本无效,但设置生效后日志中文乱码

    该日志说到了两个问题,分别解答: (1)ssh 登陆远程调用含有nohup java的启动脚本失败 (2)ssh 登陆远程调用启动脚本生效后,但是日志中出现中文乱码 场景:jenk…

    Java 2023年5月29日
    075
  • Mybatis完整版详解

    一、简介 1.什么是MyBatis MyBatis 是一款优秀的持久层框架 它支持自定义 SQL、存储过程以及高级映射。 MyBatis 免除了几乎所有的 JDBC 代码以及设置参…

    Java 2023年6月14日
    065
  • MySQL的主从复制和分库分表初探

    主从复制 + 分库分表 要讲主从复制,首先来看看MySQL自带的日志文件。 日志 错误日志 错误日志是 MySQL 中最重要的日志之一,它记录了当 mysqld 启动和停止时,以及…

    Java 2023年6月15日
    082
  • C# 反射 操作列表类型属性

    本文介绍对列表进行创建及赋值的反射操作 我们现在有TestA、TestB类,TestA中有TestB类型列表的属性List,如下: 下面通过反射,给TestA.List进行赋值,o…

    Java 2023年5月30日
    0104
  • Java集合原理分析和知识点大杂烩(多图初学者必备!!)

    一、数据结构 ​ 数据结构就是计算机存储、组织数据的方式。 ​ 在计算机科学中,算法的时间复杂度是一个函数,它定性描述了该算法的运行时间,常用O符号来表述。​ 时间复杂度是同一问题…

    Java 2023年6月7日
    091
  • Jedis案例

    案例: 案例需求: 提供index.html页面,页面中有一个省份 下拉列表 当页面加载完成后 发送ajax请求,加载所有省份 代码实现: ProvinceDao package …

    Java 2023年6月6日
    084
  • CentOS 7安装Docker整理常规命令集合

    XShell CentOS 7 Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。 容器是…

    Java 2023年6月7日
    0129
  • java基础

    一:java反射机制的原理java反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法,对于任意一个对象,都能够调用它的任意一个方法,这种动态获取节点信息以及动…

    Java 2023年6月5日
    095
  • Vue

    Vue 的核心库只关注视图层,方便与 HTML + CSS + JS : 视图 : &#x7ED9;&#x7528;&#x6237;&#x770B;…

    Java 2023年6月7日
    096
  • Spring Boot 3.0.0 M3、2.7.0发布,2.5.x将停止维护

    昨晚(5月19日),Spring Boot官方发布了一系列Spring Boot的版本更新,其中包括: Spring Boot 3.0.0-M3 Spring Boot 2.7.0…

    Java 2023年6月9日
    0102
  • Java连载151-JUnit简介以及HashMap初步分析

    一、配置JUnit环境 JUnit是一个集成测试单元框架,我们先下载软件包,来配置环境 <span class="hljs-keyword">pac…

    Java 2023年6月13日
    0102
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球