聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

本系列Netty源码解析文章基于 4.1.56.Final版本

在上篇文章《聊聊Netty那些事儿之从内核角度看IO模型》中我们花了大量的篇幅来从内核角度详细讲述了五种 IO模型的演进过程以及 ReactorIO线程模型的底层基石IO多路复用技术在内核中的实现原理。

最后我们引出了netty中使用的主从Reactor IO线程模型。

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

通过上篇文章的介绍,我们已经清楚了在IO调用的过程中内核帮我们搞了哪些事情,那么俗话说的好 内核领进门,修行在netty,netty在用户空间又帮我们搞了哪些事情?

那么从本文开始,笔者将从源码角度来带大家看下上图中的 Reactor IO线程模型在Netty中是如何实现的。

本文作为Reactor在Netty中实现系列文章中的开篇文章,笔者先来为大家介绍Reactor的骨架是如何创建出来的。

在上篇文章中我们提到Netty采用的是 主从Reactor多线程的模型,但是它在实现上又与 Doug LeaScalable IO in Java论文中提到的经典 主从Reactor多线程模型有所差异。

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

Netty中的 Reactor是以 Group的形式出现的, 主从Reactor在Netty中就是 主从Reactor组,每个 Reactor Group中会有多个 Reactor用来执行具体的 IO任务。当然在netty中 Reactor不只用来执行 IO任务,这个我们后面再说。

  • Main Reactor Group中的 Reactor数量取决于服务端要监听的端口个数,通常我们的服务端程序只会监听一个端口,所以 Main Reactor Group只会有一个 Main Reactor线程来处理最重要的事情: 绑定端口地址接收客户端连接为客户端创建对应的SocketChannel将客户端SocketChannel分配给一个固定的Sub Reactor。也就是上篇文章笔者为大家举的例子,饭店最重要的工作就是先把客人迎接进来。 *“我家大门常打开,开放怀抱等你,拥抱过就有了默契你会爱上这里……”

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)
  • Sub Reactor Group里有多个 Reactor线程, Reactor线程的个数可以通过系统参数 -D io.netty.eventLoopThreads指定。默认的 Reactor的个数为 CPU核数 * 2Sub Reactor线程主要用来 轮询客户端SocketChannel上的IO就绪事件处理IO就绪事件执行异步任务Sub Reactor Group做的事情就是上篇饭店例子中服务员的工作,客人进来了要为客人分配座位,端茶送水,做菜上菜。 *“不管远近都是客人,请不用客气,相约好了在一起,我们欢迎您……”

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

一个 客户端SocketChannel只能分配给一个固定的 Sub Reactor。一个 Sub Reactor负责处理多个 客户端SocketChannel,这样可以将服务端承载的 全量客户端连接分摊到多个 Sub Reactor中处理,同时也能保证 客户端SocketChannel上的IO处理的线程安全性

由于文章篇幅的关系,作为Reactor在netty中实现的第一篇我们主要来介绍 主从Reactor Group的创建流程,骨架脉络先搭好。

下面我们来看一段Netty服务端代码的编写模板,从代码模板的流程中我们来解析下主从Reactor的创建流程以及在这个过程中所涉及到的Netty核心类。

Netty服务端代码模板

/**
 * Echoes back any received data from a client.

 */
