Netty源码分析之ChannelPipeline(三)—入站事件的传播

之前的文章中我们说过ChannelPipeline作为Netty中的数据管道,负责传递Channel中消息的事件传播,事件的传播分为入站和出站两个方向,分别通知ChannelInboundHandler与ChannelOutboundHandler来触发对应事件。这篇文章我们先对Netty中入站事件的传播,也就是ChannelInboundHandler进行下分析:

1、入站事件传播示例

我们通过一个简单的例子看下ChannelPipeline中入站事件channelRead的传播

public class ServerApp {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup(2);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, work).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            // p.addLast(new LoggingHandler(LogLevel.INFO));
                            // 向ChannelPipeline中添加自定义channelHandler
                            p.addLast(new ServerHandlerA());
                            p.addLast(new ServerHandlerB());
                            p.addLast(new ServerHandlerC());
                        }
                    });
            bootstrap.bind(8050).sync();

        } catch (Exception e) {
            // TODO: handle exception
        }

    }

}

public class ServerHandlerA  extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object object) {
        System.out.println(this.getClass().getName() + "--"+object.toString());
        ctx.fireChannelRead(object);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.channel().pipeline().fireChannelRead("hello word");
    }

}

public class ServerHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object object) {
        System.out.println(this.getClass().getName() + "--"+object.toString());
        ctx.fireChannelRead(object);
    }
}

public class ServerHandlerC extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object object) {
        System.out.println(this.getClass().getName() + "--"+object.toString());
        ctx.fireChannelRead(object);
    }
}

客户端连接服务后可看到输出结果

io.netty.example.echo.my.ServerHandlerA--hello word
io.netty.example.echo.my.ServerHandlerB--hello word
io.netty.example.echo.my.ServerHandlerC--hello word

通过输出结果我们可以看到,消息会根据向ChannelPipeline中添加自定义channelHandler的顺序传递,并通过实现channelRead接口处理消息接收事件的。在例子中channelRead事件的传递是通过ctx.fireChannelRead(object)方法实现,接下来我们就从这里入手看下ChannelPipeline事件传递的具体实现。

2、channelRead事件的传播

首先这里需要注意的是我们例子中第一个节点的传递与实际应用中入站数据的传递是通过ChannelPipeline的fireChannelRead方法实现的,因为在实际的应用中,入站事件的传递是由NioUnsafe的read接口实现发起的,需要保证消息是从head结点开始传递的,例子中是为了模拟这一过程。

ctx.channel().pipeline().fireChannelRead("hello word");
@Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);//默认传入head节点
        return this;
    }

