Java并发编程-线程池

重点内容

  • 线程池的使⽤
  • 创建线程池
  • 提交任务
  • 关闭线程池
  • 线程池的原理
  • 合理配置线程池
  • 线程池的监控

1.线程池的创建

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
  • corePoolSize:线程池的基本大小。 提前调用prestartAllCoreThreads(),会把所有的基本线程启动 。
  • workQueue: ⽤于保存等待执⾏的任务的阻塞队列。
  • ArrayBlockingQueue 基于数组实现的(先进先出)。
  • LinkedBlockingQueue 吞吐量要高于ArrayBlockingQueue。
  • SynchronousQueue 吞吐量要高于LinkedBlockingQueue 不存储元素的阻塞队列,得等一个线程做移除操作才能继续进行,要不会一直阻塞。
  • PriorityBlockingQueue 具有优先级的无限阻塞队列。
  • maximumPoolSize: 线程池允许创建的最⼤线程数。
  • threadFactory: ⽤于设置创建线程的工厂可以使用谷歌的开源方法。
  • handler: 饱和策略,阻塞队列和我们的线程的创建数都满了的时候就会饱和选择一个策略对新提交的策略进行处理。
  • AbortPolicy 直接抛出异常。
  • CallerRunsPolicy 只用调用者所在的线程来处理任务。
  • DiscardOldestPolicy 丢弃队列里最近的一个任务。
  • DiscardPolicy 直接丢弃。
  • ⾃定义 自己定义一个处理方式。
  • keepAliveTime:线程池的⼯作线程空闲后,保持存活的时间。
  • unit:线程活动保持时间的单位。

2.提交任务

  • execute:⽤于提交不需要返回值的任务
  • submit:⽤于提交需要返回值的任务
  • shutdown:终止的时候会抛出异常
  • shutdownNow:中止的时候不会抛出异常

  • 线程池测试代码

/** @Classname ThreadPoolDemo @Author XW @Date 2021/12/17 23:15 */
public class ThreadPoolDemo {
  private static ThreadFactory namedThreadFactory =
      new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

  private static ExecutorService pool =
      new ThreadPoolExecutor(
          2,
          20,
          0L,
          TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue(2),
          namedThreadFactory,
          new ThreadPoolExecutor.AbortPolicy());

  public static void main(String[] args) {

    for (int i = 0; i < 10; i++) {
      pool.execute(new NoResultThread(i));
      /**
       * submit 验证 Future future = pool.submit(new ResultThread()); try {
       * System.out.println("main thread get result: " + future.get()); // future.get(100,
       * TimeUnit.MICROSECONDS); } catch (Exception e) { e.printStackTrace(); }
       */
    }

    // shutdown 验证
    System.out.println("执行shutdown! ");
    pool.shutdown(); // 会继续执行并且完成所有未执行的任务, 新提交的任务会被reject(通过reject策略)
    for (int i = 10; i < 12; i++) {
      pool.execute(new NoResultThread(i));
    }

    /**
     * shutdownnow 验证 System.out.println("执行shutdownnow! "); List runnableList =
     * pool.shutdownNow(); // 会清除所有未执行的任务并且在运行线程上调用interrupt()
     */
    System.out.println("pool shutdown state: " + pool.isShutdown());
    while (true) {
      if (pool.isTerminated()) {
        System.out.println("pool terminated!");
        break;
      } else {
        System.out.println("pool terminated state: " + pool.isTerminated());
      }
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  private static class NoResultThread implements Runnable {

    private int taskNum;

    public NoResultThread(int taskNum) {
      this.taskNum = taskNum;
    }

    @Override
    public void run() {
      System.out.println("线程 " + Thread.currentThread().getName() + " 开始执行任务 " + this.taskNum);
      try {
        Thread.sleep(1000);
        System.out.println("线程 " + Thread.currentThread().getName() + " 执行完任务 " + this.taskNum);
      } catch (InterruptedException e) {
        System.out.println(
            "线程 "
                + Thread.currentThread().getName()
                + " 在执行任务 "
                + this.taskNum
                + " 时被中断 :"
                + e.getMessage());
      }
    }
  }

  private static class ResultThread implements Callable {
    @Override
    public String call() throws Exception {
      System.out.println(
          Thread.currentThread().getState() + "----------" + Thread.currentThread().getName());
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return Thread.currentThread().getName();
    }
  }
}

3.线程池的实现原理

Java并发编程-线程池

​ 首先会判断corePoolSize核心线程池是否已经满了,没满就直接创建线程执行任务,满了再去判断队列是否满了,队列没有满的话在把任务放在队列里面,队列如果满的话,会将当前的线程数量跟maximumPoolSize进行对比如果没满的话就创建线程执行任务,maximumPoolSize也满了话就按照策略(handler)处理无法执行的任务。注意线程池只要创建线程就会获取全局锁。

Java并发编程-线程池

Java并发编程-线程池

线程会根据worker去线程池里面拿任务

  • *线程池execute的源码
public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }

        /** ctl记录着workCount和runState */
        int c = ctl.get();

        /** 第一步: 如果线程池中的线程数量小于核心线程数,那么创建线程并执行*/
        if (workerCountOf(c) < corePoolSize) { // workerCountOf(c): 获取当前活动的线程数
            /**
             * 在线程池中新建一个新的线程
             * command:需要执行的Runnable线程
             * true:新增线程时,【当前活动的线程数】是否 < corePoolSize
             * false:新增线程时,【当前活动的线程数】是否 < maximumPoolSize
             */
            if (addWorker(command, true)) {
                // 添加新线程成功,则直接返回。
                return;
            }
            // 添加新线程失败,则重新获取【当前活动的线程数】
            c = ctl.get();
        }

        /**
         * 第二步:如果当前线程池是运行状态 并且 任务添加到队列成功
         * (即:case2: 如果workCount >= corePoolSize,创建线程往workQueue添加线程任务,等待执行)
         */
        // BlockingQueue workQueue 和 Runnable command
        if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue队列中。
            // 重新获取ctl
            int recheck = ctl.get();
            // 再次check一下,当前线程池是否是运行状态,如果不是运行时状态,则把刚刚添加到workQueue中的command移除掉,并调用拒绝策略
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) { // 如果【当前活动的线程数】为0,则执行addWork方法
                /**
                 * null:只创建线程,但不去启动
                 * false:添加线程时,根据maximumPoolSize来判断
                 *
                 * 如果 workerCountOf(recheck) > 0, 则直接返回,在队列中的command稍后会出队列并且执行
                 */
                addWorker(null, false);
            }
        }

        /**
         * 第三步:满足以下两种条件之一,进入第三步判断语句
         *  case1: 线程池不是正在运行状态,即:isRunning(c)==false
         *  case2: workCount >= corePoolSize 并且 添加workQueue队列失败。即:workQueue.offer(command)==false
         *
         * 由于第二个参数传的是false,所以如果workCount < maximumPoolSize,则创建执行线程;否则,进入方法体执行reject(command)
         */
        else if (!addWorker(command, false)) {
            // 执行线程创建失败的拒绝策略
            reject(command);
        }
    }
  • *线程池addWorker的源码
 private boolean addWorker(Runnable firstTask, boolean core) {

        retry:
        /** 步骤一:试图将workerCount+1 */
        for (; ; ) {
            int c = ctl.get();
            // 获得运行状态runState
            int rs = runStateOf(c);

            /**
             * 只有如下两种情况可以新增worker,继续执行下去:
             * case one: rs == RUNNING
             * case two: rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()
             */
            if (rs >= SHUTDOWN && // 即:非RUNNING状态(请查看isRunning()方法)。线程池异常,表示不再去接收新的线程任务了,返回false
                    /**
                     * 当线程池是SHUTDOWN状态时,表示不再接收新的任务了,所以:
                     * case1:如果firstTask!=null,表示要添加新任务,则:新增worker失败,返回false。
                     * case2:如果firstTask==null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了。
                     *        则:新增worker失败,返回false
                     */
                    !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
                return false;
            }
            for (; ; ) {
                // 获得当前线程池里的线程数
                int wc = workerCountOf(c);
                /**
                 * 满足如下任意情况,则新增worker失败,返回false
                 * case1:大于等于最大线程容量,即:int CAPACITY = 00011111111111111111111111111111 = 536870911(十进制)
                 * case2:当core是true时:>= 核心线程数
                 *        当core是false时:>= 最大线程数
                 */
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
                    return false;
                }

                // 当前工作线程数加1
                if (compareAndIncrementWorkerCount(c)) {
                    break retry; // 成功加1,则跳出retry标识的这两层for循环
                }

                // 如果线程数加1操作失败,则获取当前最新的线程池运行状态
                c = ctl.get();

                // 判断线程池运行状态(rs)是否改变;如果不同,则说明方法处理期间线程池运行状态发生了变化,重新获取最新runState
                if (runStateOf(c) != rs) {
                    continue retry; // 跳出内层for循环,继续从第一个for循环执行
                }
            }
        }

        /**
         * 步骤二:workerCount成功+1后,创建Worker,加入集合workers中,并启动Worker线程
         */
        boolean workerStarted = false; /** 用于判断新的worker实例是否已经开始执行Thread.start() */
        boolean workerAdded = false; /** 用于判断新的worker实例是否已经被添加到线程池的workers队列中 */
        Worker w = null; // AQS.Worker
        try {
            w = new Worker(firstTask); /** 创建Worker实例,每个Worker对象都会针对入参firstTask来创建一个线程。 */
            final Thread t = w.thread; /** 从Worker中获得新建的线程t */
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock; /** 加重入锁 */
                /** ----------lock() 尝试加锁操作!!获得锁后继续执行,没获得则等待直到获得锁为止---------- */
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get()); /** 获得线程池当前的运行状态runStatus */
                    /**
                     * 满足如下任意条件,即可向线程池中添加线程:
                     * case1:线程池状态为RUNNING。(请查看isRunning()方法)
                     * case2:线程池状态为SHUTDOWN并且firstTask为null。
                     */
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) { /** 因为t是新构建的线程,还没有启动。所以,如果是alive状态,说明已经被启动了,则抛出异常 */
                            throw new IllegalThreadStateException();
                        }
                        workers.add(w); /** workers中保存线程池中存在的所有work实例集合 */
                        int s = workers.size();
                        if (s > largestPoolSize) { /** largestPoolSize用于记录线程池中曾经存在的最大的线程数量 */
                            largestPoolSize = s;
                        }
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock(); /** ----------unlock 解锁操作!!---------- */
                }
                if (workerAdded) {
                    t.start(); /** 开启线程,执行Worker.run() */
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted) { // 如果没有开启线程
                addWorkerFailed(w); // 往线程池中添加worker失败了
            }
        }
        return workerStarted;
    }
  • *线程池runWorker的源码
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                /**
                 * 如果线程池正在停止,请确保线程被中断;否则,请确保线程不被中断。
                 * 这需要在第二种情况下重新检查以处理shutdownNow竞赛,同时清除中断
                 *
                 * 同时满足如下两个条件,则执行wt.interrupt()
                 * 1>线程状态为STOP、TIDYING、TERMINATED 或者 (当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
                 * 2>当前线程wt没有被标记中断
                 */
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted()) {
                    wt.interrupt();
                }
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); /** 真正做事儿的地方了 */
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