public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.

        //创建主从Reactor线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)//配置主从Reactor
             .channel(NioServerSocketChannel.class)//配置主Reactor中的channel类型
             .option(ChannelOption.SO_BACKLOG, 100)//设置主Reactor中channel的option选项
             .handler(new LoggingHandler(LogLevel.INFO))//设置主Reactor中Channel->pipline->handler
             .childHandler(new ChannelInitializer() {//设置从Reactor中注册channel的pipeline
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server. 绑定端口启动服务,开始监听accept事件
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.

            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.

            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

  1. 首先我们要创建Netty最核心的部分 -> 创建主从Reactor Group,在Netty中 EventLoopGroup就是 Reactor Group的实现类。对应的 EventLoop就是 Reactor的实现类。
  //创建主从Reactor线程组
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();
  1. 创建用于 IO处理ChannelHandler,实现相应 IO事件的回调函数,编写对应的 IO处理逻辑。注意这里只是简单示例哈,详细的IO事件处理,笔者会单独开一篇文章专门讲述。
final EchoServerHandler serverHandler = new EchoServerHandler();

/**
 * Handler implementation for the echo server.

 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ................省略IO处理逻辑................

        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {

        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.

        cause.printStackTrace();
        ctx.close();
    }
}
  1. 创建 ServerBootstrapNetty服务端启动类,并在启动类中配置启动Netty服务端所需要的一些必备信息。
  2. 通过 serverBootstrap.group(bossGroup, workerGroup)为Netty服务端配置 主从Reactor Group实例。
  3. 通过 serverBootstrap.channel(NioServerSocketChannel.class)配置Netty服务端的 ServerSocketChannel用于 绑定端口地址以及 创建客户端SocketChannel。Netty中的 NioServerSocketChannel.class就是对JDK NIO中 ServerSocketChannel的封装。而用于表示 客户端连接NioSocketChannel是对JDK NIO SocketChannel封装。

    在上篇文章介绍 Socket内核结构小节中我们提到,在编写服务端网络程序时,我们首先要创建一个 Socket用于 listen和bind端口地址,我们把这个叫做 监听Socket,这里对应的就是 NioServerSocketChannel.class。当客户端连接完成三次握手,系统调用 accept函数会基于 监听Socket创建出来一个 新的Socket专门用于与客户端之间的网络通信我们称为 客户端连接Socket,这里对应的就是 NioSocketChannel.class

  4. serverBootstrap.option(ChannelOption.SO_BACKLOG, 100)设置服务端 ServerSocketChannel中的 SocketOption。关于 SocketOption的选项我们后边的文章再聊,本文主要聚焦在Netty Main Reactor Group的创建及工作流程。
  5. serverBootstrap.handler(....)设置服务端 NioServerSocketChannel中对应 Pipieline中的 ChannelHandler

    netty有两种 Channel类型:一种是服务端用于监听绑定端口地址的 NioServerSocketChannel,一种是用于客户端通信的 NioSocketChannel。每种 Channel类型实例都会对应一个 PipeLine用于编排 对应channel实例上的IO事件处理逻辑。 PipeLine中组织的就是 ChannelHandler用于编写特定的IO处理逻辑。
    注意 serverBootstrap.handler设置的是服务端 NioServerSocketChannel PipeLine中的 ChannelHandler

  6. serverBootstrap.childHandler(ChannelHandler childHandler)用于设置客户端 NioSocketChannel中对应 Pipieline中的 ChannelHandler。我们通常配置的编码解码器就是在这里。

    ServerBootstrap启动类方法带有 child前缀的均是设置客户端 NioSocketChannel属性的。
    ChannelInitializer是用于当 SocketChannel成功注册到绑定的 Reactor上后,用于初始化该 SocketChannelPipeline。它的 initChannel方法会在注册成功后执行。这里只是捎带提一下,让大家有个初步印象,后面我会专门介绍。

  7. ChannelFuture f = serverBootstrap.bind(PORT).sync()这一步会是下篇文章要重点分析的主题 Main Reactor Group的启动,绑定端口地址,开始监听客户端连接事件( OP_ACCEPT)。本文我们只关注创建流程。
  8. f.channel().closeFuture().sync()等待服务端 NioServerSocketChannel关闭。Netty服务端到这里正式启动,并准备好接受客户端连接的准备。
  9. shutdownGracefully优雅关闭 主从Reactor线程组里的所有 Reactor线程

Netty对IO模型的支持

在上篇文章中我们介绍了五种 IO模型,Netty中支持 BIO, NIO, AIO以及多种操作系统下的 IO多路复用技术实现。

在Netty中切换这几种 IO模型也是非常的方便,下面我们来看下Netty如何对这几种IO模型进行支持的。

首先我们介绍下几个与 IO模型相关的重要接口:

EventLoop

EventLoop就是Netty中的 Reactor,可以说它就是Netty的引擎,负责Channel上 IO就绪事件的监听IO就绪事件的处理异步任务的执行驱动着整个Netty的运转。

不同 IO模型下, EventLoop有着不同的实现,我们只需要切换不同的实现类就可以完成对Netty IO模型的切换。

BIO NIO AIO ThreadPerChannelEventLoop NioEventLoop AioEventLoop

NIO模型下Netty会 自动根据操作系统以及版本的不同选择对应的 IO多路复用技术实现。比如Linux 2.6版本以上用的是 Epoll,2.6版本以下用的是 Poll,Mac下采用的是 Kqueue

其中Linux kernel 在5.1版本引入的异步IO库io_uring正在netty中孵化。

EventLoopGroup

Netty中的 Reactor是以 Group的形式出现的, EventLoopGroup正是 Reactor组的接口定义,负责管理 Reactor,Netty中的 Channel就是通过 EventLoopGroup注册到具体的 Reactor上的。

Netty的IO线程模型是 主从Reactor多线程模型主从Reactor线程组在Netty源码中对应的其实就是两个 EventLoopGroup实例。

不同的 IO模型也有对应的实现:

BIO NIO AIO ThreadPerChannelEventLoopGroup NioEventLoopGroup AioEventLoopGroup

ServerSocketChannel

用于Netty服务端使用的 ServerSocketChannel,对应于上篇文章提到的 监听Socket,负责绑定监听端口地址,接收客户端连接并创建用于与客户端通信的 SocketChannel

不同的 IO模型下的实现:

BIO NIO AIO OioServerSocketChannel NioServerSocketChannel AioServerSocketChannel

SocketChannel

用于与客户端通信的 SocketChannel,对应于上篇文章提到的 客户端连接Socket,当客户端完成三次握手后,由系统调用 accept函数根据 监听Socket创建。

不同的 IO模型下的实现:

BIO NIO AIO OioSocketChannel NioSocketChannel AioSocketChannel

我们看到在 不同IO模型的实现中,Netty这些围绕 IO模型的核心类只是前缀的不同:

  • BIO对应的前缀为 Oio表示 old io,现在已经废弃不推荐使用。
  • NIO对应的前缀为 Nio,正是Netty推荐也是我们常用的 非阻塞IO模型
  • AIO对应的前缀为 Aio,由于Linux下的 异步IO机制实现的并不成熟,性能提升表现上也不明显,现已被删除。

我们只需要将 IO模型的这些核心接口对应的实现类 前缀改为对应 IO模型的前缀,就可以轻松在Netty中完成对 IO模型的切换。

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

多种NIO的实现

Common Linux Mac NioEventLoopGroup EpollEventLoopGroup KQueueEventLoopGroup NioEventLoop EpollEventLoop KQueueEventLoop NioServerSocketChannel EpollServerSocketChannel KQueueServerSocketChannel NioSocketChannel EpollSocketChannel KQueueSocketChannel

我们通常在使用 NIO模型的时候会使用 Common列下的这些 IO模型核心类, Common类也会根据操作系统的不同自动选择 JDK在对应平台下的 IO多路复用技术的实现。

而Netty自身也根据操作系统的不同提供了自己对 IO多路复用技术的实现,比 JDK的实现性能更优。比如:

  • JDK的 NIO 默认实现是 水平触发,Netty 是 边缘触发(默认)和水平触发可切换。。
  • Netty 实现的垃圾回收更少、性能更好。

我们编写Netty服务端程序的时候也可以根据操作系统的不同,采用Netty自身的实现来进一步优化程序。做法也很简单,直接将上图中红框里的实现类替换成Netty的自身实现类即可完成切换。

经过以上对Netty服务端代码编写模板以及 IO模型相关核心类的简单介绍,我们对Netty的创建流程有了一个简单粗略的总体认识,下面我们来深入剖析下创建流程过程中的每一个步骤以及这个过程中涉及到的核心类实现。

以下源码解析部分我们均采用 Common列NIO 相关的实现进行解析。

创建主从Reactor线程组

在Netty服务端程序编写模板的开始,我们首先会创建两个Reactor线程组:

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)
  • 一个是主Reactor线程组 bossGroup用于监听客户端连接,创建客户端连接 NioSocketChannel,并将创建好的客户端连接 NioSocketChannel注册到从Reactor线程组中一个固定的 Reactor上。
  • 一个是从Reactor线程组 workerGroupworkerGroup中的 Reactor负责监听绑定在其上的客户端连接 NioSocketChannel上的 IO就绪事件,并处理 IO就绪事件执行异步任务
  //创建主从Reactor线程组
  EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  EventLoopGroup workerGroup = new NioEventLoopGroup();

Netty中Reactor线程组的实现类为 NioEventLoopGroup,在创建 bossGroupworkerGroup的时候用到了 NioEventLoopGroup的两个构造函数:

  • nThreads参数的构造函数 public NioEventLoopGroup(int nThreads)
  • 不带 nThreads参数的 默认构造函数 public NioEventLoopGroup()
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    /**
     * Create a new instance using the default number of threads, the default {@link ThreadFactory} and
     * the {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.

     */
    public NioEventLoopGroup() {
        this(0);
    }

    /**
     * Create a new instance using the specified number of threads, {@link ThreadFactory} and the
     * {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.

     */
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

    ......................省略...........................

}

nThreads参数表示当前要创建的 Reactor线程组内包含多少个 Reactor线程。不指定 nThreads参数的话采用默认的 Reactor线程个数,用 0表示。

最终会调用到构造函数

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

下面简单介绍下构造函数中这几个参数的作用,后面我们在讲解本文主线的过程中还会提及这几个参数,到时在详细介绍,这里只是让大家有个初步印象,不必做过多的纠缠。

  • Executor executor:负责启动 Reactor线程进而Reactor才可以开始工作。

Reactor线程组 NioEventLoopGroup负责创建 Reactor线程,在创建的时候会将 executor传入。

  • RejectedExecutionHandler: 当向 Reactor添加异步任务添加失败时,采用的拒绝策略。Reactor的任务不只是监听IO活跃事件和IO任务的处理,还包括对异步任务的处理。这里大家只需有个这样的概念,后面笔者会专门详细介绍。
  • SelectorProvider selectorProvider: Reactor中的IO模型为 IO多路复用模型,对应于JDK NIO中的实现为 java.nio.channels.Selector(就是我们上篇文章中提到的 select,poll,epoll),每个Reator中都包含一个 Selector,用于 轮询注册在该Reactor上的所有 Channel上的 IO事件SelectorProvider就是用来创建 Selector的。
  • SelectStrategyFactory selectStrategyFactory: Reactor最重要的事情就是 轮询注册其上的 Channel上的 IO就绪事件,这里的 SelectStrategyFactory用于指定 轮询策略,默认为 DefaultSelectStrategyFactory.INSTANCE

最终会将这些参数交给 NioEventLoopGroup的父类构造器,下面我们来看下 NioEventLoopGroup类的继承结构:

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

NioEventLoopGroup类的继承结构乍一看比较复杂,大家不要慌,笔者会随着主线的深入慢慢地介绍这些父类接口,我们现在重点关注 Mutithread前缀的类。

我们知道 NioEventLoopGroup是Netty中的 Reactor线程组的实现,既然是线程组那么肯定是负责管理和创建 多个Reactor线程的,所以 Mutithread前缀的类定义的行为自然是对 Reactor线程组内多个 Reactor线程的创建和管理工作。

MultithreadEventLoopGroup

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    //默认Reactor个数
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    ...................省略.....................

}

