Netty源码解读(三)-NioEventLoop

先看看EventLoop类图

我们在Netty第二篇文章中的代码中,看到有多次用到eventLoop.execute()方法,这个方法就是EventLoop开启线程执行任务的关键,跟踪进去看看

// 类SingleThreadEventExecutor
SingleThreadEventExecutor#execute(Runnable task)
-->
SingleThreadEventExecutor#execute0(@Schedule Runnable task)
-->
private void execute(Runnable task, boolean immediate) {
    // 判断当前线程是否为eventLoop的线程
    boolean inEventLoop = inEventLoop();
    // 将任务添加进taskQueue
    addTask(task);
    if (!inEventLoop) {
        // 开启eventLoop的线程
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.

                // In worst case we will log on termination.

            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && immediate) {
        wakeup(inEventLoop);
    }
}

这段代码,我们分3部分解读

将任务添加进队列,等待调用

对应代码

// addTask(task);
// 类SingleThreadEventExecutor
protected void addTask(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    // 添加任务
    if (!offerTask(task)) {
        // 如果添加失败,执行拒绝执行处理器
        reject(task);
    }
}

SingleThreadEventExecutor有一个类变量来存储task

private final Queue taskQueue;

开启线程是以下这段逻辑

// 判断当前线程是否eventLoop的线程
// 如果不是,则开启EventLoop的线程
if (!inEventLoop) {
    // 开启eventLoop的线程
    startThread();
    。。。。。。
}
-->
// 类SingleThreadEventExecutor
private void startThread() {
    // 一些状态判断,保证doStartThread只会被执行一次
    if (state == ST_NOT_STARTED) {
        // cas修改状态
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                // 实际开启线程的方法
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}
-->
// 类SingleThreadEventExecutor
private void doStartThread() {
    assert thread == null;
    // 这个Executor在EventLoopGroup构造时,就已经注入
    // MultithreadEventExecutorGroup
    // executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
                // 这个整个NioEventLoop的核心,里面是个死循环
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                。。。。。。
            }
        }
    });
}

在前面 EventLoopGroup的创建与初始化一节有说到,executor的实例化

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
executor.execute(new Runnable() {...});
// 不深究细节的话,上面两行代码效果类比下面两行
Thread thread = new Thread(new Runnable() {...});
thread.start();

所以,executor.execute这里,就开了一个新的线程。

而这个线程,主要处理的是 SingleThreadEventExecutor.**this**.run();,也就是NioEventLoop#run()

那这个run方法是干嘛的,看下图红框部分

因为run方法比较重要,此处不做代码省略,看注释

