Netty源码解读(二)-服务端源码讲解

简单Echo案例

代码是netty的源码,我添加了自己理解的中文注释。

了解了Netty的线程模型和组件之后,我们先看看如何写一个简单的Echo案例,后续的源码讲解都基于此案例。以下是服务端的代码:

public final class MyEchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final MyEchoServerHandler serverHandler = new MyEchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    // 说明服务器端通道的实现类(便于 Netty 做反射处理)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    // 对服务端的 NioServerSocketChannel 添加 Handler
                    // LoggingHandler 是 netty 内置的一种 ChannelDuplexHandler,
                    // 既可以处理出站事件,又可以处理入站事件,即 LoggingHandler
                    // 既记录出站日志又记录入站日志。
                    .handler(new LoggingHandler(LogLevel.INFO))
                    // 对服务端接收到的、与客户端之间建立的 SocketChannel 添加 Handler
                    .childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(serverHandler);
                        }
                    });

            // Start the server.

            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.

            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.

            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

EventLoopGroup的创建与初始化

对应代码

EventLoopGroup bossGroup = new NioEventLoopGroup();

跟踪 NioEventLoopGroup的无参构造

NioEventLoopGroup()
-->
NioEventLoopGroup(int nThreads)
-->
NioEventLoopGroup(int nThreads, Executor executor)
-->
NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider)
-->
NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory)
-->
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里能看到,如果构造传入的线程数为0,则使用DEFAULT_EVENT_LOOP_THREADS

值为系统变量 io.netty.eventLoopThreads,没有环境变量就取cpu逻辑线程数*2

例如我的电脑为8核16线程,nThreads = 16 * 2

继续跟踪代码,以下代码有部分省略

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    // 检查线程数量不能小于1
    checkPositive(nThreads, "nThreads");

    // 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 创建EventLoop(重点)
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            。。。。。。
        }
    }

    // chooser 的作用是为了实现 next()方法,即从 EventLoopGroup 中挑选
    // 一个 NioEventLoop 来处理连接上 IO 事件的方法
    chooser = chooserFactory.newChooser(children);

    。。。。。。
}

ThreadPerTaskExecutor很简单,实现了Executor接口

public final class ThreadPerTaskExecutor implements Executor {
    。。。。。。
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

这意味着每次执行executor.execute方法,都会开启一个线程。

EventLoop的创建是在newChild中

// 类NioEventLoopGroup
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // selector工厂
    SelectorProvider selectorProvider = (SelectorProvider) args[0];
    // 选择策略工厂
    SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
    // 拒绝执行处理器(任务添加到队列中失败时调用)
    RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
    EventLoopTaskQueueFactory taskQueueFactory = null;
    EventLoopTaskQueueFactory tailTaskQueueFactory = null;

    int argsLength = args.length;
    if (argsLength > 3) {
        taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
    }
    if (argsLength > 4) {
        tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
    }
    // 创建NioEventLoop并返回
    return new NioEventLoop(this, executor, selectorProvider,
            selectStrategyFactory.newSelectStrategy(),
            rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}

NioEventLoopGroup的创建,初始化了selector工厂,选择策略,拒绝执行处理器等。

并创建了同样线程数的NioEventLoop

服务端引导类 ServerBootstrap的创建与设置

对应代码

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)......

public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
    return this;
}

提供了两个设置EventLoopGroup的方法,也就是parentGroup和childGroup可以是同一个group,

而parentGroup对应线程图中的bossGroup,childGroup对应线程图中的workerGroup

