Java并发编程-线程池

重点内容

  • 线程池的使⽤
  • 创建线程池
  • 提交任务
  • 关闭线程池
  • 线程池的原理
  • 合理配置线程池
  • 线程池的监控

1.线程池的创建

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
  • corePoolSize:线程池的基本大小。 提前调用prestartAllCoreThreads(),会把所有的基本线程启动 。
  • workQueue: ⽤于保存等待执⾏的任务的阻塞队列。
  • ArrayBlockingQueue 基于数组实现的(先进先出)。
  • LinkedBlockingQueue 吞吐量要高于ArrayBlockingQueue。
  • SynchronousQueue 吞吐量要高于LinkedBlockingQueue 不存储元素的阻塞队列,得等一个线程做移除操作才能继续进行,要不会一直阻塞。
  • PriorityBlockingQueue 具有优先级的无限阻塞队列。
  • maximumPoolSize: 线程池允许创建的最⼤线程数。
  • threadFactory: ⽤于设置创建线程的工厂可以使用谷歌的开源方法。
  • handler: 饱和策略,阻塞队列和我们的线程的创建数都满了的时候就会饱和选择一个策略对新提交的策略进行处理。
  • AbortPolicy 直接抛出异常。
  • CallerRunsPolicy 只用调用者所在的线程来处理任务。
  • DiscardOldestPolicy 丢弃队列里最近的一个任务。
  • DiscardPolicy 直接丢弃。
  • ⾃定义 自己定义一个处理方式。
  • keepAliveTime:线程池的⼯作线程空闲后,保持存活的时间。
  • unit:线程活动保持时间的单位。

2.提交任务

  • execute:⽤于提交不需要返回值的任务
  • submit:⽤于提交需要返回值的任务
  • shutdown:终止的时候会抛出异常
  • shutdownNow:中止的时候不会抛出异常

  • 线程池测试代码

/** @Classname ThreadPoolDemo @Author XW @Date 2021/12/17 23:15 */
public class ThreadPoolDemo {
  private static ThreadFactory namedThreadFactory =
      new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

  private static ExecutorService pool =
      new ThreadPoolExecutor(
          2,
          20,
          0L,
          TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue(2),
          namedThreadFactory,
          new ThreadPoolExecutor.AbortPolicy());

