Netty源码解读(四)-读写数据

读写Channel(READ)的创建和注册

在NioEventLoop#run中提到,当有IO事件时,会调用 processSelectedKeys方法来处理。

当客户端连接服务端,会触发服务端的ACCEPT事件,创建负责READ事件的channel并注册到workerGroup中

跟踪 processSelectedKeys的调用

NioEventLoop#processSelectedKeys()
-->
NioEventLoop#processSelectedKeysOptimized()
-->
NioEventLoop#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
-->
// AbstractNioMessageChannel#read()
public void read() {
    。。。。。。
    try {
        try {
            do {
                // 用于读取bossGroup中EventLoop的NIOServerSocketChannel接收到的请求数据,并把这些请求数据放入到readBuf
                // 结束后,readBuf中存放了一个处理客户端后续请求的NioSocketChannel
                // 与java nio对应的就是serverSocketChannel的accept生成SocketChannel,并封装成NioSocketChannel放入到readBuf中
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 核心功能
            // 依次触发NioServerSocketChannel的pipeline中所有入站Handler中的channelRead()方法的执行
            // 注意:此处还是在bossGroup的线程,不是workGroup
            // 所以,执行可能是LoggingHandler
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        // 触发管道中所有handler的channelReadComplete方法
        pipeline.fireChannelReadComplete();

        。。。。。。
    } finally {
        。。。。。。
    }
}

这里主要关注两个方法:

  1. doReadMessages 调用Java NIO的API,获取ACCEPT产生的SocketChannel,并封装成NioSocketChannel
protected int doReadMessages(List buf) throws Exception {
    // 调用服务端ServerSocketChannel的accept方法产生一个处理客户端后续请求的SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // 将这个SocketChannel封装成NioSocketChannel添加到buf容器中
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        。。。。。。
    }
    return 0;
}
  1. pipeline.fireChannelRead 依次触发管道中所有入站Handler中的channelRead()方法(从HeadContext开始)。 再次复习下管道中的所有Handler,看图: Netty源码解读(四)-读写数据 忽略前面的Handler,直接来到ServerBootstrapAcceptor
// 类ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    // 添加用户自定义的handler
    child.pipeline().addLast(childHandler);

    // 设置相关属性
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        // 将channel注册到workerGroup的EventLoop
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

到了childGroup.register这里,就和前面bossGroup的channel注册一样了,前面的代码长这样config().group().register,请擅用搜索。 区别在于,注册进bossGroup的是 NioServerSocketChannel,负责ACCEPT事件。 注册进workerGroup的是 NioSocketChannel,负责READ事件。 小结

客户端连接时,触发ACCEPT事件(在bossGroup中),生成 NioSocketChannel并注册进workerGroup的EventLoop中。然后触发READ事件(在workerGroup中)进行读写数据。

往通道写入数据

demo中的workerGroup中的channel的管道如下图:

Netty源码解读(四)-读写数据

在netty的管道pipeline中,头尾是固定的,addLast方法,插入的handler在tail前

head的类是 HeadContext,类型是in、out

Tail的类是 TailContext,类型是in

有两种方式写入数据

  • channelHandlerContext.write()
  • channel.write()

区别在于:第一种是从管道当前位置往前找,第二种从tail往前找

比如在MyEchoHandler中使用channelHandlerContext.write(),则路径是

MyEchoHandler → HeadContext

如果使用channel.write(),路径是

TailContext → MyEchoHandler → HeadContext

源码跟踪路径:

  1. ctx.write()
AbstractChannelHandlerContext#write(Object msg)-->
AbstractChannelHandlerContext#write(final Object msg, final ChannelPromise promise)-->
AbstractChannelHandlerContext#write(Object msg, boolean flush, ChannelPromise promise)-->
AbstractChannelHandlerContext#invokeWrite(Object msg, ChannelPromise promise)-->
AbstractChannelHandlerContext#invokeWrite0(Object msg, ChannelPromise promise)-->
// 一个一个outboundHandler往前调用write,直到HeadContext
HeadContext#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)-->
AbstractUnsafe#write(Object msg, ChannelPromise promise)
  1. ctx.channel().write()
AbstractChannel#write(Object msg)-->
DefaultChannelPipeline#write(Object msg)-->
// TailContext继承自AbstractChannelHandlerContext
AbstractChannelHandlerContext#write(Object msg)-->
// 这里就和ctx.write()一样了

注意:

write只是将内容写入到channel的缓存 ChannelOutboundBuffer中,并且会判断如果大小大于高水位,会将channel置为不可写( isWritable判断)

想要写入到socket,需要调用flush方法

即使调用 writeAndFlush,效果也是先执行全部outboundHandler的write,再执行flush

Original: https://www.cnblogs.com/konghuanxi/p/16381404.html
Author: 王谷雨
Title: Netty源码解读(四)-读写数据

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/583260/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 【MQTT】使用MQTT.fx上报温度到腾讯云

    打开 腾讯云官网, 注册并登录. 2.登录之后点击右上角的控制台点进去 3.在搜索框[物联网通信],点击进入 4.点击创建新产品 5.选择普通产品,名称随便,选择密钥认证,选择js…

    Linux 2023年6月13日
    0101
  • shell命令重置版整理(经典推荐)

    文件 ls&#xA0;-rtl&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&#xA0;&…

    Linux 2023年5月28日
    098
  • Linux 查看端口被占用

    端口被占用网上很多,这种频繁操作的命令容易忘记,写这边文章的目的主要是加深操作命令的印象, Liux 查看端口占用情况可以使用 lsof 和 netstat 命令。 lsof ls…

    Linux 2023年6月6日
    091
  • Linux常用磁盘管理命令详解

    du du命令用于查看文件和目录磁盘的使用空间。 命令语法: du [&#x53C2;&#x6570;] [&#x6587;&#x4EF6;&amp…

    Linux 2023年5月27日
    0113
  • K8S-kubeadm安装

    K8S-kubeadmin快速安装K8S集群 1.IP规划 节点 IP 组件 MASTER01(4C/6G,cpu核心数大于2) 192.168.80.20 docker、kube…

    Linux 2023年6月13日
    0107
  • Seata-初体验以及避坑

    Seata是什么 这里引用官方解释 Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 …

    Linux 2023年6月7日
    0102
  • linux配置yum源的三种方法

    镜像下载、域名解析、时间同步请点击阿里云开源镜像站 linux配置yum源的三种方法: 1.配置网络yum源 2.通过上传镜像文件配置本地yum源 3.通过连接存储或本地镜像文件配…

    Linux 2023年5月27日
    090
  • pip 换源与python虚拟环境的创建

    pip换源 临时换源: pip install pypi源下载源码,在国外,比较慢—》镜像(pypi在国内备份)—>豆瓣,清华,阿里 pip install -i 源…

    Linux 2023年6月14日
    0100
  • 同一台电脑生成多份ssh私钥和公钥,映射多个GitHub账号

    当我们使用 Git 进行代码版本控制时,经常出现一台电脑需要连接多个Git 账号的情况,此时需要在一台电脑上生成多份 ssh 私钥和密钥,同时映射多个 Git 账号;这里我们需要同…

    Linux 2023年6月14日
    082
  • k8s-简介

    Kubenetes是一个针对容器应用,进行自动部署,弹性伸缩和管理的开源系统,K8s 作为缩写的结果来自计算”K”和”s”之间的八个…

    Linux 2023年6月13日
    088
  • Linux——配置主从数据库服务

    主从数据库 Linux中,数据库服务有三种:互为主主,互为主从,一主一从(主从数据库) 服务名 mariadb 协议名 mysql 进程名称 mysqld 端口号 3306 一、改…

    Linux 2023年5月27日
    0115
  • 四大高阶函数、匿名函数、递归

    四大高阶函数: map、reduce、filter、sorted 1.map函数: 根据提供的函数对指定序列做映射 使用可迭代对象(指定的序列)中的每个元素调用函数,将返回值作为新…

    Linux 2023年6月8日
    0116
  • CentOS7.6下安装Redis5.0.7

    此次安装是在CentOS7下安装Redis5.0.7 &#x8FD9;&#x91CC;&#x4E0B;&#x8F7D;&#x7684;&am…

    Linux 2023年6月14日
    0114
  • ElasticSearch7.2安装

    下载JDK压缩包,通过SFTP客户端(WinSCP)上传到CentOS7相应的目录下。然后解压JDK,解压命令为: tar -zxvf jdk-12.0.2_linux-x64_b…

    Linux 2023年6月7日
    0112
  • MySQL安装卸载、idea中Database的使用、常用的sql语句

    MySQL安装卸载 MySQL安装 在下面的 &#x8D44;&#x6E90;&#x94FE;&#x63A5;中下载MySQL软件 压缩包(绿色版)…

    Linux 2023年6月6日
    0132
  • .NET Core 3.0, 发布将于今晚开始!

    期待已久的.NET Core 3.0即将发布! .NET Core 3.0在.NET Conf上发布。大约还有9个多小时后,.NET Conf开始启动。 第1天-9月23日 9:0…

    Linux 2023年6月7日
    087
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球