MultithreadEventLoopGroup类主要的功能就是用来确定 Reactor线程组Reactor的个数。

默认的 Reactor的个数存放于字段 DEFAULT_EVENT_LOOP_THREADS中。

static {}静态代码块中我们可以看出默认 Reactor的个数的获取逻辑:

  • 可以通过系统变量 -D io.netty.eventLoopThreads"指定。
  • 如果不指定,那么默认的就是 NettyRuntime.availableProcessors() * 2

nThread参数设置为 0采用默认设置时, Reactor线程组内的 Reactor个数则设置为 DEFAULT_EVENT_LOOP_THREADS

MultithreadEventExecutorGroup

MultithreadEventExecutorGroup这里就是本小节的核心,主要用来定义创建和管理 Reactor的行为。

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    //Reactor线程组中的Reactor集合
    private final EventExecutor[] children;
    private final Set readonlyChildren;
    //从Reactor group中选择一个特定的Reactor的选择策略 用于channel注册绑定到一个固定的Reactor上
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    /**
     * Create a new instance.

     *
     * @param nThreads          the number of threads that will be used by this instance.

     * @param executor          the Executor to use, or {@code null} if the default should be used.

     * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
     */
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    ............................省略................................

}

首先介绍一个新的构造器参数 EventExecutorChooserFactory chooserFactory。当客户端连接完成三次握手后, Main Reactor会创建客户端连接 NioSocketChannel,并将其绑定到 Sub Reactor Group中的一个固定 Reactor,那么具体要绑定到哪个具体的 Sub Reactor上呢?这个绑定策略就是由 chooserFactory来创建的。默认为 DefaultEventExecutorChooserFactory

下面就是本小节的主题 Reactor线程组 的创建过程:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads  0)", nThreads));
        }

        if (executor == null) {
            //用于创建Reactor线程
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];
        //循环创建reaactor group中的Reactor
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //创建reactor
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                     ................省略................

                }
            }
        }
        //创建channel到Reactor的绑定策略
        chooser = chooserFactory.newChooser(children);

         ................省略................

        Set childrenSet = new LinkedHashSet(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

1. 创建用于启动Reactor线程的executor

在Netty Reactor Group中的单个 ReactorIO&#x7EBF;&#x7A0B;&#x6A21;&#x578B;为上篇文章提到的 &#x5355;Reactor&#x5355;&#x7EBF;&#x7A0B;&#x6A21;&#x578B;,一个 Reactor&#x7EBF;&#x7A0B;负责 &#x8F6E;&#x8BE2;注册其上的所有 Channel中的 IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;,处理IO事件,执行Netty中的异步任务等工作。正是这个 Reactor&#x7EBF;&#x7A0B;驱动着整个Netty的运转,可谓是Netty的核心引擎。

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

而这里的 executor就是负责启动 Reactor&#x7EBF;&#x7A0B;的,从创建源码中我们可以看到 executor的类型为 ThreadPerTaskExecutor

ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

我们看到 ThreadPerTaskExecutor做的事情很简单,从它的命名前缀 ThreadPerTask我们就可以猜出它的工作方式,就是来一个任务就创建一个线程执行。而创建的这个线程正是netty的核心引擎Reactor线程。

Reactor&#x7EBF;&#x7A0B;启动的时候,Netty会将 Reactor&#x7EBF;&#x7A0B;要做的事情封装成 Runnable,丢给 exexutor启动。

Reactor&#x7EBF;&#x7A0B;的核心就是一个 &#x6B7B;&#x5FAA;&#x73AF;不停的 &#x8F6E;&#x8BE2;IO就绪事件,处理IO事件,执行异步任务。一刻也不停歇,堪称 996&#x5178;&#x8303;

这里向大家先卖个关子, "Reactor&#x7EBF;&#x7A0B;&#x662F;&#x4F55;&#x65F6;&#x542F;&#x52A8;&#x7684;&#x5462;&#xFF1F;&#xFF1F;"

2. 创建Reactor

Reactor&#x7EBF;&#x7A0B;&#x7EC4;NioEventLoopGroup包含多个 Reactor,存放于 private final EventExecutor[] children数组中。

所以下面的事情就是创建 nThreadReactor,并存放于 EventExecutor[] children字段中,

我们来看下用于创建 ReactornewChild(executor, args)方法:

newChild

newChild方法是 MultithreadEventExecutorGroup中的一个抽象方法,提供给具体子类实现。

protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

这里我们解析的是 NioEventLoopGroup,我们来看下 newChild在该类中的实现:

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
}

