Netty源码解读(四)-读写数据

读写Channel(READ)的创建和注册

在NioEventLoop#run中提到,当有IO事件时,会调用 processSelectedKeys方法来处理。

当客户端连接服务端,会触发服务端的ACCEPT事件,创建负责READ事件的channel并注册到workerGroup中

跟踪 processSelectedKeys的调用

NioEventLoop#processSelectedKeys()
-->
NioEventLoop#processSelectedKeysOptimized()
-->
NioEventLoop#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
-->
// AbstractNioMessageChannel#read()
public void read() {
    。。。。。。
    try {
        try {
            do {
                // 用于读取bossGroup中EventLoop的NIOServerSocketChannel接收到的请求数据,并把这些请求数据放入到readBuf
                // 结束后,readBuf中存放了一个处理客户端后续请求的NioSocketChannel
                // 与java nio对应的就是serverSocketChannel的accept生成SocketChannel,并封装成NioSocketChannel放入到readBuf中
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            // 核心功能
            // 依次触发NioServerSocketChannel的pipeline中所有入站Handler中的channelRead()方法的执行
            // 注意:此处还是在bossGroup的线程,不是workGroup
            // 所以,执行可能是LoggingHandler
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        // 触发管道中所有handler的channelReadComplete方法
        pipeline.fireChannelReadComplete();

        。。。。。。
    } finally {
        。。。。。。
    }
}

这里主要关注两个方法:

  1. doReadMessages 调用Java NIO的API,获取ACCEPT产生的SocketChannel,并封装成NioSocketChannel
protected int doReadMessages(List buf) throws Exception {
    // 调用服务端ServerSocketChannel的accept方法产生一个处理客户端后续请求的SocketChannel
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            // 将这个SocketChannel封装成NioSocketChannel添加到buf容器中
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        。。。。。。
    }
    return 0;
}
  1. pipeline.fireChannelRead 依次触发管道中所有入站Handler中的channelRead()方法(从HeadContext开始)。 再次复习下管道中的所有Handler,看图: Netty源码解读(四)-读写数据 忽略前面的Handler,直接来到ServerBootstrapAcceptor
// 类ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    // 添加用户自定义的handler
    child.pipeline().addLast(childHandler);

    // 设置相关属性
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        // 将channel注册到workerGroup的EventLoop
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

到了childGroup.register这里,就和前面bossGroup的channel注册一样了,前面的代码长这样config().group().register,请擅用搜索。 区别在于,注册进bossGroup的是 NioServerSocketChannel,负责ACCEPT事件。 注册进workerGroup的是 NioSocketChannel,负责READ事件。 小结

客户端连接时,触发ACCEPT事件(在bossGroup中),生成 NioSocketChannel并注册进workerGroup的EventLoop中。然后触发READ事件(在workerGroup中)进行读写数据。

往通道写入数据

demo中的workerGroup中的channel的管道如下图:

Netty源码解读(四)-读写数据

在netty的管道pipeline中,头尾是固定的,addLast方法,插入的handler在tail前

head的类是 HeadContext,类型是in、out

Tail的类是 TailContext,类型是in

有两种方式写入数据

  • channelHandlerContext.write()
  • channel.write()

区别在于:第一种是从管道当前位置往前找,第二种从tail往前找

比如在MyEchoHandler中使用channelHandlerContext.write(),则路径是

MyEchoHandler → HeadContext

如果使用channel.write(),路径是

TailContext → MyEchoHandler → HeadContext

源码跟踪路径:

  1. ctx.write()
AbstractChannelHandlerContext#write(Object msg)-->
AbstractChannelHandlerContext#write(final Object msg, final ChannelPromise promise)-->
AbstractChannelHandlerContext#write(Object msg, boolean flush, ChannelPromise promise)-->
AbstractChannelHandlerContext#invokeWrite(Object msg, ChannelPromise promise)-->
AbstractChannelHandlerContext#invokeWrite0(Object msg, ChannelPromise promise)-->
// 一个一个outboundHandler往前调用write,直到HeadContext
HeadContext#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)-->
AbstractUnsafe#write(Object msg, ChannelPromise promise)
  1. ctx.channel().write()
AbstractChannel#write(Object msg)-->
DefaultChannelPipeline#write(Object msg)-->
// TailContext继承自AbstractChannelHandlerContext
AbstractChannelHandlerContext#write(Object msg)-->
// 这里就和ctx.write()一样了

注意:

write只是将内容写入到channel的缓存 ChannelOutboundBuffer中,并且会判断如果大小大于高水位,会将channel置为不可写( isWritable判断)

想要写入到socket,需要调用flush方法

即使调用 writeAndFlush,效果也是先执行全部outboundHandler的write,再执行flush

Original: https://www.cnblogs.com/konghuanxi/p/16381404.html
Author: 王谷雨
Title: Netty源码解读(四)-读写数据

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/583260/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • pod(二):创建包含多个容器的pod(sidecar)

    服务器版本 docker软件版本 CPU架构 CentOS Linux release 7.4.1708 (Core) Docker version 20.10.12 x86_64…

    Linux 2023年6月7日
    093
  • 【实测】Python 和 C++ 下字符串查找的速度对比

    最近在备战一场算法竞赛,语言误选了 Python ,无奈只能着手对常见场景进行语言迁移。而字符串查找的场景在算法竞赛中时有出现。本文即对此场景在 Python 和竞赛常用语言 C+…

    Linux 2023年6月13日
    0101
  • 常用的分布式锁和redis和zk两种分布式锁的对比

    常用的分布式锁 一、基于数据库实现分布式锁 1. 悲观锁 利用select … where … for update 排他锁 注意: 其他附加功能与实现一基…

    Linux 2023年5月28日
    071
  • 特殊进制

    //0xaaaaaaaa = 10101010101010101010101010101010 (偶数位为1,奇数位为0) //0x55555555 = 1010101010101…

    Linux 2023年6月13日
    090
  • Python导入cx_Oracle报错

    系统环境:RHEL5.4 python2.5(手动编译安装,系统带有2.4版本) 在使用python脚本访问数据库时,需要导入cx_Oracle模块 $>>>im…

    Linux 2023年6月14日
    080
  • Linux指令_曾佳豪

    一、基础指令 1、ls指令 含义:ls (list) 用法1 :#ls 含义:列出当前工作目录下所有文件/文件夹的名称 [En] Meaning: list the names o…

    Linux 2023年5月27日
    0114
  • 磁盘空间满,如何处理?

    作为运维人员,磁盘空间报警是最寻常遇到的情况,那么遇到空间如何处理呢?这里介绍一下处理办法及处理思路。 操作系统:centos系统 、Ubuntu 1 定位目录 收到空间报警信息,…

    Linux 2023年6月6日
    096
  • JavaScript 做的网页版扫雷小游戏

    闲来无事做了个网页版扫雷小游戏,基本实现了扫雷客户端的全部功能。但是感觉面向对象用的还不是很好,有待优化。 游戏地址:http://twgdh.com/saolei/index.h…

    Linux 2023年6月13日
    0108
  • redis集群部署

    Redis 是一个开源的 key-value 存储系统,由于出众的性能,大部分互联网企业都用来做服务器端缓存。Redis 在3.0版本前只支持单实例模式,虽然支持主从模式、哨兵模式…

    Linux 2023年6月7日
    083
  • ASP.NET Core 3.0 : 二十八. 在Docker中的部署以及docker-compose的使用

    本文简要说一下ASP.NET Core 在Docker中部署以及docker-compose的使用 (ASP.NET Core 系列目录)。 系统环境为CentOS 8 。 一、概…

    Linux 2023年6月7日
    0101
  • MySQL注入 利用系统读、写文件

    MySQL能读写系统文件的前提 不同系统、不同的数据库版本有细微差异,以下实验在Windows10和Mysql 5.7.26下操作; 1.拥有该File的读权限 、 该目录写的权限…

    Linux 2023年6月6日
    0107
  • 国庆专属头像一键生成搭建教程,附源码!

    国庆节马上就要来啦! 没有一个像样的微信头像怎么行。 为此小编为大家带来了一款国庆节国旗头像生成源码,有服务器、域名的朋友可以自行下载上传至服务器之后提供给大家使用。 没有服务器、…

    Linux 2023年6月7日
    0102
  • [20210917]ssh: error while loading shared libraries: libcrypto.so.1.0.0.txt

    [20210917]ssh: error while loading shared libraries: libcrypto.so.1.0.0.txt –//以后写一些…

    Linux 2023年5月27日
    0128
  • Django中信号的使用

    信号种类及用法 Django中提供了”信号调度”,用于在框架执行操作时解耦. 一些动作发生的时候,系统会根据信号定义的函数执行相应的操作 Django中内置…

    Linux 2023年6月14日
    087
  • Linux中安装JDK详细步骤

    一、下载Linux版本的JDK 进入官网下载对应的JDK,下载之前需要先登录 官网地址 -> https://www.oracle.com/ 登录成功后,找到对应的下载位置 …

    Linux 2023年6月7日
    098
  • 分布式系统中数据存储方案实践

    数据膨胀的时候,必然放大细节。 一、背景简介 在项目研发的过程中,对于数据存储能力的依赖无处不在,项目初期,相比系统层面的组件选型与框架设计,由于数据体量不大,在存储管理方面通常容…

    Linux 2023年6月14日
    075
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球