public B channel(Class channelClass) {
    return channelFactory(new (
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

这里设置的是channel反射工厂,该工厂会使用反射生成 NioServerSocketChannel对象。

创建并绑定channel

对应代码

ChannelFuture f = b.bind(PORT).sync();

准确点说,是负责创建连接(ACCEPT)的channel的创建

AbstractBootstrap#bind(int inetPort) -->
AbstractBootstrap#bind(SocketAddress localAddress) -->
private ChannelFuture doBind(final SocketAddress localAddress) {
    // 初始化 NioServerSocketChannel 的实例,并且将其注册到
    // bossGroup 中的 EvenLoop 中的 Selector 中,initAndRegister()

    // 实例的初始化和注册(此方法是异步的):
    // (1) 初始化:将handler注册进通道,并执行handler的handlerAdded、channelRegistered方法
    // (2) 将channel注册进selector
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.

        // 若异步过程 initAndRegister()已经执行完毕,则进入该分支
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.

        // 若异步过程 initAndRegister()还未执行完毕,则进入该分支
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            // 监听 regFuture 的完成事件,完成之后再调用
            // doBind0(regFuture, channel, localAddress, promise);
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.

                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.

                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

initAndRegister方法中主要做了3个操作,channel的创建、初始化以及将channel注册到EventLoop中

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        // 无参构造会创建pipelile
        // NioServerSocketChannel
        channel = channelFactory.newChannel();
        // 初始化相关属性
        // 如果是ServerBoottrap,还会设置bossGroup的handler,
        // 其中包括ServerBootstrap.handler设置的handler,以及最后添加ServerBootstrapAcceptor
        // ServerBootstrapAcceptor就是将channel注册到workerGroup的类
        init(channel);
    } catch (Throwable t) {
        。。。。。。
    }
    // 将channel注册进selector(监听ACCEPT事件)
    // 依然是通过开启eventLoop线程的方式进行注册
    // MultithreadEventLoopGroup
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

重新回到AbstractBootstrap#doBind方法中

// 如果上面的initAndRegister方法执行完毕(异步执行的),则执行doBind0
if (regFuture.isDone()) {
    // At this point we know that the registration was complete and successful.

    // 若异步过程 initAndRegister()已经执行完毕,则进入该分支
    ChannelPromise promise = channel.newPromise();
    doBind0(regFuture, channel, localAddress, promise);
    return promise;
} else {
    // Registration future is almost always fulfilled already, but just in case it's not.

    // 若异步过程 initAndRegister()还未执行完毕,则进入该分支
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
        // 监听 regFuture 的完成事件,完成之后再调用
        // doBind0(regFuture, channel, localAddress, promise);
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.

                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.

                // See https://github.com/netty/netty/issues/2586
                promise.registered();

                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
}

上面这段if/esle做了同一件是,就是自行doBind0方法,区别在于如果initAndRegister执行完毕,则执行调用doBind0,否则添加监听器,等执行完成触发调用doBind0

继续看doBind0

// 类AbstractBootstrap
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.

    // execute方法会将这个Runnable加入到taskQueue中,并开线程执行EventLoop的run方法(死循环)
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

channel.eventLoop().execute这个后面再说,可以看到,里面的逻辑是调用channel.bind在实现绑定的,继续跟踪

AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)
-->
// 类AbstractChannel
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    // tail就是TailContext
    return tail.bind(localAddress, promise);
}
-->
// 类AbstractChannel
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }
    // 在管道中从当前handlerContext往前查找实现了bind方法的handlerContext
    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 执行handlerContext的bind方法
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}
-->
// 类AbstractChannel
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

从上面可以看到,最终会执行handler的bind方法,拿 LoggingHandler的bind方法举例

// 类LoggingHandler
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
    if (logger.isEnabled(internalLevel)) {
        logger.log(internalLevel, format(ctx, "BIND", localAddress));
    }
    ctx.bind(localAddress, promise);
}

ctx.bind(localAddress, promise)是不是很眼熟,没错,就是AbstractChannel#bind(final SocketAddress localAddress, final ChannelPromise promise)

就像一个循环,每一次都在当前handlerContext往前找有实现了bind方法的handlerContext,执行bind,然后继续往前找。

最终找到管道中的第一个handler,也就是 HeadContext,看看它实现的bind方法

// 类HeadContext
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
-->
AbstractChannel#bind(final SocketAddress localAddress, final ChannelPromise promise)
-->
NioServerSocketChannel#doBind(SocketAddress localAddress)

最后,还是Java NIO的API来绑定

参考资料:

《Netty in Action》,Norman Maurer

《Scalable IO in Java》,Doug Lea