前边提到的众多构造器参数,这里会通过可变参数 Object... args传入到Reactor类 NioEventLoop的构造器中。

这里介绍下新的参数 EventLoopTaskQueueFactory queueFactory,前边提到Netty中的 Reactor主要工作是 &#x8F6E;&#x8BE2;注册其上的所有 Channel上的 IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;,处理 IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;。除了这些主要的工作外,Netty为了极致的压榨 Reactor的性能,还会让它做一些异步任务的执行工作。既然要执行异步任务,那么 Reactor中就需要一个 &#x961F;&#x5217;来保存任务。

这里的 EventLoopTaskQueueFactory就是用来创建这样的一个队列来保存 Reactor中待执行的异步任务。

可以把 Reactor 理解成为一个 &#x5355;&#x7EBF;&#x7A0B;&#x7684;&#x7EBF;&#x7A0B;&#x6C60;&#x7C7B;&#x4F3C;JDK 中的 SingleThreadExecutor ,仅用一个线程来执行 &#x8F6E;&#x8BE2;IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;&#x5904;&#x7406;IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;&#x6267;&#x884C;&#x5F02;&#x6B65;&#x4EFB;&#x52A1; 。同时待执行的异步任务保存在 Reactor 里的 taskQueue 中。

NioEventLoop

public final class NioEventLoop extends SingleThreadEventLoop {
    //用于创建JDK NIO Selector,ServerSocketChannel
    private final SelectorProvider provider;
    //Selector轮询策略 决定什么时候轮询,什么时候处理IO事件,什么时候执行异步任务
    private final SelectStrategy selectStrategy;
    /**
     * The NIO {@link Selector}.

     */
    private Selector selector;
    private Selector unwrappedSelector;

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
}

这里就正式开始了 Reactor的创建过程,我们知道 Reactor的核心是采用的 IO&#x591A;&#x8DEF;&#x590D;&#x7528;&#x6A21;&#x578B;来对客户端连接上的 IO&#x4E8B;&#x4EF6;进行 &#x76D1;&#x542C;,所以最重要的事情是创建 Selector(JDK NIO &#x4E2D;IO&#x591A;&#x8DEF;&#x590D;&#x7528;&#x6280;&#x672F;&#x7684;&#x5B9E;&#x73B0;)。

可以把 Selector理解为我们上篇文章介绍的 Select,poll,epoll,它是 JDK NIO对操作系统内核提供的这些 IO&#x591A;&#x8DEF;&#x590D;&#x7528;&#x6280;&#x672F;的封装。

openSelector

openSelectorNioEventLoop&#x7C7B;中用于创建 IO&#x591A;&#x8DEF;&#x590D;&#x7528;Selector,并对创建出来的 JDK NIO 原生的 Selector进行性能优化。

首先会通过 SelectorProvider#openSelector创建JDK NIO原生的 Selector

 private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            //通过JDK NIO SelectorProvider创建Selector
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        ..................省略.............

}

SelectorProvider会根据操作系统的不同选择JDK在不同操作系统版本下的对应 Selector的实现。Linux下会选择 Epoll,Mac下会选择 Kqueue

下面我们就来看下 SelectorProvider是如何做到自动适配不同操作系统下 IO&#x591A;&#x8DEF;&#x590D;&#x7528;实现的

SelectorProvider

    public NioEventLoopGroup(ThreadFactory threadFactory) {
        this(0, threadFactory, SelectorProvider.provider());
    }

SelectorProvider是在前面介绍的 NioEventLoopGroup&#x7C7B;构造函数中通过调用 SelectorProvider.provider()被加载,并通过 NioEventLoopGroup#newChild方法中的可变长参数 Object... args传递到 NioEventLoop中的 private final SelectorProvider provider字段中。

SelectorProvider的加载过程:

public abstract class SelectorProvider {

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
}

SelectorProvider加载源码中我们可以看出, SelectorProvider的加载方式有三种,优先级如下:

  1. 通过系统变量 -D java.nio.channels.spi.SelectorProvider指定 SelectorProvider的自定义实现类 &#x5168;&#x9650;&#x5B9A;&#x540D;。通过 &#x5E94;&#x7528;&#x7A0B;&#x5E8F;&#x7C7B;&#x52A0;&#x8F7D;&#x5668;(Application Classloader)加载。
    private static boolean loadProviderFromProperty() {
        String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
        if (cn == null)
            return false;
        try {
            Class c = Class.forName(cn, true,
                                       ClassLoader.getSystemClassLoader());
            provider = (SelectorProvider)c.newInstance();
            return true;
        }
        .................省略.............

    }
  1. 通过 SPI方式加载。在工程目录 META-INF/services下定义名为 java.nio.channels.spi.SelectorProviderSPI&#x6587;&#x4EF6;,文件中第一个定义的 SelectorProvider实现类全限定名就会被加载。
    private static boolean loadProviderAsService() {

        ServiceLoader sl =
            ServiceLoader.load(SelectorProvider.class,
                               ClassLoader.getSystemClassLoader());
        Iterator i = sl.iterator();
        for (;;) {
            try {
                if (!i.hasNext())
                    return false;
                provider = i.next();
                return true;
            } catch (ServiceConfigurationError sce) {
                if (sce.getCause() instanceof SecurityException) {
                    // Ignore the security exception, try the next provider
                    continue;
                }
                throw sce;
            }
        }
    }
  1. 如果以上两种方式均未被定义,那么就采用 SelectorProvider系统默认实现 sun.nio.ch.DefaultSelectorProvider。笔者当前使用的操作系统是 MacOS,从源码中我们可以看到自动适配了 KQueue实现。
