Netty源码分析之自定义编解码器

在日常的网络开发当中,协议解析都是必须的工作内容,Netty中虽然内置了基于长度、分隔符的编解码器,但在大部分场景中我们使用的都是自定义协议,所以Netty提供了 MessageToByteEncoder 与 ByteToMessageDecoder 两个抽象类,通过继承重写其中的encode与decode方法实现私有协议的编解码。这篇文章我们就对Netty中的自定义编解码器进行实践与分析。

一、编解码器的使用

下面是MessageToByteEncoder与ByteToMessageDecoder使用的简单示例,其中不涉及具体的协议编解码。

创建一个sever端服务

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final CodecHandler codecHandler = new CodecHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            //添加编解码handler
                            p.addLast(new MessagePacketDecoder(),new MessagePacketEncoder());
                            //添加自定义handler
                            p.addLast(codecHandler);
                        }
                    });

            // Start the server.

            ChannelFuture f = b.bind(PORT).sync();

继承MessageToByteEncoder并重写encode方法,实现编码功能

public class MessagePacketEncoder extends MessageToByteEncoder<byte[]> {

    @Override
    protected void encode(ChannelHandlerContext ctx, byte[] bytes, ByteBuf out) throws Exception {
        //进行具体的编码处理 这里对字节数组进行打印
        System.out.println("编码器收到数据:"+BytesUtils.toHexString(bytes));
        //写入并传送数据
        out.writeBytes(bytes);
    }
}

继承ByteToMessageDecoder 并重写decode方法,实现解码功能

public class MessagePacketDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out){
        try {
            if (buffer.readableBytes() > 0) {
                // 待处理的消息包
                byte[] bytesReady = new byte[buffer.readableBytes()];
                buffer.readBytes(bytesReady);
                //进行具体的解码处理
                System.out.println("解码器收到数据:"+ByteUtils.toHexString(bytesReady));
                //这里不做过多处理直接把收到的消息放入链表中,并向后传递
                out.add(bytesReady);

            }
        }catch(Exception ex) {

        }

    }

}

实现自定义的消息处理handler,到这里其实你拿到的已经是编解码后的数据

public class CodecHandler extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("CodecHandler收到数据:"+ByteUtils.toHexString((byte[])msg));
        byte[] sendBytes = new byte[] {0x7E,0x01,0x02,0x7e};
        ctx.write(sendBytes);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.

        cause.printStackTrace();
        ctx.close();
    }
}

运行一个客户端模拟发送字节0x01,0x02,看一下输出的执行结果

解码器收到数据:0102
CodecHandler收到数据:0102
编码器收到数据:7E01027E

根据输出的结果可以看到消息的入站与出站会按照pipeline中自定义的顺序传递,同时通过重写encode与decode方法实现我们需要的具体协议编解码操作。

二、源码分析

通过上面的例子可以看到MessageToByteEncoder 与ByteToMessageDecoder分别继承了ChannelInboundHandlerAdapter与ChannelOutboundHandlerAdapter,所以它们也是channelHandler的具体实现,并在创建sever时被添加到pipeline中, 同时为了方便我们使用,netty在这两个抽象类中内置与封装了一些其操作;消息的出站和入站会分别触发write与channelRead事件方法,所以上面例子中我们重写的encode与decode方法,也都是在父类的write与channelRead方法中被调用,下面我们就别从这两个方法入手,对整个编解码的流程进行梳理与分析。

1、MessageToByteEncoder

编码需要操作的是出站数据,所以在MessageToByteEncoder的write方法中会调用我们重写的encode具体实现, 把我们内部定义的消息实体编码为最终要发送的字节流数据发送出去。

@Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) {//判断传入的msg与你定义的类型是否一致
                @SuppressWarnings("unchecked")
                I cast = (I) msg;//转为你定义的消息类型
                buf = allocateBuffer(ctx, cast, preferDirect);//包装成一个ByteBuf
                try {
                    encode(ctx, cast, buf);//传入声明的ByteBuf,执行具体编码操作
                } finally {
                    /**
                     * 如果你定义的类型就是ByteBuf 这里可以帮助你释放资源,不需要在自己释放
                     * 如果你定义的消息类型中包含ByteBuf,这里是没有作用,需要你自己主动释放
                     */
                    ReferenceCountUtil.release(cast);//释放你传入的资源
                }

                //发送buf
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                //类型不一致的话,就直接发送不再执行encode方法,所以这里要注意如果你传递的消息与泛型类型不一致,其实是不会执行的
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();//释放资源
            }
        }
    }

MessageToByteEncoder的write方法要实现的功能还是比较简单的,就是把你传入的数据类型进行转换和发送;这里有两点需要注意:

  • 一般情况下,需要通过重写encode方法把定义的泛型类型转换为ByteBuf类型, write方法内部自动帮你执行传递或发送操作;
  • 代码中虽然有通过ReferenceCountUtil.release(cast)释放你定义的类型资源,但如果定义的消息类中包含ByteBuf对象,仍需要主动释放该对象资源;

2、ByteToMessageDecoder

从命名上就可以看出ByteToMessageDecoder解码器的作用是把字节流数据编码转换为我们需要的数据格式

作为入站事件,解码操作的入口自然是channelRead方法

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {//如果消息是bytebuff
            CodecOutputList out = CodecOutputList.newInstance();//实例化一个链表
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);//开始解码
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {//不为空且没有可读数据,释放资源
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.

                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);//向下传递消息
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

