【源码笔记】ThreadPoolExecutor#addWorker


/**
 * Checks if a new worker can be added with respect to current
 * pool state and the given bound (either core or maximum). If so,
 * the worker count is adjusted accordingly, and, if possible, a
 * new worker is created and started, running firstTask as its
 * first task. This method returns false if the pool is stopped or
 * eligible to shut down. It also returns false if the thread
 * factory fails to create a thread when asked.  If the thread
 * creation fails, either due to the thread factory returning
 * null, or due to an exception (typically OutOfMemoryError in
 * Thread.start()), we roll back cleanly.

 *
 * @param firstTask the task the new thread should run first (or
 * null if none). Workers are created with an initial first task
 * (in method execute()) to bypass queuing when there are fewer
 * than corePoolSize threads (in which case we always start one),
 * or when the queue is full (in which case we must bypass queue).

 * Initially idle threads are usually created via
 * prestartCoreThread or to replace other dying workers.

 *
 * @param core if true use corePoolSize as bound, else
 * maximumPoolSize. (A boolean indicator is used here rather than a
 * value to ensure reads of fresh values after checking other pool
 * state).

 * @return true if successful
 */
// firstTask:
//   可以为null。表示启动worker之后,worker自动到queue中获取任务
//   如果不为null,则worker会优先执行firstTask
// core:
//   true 采用corePoolSize限制
//   false 采用maximumPoolSize限制
private boolean addWorker(Runnable firstTask, boolean core) {
    // 自旋
    retry:
    for (;;) {
        // 获取当前ctl值,保存到c中
        int c = ctl.get();
        // 取到线程池的runState
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.

        // 条件1:rs >= SHUTDOWN
        //   true --> 说明当前线程池状态不是running
        // 条件2:前置条件 --> 当前线程池状态不是running
        //       rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
        //       --> 表示当前线程池的状态是SHUTDOWN && 提交的任务是null(addWorker这个方法可能不是execute调用的) && 当前任务队列不是空
        //       --> 排除掉这种情况:当前线程池的状态是SHUTDOWN,但是队列里面还有任务尚未完成。这个时候是允许添加worker,但是不允许再次提交task

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // 走到这的情况:
            //   1. 线程池状态rs > SHUTDOWN
            //   2. rs == SHUTDOWN
            return false;

        // 上面这些代码就是判断当前线程池状态是否允许添加线程

        // 内部自旋
        for (;;) {
            // 获取当前线程池中的线程数量
            int wc = workerCountOf(c);

            // 条件1:wc >= CAPACITY --> 永远不成立,因为CAPACITY是一个5亿多的数字
            // 条件2:wc >= (core ? corePoolSize : maximumPoolSize)
            //       core == true --> 判断当前线程数量是否 >= corePoolSize
            //       core == false -> 判断当前线程数量是否 >= maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 当前线程数太多了,已经不能添加线程了
                return false;

            // 使用CAS使workerCount+1
            if (compareAndIncrementWorkerCount(c))
                // 行至此处,说明workerCount已经成功+1。相当于申请到一个信号量
                break retry;

            // 行至此处,说明workerCount+1失败。说明可能有其它线程已经修改过ctl的值
            // 情况1.其它线程执行execute()申请过令牌了
            // 情况2.外部线程可能调用过shutdown()或者shutdownNow()导致线程池状态发生变化了(ctl高3位表示状态,状态改变后,ctl也会失败)

            c = ctl.get();  // Re-read ctl
            // 判断线程池状态是否发生过变化。如果外部线程调用过shutdown()或者shutdownNow(),线程池状态会变化
            if (runStateOf(c) != rs)
                // 状态发生变化后,直接跳到外层循环
                // 外层循环负责判断当前线程池状态是否允许创建线程
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 以上:申请一个创建worker的信号量
    //   总结:外层循环负责判断线程池状态能否创建线程;内层循环负责判断线程数量能否创建线程。
    //        线程状态、线程数量都用ctl表示
    //        CAS增加ctl的值,如果有并发(修改了线程池状态,或者线程数量),则重试
    // 以下:添加一个worker

    // 表示创建的worker是否已经启动
    boolean workerStarted = false;
    // 表示创建的worker是否已经添加到线程池中
    boolean workerAdded = false;
    // 表示创建的worker的一个引用
    Worker w = null;
    try {
        // 创建worker
        //   worker中会创建一个线程,线程的target就是worker
        //     Worker#thread.run() --> Worker.run() --> ThreadPoolExecutor.runWorker() --> task.run()
        //   worker的state为初始化中
        //   firstTask设置为Worker的firstTask
        w = new Worker(firstTask);

        // 将worker的线程赋值给t
        final Thread t = w.thread;

        // 为了防止ThreadFactory有bug,因为ThreadFactory是一个接口,任何人都可以实现它
        if (t != null) {
            // 将全局锁的引用保存到mainLock中
            final ReentrantLock mainLock = this.mainLock;

            // 持有全局锁 --> 可能会阻塞
            // 操作线程池内部的相关操作,都必须持锁
            mainLock.lock();
            try {
                // Recheck while holding lock.

                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.

                // 获取了当前线程池的状态
                int rs = runStateOf(ctl.get());

                // 条件1.rs < SHUTDOWN
                //   true --> 最正常的情况,线程池当前状态是RUNNING
                // 条件2.rs == SHUTDOWN && firstTask == null
                //   true --> 前置条件:线程池状态不是running状态。虽然线程池SHUTDOWN了,但是firstTask为null
                //            其实这个判断的就是SHUTDOWN状态下的特殊情况,只不过这里不再判断队列是否为空
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // t.isAlive() --> 线程start后,值会为true
                    // 还是防止如果有人自定义ThreadFactory实现的话,ThreadFactory创建的线程在返回给外部之前,线程start了
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将创建的worker加入到线程池中
                    // - 线程池是一个HashSet
                    workers.add(w);
                    // 获取最新的线程池线程数量
                    int s = workers.size();
                    // 维护largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }

                // 线程池中的线程不分核心与否
                // 只是创建的时候,会根据core参数做不同的数量校验
            } finally {
                // 释放了线程池全局锁
                mainLock.unlock();
            }

            // 条件成立:说明添加worker成功
            // 条件失败:说明在lock之前,线程池状态发生了改变
            if (workerAdded) {
                // 启动线程:
                //   worker.thread.start() ---> worker#thread.run() --> thread.target.run()
                //                                                      target是worker
                //   --> ThreadPoolExecutor.runWorker() --> worker.firstTask.run()
                //   --> firstTask.callable.cal() ==> 执行了程序员自己写的逻辑
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 条件成立:说明添加失败 --> 需要做清理工作
        if (! workerStarted)
            // 做一些失败的清理工作:
            // - 释放信号量
            // - 将当前worker清理出worker集合
            addWorkerFailed(w);
    }
    // woker是否启动
    return workerStarted;

    // 返回值信息:
    //   true --> 创建worker && 启动worker成功
    //   false -> 创建失败
    //            case1. 线程池状态 > SHUTDOWN (STOP/TIDYING/TERMINATION)
    //            case2. rs == SHUTDOWN && 队列中已经没有任务了
    //                   rs == SHUTDOWN && 队列中有任务 && firstTask != null
    //            case3. 线程池已经达到数量限制(corePoolSize or maximumPoolSize)
    //            case4. threadFactory创建的线程是null,或者创建的线程有问题
}

/**
 * Rolls back the worker thread creation.

 * - removes worker from workers, if present
 * - decrements worker count
 * - rechecks for termination, in case the existence of this
 *   worker was holding up termination
 */
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;

    // 持有线程池全局锁。因为操作的是线程池相关的东西
    mainLock.lock();
    try {
        // 条件成立:需要将worker在workers中清理出去
        if (w != null)
            workers.remove(w);
        // 将线程池计数器-1(恢复) --> 相当于归还令牌
        decrementWorkerCount();

        tryTerminate();
    } finally {
        // 释放线程池全局锁
        mainLock.unlock();
    }
}

Original: https://www.cnblogs.com/daheww/p/16723406.html
Author: daheww
Title: 【源码笔记】ThreadPoolExecutor#addWorker

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713265/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Vue(十三)—过滤器filter,filters

    官网:https://cn.vuejs.org/v2/api/#filters https://cn.vuejs.org/v2/api/#Vue-filter 分为全局过滤器和局部…

    技术杂谈 2023年7月25日
    075
  • 一些常用的 std 类型

    【 std::allocator】 标准库中包含一个名为allocator的类,允许我们将分配和初始化分离。使用allocator通常会提供更好的性能和更灵活的内存管理能力。 标准…

    技术杂谈 2023年5月31日
    0102
  • Linux Accounting(中文翻译)(2):Delay Accounting

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    技术杂谈 2023年7月11日
    076
  • 经典注意力机制

    2. 注意力机制的正式引入 前边我们通过机器翻译任务介绍了Attention机制的整体计算。但是还有点小尾巴没有展开,就是那个注意力打分函数的计算,现在我们将来讨论这个事情。但在讲…

    技术杂谈 2023年7月11日
    074
  • Oracle:不能使用AS关键字来命名表别名

    posted @2021-06-07 21:24 Mr_伍先生 阅读(338 ) 评论() 编辑 Original: https://www.cnblogs.com/mr-wuxi…

    技术杂谈 2023年5月31日
    088
  • Vue前端访问控制方案

    1、前端访问控制的常规处理方法 前端访问控制,一般针对界面元素dom element进行可见属性或enable属性进行控制,有权限的,相关元素可见或使能;没权限的,相关元素不可见或…

    技术杂谈 2023年6月21日
    082
  • 势函数法

    https://www.cnblogs.com/huadongw/p/4106290.html 势函数主要用于确定分类面,其思想来源于物理。 1 势函数法基本思想 假设要划分属于两…

    技术杂谈 2023年5月31日
    0119
  • Podman基础用法

    Podman基础 1、什么是Podman? Podman是一种开源的Linux原生工具,旨在根据开放容器倡议(Open Container Initiative,OCI)标准开发、…

    技术杂谈 2023年6月21日
    0106
  • Netty源码分析之ChannelPipeline(五)—异常事件的传播

    ChannelHandler中异常的获取与处理是通过继承重写exceptionCaught方法来实现的,本篇文章我们对ChannelPipeline中exceptionCaught…

    技术杂谈 2023年7月25日
    073
  • jobs 命令

    jobs命令 显示了当前 shell 环境中已启动的作业状态。如果 JobID 参数没有指定特定作业,就显示所有的活动的作业的状态信息。如果报告了一个作业的终止,shell 从当前…

    技术杂谈 2023年5月31日
    0118
  • GO select

    分析: 第一次:i=0; ch 是空的, x:= Original: https://www.cnblogs.com/kaituorensheng/p/15866551.htmlA…

    技术杂谈 2023年5月31日
    088
  • Redis缓存雪崩、缓存穿透、缓存击穿

    缓存雪崩 Redis中的缓存数据是有过期时间的,当在同一时间大量的缓存同时失效时就会造成缓存雪崩。解决方案1、设置Redis中的key永不过期,缺点是会占用很多内存2、使用Redi…

    技术杂谈 2023年7月25日
    079
  • 聊一聊Redis事务

    没错,Redis也有事务管理,但是功能很简单,在正式开发中也并不推荐使用。但是面试中有可能会问到,所以本文简单谈一谈Redis的事务。 通过这篇文章,你会了解 Redis为什么要提…

    技术杂谈 2023年7月23日
    077
  • EMAS Serverless系列~4步教你快速搭建小程序

    体验简介 本实验基于 EMAS Serverless 的云函数、云数据库、云存储等云服务能力一站式快速开发小程序《私人云相册》。Demo 主要包括如下功能:1 相册管理2 上传相片…

    技术杂谈 2023年7月10日
    090
  • 京准发布,PTP1588(NTP卫星授时服务器)使用说明书

    京准发布,PTP1588(NTP卫星授时服务器)使用说明书 京准发布,PTP1588(NTP卫星授时服务器)使用说明书 安徽京准电子科技官微——ahjzsz 1、装置简介 卫星时间…

    技术杂谈 2023年6月21日
    092
  • 亿级消息中心架构方案概述【原创】

    目标 技术目标: 上行到消息队列api吞吐量10000条/秒,下发第三方平台1000条/秒(仅平台自身处理能力,第三方看第三方处理能力极限指标为准);保证消息中心100%高可用。 …

    技术杂谈 2023年7月23日
    066
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球