4.合理配置线程池

  • 任务的性质
  • CPU密集型 : N cpu + 1 配置尽可能小的线程,线程数要少一点,减少cpu频繁的上下文切换,提高cpu的利用率
  • IO 密集型 :2 * N cpu 需要配置尽可能多的线程,这样才能保证cpu能被充分的利用
  • 混合型 :拆分成CPU密集型和IO密集型
  • N = Runtime.getRuntime().availableProcessors()
  • 任务的优先级 :PriorityBlockingQueue
  • 任务的执⾏时间
  • 不同规模的线程池
  • PriorityBlockingQueue 让执行时间比较短的线程先执行
  • 任务的依赖性
  • 增加线程数量
  • 使⽤有界队列保证系统的稳定性

5.线程池的监控

  • taskCount 任务的数量
  • completedTaskCount 运行的过程中完成的任务数量
  • largestPoolSize 曾经创建过的最大的线程数量
  • getPoolSize 线程数量
  • getActiveCount 获取活动的线程数
  • 扩展线程池:beforeExecute、afterExecute 在线程执行前,执行后做点什么

Original: https://www.cnblogs.com/arrorzz/p/15707783.html
Author: 秃头版胡歌
Title: Java并发编程-线程池

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/684182/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Oracle删除唯一索引失败提示ORA-01418:指定的索引不存在 ORACLE

    前言 oracle数据库执行drop index index_name 时报指定的索引不存在。 select * from user_ind_columns where index…

    技术杂谈 2023年6月1日
    098
  • prim算法求最小生成树

    例题链接 最小生成树的含义是,假设给定n个点,m条边(m > n – 1),在m条边中选择n – 1条边将n个点连接成一个连通图,即一棵生成树。因为每…

    技术杂谈 2023年6月21日
    086
  • 字符串

    20、【剑指Offer学习】【面试题20:表示数值的字符串】 38、【剑指Offer学习】【面试题38:字符串的排列】 46、【剑指Offer学习】【面试题46:把数字翻译成字符串…

    技术杂谈 2023年6月22日
    066
  • 为在线数据库构建基于Kudu的实时数据同步

    Kudu 是 Cloudera 开源的新型列式存储系统,是 Apache Hadoop 生态圈的成员之一。它专门为了对快速变化的数据进行快速的分析,填补了以往Hadoop 存储层的…

    技术杂谈 2023年7月23日
    066
  • 如何成为一名优秀的博主(PPT)

    如何成为一名优秀的博主?这是一个问题。我一直到处建议别人写博客,写技术博客,但是还真没想过或总结过这个问题。上个星期微软”社区经营计划”讲解了微软内部关于撰…

    技术杂谈 2023年5月31日
    0107
  • 2022.30 微内核架构

    微内核架构(Microkernel Architecture),也被称为插件化架构(Plug-in Architecture),是一种面向功能进行拆分的可扩展性架构。 微内核架构最…

    技术杂谈 2023年5月30日
    097
  • 耗时几个月,终于找到了JVM停顿十几秒的原因

    原创:打码日记(微信公众号ID:codelogs),欢迎分享,转载请保留出处。 简介 最近我们系统出现了一些奇怪的现象,系统每隔几个星期会在大半夜重启一次,分析过程花费了很长时间,…

    技术杂谈 2023年7月25日
    095
  • 三次Bezier曲线算法

    三次Bezier曲线算法 给定(n+1)个控制点(p_i(i=0,1,2,…,n)),则(n)次Bezier曲线定义为: [p(t)=\sum_{i=1}^np_iB_…

    技术杂谈 2023年7月11日
    080
  • 拒绝蛮力,高效查看Linux日志文件!

    原创:扣钉日记(微信公众号ID:codelogs),欢迎分享,转载请保留出处。 简介 日常分析问题时,会频繁地查看分析日志,但如果蛮力去查看日志,耗时费力还不一定有效果,因此我总结…

    技术杂谈 2023年7月25日
    091
  • zuul实现的限流

    限流一般可以根据客户端IP,请求的URL,用户登陆信息进行限制,每秒钟限制多次数,这从别一方面也提升了系统的性能,无用的并发没那么多了。 依赖包 org.springframewo…

    技术杂谈 2023年5月31日
    095
  • 2-08. 用扑克牌计算24点(25) (ZJU_PAT 数学 枚举)

    一副扑克牌的每张牌表示一个数(J、Q、K分别表示11、12、13,两个司令都表示6)。任取4张牌。即得到4个1~13的数,请加入运算符(规定为加+ 减- 乘* 除/ 四种)使之成为…

    技术杂谈 2023年5月31日
    0156
  • Zookeeper-3.4.9安装

    环境: centos7 Zookeeper-3.4.9.tar.gz 官网下载 步骤 下载后上传压缩文件并解压 tar -zxvf zookeeper-3.4.9.tar.gz -…

    技术杂谈 2023年7月25日
    086
  • 自动化测试之争:code vs codeless

    在TesterHome看到的一个话题,当我们选择做自动化时是否需要code 或者codeless。 code方案 用code去做自动化,实现过程就是拿个IDE撸代码。 python…

    技术杂谈 2023年5月31日
    091
  • flexible如何实现自动判断dpr?

    判断机型, 找出样本机型去适配. 比如iphone以6为样本, 宽度375px, dpr是2 Java Program! Original: https://www.cnblogs…

    技术杂谈 2023年5月31日
    0106
  • 关于 QA 和自动化测试

    现在流行叫 QA,而不是测试。这是因为大家意识到:保证软件质量,仅仅靠编码完成后的测试是不够的,从需求分析、设计阶段开始就要严格把关。QA 的职责从之前的”编码完成后测…

    技术杂谈 2023年7月11日
    085
  • 记一次数据库查询超时优化问题

    问题发现 在七月份时,经常发现有几个定时任务报错,查看了下异常原因,大概定位是数据库执行异常 ### Error querying database. Cause: com.mys…

    技术杂谈 2023年7月25日
    095
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球