进入invokeChannelRead方法内部看下具体实现;

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        //ObjectUtil.checkNotNull 判断传入的消息数据是否为空
        //next.pipeline.touch 对消息类型进行判断
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();//获取ChannelHandlerContext对应的线程
        if (executor.inEventLoop()) {//是否为当前线程
            next.invokeChannelRead(m);//调用ChannelHandlerContext中invokeChannelRead的回调方法
        } else {
            executor.execute(new Runnable() {//如果线程不是当前线程
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

其中invokeChannelRead方法会获取该ChannelHandlerContext所封装的handler实现;

private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                //获取封装的ChannelInboundHandler实现,并调用我们实现的channelRead方法,
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

前面我们知道首先传入的ChannelPipeline中ChannelHandlerContext链表的head头部节点HeadContext,看下其channelRead的方法实现;

@Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.fireChannelRead(msg);
        }

调用当前ChannelHandlerContext的fireChannelRead方法,进入ctx.fireChannelRead(object)方法内部看下具体的源码实现;

@Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        //开始消息传递,findContextInbound方法按顺序获取当前ChannelHandlerContext的next节点
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

findContextInbound方法获取的是HeadContext的下一个节点,也就是我们例子中向ChannelPipeline中添加自定义ServerHandlerA;

到这里其实就可以看出Pipeline中channelRead事件的传播主要就是通过ctx.fireChannelRead(msg),获取当前ChannelHandlerContext下一个节点中封装的ChannelInboundHandler来实现的,最后一步一步传递到Tail尾部节点。

3、资源的释放及SimpleChannelInboundHandler

Netty中对象的生命周期由它们的引用计数管理的,为保证入站对象资源被释放,我们需要通过ReferenceCountUtil.release方法减少引用计数,确保对象的的最终计数器最后被置为0,从而被回收释放。我们看下Netty在入站事件中默认是如何减少引用计数的。

第一种方法,如果我们跟上面示例一样,在实现的每一个ChannelInboundHandler中都调用了ctx.fireChannelRead(msg),最后消息会被传递到Tail尾节点,我们看下Tail节点中的channelRead方法

@Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) {
        onUnhandledInboundMessage(msg);
   }

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

Tail节点的channelRead方法最终会调用ReferenceCountUtil.release方法来减少引用计数的,所以如果你在处理入站消息的过程中没有增加引用并且通过ctx.fireChannelRead(msg)方法把消息传到了Tail节点,你就不需要自己显式调用ReferenceCountUtil.release方法了。

其次如果继承的是SimpleChannelInboundHandler,可以看到SimpleChannelInboundHandler的channelRead方法实现中也已经调用了ReferenceCountUtil.release方法来减少引用计数;

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

所以关于入站消息的资源释放方式总结如下:

  • 1、继承ChannelInboundHandlerAdapter ,在channelRead的方法实现中调用ctx.fireChannelRead(object)方法,把消息一直向下传递,直到传递到Tail尾部节点,由Tail节点执行 ReferenceCountUtil.release来减少计数器,保证资源释放;
  • 2、继承SimpleChannelInboundHandler,SimpleChannelInboundHandler本身的ChannelRead方法中会执行 ReferenceCountUtil.release来减少引用;
  • 3、如果以上两点都没有做到,那就需要手动调用ReferenceCountUtil.release来减少引用来释放资源;

到这里我们基本了解了ChannelPipeline中入站事件是如何传播与相应的的,以及Netty中入站消息的资源释放机制。其中如有不足与不正确的地方还望指出与海涵。

关注微信公众号,查看更多技术文章。

Netty源码分析之ChannelPipeline(三)—入站事件的传播

Original: https://www.cnblogs.com/dafanjoy/p/12274701.html
Author: DaFanJoy
Title: Netty源码分析之ChannelPipeline(三)—入站事件的传播

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713989/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • collection v1.3.1升级全记录

    collection v1.3.1升级全记录 项目地址: https://github.com/jianfengye/collection 欢迎star。 collection 手…

    技术杂谈 2023年6月1日
    0109
  • golang 笔记

    for循环,一个key在一个map中,则一直迭代 go总是使用值传递,但是有些数据类型是引用类型,比如map, pointer, channel, slice是部分引用类型 在给函…

    技术杂谈 2023年7月11日
    093
  • 心存好奇,心怀敬意

    走出舒适圈难,是因为走出去,除了要吃学习的苦,还要忍受心里的苦。 看到郭德纲的一段话: 从出生就挨打,一天八个嘴巴。这到 25 岁,铁罗汉活金刚一样,什么都不在乎。吃亏要趁早,一帆…

    技术杂谈 2023年7月11日
    0116
  • [转]到底什么是“信创”

    本文转自:https://m.thepaper.cn/baijiahao_13661473 以下文章来源于鲜枣课堂 ,作者小枣君 鲜枣课堂 学通信,学5G,就上鲜枣课堂! 我已加入…

    技术杂谈 2023年5月30日
    0107
  • Linux命令行如何实现sftp限速传输

    上周遇到一个需要在Linux命令行模式下进行sftp限速传输的场景(公司带宽占用限制) 百度后无果,问老江湖F哥也没办法(百度出的结果都是用lftp指令,内网环境无法安装) 实在不…

    技术杂谈 2023年7月11日
    0124
  • 基本运算符

    运算符 JAVA语言支持入下运算符。 %:余数 !=:不等于 &&:and ||:or !:not 二元运算符 整数默认为int类型,按类型优先级自动转换,下图说明…

    技术杂谈 2023年6月21日
    0118
  • HTTP长连接和短链接代理与网关

    长连接和短链接 代理与网关 HTTP/)服务器端网关:网关与客户端使用HTTP协议通信,使用其他协议与服务端通信 (/HTTP)客户端网关:网关与客户端使用其他协议通信,使用HTT…

    技术杂谈 2023年7月24日
    099
  • JavaCV的摄像头实战之四:抓图

    欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos 本篇概览 本文是《JavaCV的摄…

    技术杂谈 2023年7月11日
    0108
  • Redis变慢?深入浅出Redis性能诊断系列文章(一)

    (本文首发于”数据库架构师”公号,订阅”数据库架构师”公号,一起学习数据库技术) Redis 作为一款业内使用率最高的内存数据库,其…

    技术杂谈 2023年7月25日
    091
  • 高企必备项目—SSM框架项目CRM客户管理系统

    首先我们来了解一下什么是CRM客户管理系统? CRM系统包括一些核心的客户关系管理业务功能,如:潜在客户、客户管理、拜访管理、商机管理、订单管理等模块,满足企业客户关系信息化的要求…

    技术杂谈 2023年7月25日
    0103
  • 关闭显示器API及命令

    window下命令powercfg /change “Home/Office Desk” /moniter-timeout-ac 1C#中实现[DllImp…

    技术杂谈 2023年6月1日
    089
  • MySql主要性能指标说明

    在项目当中数据库一般都会成为主要的性能与负载瓶颈,那么针对数据库各项性能指标的监控与对应的优化是开发与运维人员需要面对的主要工作,而且这部分的工作会贯穿项目从开发到运行的整个周期里…

    技术杂谈 2023年7月25日
    077
  • 不需要服务器,教你仅用30行代码搞定实时健康码识别

    此次新冠疫情,波及范围之广,持续时间之久已经超出了我们的预料。自打疫情发生以来,几乎所有人的生活都受到了影响,还好现在已经是数字化的时代,为了防控疫情,健康码成了我们的通行证,已经…

    技术杂谈 2023年5月31日
    0128
  • java Script

    JavaScript JavaScript(简称”JS”)是一种具有函数优先的轻量级,解释型或即时编译型的高级编程语言,弱类型,脚本语言 三大部分 核心(E…

    技术杂谈 2023年6月21日
    0120
  • 碰到shiro反序列化漏洞,大家都是怎么解决的

    项目是借用一个开源项目,然后被发现有shiro反序列化漏洞,如下图: 有了以上漏洞,就可以在服务器执行任意指令。如下图: 解决过程: 1、升级shiro到最新版本1.9.1,却发现…

    技术杂谈 2023年7月11日
    0129
  • 人生苦短,我用python之二

    今天的主题爬取网页的通用代码框架 我们首先打开IDLE选择File->new window命令(或者可以直接按键Ctrl+N键,在很多地方这个按键是新建文件的意思) 我们填入…

    技术杂谈 2023年7月25日
    0102
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球