  public static void main(String[] args) {

    for (int i = 0; i < 10; i++) {
      pool.execute(new NoResultThread(i));
      /**
       * submit 验证 Future future = pool.submit(new ResultThread()); try {
       * System.out.println("main thread get result: " + future.get()); // future.get(100,
       * TimeUnit.MICROSECONDS); } catch (Exception e) { e.printStackTrace(); }
       */
    }

    // shutdown 验证
    System.out.println("执行shutdown! ");
    pool.shutdown(); // 会继续执行并且完成所有未执行的任务, 新提交的任务会被reject(通过reject策略)
    for (int i = 10; i < 12; i++) {
      pool.execute(new NoResultThread(i));
    }

    /**
     * shutdownnow 验证 System.out.println("执行shutdownnow! "); List runnableList =
     * pool.shutdownNow(); // 会清除所有未执行的任务并且在运行线程上调用interrupt()
     */
    System.out.println("pool shutdown state: " + pool.isShutdown());
    while (true) {
      if (pool.isTerminated()) {
        System.out.println("pool terminated!");
        break;
      } else {
        System.out.println("pool terminated state: " + pool.isTerminated());
      }
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

  private static class NoResultThread implements Runnable {

    private int taskNum;

    public NoResultThread(int taskNum) {
      this.taskNum = taskNum;
    }

    @Override
    public void run() {
      System.out.println("线程 " + Thread.currentThread().getName() + " 开始执行任务 " + this.taskNum);
      try {
        Thread.sleep(1000);
        System.out.println("线程 " + Thread.currentThread().getName() + " 执行完任务 " + this.taskNum);
      } catch (InterruptedException e) {
        System.out.println(
            "线程 "
                + Thread.currentThread().getName()
                + " 在执行任务 "
                + this.taskNum
                + " 时被中断 :"
                + e.getMessage());
      }
    }
  }

  private static class ResultThread implements Callable {
    @Override
    public String call() throws Exception {
      System.out.println(
          Thread.currentThread().getState() + "----------" + Thread.currentThread().getName());
      try {
        Thread.sleep(1000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return Thread.currentThread().getName();
    }
  }
}

3.线程池的实现原理

Java并发编程-线程池

​ 首先会判断corePoolSize核心线程池是否已经满了,没满就直接创建线程执行任务,满了再去判断队列是否满了,队列没有满的话在把任务放在队列里面,队列如果满的话,会将当前的线程数量跟maximumPoolSize进行对比如果没满的话就创建线程执行任务,maximumPoolSize也满了话就按照策略(handler)处理无法执行的任务。注意线程池只要创建线程就会获取全局锁。

Java并发编程-线程池

Java并发编程-线程池

线程会根据worker去线程池里面拿任务

  • *线程池execute的源码
public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }

        /** ctl记录着workCount和runState */
        int c = ctl.get();

        /** 第一步: 如果线程池中的线程数量小于核心线程数,那么创建线程并执行*/
        if (workerCountOf(c) < corePoolSize) { // workerCountOf(c): 获取当前活动的线程数
            /**
             * 在线程池中新建一个新的线程
             * command:需要执行的Runnable线程
             * true:新增线程时,【当前活动的线程数】是否 < corePoolSize
             * false:新增线程时,【当前活动的线程数】是否 < maximumPoolSize
             */
            if (addWorker(command, true)) {
                // 添加新线程成功,则直接返回。
                return;
            }
            // 添加新线程失败,则重新获取【当前活动的线程数】
            c = ctl.get();
        }

        /**
         * 第二步:如果当前线程池是运行状态 并且 任务添加到队列成功
         * (即:case2: 如果workCount >= corePoolSize,创建线程往workQueue添加线程任务,等待执行)
         */
        // BlockingQueue workQueue 和 Runnable command
        if (isRunning(c) && workQueue.offer(command)) { // 添加command到workQueue队列中。
            // 重新获取ctl
            int recheck = ctl.get();
            // 再次check一下,当前线程池是否是运行状态,如果不是运行时状态,则把刚刚添加到workQueue中的command移除掉,并调用拒绝策略
            if (!isRunning(recheck) && remove(command)) {
                reject(command);
            } else if (workerCountOf(recheck) == 0) { // 如果【当前活动的线程数】为0,则执行addWork方法
                /**
                 * null:只创建线程,但不去启动
                 * false:添加线程时,根据maximumPoolSize来判断
                 *
                 * 如果 workerCountOf(recheck) > 0, 则直接返回,在队列中的command稍后会出队列并且执行
                 */
                addWorker(null, false);
            }
        }

        /**
         * 第三步:满足以下两种条件之一,进入第三步判断语句
         *  case1: 线程池不是正在运行状态,即:isRunning(c)==false
         *  case2: workCount >= corePoolSize 并且 添加workQueue队列失败。即:workQueue.offer(command)==false
         *
         * 由于第二个参数传的是false,所以如果workCount < maximumPoolSize,则创建执行线程;否则,进入方法体执行reject(command)
         */
        else if (!addWorker(command, false)) {
            // 执行线程创建失败的拒绝策略
            reject(command);
        }
    }
  • *线程池addWorker的源码
 private boolean addWorker(Runnable firstTask, boolean core) {

        retry:
        /** 步骤一:试图将workerCount+1 */
        for (; ; ) {
            int c = ctl.get();
            // 获得运行状态runState
            int rs = runStateOf(c);

            /**
             * 只有如下两种情况可以新增worker,继续执行下去:
             * case one: rs == RUNNING
             * case two: rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()
             */
            if (rs >= SHUTDOWN && // 即:非RUNNING状态(请查看isRunning()方法)。线程池异常,表示不再去接收新的线程任务了,返回false
                    /**
                     * 当线程池是SHUTDOWN状态时,表示不再接收新的任务了,所以:
                     * case1:如果firstTask!=null,表示要添加新任务,则:新增worker失败,返回false。
                     * case2:如果firstTask==null并且workQueue为空,表示队列中的任务已经处理完毕,不需要添加新任务了。
                     *        则:新增worker失败,返回false
                     */
                    !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
                return false;
            }
            for (; ; ) {
                // 获得当前线程池里的线程数
                int wc = workerCountOf(c);
                /**
                 * 满足如下任意情况,则新增worker失败,返回false
                 * case1:大于等于最大线程容量,即:int CAPACITY = 00011111111111111111111111111111 = 536870911(十进制)
                 * case2:当core是true时:>= 核心线程数
                 *        当core是false时:>= 最大线程数
                 */
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) {
                    return false;
                }

                // 当前工作线程数加1
                if (compareAndIncrementWorkerCount(c)) {
                    break retry; // 成功加1,则跳出retry标识的这两层for循环
                }

                // 如果线程数加1操作失败,则获取当前最新的线程池运行状态
                c = ctl.get();

                // 判断线程池运行状态(rs)是否改变;如果不同,则说明方法处理期间线程池运行状态发生了变化,重新获取最新runState
                if (runStateOf(c) != rs) {
                    continue retry; // 跳出内层for循环,继续从第一个for循环执行
                }
            }
        }

        /**
         * 步骤二:workerCount成功+1后,创建Worker,加入集合workers中,并启动Worker线程
         */
        boolean workerStarted = false; /** 用于判断新的worker实例是否已经开始执行Thread.start() */
        boolean workerAdded = false; /** 用于判断新的worker实例是否已经被添加到线程池的workers队列中 */
        Worker w = null; // AQS.Worker
        try {
            w = new Worker(firstTask); /** 创建Worker实例,每个Worker对象都会针对入参firstTask来创建一个线程。 */
            final Thread t = w.thread; /** 从Worker中获得新建的线程t */
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock; /** 加重入锁 */
                /** ----------lock() 尝试加锁操作!!获得锁后继续执行,没获得则等待直到获得锁为止---------- */
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get()); /** 获得线程池当前的运行状态runStatus */
                    /**
                     * 满足如下任意条件,即可向线程池中添加线程:
                     * case1:线程池状态为RUNNING。(请查看isRunning()方法)
                     * case2:线程池状态为SHUTDOWN并且firstTask为null。
                     */
                    if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) { /** 因为t是新构建的线程,还没有启动。所以,如果是alive状态,说明已经被启动了,则抛出异常 */
                            throw new IllegalThreadStateException();
                        }
                        workers.add(w); /** workers中保存线程池中存在的所有work实例集合 */
                        int s = workers.size();
                        if (s > largestPoolSize) { /** largestPoolSize用于记录线程池中曾经存在的最大的线程数量 */
                            largestPoolSize = s;
                        }
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock(); /** ----------unlock 解锁操作!!---------- */
                }
                if (workerAdded) {
                    t.start(); /** 开启线程,执行Worker.run() */
                    workerStarted = true;
                }
            }
        } finally {
            if (!workerStarted) { // 如果没有开启线程
                addWorkerFailed(w); // 往线程池中添加worker失败了
            }
        }
        return workerStarted;
    }
  • *线程池runWorker的源码
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                /**
                 * 如果线程池正在停止,请确保线程被中断;否则,请确保线程不被中断。
                 * 这需要在第二种情况下重新检查以处理shutdownNow竞赛,同时清除中断
                 *
                 * 同时满足如下两个条件,则执行wt.interrupt()
                 * 1>线程状态为STOP、TIDYING、TERMINATED 或者 (当前线程被中断(清除中断标记)并且线程状态为STOP、TIDYING、TERMINATED)
                 * 2>当前线程wt没有被标记中断
                 */
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                        && !wt.isInterrupted()) {
                    wt.interrupt();
                }
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); /** 真正做事儿的地方了 */
                    } catch (RuntimeException x) {
                        thrown = x;
                        throw x;
                    } catch (Error x) {
                        thrown = x;
                        throw x;
                    } catch (Throwable x) {
                        thrown = x;
                        throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

4.合理配置线程池

  • 任务的性质
  • CPU密集型 : N cpu + 1 配置尽可能小的线程,线程数要少一点,减少cpu频繁的上下文切换,提高cpu的利用率
  • IO 密集型 :2 * N cpu 需要配置尽可能多的线程,这样才能保证cpu能被充分的利用
  • 混合型 :拆分成CPU密集型和IO密集型
  • N = Runtime.getRuntime().availableProcessors()
  • 任务的优先级 :PriorityBlockingQueue
  • 任务的执⾏时间
  • 不同规模的线程池
  • PriorityBlockingQueue 让执行时间比较短的线程先执行
  • 任务的依赖性
  • 增加线程数量
  • 使⽤有界队列保证系统的稳定性

5.线程池的监控

  • taskCount 任务的数量
  • completedTaskCount 运行的过程中完成的任务数量
  • largestPoolSize 曾经创建过的最大的线程数量
  • getPoolSize 线程数量
  • getActiveCount 获取活动的线程数
  • 扩展线程池:beforeExecute、afterExecute 在线程执行前,执行后做点什么

Original: https://www.cnblogs.com/arrorzz/p/15707783.html
Author: 秃头版胡歌
Title: Java并发编程-线程池

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/575681/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Jmeter环境变量配置你不得不知道的事情

    在安装Jmeter的过程中大家肯定需要配置环境,但是为什么要配置JDK的环境变量呢?大家有没有好奇过,有没有仔细去像一下呢,其实在安装Jmeter前,大家应该都知道Jmeter是我…

    数据库 2023年6月6日
    080
  • Redis进阶(一)

    通过简单的KV数据库理解Redis 分为访问模块,操作模块,索引模块,存储模块 底层数据结构 除了String类型,其他类型都是一个键对应一个集合,键值对的存储结构采用哈希表 哈希…

    数据库 2023年6月16日
    082
  • English words 930 2022

    low hanging fruit 本文来自博客园,作者:ukyo–BlackJesus,转载请注明原文链接:https://www.cnblogs.com/ukzq/…

    数据库 2023年6月11日
    095
  • Docker下Nginx、ES、Kibana、 Redis 安装和配置

    1.Nginx 1.拉取镜像 注意:不带版本默认会下载docker仓库里面最新的版本 docker pull nginx #下载最新版 镜像名:版本名(标签) docker pul…

    数据库 2023年6月6日
    077
  • 2022-8-29 javaweb 第一天 servlet/tomcat

    软件架构 1、C/S架构:客户端 / 服务器——–QQ,Typora,腾讯会议。 2、B/S架构:浏览器 / 服务器——…

    数据库 2023年6月14日
    077
  • 5 float f = 3.4,是否正确

    不正确,赋值运算符 “=” 左右两边的精度类型不匹配。 Java中,有小数点的默认被存储为double类型,即双精度;而float类型的变量为单精度。 可以…

    数据库 2023年6月6日
    071
  • String字符串用逗号拼接,防止最后一位是逗号

    StringBuilder sb = new StringBuilder(); for(String s strArr) { if (sb.length() > 0) {//…

    数据库 2023年6月16日
    097
  • Ajax请求下载文件的解决方案

    写这个博客之前我并不清楚 ajax请求是下载不了文件的 😅 这段时间在写一个自己的项目,用到了ajax下载文件,请求到了controller层并返回文件下载成功 但是浏览器就是没有…

    数据库 2023年6月9日
    0243
  • IDEA中如何查看接口的所有实现类呢?

    接口是我们日常开发中常用的操作,那么如何查看一个接口有哪些实现类呢?下文笔者将讲述IDEA编辑器中 查看实现类的快捷方法,如下所示 在spring源码阅读中,每一个接口都有很多实现…

    数据库 2023年6月11日
    076
  • 第一篇博客

    这是我在博客园的第一篇博客,用来纪念以下,同时也是写博客的试水标记 Original: https://www.cnblogs.com/zht1702/p/15081310.htm…

    数据库 2023年6月14日
    065
  • 2_Git

    一. 引言 在单人开发过程中, 需要进行版本管理, 以利于开发进度的控制 在多人开发过程中, 不仅需要版本管理, 还需要进行多人协同控制 二. 介绍 Git是一个 开源的分布式版本…

    数据库 2023年6月11日
    083
  • 面试现场!月薪3w+的这些数据挖掘SQL面试题你都掌握了吗? ⛵

    💡 作者:韩信子@ShowMeAI📘 数据分析实战系列:https://www.showmeai.tech/tutorials/40📘 AI 面试题库系列:https://www….

    数据库 2023年5月24日
    095
  • 实现一个简单的Database1(译文)

    “What I cannot create, I do not understand.” – Richard Feynman I’m build…

    数据库 2023年6月11日
    099
  • 做自动化测试选择Python还是Java?

    你好,我是测试蔡坨坨。 今天,我们来聊一聊测试人员想要进阶,想要做自动化测试,甚至测试开发,如何选择编程语言。 自动化测试,这几年行业内的热词,也是测试人员进阶的必备技能,更是软件…

    数据库 2023年6月11日
    099
  • mysql主从

    mysql主从 mysql主从 1.主从原理 1.1 主从介绍 1.2 主从作用 1.3 主从形式 1.4 主从复制原理 2.主从复制配置 2.1 mysql安装 2.2 mysq…

    数据库 2023年5月24日
    073
  • Python学习笔记(十一)– Django API RESTful

    (1)路由;(2)视图类(提供给用户访问相当于原来的视图函数);(3)序列化类(提供给视图类使用,把对象序列化成Json) 注意:使用rest_framework,需先 setti…

    数据库 2023年6月16日
    078
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球