callDecode方法内部通过while循环的方式对ByteBuf数据进行解码,直到其中没有可读数据

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) {
        try {
            while (in.isReadable()) {//判断ByteBuf是还有可读数据
                int outSize = out.size();//获取记录链表大小

                if (outSize > 0) {//判断链表中是否已经有数据
                    fireChannelRead(ctx, out, outSize);//如果有数据继续向下传递
                    out.clear();//清空链表

                    // Check if this handler was removed before continuing with decoding.

                    // If it was removed, it is not safe to continue to operate on the buffer.

                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);//开始调用decode方法

                // Check if this handler was removed before continuing the loop.

                // If it was removed, it is not safe to continue to operate on the buffer.

                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                //这里如果链表为空且bytebuf没有可读数据,就跳出循环
                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {//有可读数据继续读取
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {//beytebuf没有读取,但却进行了解码
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {//是否设置了每条入站数据只解码一次,默认false
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

decodeRemovalReentryProtection方法内部会调用我们重写的decode解码实现

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;//标记状态
        try {
            decode(ctx, in, out);//调用我们重写的decode解码实现
        } finally {
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {//这里判断标记,防止handlerRemoved事件与解码操作冲突
                handlerRemoved(ctx);
            }
        }
    }

channelRead方法中接受到数据经过一系列逻辑处理,最终会调用我们重写的decode方法实现具体的解码功能;在decode方法中我们只需要ByteBuf类型的数据解析为我们需要的数据格式直接放入 List

三、总结

通过上面的讲解,我们可以对Netty中内置自定义编解码器MessageToByteEncoder与ByteToMessageDecoder有一定的了解,其实它们本质上是Netty封装的一组专门用于自定义编解码的channelHandler实现类。在实际开发当中基于这两个抽象类的实现非常具有实用性,所以在这里稍作分析, 其中如有不足与不正确的地方还望指出与海涵。

关注微信公众号,查看更多技术文章。

Netty源码分析之自定义编解码器

转载说明:未经授权不得转载,授权后务必注明来源(注明:来源于公众号:架构空间, 作者:大凡)

Original: https://www.cnblogs.com/dafanjoy/p/13049158.html
Author: DaFanJoy
Title: Netty源码分析之自定义编解码器

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/593443/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 文件输入/输出流

    文件输入/输出流程序 运行期间,大部分数据都被存储在 内存中,当程序结束或被关闭时,存储在内存中的数据将会 消失。如果要 永久保存数据,那么最好的办法就是把数据保存到 磁盘的文件中…

    Java 2023年6月9日
    076
  • input框设置禁用状态

    input设置为不可编辑的状态(三种方法,可自行选择) 1. disabled 属性规定应该禁用 input 元素,被禁用的 input 元素,不可编辑,不可复制,不可选择,不能接…

    Java 2023年6月5日
    078
  • 4.MySQL动态拼接

    举例如下: posted @2022-09-06 23:07 NIANER2011 阅读(7 ) 评论() 编辑 Original: https://www.cnblogs.com…

    Java 2023年6月13日
    084
  • 实验设计

    统计学是什么? 统计学是对令人困惑费解的问题作出 数字设想的艺术 一、对照实验 该部分的第一个例子由脊髓灰质炎的疫苗引入了 随机对照双盲实验。 其所总结出的实验设计的原则: 减小混…

    Java 2023年6月7日
    080
  • Kafka 基础概念及架构

    一、Kafka 介绍 Kafka是⼀个分布式、分区的、多副本的、多⽣产者、多订阅者,基于zookeeper协调的分布式⽇志系统(也可以当做MQ系统),常⻅可以⽤于web/nginx…

    Java 2023年6月5日
    094
  • Filter 过滤器

    什么是Filter过滤器? 1、Filter 过滤器它是 JavaWeb 的三大组件之一。三大组件分别是:Servlet 程序、Listener 监听器、Filter 过滤器2、F…

    Java 2023年6月15日
    092
  • RabbMQ

    RabbitMQ基本概念 Broker: 简单来说就是消息队列服务器实体 Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列 Queue: 消息队列载体,每个消息…

    Java 2023年6月13日
    050
  • 域名+端口号 访问minio服务问题

    业务上需要用到分布式文件服务,选择了minio作为文件服务的组件,搭建好服务后使用IP+端口号(http://xx.xx.xx.xx:9001)的形式访问在所有环境下都没有问题。上…

    Java 2023年6月13日
    081
  • 课堂笔记

    初始化Linux 修改静态IP cd /etc/sysconfig/network-scripts/ vim ifcfg-ens33 文件内容: TYPE="Ethern…

    Java 2023年6月13日
    065
  • Nginx location匹配规则

    url匹配规则 = : 表示精确匹配后面的url ~ : 表示正则匹配,但是区分大小写 ~* : 正则匹配,不区分大小写 ^~ : 表示普通字符匹配,如果该选项匹配,只匹配该选项,…

    Java 2023年5月30日
    092
  • FastDFS 原理、安装、使用

    介绍 技术论坛: http://bbs.chinaunix.net/forum-240-1.html FAQ:http://bbs.chinaunix.net/thread-192…

    Java 2023年6月7日
    081
  • 基于数据库的自动化生成工具,自动生成JavaBean、自动生成数据库文档等(v4.1.2版)

    v4.1.2版更新震撼发布,功能更加强大,速度过来围观,此次版本更新如下: 1、随着程序的功能越来越强大,原来的定位和设计已经无法满足更高的要求,所以决定对本程序更名,更名为Tab…

    Java 2023年6月9日
    082
  • kubernetes V1.16 Ingress-nginx部署

    在Kubernetes中,服务和Pod的IP地址仅可以在集群网络内部使用,对于集群外的应用是不可见的。为了使外部的应用能够访问集群内的服务,在Kubernetes中可以通过Node…

    Java 2023年5月30日
    0105
  • 如何控制多线程执行顺序

    package com.thread; import java.util.concurrent.ExecutorService; import java.util.concurre…

    Java 2023年5月30日
    0109
  • Springboot学习

    具体内容: 包含核心基础、Web原理、单元测试、数据访问、指标监控等章节 SpringBoot 官方文档 https://www.cnblogs.com/youcoding/p/1…

    Java 2023年5月30日
    0107
  • @Inherited 原注解功能介绍

    @Inherited 底层 package java.lang.annotation; /** * Indicates that an annotation type is aut…

    Java 2023年6月5日
    0154
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球