Original: https://www.cnblogs.com/konghuanxi/p/16381317.html
Author: 王谷雨
Title: Netty源码解读(二)-服务端源码讲解

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/583264/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • oracle 触发器trigger(主键自增长)

    触发器我们也可以认为是存储过程,是一种特殊的存储过程。 存储过程:有输入参数和输出参数,定义之后需要调用 触发器:没有输入参数和输出参数,定义之后无需调用,在 适当的时候会自动执行…

    Linux 2023年6月7日
    080
  • 【微信篇】电脑版微信的照片视频文件位置变化

    新版的微信视频图片更新了位置,感觉有好有坏吧,好的方面就是以后查找视频、图片、文档等可能更方便;不好就是越更新占用体积越大,还多很多数据,不懂是否流氓?!—【蘇小沐】 …

    Linux 2023年6月13日
    0117
  • 1:文件与目录

    CD 切换当前工作目录 mkdir 创建目录 re -dir 删除目录 pwd 打印当前工作目录 绝对路径和相对路径 硬链接 和软链接 CP拷贝 MV 移动 dirname 和 b…

    Linux 2023年6月7日
    0135
  • 试吃香甜可口的《程序员面试指南》

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Linux 2023年6月6日
    069
  • 如何逃离框架孤井?

    前言 前面我发过一篇文章,脱离了Spring询问大家能不能继续开发,结果文章下面的评论和回复都告诉我大家伙的基础打得很牢固,该咋写还是咋写。看得我在这内卷的时代瞬间躺平。 那么今天…

    Linux 2023年6月13日
    091
  • Zookeeper集群搭建及原理

    1 概述 1.1 简介 ZooKeeper 是 Apache 的一个顶级项目,为分布式应用提供高效、高可用的分布式协调服务,提供了诸如数据发布/订阅、负载均衡、命名服务、分布式协调…

    Linux 2023年6月13日
    082
  • 如何提高团队开发质量

    年轻的时候去面过一个相对于当时我的比较高端的管理岗位,当时的我情况是,开发经验相对丰富, 但管理经验还欠缺。对方当时面临一个具体的问题。 “我们最近生产上,出现了一个比…

    Linux 2023年6月13日
    074
  • WPF 制作一个占用文件的测试工具

    我在开发软件进行测试时,需要测试拖入的文件被占用时软件的行为,于是就做了一个文件占用工具,此工具可以将某个文件进行占用,以及获取某个文件被哪个进程占用 先给大家看一下效果: 以上是…

    Linux 2023年6月6日
    0113
  • Harbor部署

    harbor 无论是使用Docker-distribution去自建仓库,还是通过官方镜像跑容器的方式去自建仓库,通过前面的演示我们可以发现其是非常的简陋的,还不如直接使用官方的D…

    Linux 2023年6月7日
    095
  • 2017年腾讯 秋招软件开发笔试编程题回忆版

    2017 年腾讯 秋招软件开发笔试编程题回忆版 (所有题目大致描述如下,并非完整的题目回忆,但意思大致一样) 1、又一个魔法城市,城市里面有n个魔法城堡,序号为0,1,2。。。n-…

    Linux 2023年6月6日
    092
  • MAC安装redis

    一、安装命令使用mac的包管理工具brew一行命令搞定安装。若未安装brew,命令行先输入以下命令安装brew。 /usr/bin/ruby -e “$(curl -f…

    Linux 2023年5月28日
    088
  • podman对容器映像签名和分发

    熟悉podman 如何使用 Podman 对容器映像进行签名和分发 熟悉podman 此示例容器将运行一个非常基本的 httpd 服务器,该服务器仅为其索引页提供服务 [root@…

    Linux 2023年6月7日
    092
  • 个人的游戏紧张程度排行

    玩游戏是为了放松,适当的紧张刺激能让人兴奋愉悦,但如果过度紧张就会适得其反,不仅达不到放松和休息的效果,甚至还可能会损害健康。所以本人将自己常玩的网游和游戏总结了一下,按从低到高的…

    Linux 2023年6月6日
    096
  • Mysql数据库体系

    Mysql数据库体系如下(手绘): 描述: 1.DBMS:database system management是数据库管理软件,平时我们使用的数据库的全称,是C/S架构(clien…

    Linux 2023年6月14日
    091
  • centos7安装zabbix-agent客户端

    1.官方下载zabbix-agent安装包链接:wget https://cdn.zabbix.com/zabbix/binaries/stable/5.4/5.4.6/zabbi…

    Linux 2023年6月13日
    0119
  • 启动springboot项目很慢的解决方案-InetAddress.getLocalHost().getHostName()

    https://blog.csdn.net/qq_39595769/article/details/119573111 Original: https://www.cnblogs….

    Linux 2023年6月13日
    084
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球