读写Channel(READ)的创建和注册
在NioEventLoop#run中提到,当有IO事件时,会调用 processSelectedKeys
方法来处理。
当客户端连接服务端,会触发服务端的ACCEPT事件,创建负责READ事件的channel并注册到workerGroup中
跟踪 processSelectedKeys
的调用
NioEventLoop#processSelectedKeys()
-->
NioEventLoop#processSelectedKeysOptimized()
-->
NioEventLoop#processSelectedKey(SelectionKey k, AbstractNioChannel ch)
-->
// AbstractNioMessageChannel#read()
public void read() {
。。。。。。
try {
try {
do {
// 用于读取bossGroup中EventLoop的NIOServerSocketChannel接收到的请求数据,并把这些请求数据放入到readBuf
// 结束后,readBuf中存放了一个处理客户端后续请求的NioSocketChannel
// 与java nio对应的就是serverSocketChannel的accept生成SocketChannel,并封装成NioSocketChannel放入到readBuf中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 核心功能
// 依次触发NioServerSocketChannel的pipeline中所有入站Handler中的channelRead()方法的执行
// 注意:此处还是在bossGroup的线程,不是workGroup
// 所以,执行可能是LoggingHandler
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 触发管道中所有handler的channelReadComplete方法
pipeline.fireChannelReadComplete();
。。。。。。
} finally {
。。。。。。
}
}
这里主要关注两个方法:
- doReadMessages 调用Java NIO的API,获取ACCEPT产生的SocketChannel,并封装成NioSocketChannel
protected int doReadMessages(List buf) throws Exception {
// 调用服务端ServerSocketChannel的accept方法产生一个处理客户端后续请求的SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 将这个SocketChannel封装成NioSocketChannel添加到buf容器中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
。。。。。。
}
return 0;
}
- pipeline.fireChannelRead 依次触发管道中所有入站Handler中的channelRead()方法(从HeadContext开始)。 再次复习下管道中的所有Handler,看图: 忽略前面的Handler,直接来到ServerBootstrapAcceptor
// 类ServerBootstrapAcceptor
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 添加用户自定义的handler
child.pipeline().addLast(childHandler);
// 设置相关属性
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 将channel注册到workerGroup的EventLoop
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
到了childGroup.register这里,就和前面bossGroup的channel注册一样了,前面的代码长这样config().group().register,请擅用搜索。 区别在于,注册进bossGroup的是 NioServerSocketChannel
,负责ACCEPT事件。 注册进workerGroup的是 NioSocketChannel
,负责READ事件。 小结
客户端连接时,触发ACCEPT事件(在bossGroup中),生成
NioSocketChannel
并注册进workerGroup的EventLoop中。然后触发READ事件(在workerGroup中)进行读写数据。
往通道写入数据
demo中的workerGroup中的channel的管道如下图:
在netty的管道pipeline中,头尾是固定的,addLast方法,插入的handler在tail前
head的类是 HeadContext
,类型是in、out
Tail的类是 TailContext
,类型是in
有两种方式写入数据
- channelHandlerContext.write()
- channel.write()
区别在于:第一种是从管道当前位置往前找,第二种从tail往前找
比如在MyEchoHandler中使用channelHandlerContext.write(),则路径是
MyEchoHandler → HeadContext
如果使用channel.write(),路径是
TailContext → MyEchoHandler → HeadContext
源码跟踪路径:
- ctx.write()
AbstractChannelHandlerContext#write(Object msg)-->
AbstractChannelHandlerContext#write(final Object msg, final ChannelPromise promise)-->
AbstractChannelHandlerContext#write(Object msg, boolean flush, ChannelPromise promise)-->
AbstractChannelHandlerContext#invokeWrite(Object msg, ChannelPromise promise)-->
AbstractChannelHandlerContext#invokeWrite0(Object msg, ChannelPromise promise)-->
// 一个一个outboundHandler往前调用write,直到HeadContext
HeadContext#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)-->
AbstractUnsafe#write(Object msg, ChannelPromise promise)
- ctx.channel().write()
AbstractChannel#write(Object msg)-->
DefaultChannelPipeline#write(Object msg)-->
// TailContext继承自AbstractChannelHandlerContext
AbstractChannelHandlerContext#write(Object msg)-->
// 这里就和ctx.write()一样了
注意:
write只是将内容写入到channel的缓存 ChannelOutboundBuffer
中,并且会判断如果大小大于高水位,会将channel置为不可写( isWritable
判断)
想要写入到socket,需要调用flush方法
即使调用 writeAndFlush
,效果也是先执行全部outboundHandler的write,再执行flush
Original: https://www.cnblogs.com/konghuanxi/p/16381404.html
Author: 王谷雨
Title: Netty源码解读(四)-读写数据
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/583260/
转载文章受原作者版权保护。转载请注明原作者出处!