public class DefaultSelectorProvider {
    private DefaultSelectorProvider() {
    }

    public static SelectorProvider create() {
        return new KQueueSelectorProvider();
    }
}

不同操作系统中JDK对于 DefaultSelectorProvider会有所不同,Linux内核版本2.6以上对应的 Epoll,Linux内核版本2.6以下对应的 Poll,MacOS对应的是 KQueue

下面我们接着回到 io.netty.channel.nio.NioEventLoop#openSelector的主线上来。

Netty对JDK NIO 原生Selector的优化

首先在 NioEventLoop中有一个Selector优化开关 DISABLE_KEY_SET_OPTIMIZATION,通过系统变量 -D io.netty.noKeySetOptimization指定,默认是开启的,表示需要对JDK NIO原生 Selector进行优化。

public final class NioEventLoop extends SingleThreadEventLoop {
   //Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化
    private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
}

如果优化开关 DISABLE_KEY_SET_OPTIMIZATION是关闭的,那么直接返回JDK NIO原生的 Selector

private SelectorTuple openSelector() {
        ..........SelectorProvider创建JDK NIO  原生Selector..............

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            //JDK NIO原生Selector ,Selector优化开关 默认开启需要对Selector进行优化
            return new SelectorTuple(unwrappedSelector);
        }
}

下面为Netty对JDK NIO原生的 Selector 的优化过程:

  1. 获取 JDK NIO&#x539F;&#x751F;Selector的抽象实现类 sun.nio.ch.SelectorImplJDK NIO&#x539F;&#x751F;Selector的实现均继承于该抽象类。用于判断由 SelectorProvider创建出来的 Selector是否为 JDK&#x9ED8;&#x8BA4;&#x5B9E;&#x73B0;SelectorProvider第三种加载方式)。因为 SelectorProvider可以是自定义加载,所以它创建出来的 Selector并不一定是JDK NIO 原生的。
       Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

JDK NIO Selector的抽象类 sun.nio.ch.SelectorImpl

public abstract class SelectorImpl extends AbstractSelector {

    // The set of keys with data ready for an operation
    // //IO就绪的SelectionKey(里面包裹着channel)
    protected Set selectedKeys;

    // The set of keys registered with this Selector
    //注册在该Selector上的所有SelectionKey(里面包裹着channel)
    protected HashSet keys;

    // Public views of the key sets
    //用于向调用线程返回的keys,不可变
    private Set publicKeys;             // Immutable
    //当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可增加
    private Set publicSelectedKeys;     // Removal allowed, but not addition

    protected SelectorImpl(SelectorProvider sp) {
        super(sp);
        keys = new HashSet();
        selectedKeys = new HashSet();
        if (Util.atBugLevel("1.4")) {
            publicKeys = keys;
            publicSelectedKeys = selectedKeys;
        } else {
            //不可变
            publicKeys = Collections.unmodifiableSet(keys);
            //只可删除其中元素,不可增加
            publicSelectedKeys = Util.ungrowableSet(selectedKeys);
        }
    }
}

这里笔者来简单介绍下JDK NIO中的 Selector中这几个字段的含义,我们可以和上篇文章讲到的epoll在内核中的结构做类比,方便大家后续的理解:

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)
  • Set<selectionkey> selectedKeys </selectionkey> 类似于我们上篇文章讲解 Epoll时提到的 &#x5C31;&#x7EEA;&#x961F;&#x5217;eventpoll->rdllistSelector这里大家可以理解为 EpollSelector会将自己监听到的 IO&#x5C31;&#x7EEA;Channel放到 selectedKeys中。

这里的 SelectionKey暂且可以理解为 ChannelSelector中的表示,类比上图中 epitem&#x7ED3;&#x6784;里的 epoll_event,封装IO就绪Socket的信息。
其实 SelectionKey里包含的信息不止是 Channel还有很多IO相关的信息。后面我们在详细介绍。

  • HashSet<selectionkey> keys&#xFF1A;</selectionkey>这里存放的是所有注册到该 Selector上的 Channel。类比 epoll&#x4E2D;&#x7684;&#x7EA2;&#x9ED1;&#x6811;&#x7ED3;&#x6784;rb_root

SelectionKeyChannel注册到 Selector中后生成。

  • Set<selectionkey> publicSelectedKeys</selectionkey> 相当于是 selectedKeys的视图,用于向外部线程返回 IO&#x5C31;&#x7EEA;SelectionKey。这个集合在外部线程中只能做删除操作 &#x4E0D;&#x53EF;&#x589E;&#x52A0;&#x5143;&#x7D20;,并且 &#x4E0D;&#x662F;&#x7EBF;&#x7A0B;&#x5B89;&#x5168;&#x7684;
  • Set<selectionkey> publicKeys</selectionkey>相当于 keys的不可变视图,用于向外部线程返回所有注册在该 Selector上的 SelectionKey