protected void run() {
    int selectCnt = 0;
    for (;;) {
        try {
            int strategy;
            try {
                // 如果没有任务,则返回SELECT
                // 有任务,则获取io事件的个数。此时如果strategy >= 0
                // 如果有io任务,优先io任务,然后才执行普通任务
                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                switch (strategy) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO

                case SelectStrategy.SELECT:
                    // 下一次定时任务触发截止时间
                    long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                    if (curDeadlineNanos == -1L) {
                        curDeadlineNanos = NONE; // nothing on the calendar
                    }
                    nextWakeupNanos.set(curDeadlineNanos);
                    try {
                        // 再次判断没有任务
                        if (!hasTasks()) {
                            strategy = select(curDeadlineNanos);
                        }
                    } finally {
                        // This update is just to help block unnecessary selector wakeups
                        // so use of lazySet is ok (no race condition)
                        // 阻止不必要的唤醒
                        nextWakeupNanos.lazySet(AWAKE);
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                // the selector and retry. https://github.com/netty/netty/issues/8566
                rebuildSelector0();
                selectCnt = 0;
                handleLoopException(e);
                continue;
            }

            selectCnt++;
            cancelledKeys = 0;
            needsToSelectAgain = false;
            // 控制处理io事件的事件占用比例,默认是百分之50,一半时间用来处理io事件,一半时间用来处理任务
            final int ioRatio = this.ioRatio;
            boolean ranTasks;
            // 100%表示执行完全部任务,才进入下一轮循环
            if (ioRatio == 100) {
                try {
                    if (strategy > 0) {
                        // 在对应的 Channel 上处理 IO 事件
                        processSelectedKeys();
                    }
                } finally {
                    // Ensure we always run tasks.

                    // 执行queueTask中全部的任务
                    ranTasks = runAllTasks();
                }
            } else if (strategy > 0) {
                final long ioStartTime = System.nanoTime();
                try {
                    // 在对应的 Channel 上处理 IO 事件
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.

                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            } else {
                // 0表示运行运行最小数量的任务,即63个
                ranTasks = runAllTasks(0); // This will run the minimum number of tasks
            }

            if (ranTasks || strategy > 0) {
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
                selectCnt = 0;
            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                selectCnt = 0;
            }
        } catch (CancelledKeyException e) {
            // Harmless exception - log anyway
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
        } catch (Error e) {
            throw e;
        } catch (Throwable t) {
            handleLoopException(t);
        } finally {
            // Always handle shutdown even if the loop processing threw an exception.

            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Error e) {
                throw e;
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

run里面是个死循环,关键方法有 selectprocessSelectedKeysrunAllTasks

  • select select方法很简单,就是选择合适的阻塞时间,等待IO事件触发
private int select(long deadlineNanos) throws IOException {
    if (deadlineNanos == NONE) {
        return selector.select();
    }
    // Timeout will only be 0 if deadline is within 5 microsecs
    // 如果deadlineNanos小于5纳秒,则为0,,否则取整为1毫秒
    // 这段操作是为了向上取整,转成毫秒
    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
    // 如果timeoutMillis大于0,就阻塞selector同样的时间
    // 这段是为了获取最近的延时任务
    return timeoutMillis
  • runAllTasks 此方法会取出可执行的任务,并执行。 上一节 添加任务讲的addTask,放入的任务就是在这里被执行的。
  • processSelectedKeys 此方法在有IO事件时才触发,这个下面细讲

小结一下,run的死循环中主要判断有无IO事件,有则处理,处理完IO事件,再处理队列中的任务。
如果没有IO事件,也没有待处理的任务,则阻塞等待。

对应代码

// wakeup(inEventLoop);
// 类NioEventLoop
protected void wakeup(boolean inEventLoop) {
    // 不是当前EventLoop在执行的时候,才需要唤醒
    // nextWakeupNanos放入AWAKE是阻止不必要的唤醒
    if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
        selector.wakeup();
    }
}

阻塞是selecto.select,那么唤醒就是selector.wakeup

Original: https://www.cnblogs.com/konghuanxi/p/16381385.html
Author: 王谷雨
Title: Netty源码解读(三)-NioEventLoop

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/583262/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • kubenetes无法创建pod/创建RC时无法自动创建pod的问题

    一、问题概述 问题1:虽然每次通过yaml创建rc都显示成功了,但是kubectl get pod却没显示任何的pod. 问题2:直接通过yaml创建pod提示apixxx 问题3…

    Linux 2023年6月14日
    0103
  • MVC(二)

    通过前一篇文章,我们对MVC有了一定的了解。 三、MVC能做什么 网站——服务器返回页面——实际上就是一段文本(response header+html)。 实际上mvc的acti…

    Linux 2023年6月13日
    0104
  • JavaScript编程基础(一)

    目的: 熟练掌握JavaScript的语言基础,包括数据类型、常量和变量、运算符和表达式、基本语句和函数。 理解JavaScript如何动态产生HTML代码,以及控制CSS。 要求…

    Linux 2023年6月13日
    082
  • Python 中 map() zip() list() 函数的介绍

    map() map(function , iterable, ….) : 依次将函数function作用在可迭代的list上,并返回对应的函数返回值,组成新的list(…

    Linux 2023年6月7日
    096
  • 【设计模式】Java设计模式-建造者模式

    【设计模式】Java设计模式 – 建造者模式 😄 不断学习才是王道🔥 继续踏上学习之路,学之分享笔记👊 总有一天我也能像各位大佬一样🏆 @一个有梦有戏的人 @怒放吧德德…

    Linux 2023年6月6日
    0118
  • Spring 对Controller异常的统一处理

    对于Controller的异常处理,分为两种,一种是对已知的异常处理,一种是未知的异常处理 1、定义自定义异常类 /** * @author hzc * */ public cla…

    Linux 2023年6月7日
    0102
  • VirtualAlloc加载shellcode免杀一点记录

    一个很好的学习网站 推荐一下: https://docs.microsoft.com/zh-cn/windows/win32/api/ 0x01 VirtualAlloc Virt…

    Linux 2023年5月28日
    075
  • Python Django构建简易CMDB

    前言 本文仅是对以前写的小示例进行一次梳理和代码优化,由于本人菜鸟一枚,前端,系统运维知识只知皮毛,后端python也没有实际开发经验,在开发过程中也没有考虑堡垒机等一系列服务器的…

    Linux 2023年6月7日
    0123
  • Git简介

    Git是一个开源的分布式版本控制系统,是目前主流的版本控制系统,很多软件项目都会用它做源代码管理。Git的常用操作想必很多人都会,但是可能了解Git内部原理的人并不多。了解一些底层…

    Linux 2023年6月6日
    074
  • [ Skill ] 如何获取库中的 top cell

    https://www.cnblogs.com/yeungchie/ top cell 的一个特点就是没有被其他的单元所调用,下面举例获取某个库中的 top cell。 1. 获取…

    Linux 2023年6月7日
    086
  • 【k8s】kubeadm init 时报错 unknown service runtime.v1alpha2.RuntimeService

    在测试机器中准备搭建 k8s 集群,在 master 节点执行 kubeadm init 时出现报错, unknown service runtime.v1alpha2.Runti…

    Linux 2023年6月7日
    0113
  • LINUX 终端显示错乱问题

    问题描述 Linux Terminal进入vi/vim界面退出后会覆盖前面的显示,如下图: 解决办法 找一台显示正常和的机器(和故障机器架构一致),去家目录查看.bashrc文件,…

    Linux 2023年5月27日
    079
  • SQL中连接(JOIN)子句介绍

    本文主要介绍 SQL(Structured Query Language)中连接(JOIN)子句的相关知识,同时通过用法示例介绍连接的常见用法。 说明:本文的用法示例是面向 MyS…

    Linux 2023年6月13日
    073
  • Mac安装php和redis扩展

    Mac上有特定的包管理工具homebrew,也叫brew,这里的php安装用的就是brew 1安装php brew install php@7.0。 brw安装会自动管理依赖,所以…

    Linux 2023年5月28日
    090
  • 操作系统实现-外中断

    博客网址:www.shicoder.top微信:18223081347欢迎加群聊天 :452380935 这一次我们来对中断中的外中断进行讲解,先给下中断的分类和中断号分配把。 中…

    Linux 2023年6月13日
    0115
  • mycat数据库集群系列之mysql主从同步设置

    最近在梳理数据库集群的相关操作,现在花点时间整理一下关于mysql数据库集群的操作总结,恰好你又在看这一块,供一份参考。本次系列终结大概包括以下内容:多数据库安装、mycat部署安…

    Linux 2023年6月14日
    089
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球