Netty源码分析之ChannelPipeline(四)—出站事件的传播

上篇文章中我们梳理了ChannelPipeline中入站事件的传播,这篇文章中我们看下出站事件的传播,也就是ChannelOutboundHandler接口的实现。

1、出站事件的传播示例

我们对上篇文章中的示例代码进行改造,在ChannelPipeline中加入ChannelOutboundHandler出站实现

public class ServerApp {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup(2);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // p.addLast(new LoggingHandler(LogLevel.INFO));
                            // 向ChannelPipeline中添加自定义channelHandler
                            p.addLast(new OutHandlerA());
                            p.addLast(new ServerHandlerA());
                            p.addLast(new ServerHandlerB());
                            p.addLast(new ServerHandlerC());
                            p.addLast(new OutHandlerB());
                            p.addLast(new OutHandlerC());

                        }
                    });
            bootstrap.bind(8050).sync();

        } catch (Exception e) {
            // TODO: handle exception
        }

    }

}

public class OutHandlerA extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.err.println(this.getClass().getName()+msg);
        ctx.writeAndFlush((ByteBuf)msg);
    }
}

public class OutHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) {
        System.out.println(this.getClass().getName()+msg);
        ctx.write((ByteBuf)msg);
    }
}

public class OutHandlerC extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx,Object msg,ChannelPromise promise) {
        System.out.println(this.getClass().getName()+"--"+msg);
        ctx.write((ByteBuf)msg);
    }
}

然后我们在ServerHandlerA的channelRead方法中执行ctx的write方法,模拟消息出站事件的发生。

public class ServerHandlerA  extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object object) {
        ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.buffer();
        byteBuf.writeByte(1);
        byteBuf.writeByte(2);
        ctx.channel().write(byteBuf);
        //ctx.write(byteBuf);
    }
}

上面channelRead方法中write方法的调用有两种方式 ctx.channel().write 与 ctx.write,这两种方式有何区别呢,我们首先看下这两种方式的运行结果

ctx.channel().write

io.netty.example.echo.my.OutHandlerC--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)
io.netty.example.echo.my.OutHandlerB--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)
io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)

ctx.write

io.netty.example.echo.my.OutHandlerA--PooledUnsafeDirectByteBuf(ridx: 0, widx: 2, cap: 256)

可以看到当调用ctx.channel().write时,消息在管道中传播的顺序是从尾部一直传递到最上层的OutboundHandler;而 ctx.write会从所在的 handler 向前找 OutboundHandler。

那么这两种方式区别是否就如结果所示呢,下面我们就开始对这两种方法的内部实现进行分析

2、出站事件传播的分析

ctx.channel().write与 ctx.write 分别用的是AbstractChannel与AbstractChannelHandlerContext的write方法

AbstractChannel 的 write方法

@Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }

AbstractChannelHandlerContext 的 write方法

@Override
    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }

上面代码中AbstractChannel的 wirte方法最终调用的是pipeline的write方法,我们进入pipeline内部查看,可以看到pipeline的write方法默认从尾部AbstractChannelHandlerContext节点开始调用。

@Override
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

继续向下跟踪最终它们调用的都是AbstractChannelHandlerContext 的 write方法,下面我们看下方法内部的具体实现。

private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {//检查ChannelPromise是否有效
                ReferenceCountUtil.release(msg);
                // cancelled
                return;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }

        //寻找上一个AbstractChannelHandlerContext节点
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {//与当前线程是否一致
            if (flush) {//确定是否要把数据冲刷到远程节点
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else { //如果不一致的封装成writeTask任务线程
            final AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            //把该线程任务交给对应的EventExecutor执行
            if (!safeExecute(executor, task, promise, m)) {
                // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.

                //
                // See https://github.com/netty/netty/issues/8343.

                task.cancel();
            }
        }
    }

主要关注下findContextOutbound(),这个方法的作用就是获取当前AbstractChannelHandlerContext节点的上一个节点prev

private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;//获取当前节点的上一个节点
        } while (!ctx.outbound);//判断是不是出站节点
        return ctx;
    }

最终通过next.invokeWrite(m, promise)回调方法,调用下一个节点中封装的ChannelOutboundHandler的write方法,从而实现write方法事件的传递

private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {//判断当前ChannelOutboundHandler是否已经被添加到pipeline中(handlerAdded事件触发)
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

    private boolean invokeHandler() {
        // Store in local variable to reduce volatile reads.

        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }

到这里整个出站事件的传播流程已经基本清晰了,wirte方法本身就是一个寻找并回调下一个节点中wirte方法的过程。

3、write与writeAndFlush

在上面代码中可以看到这两个方法主要在于是否会在执行write方法后,是否会执行flush方法。

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) { //是否调用回调方法
            //调用write与flush回调方法,最终调用自定义hander的对应实现
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }

这里需要注意的是invokeFlush0()在invokeWrite0后执行,也就是必须等到消息出站事件传递完毕后,才会调用flush把数据冲刷到远程节点。简单理解就是你无论是在OutHandlerA、OutHandlerB还是OutHandlerC中调用writeAndFlush,最后都是要在write事件传递完毕才会flush数据的。

同时我们需要注意到当write与flush事件从OutHandlerA再往上传递时,OutHandlerA的的上一个节点就是Pipeline的头节点HeadContext,我们看下HeadContext的write与flush方法实现;

@Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            unsafe.write(msg, promise);
        }

        @Override
        public void flush(ChannelHandlerContext ctx) {
            unsafe.flush();
        }

到这里我们可以看出,消息的真正入队与发送最终是通过HeadContext的write与flush方法实现。

通过以上的分析我们可以看到Pipeline出站事件的传播流程,同时我们需要注意ctx.write与ctx.channel().write的区别以及消息的发送最终是通头部节点调用unsafe的write与flush方法实现的,其中如有不足与不正确的地方还望指出与海涵。

关注微信公众号,查看更多技术文章。

Netty源码分析之ChannelPipeline(四)—出站事件的传播

Original: https://www.cnblogs.com/dafanjoy/p/12433929.html
Author: DaFanJoy
Title: Netty源码分析之ChannelPipeline(四)—出站事件的传播

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/593449/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Netty源码分析之自定义编解码器

    在日常的网络开发当中,协议解析都是必须的工作内容,Netty中虽然内置了基于长度、分隔符的编解码器,但在大部分场景中我们使用的都是自定义协议,所以Netty提供了 MessageT…

    Java 2023年6月9日
    070
  • RxJava简析

    rxjava文档地址https://mcxiaoke.gitbooks.io/rxdocs/content/ 这个是中文版的 android studio 添加依赖 impleme…

    Java 2023年5月29日
    089
  • 花生壳内网穿透

    Original: https://www.cnblogs.com/weiapro/p/7688796.htmlAuthor: 天涯越野Title: 花生壳内网穿透

    Java 2023年6月13日
    059
  • 这个开源组织里的项目都是精品(第二弹)

    前言 之前我写过一篇文章——《这个开源组织里的项目都是精品》,里面列举了Dromara开源组织的4个java项目,每一个都轻量且实用,受到了很多小伙伴的喜爱。Dromara这个开源…

    Java 2023年6月8日
    080
  • Mybatis的联合查询

    数据库表结构departmentemployee 要求一 现在的要求是输入 id 把 employee 表的对应员工数据查询出来,并且查询出该员工的所处部门信息 JavaBean …

    Java 2023年6月5日
    062
  • mybatis-plus报错解决Invalid bound statement (not found)错误

    mybatis-plus报错解决Invalid bound statement (not found)错误 org.apache.ibatis.binding.BindingExc…

    Java 2023年5月30日
    095
  • mybatis学习笔记(一)for 概念

    mybaits相关概念 1.1 mybatis简介 mybatis是是一款优秀的基于ORM的半自动轻量级持久层框架,它支持定制化SQL、存储过程以及高级映射。(与另一基于ORM的持…

    Java 2023年6月5日
    084
  • (转)CUDA软件架构—网格(Grid)、线程块(Block)和线程(Thread)的组织关系以及线程索引的计算公式

    网格(Grid)、线程块(Block)和线程(Thread)的最大数量 CUDA中可以创建的网格数量跟GPU的计算能力有关,可创建的Grid、Block和Thread的最大数量参看…

    Java 2023年5月29日
    063
  • 索引底层实现原理

    要了解数据库索引的底层原理,我们就得先了解一种叫树的数据结构,而树中很经典的一种数据结构就是二叉树!所以下面我们就从二叉树到平衡二叉树,再到B-树,最后到B+树来一步一步了解数据库…

    Java 2023年6月8日
    079
  • JVM虚拟机类加载机制(一)

    类从被加载到虚拟机内存中开始,到卸载出内存截止,整个生命周期包括:加载、验证、准备、解析,初始化、使用、卸载七个阶段。其中验证、准备、解析三个部分统称为连接。 类初始化情况: 遇到…

    Java 2023年6月9日
    057
  • “假学习”与“真学习”

    什么叫做”假学习”? **一、看书 **买一堆书,有空看看。看书,这是典型的假学习。看书看不懂还在看,就是假学习,欺骗自己,安慰自己正在学习而已。专业书都写…

    Java 2023年6月9日
    084
  • MySQL-指定排序

    where twui.id = #{operatorId} order by FIELD(cardStatus, 2, 1, 3), tpa.create_time Origina…

    Java 2023年6月9日
    067
  • 稀疏数组详细讲解

    稀疏数组的应用场景 稀疏sparsearray数组 稀疏:从字面意思理解就是为了压缩重复冗余的数据 基本介绍: 当一个数组中大部分元素为0,或者为同一个值的数组时,可以使用稀疏数组…

    Java 2023年6月6日
    071
  • java中的集合

    数组的缺点引出集合的好处 数组有很多不足的地方 长度从一开始就必须指定的大小 元素的类型必须一致 使用数组的增删改查,代码比价多比较麻烦 集合的好处 可以动态的保存任意对象 提供了…

    Java 2023年6月6日
    067
  • 小学生四则运算–软件工程

    1 package sizeyunsuan; 2 3 import java.util.List; 4 import java.math.BigDecimal; 5 import …

    Java 2023年6月6日
    0103
  • 【工作记录】JDBC连接MySQL,跨时区调查CST转Asia/Shangha

    根据业务要求,不同的国家设置jvm参数,来确定当前时区。 // -Duser.timezone=Asia/Kolkata 印度&…

    Java 2023年6月6日
    089
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球