这里需要 &#x91CD;&#x70B9;&#x5173;&#x6CE8; 抽象类 sun.nio.ch.SelectorImpl 中的 selectedKeyspublicSelectedKeys 这两个字段,注意它们的类型都是 HashSet ,一会优化的就是这里!!!!

  1. 判断由 SelectorProvider创建出来的 Selector是否是JDK NIO原生的 Selector实现。 因为Netty优化针对的是JDK NIO 原生 Selector。判断标准为 sun.nio.ch.SelectorImpl类是否为 SelectorProvider创建出 Selector的父类。如果不是则直接返回。不在继续下面的优化过程。
        //判断是否可以对Selector进行优化,这里主要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider可以加载的是自定义Selector实现
        //如果SelectorProvider创建的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无法进行优化,直接返回
        if (!(maybeSelectorImplClass instanceof Class) ||
            !((Class) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

通过前面对 SelectorProvider的介绍我们知道,这里通过 provider.openSelector()创建出来的 Selector实现类为 KQueueSelectorImpl&#x7C7B;,它继承实现了 sun.nio.ch.SelectorImpl,所以它是JDK NIO 原生的 Selector实现

class KQueueSelectorImpl extends SelectorImpl {

}
  1. 创建 SelectedSelectionKeySet通过反射替换掉 sun.nio.ch.SelectorImpl&#x7C7B;selectedKeyspublicSelectedKeys的默认 HashSet实现。

为什么要用 SelectedSelectionKeySet 替换掉原来的 HashSet 呢??

因为这里涉及到对 HashSet&#x7C7B;&#x578B;sun.nio.ch.SelectorImpl#selectedKeys集合的两种操作:

  • 插入操作: 通过前边对 sun.nio.ch.SelectorImpl&#x7C7B;中字段的介绍我们知道,在 Selector监听到 IO&#x5C31;&#x7EEA;SelectionKey后,会将 IO&#x5C31;&#x7EEA;SelectionKey 插入 sun.nio.ch.SelectorImpl#selectedKeys集合中,这时 Reactor&#x7EBF;&#x7A0B;会从 java.nio.channels.Selector#select(long)阻塞调用中返回(类似上篇文章提到的 epoll_wait)。
  • 遍历操作: Reactor&#x7EBF;&#x7A0B;返回后,会从 Selector中获取 IO&#x5C31;&#x7EEA;SelectionKey集合(也就是 sun.nio.ch.SelectorImpl#selectedKeys), Reactor&#x7EBF;&#x7A0B; 遍历 selectedKeys,获取 IO&#x5C31;&#x7EEA;SocketChannel,并处理 SocketChannel上的 IO&#x4E8B;&#x4EF6;

我们都知道 HashSet底层数据结构是一个 &#x54C8;&#x5E0C;&#x8868;,由于 Hash&#x51B2;&#x7A81;这种情况的存在,所以导致对 &#x54C8;&#x5E0C;&#x8868;进行 &#x63D2;&#x5165;&#x904D;&#x5386;操作的性能不如对 &#x6570;&#x7EC4;进行 &#x63D2;&#x5165;&#x904D;&#x5386;操作的性能好。

还有一个重要原因是,数组可以利用CPU缓存的优势来提高遍历的效率。后面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为我们带来性能优势。

所以Netty为了优化对 sun.nio.ch.SelectorImpl#selectedKeys集合的 &#x63D2;&#x5165;&#xFF0C;&#x904D;&#x5386;性能,自己用 &#x6570;&#x7EC4;这种数据结构实现了 SelectedSelectionKeySet,用它来替换原来的 HashSet实现。

SelectedSelectionKeySet

  • 初始化 SelectionKey[] keys数组大小为 1024,当数组容量不够时,扩容为原来的两倍大小。
  • 通过数组尾部指针 size,在向数组插入元素的时候可以直接定位到插入位置 keys[size++]。操作一步到位,不用像 &#x54C8;&#x5E0C;&#x8868;那样还需要解决 Hash&#x51B2;&#x7A81;
  • 对数组的遍历操作也是如丝般顺滑,CPU直接可以在缓存行中遍历读取数组元素无需访问内存。比 HashSet的迭代器 java.util.HashMap.KeyIterator 遍历方式性能不知高到哪里去了。
final class SelectedSelectionKeySet extends AbstractSet {

    //采用数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不需要考虑hash冲突
    SelectionKey[] keys;
    //数组尾部指针
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    /**
     * 数组的添加效率高于 HashSet 因为不需要考虑hash冲突
     * */
    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }
        //时间复杂度O(1)
        keys[size++] = o;
        if (size == keys.length) {
            //扩容为原来的两倍大小
            increaseCapacity();
        }

        return true;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }

    /**
     * 采用数组的遍历效率 高于 HashSet
     * */
    @Override
    public Iterator iterator() {
        return new Iterator() {
            private int idx;

            @Override
            public boolean hasNext() {
                return idx < size;
            }

            @Override
            public SelectionKey next() {
                if (!hasNext()) {
                    throw new NoSuchElementException();
                }
                return keys[idx++];
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}

看到这里不禁感叹,从各种小的细节可以看出Netty对性能的优化简直淋漓尽致,对性能的追求令人发指。细节真的是魔鬼。

  1. Netty通过反射的方式用 SelectedSelectionKeySet替换掉 sun.nio.ch.SelectorImpl#selectedKeyssun.nio.ch.SelectorImpl#publicSelectedKeys这两个集合中原来 HashSet的实现。

  2. 反射获取 sun.nio.ch.SelectorImpl类中 selectedKeyspublicSelectedKeys

  Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
  Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
  • Java9版本以上通过 sun.misc.Unsafe设置字段值的方式
       if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {

                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }

                    }
  • 通过反射的方式用 SelectedSelectionKeySet替换掉 hashSet实现的 sun.nio.ch.SelectorImpl#selectedKeys&#xFF0C;sun.nio.ch.SelectorImpl#publicSelectedKeys
          Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
          if (cause != null) {
                return cause;
          }
          cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
          if (cause != null) {
                return cause;
          }
          //Java8反射替换字段
          selectedKeysField.set(unwrappedSelector, selectedKeySet);
          publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
  1. 将与 sun.nio.ch.SelectorImpl类中 selectedKeyspublicSelectedKeys关联好的Netty优化实现 SelectedSelectionKeySet,设置到 io.netty.channel.nio.NioEventLoop#selectedKeys字段中保存。
   //会通过反射替换selector对象中的selectedKeySet保存就绪的selectKey
    //该字段为持有selector对象selectedKeys的引用,当IO事件就绪时,直接从这里获取
    private SelectedSelectionKeySet selectedKeys;

后续 Reactor&#x7EBF;&#x7A0B;就会直接从 io.netty.channel.nio.NioEventLoop#selectedKeys中获取 IO&#x5C31;&#x7EEA;SocketChannel

  1. SelectorTuple封装 unwrappedSelectorwrappedSelector返回给 NioEventLoop构造函数。到此 Reactor中的 Selector就创建完毕了。
return new SelectorTuple(unwrappedSelector,
                      new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    private static final class SelectorTuple {
        final Selector unwrappedSelector;
        final Selector selector;

        SelectorTuple(Selector unwrappedSelector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = unwrappedSelector;
        }

        SelectorTuple(Selector unwrappedSelector, Selector selector) {
            this.unwrappedSelector = unwrappedSelector;
            this.selector = selector;
        }
    }
  • 所谓的 unwrappedSelector是指被Netty优化过的JDK NIO原生Selector。
  • 所谓的 wrappedSelector就是用 SelectedSelectionKeySetSelector装饰类将 unwrappedSelector和与 sun.nio.ch.SelectorImpl&#x7C7B;关联好的Netty优化实现 SelectedSelectionKeySet封装装饰起来。

wrappedSelector会将所有对 Selector的操作全部代理给 unwrappedSelector,并在 &#x53D1;&#x8D77;&#x8F6E;&#x8BE2;IO&#x4E8B;&#x4EF6;的相关操作中,重置 SelectedSelectionKeySet清空上一次的轮询结果。

final class SelectedSelectionKeySetSelector extends Selector {
    //Netty优化后的 SelectedKey就绪集合
    private final SelectedSelectionKeySet selectionKeys;
    //优化后的JDK NIO 原生Selector
    private final Selector delegate;

    SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
        this.delegate = delegate;
        this.selectionKeys = selectionKeys;
    }

    @Override
    public boolean isOpen() {
        return delegate.isOpen();
    }

    @Override
    public SelectorProvider provider() {
        return delegate.provider();
    }

    @Override
    public Set keys() {
        return delegate.keys();
    }

    @Override
    public Set selectedKeys() {
        return delegate.selectedKeys();
    }

    @Override
    public int selectNow() throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.selectNow();
    }

    @Override
    public int select(long timeout) throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.select(timeout);
    }

    @Override
    public int select() throws IOException {
        //重置SelectedKeys集合
        selectionKeys.reset();
        return delegate.select();
    }

    @Override
    public Selector wakeup() {
        return delegate.wakeup();
    }

    @Override
    public void close() throws IOException {
        delegate.close();
    }
}

到这里Reactor的核心Selector就创建好了,下面我们来看下用于保存异步任务的队列是如何创建出来的。

newTaskQueue

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        //通过用SelectedSelectionKeySet装饰后的unwrappedSelector
        this.selector = selectorTuple.selector;
        //Netty优化过的JDK NIO远程Selector
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }

我们继续回到创建 Reactor的主线上,到目前为止 Reactor的核心 Selector就创建好了,前边我们提到 Reactor除了需要 &#x76D1;&#x542C;IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;以及处理 IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;外,还需要执行一些异步任务,当外部线程向 Reactor提交异步任务后, Reactor就需要一个队列来保存这些异步任务,等待 Reactor&#x7EBF;&#x7A0B;执行。

下面我们来看下 Reactor中任务队列的创建过程:

    //任务队列大小,默认是无界队列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

    private static Queue newTaskQueue(
            EventLoopTaskQueueFactory queueFactory) {
        if (queueFactory == null) {
            return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

    private static Queue newTaskQueue0(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue()
                : PlatformDependent.newMpscQueue(maxPendingTasks);
    }
  • NioEventLoop的父类 SingleThreadEventLoop中提供了一个静态变量 DEFAULT_MAX_PENDING_TASKS用来指定 Reactor任务队列的大小。可以通过系统变量 -D io.netty.eventLoop.maxPendingTasks进行设置,默认为 Integer.MAX_VALUE,表示任务队列默认为 &#x65E0;&#x754C;&#x961F;&#x5217;
  • 根据 DEFAULT_MAX_PENDING_TASKS变量的设定,来决定创建无界任务队列还是有界任务队列。
    //创建无界任务队列
    PlatformDependent.newMpscQueue()
    //创建有界任务队列
    PlatformDependent.newMpscQueue(maxPendingTasks)

    public static  Queue newMpscQueue() {
        return Mpsc.newMpscQueue();
    }

    public static  Queue newMpscQueue(final int maxCapacity) {
        return Mpsc.newMpscQueue(maxCapacity);
    }

Reactor内的异步任务队列的类型为 MpscQueue,它是由 JCTools提供的一个高性能无锁队列,从命名前缀 Mpsc可以看出,它适用于 &#x591A;&#x751F;&#x4EA7;&#x8005;&#x5355;&#x6D88;&#x8D39;&#x8005;的场景,它支持多个生产者线程安全的访问队列,同一时刻只允许一个消费者线程读取队列中的元素。

我们知道Netty中的 Reactor可以 &#x7EBF;&#x7A0B;&#x5B89;&#x5168;的处理注册其上的多个 SocketChannel上的 IO&#x6570;&#x636E;,保证 Reactor&#x7EBF;&#x7A0B;&#x5B89;&#x5168;的核心原因正是因为这个 MpscQueue,它可以支持多个业务线程在处理完业务逻辑后,线程安全的向 MpscQueue添加 &#x5F02;&#x6B65;&#x5199;&#x4EFB;&#x52A1;,然后由单个 Reactor&#x7EBF;&#x7A0B;来执行这些 &#x5199;&#x4EFB;&#x52A1;。既然是单线程执行,那肯定是线程安全的了。

Reactor对应的NioEventLoop类型继承结构

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

NioEventLoop的继承结构也是比较复杂,这里我们只关注在 Reactor创建过程中涉及的到两个父类 SingleThreadEventLoop, SingleThreadEventExecutor

剩下的继承体系,我们在后边随着 Netty源码的深入在慢慢介绍。

前边我们提到,其实 Reactor我们可以看作是一个单线程的线程池,只有一个线程用来执行 IO&#x5C31;&#x7EEA;&#x4E8B;&#x4EF6;&#x7684;&#x76D1;&#x542C;IO&#x4E8B;&#x4EF6;&#x7684;&#x5904;&#x7406;&#x5F02;&#x6B65;&#x4EFB;&#x52A1;&#x7684;&#x6267;&#x884C;。用 MpscQueue来存储待执行的异步任务。

命名前缀为 SingleThread的父类都是对 Reactor这些行为的分层定义。也是本小节要介绍的对象

SingleThreadEventLoop

Reactor负责执行的异步任务分为三类:

  • &#x666E;&#x901A;&#x4EFB;&#x52A1;&#xFF1A;这是Netty最主要执行的异步任务,存放在普通任务队列 taskQueue中。在 NioEventLoop构造函数中创建。
  • &#x5B9A;&#x65F6;&#x4EFB;&#x52A1;&#xFF1A; 存放在优先级队列中。后续我们介绍。
  • &#x5C3E;&#x90E8;&#x4EFB;&#x52A1;&#xFF1A; 存放于尾部任务队列 tailTasks中,尾部任务一般不常用,在普通任务执行完后 Reactor线程会执行尾部任务。 使用场景:比如对Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。

SingleThreadEventLoop负责对 &#x5C3E;&#x90E8;&#x4EFB;&#x52A1;&#x961F;&#x5217;tailTasks进行管理。并且提供 ChannelReactor注册的行为。

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    //任务队列大小,默认是无界队列
    protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
            SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

    //尾部任务队列
    private final Queue tailTasks;

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, Queue taskQueue, Queue tailTaskQueue,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
        //尾部队列 执行一些统计任务 不常用
        tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    }

    @Override
    public ChannelFuture register(Channel channel) {
        //注册channel到绑定的Reactor上
        return register(new DefaultChannelPromise(channel, this));
    }
}

SingleThreadEventExecutor

SingleThreadEventExecutor主要负责对 &#x666E;&#x901A;&#x4EFB;&#x52A1;&#x961F;&#x5217;的管理,以及 &#x5F02;&#x6B65;&#x4EFB;&#x52A1;&#x7684;&#x6267;&#x884C;Reactor&#x7EBF;&#x7A0B;&#x7684;&#x542F;&#x505C;

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, Queue taskQueue, RejectedExecutionHandler rejectedHandler) {
        //parent为Reactor所属的NioEventLoopGroup Reactor线程组
        super(parent);
        //向Reactor添加任务时,是否唤醒Selector停止轮询IO就绪事件,马上执行异步任务
        this.addTaskWakesUp = addTaskWakesUp;
        //Reactor异步任务队列的大小
        this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
        //用于启动Reactor线程的executor -> ThreadPerTaskExecutor
        this.executor = ThreadExecutorMap.apply(executor, this);
        //普通任务队列
        this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
        //任务队列满时的拒绝策略
        this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
}

到现在为止,一个完整的 Reactor&#x67B6;&#x6784;就被创建出来了。

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

3. 创建Channel到Reactor的绑定策略

到这一步,Reactor线程组 NioEventLoopGroup里边的所有 Reactor就已经全部创建完毕。

无论是Netty服务端 NioServerSocketChannel关注的 OP_ACCEPT事件也好,还是Netty客户端 NioSocketChannel关注的 OP_READOP_WRITE事件也好,都需要先注册到 Reactor上, Reactor才能监听 Channel上关注的 IO&#x4E8B;&#x4EF6;实现 IO&#x591A;&#x8DEF;&#x590D;&#x7528;

NioEventLoopGroup(Reactor线程组)里边有众多的 Reactor,那么以上提到的这些 Channel究竟应该注册到哪个 Reactor上呢?这就需要一个绑定的策略来平均分配。

还记得我们前边介绍 MultithreadEventExecutorGroup&#x7C7B;的时候提到的构造器参数 EventExecutorChooserFactory吗?

这时候它就派上用场了,它主要用来创建 ChannelReactor的绑定策略。默认为 DefaultEventExecutorChooserFactory.INSTANCE

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
   //从Reactor集合中选择一个特定的Reactor的绑定策略 用于channel注册绑定到一个固定的Reactor上
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

    chooser = chooserFactory.newChooser(children);
}

下面我们来看下具体的绑定策略:

DefaultEventExecutorChooserFactory

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }
    ...................省略.................

}

我们看到在 newChooser方法绑定策略有两个分支,不同之处在于需要判断Reactor线程组中的 Reactor个数是否为 2&#x7684;&#x6B21;&#x5E42;

Netty中的绑定策略就是采用 round-robin轮询的方式来挨个选择 Reactor进行绑定。

采用 round-robin的方式进行负载均衡,我们一般会用 round % reactor.length取余的方式来挨个平均的定位到对应的 Reactor上。

如果 Reactor的个数 reactor.length恰好是 2&#x7684;&#x6B21;&#x5E42;,那么就可以用位操作 &运算 round & reactor.length -1来代替 %运算 round % reactor.length,因为位运算的性能更高。具体为什么 &运算能够代替 %运算,笔者会在后面讲述时间轮的时候为大家详细证明,这里大家只需记住这个公式,我们还是聚焦本文的主线。

了解了优化原理,我们在看代码实现就很容易理解了。

利用 % 运算的方式 Math.abs(idx.getAndIncrement() % executors.length) 来进行绑定。

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicLong idx = new AtomicLong();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

利用 & 运算的方式 idx.getAndIncrement() & executors.length - 1 来进行绑定。

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

又一次被Netty对性能的极致追求所折服~~~~

4. 向Reactor线程组中所有的Reactor注册terminated回调函数

当Reactor线程组 NioEventLoopGroup中所有的 Reactor已经创建完毕, ChannelReactor的绑定策略也创建完毕后,我们就来到了创建 NioEventGroup的最后一步。

俗话说的好,有创建就有启动,有启动就有关闭,这里会创建 Reactor&#x5173;&#x95ED;的回调函数 terminationListener,在 Reactor关闭时回调。

terminationListener回调的逻辑很简单:

  • 通过 AtomicInteger terminatedChildren变量记录已经关闭的 Reactor个数,用来判断 NioEventLoopGroup中的 Reactor是否已经全部关闭。
  • 如果所有 Reactor均已关闭,设置 NioEventLoopGroup中的 terminationFuturesuccess。表示 Reactor&#x7EBF;&#x7A0B;&#x7EC4;关闭成功。
       //记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        //关闭future
        private final Promise terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

        final FutureListener terminationListener = new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    //当所有Reactor关闭后 才认为是关闭成功
                    terminationFuture.setSuccess(null);
                }
            }
        };

        //为所有Reactor添加terminationListener
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

我们在回到文章开头 Netty&#x670D;&#x52A1;&#x7AEF;&#x4EE3;&#x7801;&#x6A21;&#x677F;

public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure the server.

        //创建主从Reactor线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ...........省略............

    }
}

现在Netty的 &#x4E3B;&#x4ECE;Reactor&#x7EBF;&#x7A0B;&#x7EC4;就已经创建完毕,此时Netty服务端的骨架已经搭建完毕,骨架如下:

聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

总结

本文介绍了首先介绍了Netty对各种 IO&#x6A21;&#x578B;的支持以及如何轻松切换各种 IO&#x6A21;&#x578B;

还花了大量的篇幅介绍Netty服务端的核心引擎 &#x4E3B;&#x4ECE;Reactor&#x7EBF;&#x7A0B;&#x7EC4;的创建过程。在这个过程中,我们还提到了Netty对各种细节进行的优化,展现了Netty对性能极致的追求。

好了,Netty服务端的骨架已经搭好,剩下的事情就该绑定端口地址然后接收连接了,我们下篇文章再见~~~

Original: https://www.cnblogs.com/binlovetech/p/16440319.html
Author: bin的技术小屋
Title: 聊聊 Netty 那些事儿之 Reactor 在 Netty 中的实现(创建篇)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/685614/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球