线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

前言

上一篇内容写了 Java中线程池的实现原理及源码分析,说好的是实实在在的大满足,想通过一篇文章让大家对线程池有个透彻的了解,但是文章写完总觉得还缺点什么?

上篇文章只提到线程提交的 execute()方法,并没有讲解线程提交的 submit()方法, submit()有一个返回值,可以获取线程执行的结果 Future<t></t>,这一讲就那深入学习下 submit()FutureTask实现原理。

使用场景&示例

使用场景

我能想到的使用场景就是在并行计算的时候,例如一个方法中调用 methodA()&#x3001;methodB(),我们可以通过线程池异步去提交方法A、B,然后在主线程中获取组装方法A、B计算后的结果,能够大大提升方法的吞吐量。

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

使用示例

/**
 * @author wangmeng
 * @date 2020/5/28 15:30
 */
public class FutureTaskTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService threadPool = Executors.newCachedThreadPool();

        System.out.println("====执行FutureTask线程任务====");
        Future futureTask = threadPool.submit(new Callable() {
            @Override
            public String call() throws Exception {
                System.out.println("FutureTask执行业务逻辑");
                Thread.sleep(2000);
                System.out.println("FutureTask业务逻辑执行完毕!");
                return "欢迎关注: 一枝花算不算浪漫!";
            }
        });

        System.out.println("====执行主线程任务====");
        Thread.sleep(1000);
        boolean flag = true;
        while(flag){
            if(futureTask.isDone() && !futureTask.isCancelled()){
                System.out.println("FutureTask异步任务执行结果:" + futureTask.get());
                flag = false;
            }
        }

        threadPool.shutdown();
    }
}

上面的使用很简单, submit()内部传递的实际上是个 Callable接口,我们自己实现其中的 call()方法,我们通过 futureTask既可以获取到具体的返回值。

submit()实现原理

submit() 是也是提交任务到线程池,只是它可以获取任务返回结果,返回结果是通过 FutureTask来实现的,先看下 ThreadPoolExecutor中代码实现:

public class ThreadPoolExecutor extends AbstractExecutorService {
    public  Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
}

public abstract class AbstractExecutorService implements ExecutorService {
    protected  RunnableFuture newTaskFor(Callable callable) {
        return new FutureTask(callable);
    }
}

提交任务还是执行 execute()方法,只是 task被包装成了 FutureTask ,也就是在 excute()中启动线程后会执行 FutureTask.run()方法。

再来具体看下它执行的完整链路图:

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

上图可以看到,执行任务并返回执行结果的核心逻辑实在 FutureTask中,我们以 FutureTask.run/get 两个方法为突破口,一点点剖析 FutureTask的实现原理。

FutureTask源码初探

先看下 FutureTask中部分属性:

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!
public class FutureTask implements RunnableFuture {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    private Callable callable;
    private Object outcome;
    private volatile Thread runner;
    private volatile WaitNode waiters;
}
  1. state

当前task状态,共有7中类型。
NEW: 当前任务尚未执行
COMPLETING: 当前任务正在结束,尚未完全结束,一种临界状态
NORMAL:当前任务正常结束
EXCEPTIONAL: 当前任务执行过程中发生了异常。
CANCELLED: 当前任务被取消
INTERRUPTING: 当前任务中断中..

INTERRUPTED: 当前任务已中断

  1. callble

用户提交任务传递的Callable,自定义call方法,实现业务逻辑

  1. outcome

任务结束时,outcome保存执行结果或者异常信息。

  1. runner

当前任务被线程执行期间,保存当前任务的线程对象引用

  1. waiters

因为会有很多线程去get当前任务的结果,所以这里使用了一种stack数据结构来保存

FutureTask.run()实现原理

我们已经知道在线程池 runWorker()中最终会调用到 FutureTask.run()方法中,我们就来看下它的执行原理吧:

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

具体代码如下:

public class FutureTask implements RunnableFuture {
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

首先是判断 FutureTaskstate状态,必须是 NEW才可以继续执行。

然后通过 CAS修改 runner引用为当前线程。

接着执行用户自定义的 call()方法,将返回结果设置到result中,result可能为正常返回也可能为异常信息。这里主要是调用 set()/setException()

FutureTask.set()实现原理

set()方法的实现很简单,直接看下代码:

public class FutureTask implements RunnableFuture {
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        }
    }
}

call()返回的数据赋值给全局变量 outcome上,然后修改 state状态为 NORMAL,最后调用 finishCompletion()来做挂起线程的唤醒操作,这个方法等到 get()后面再来讲解。

FutureTask.get()实现原理

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

接着看下代码:

public class FutureTask implements RunnableFuture {
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s

如果 FutureTaskstateNORMAL或者 COMPLETING,说明当前任务并没有执行完成,调用 get()方法会被阻塞,具体的阻塞逻辑在 awaitDone()方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {

        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos

这个方法可以说是 FutureTask中最核心的方法了,一步步来分析:

如果 timed不为空,这说明指定 nanos时间还未返回结果,线程就会退出。

q是一个 WaitNode对象,是将当前引用线程封装在一个 stack数据结构中, WaitNode对象属性如下:

 static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

接着判断当前线程是否中断,如果中断则抛出中断异常。

下面就进入一轮轮的 if... else if...判断逻辑,我们还是采用分支的方式去分析。

分支一:if (s > COMPLETING) {

此时 get()方法已经有结果了,无论是正常返回的结果,还是异常、中断、取消等,此时直接返回 state状态,然后执行 report()方法。

分支二:else if (s == COMPLETING)

条件成立,说明当前任务接近完成状态,这里让当前线程再释放 cpu,进行下一轮抢占 cpu

分支三:else if (q == null)

第一次自旋执行, WaitNode还没有初始化,初始化 q=new WaitNode();

分支四:else if (!queued){

queued代表当前线程是否入栈,如果没有入栈则进行入栈操作,顺便将全局变量 waiters指向栈顶元素。

分支五/六:LockSupport.park

如果设置了超时时间,则使用 parkNanos来挂起当前线程,否则使用 park()

经过这么一轮自旋循环后,如果执行 call()还没有返回结果,那么调用 get()方法的线程都会被挂起。

被挂起的线程会等待 run()返回结果后依次唤醒,具体的执行逻辑在 finishCompletion()中。

最终 stack结构中数据如下:

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

FutureTask.finishCompletion()实现原理

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

具体实现代码如下:

private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null;
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;
}

代码实现很简单,看过 get()方法后,我们知道所有调用 get()方法的线程,在 run()还没有返回结果前,都会保存到一个有 WaitNode构成的 statck数据结构中,而且每个线程都会被挂起。

这里是遍历 waiters栈顶元素,然后依次查询起 next节点进行唤醒,唤醒后的节点接着会往后调用 report()方法。

FutureTask.report()实现原理

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

具体代码如下:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

这个方法很简单,因为执行到了这里,说明当前 state状态肯定大于 COMPLETING,判断如果是正常返回,那么返回 outcome数据。

如果 state是取消状态,抛出 CancellationException异常。

如果状态都不满足,则说明执行中出现了差错,直接抛出 ExecutionException

FutureTask.cancel()实现原理

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!
public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally {
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

cancel()方法的逻辑很简单,就是修改 state状态为 CANCELLED,然后调用 finishCompletion()来唤醒等待的线程。

这里如果 mayInterruptIfRunning,就会先中断当前线程,然后再去唤醒等待的线程。

总结

FutureTask的实现原理其实很简单,每个方法基本上都画了一个简单的流程图来方便立即。

后面还打算分享一个 BlockingQueue相关的源码解读,这样线程池也可以算是完结了。

在这之前可能会先分享一个 SpringCloud常见配置代码分析、最佳实践等手册,方便工作中使用,也是对之前看过的源码一种总结。敬请期待!
欢迎关注:

线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

Original: https://www.cnblogs.com/wang-meng/p/13023710.html
Author: 一枝花算不算浪漫
Title: 线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/542098/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 有意思的面试记录

    碰见一个很有意思的【百度面试】,和我好像hhh链接:https://www.bilibili.com/video/BV18Z4y1T7yZ/ 【类加载机制】类加载过程:加载+链接+…

    Java 2023年6月7日
    0100
  • 在Idea中创建类时自动添加作者信息

    新换了电脑需要给Idea做一个小设置,在Idea中创建类时自动添加作者信息 前言 阿里开发手册强制的建议——所有的类都必须添加创建者和创建日期,我觉得很合适,自己写的过了几个月忘记…

    Java 2023年6月9日
    072
  • 共读《redis设计与实现》-单机(一)

    上一章我们讲了 redis 基本类型的 &#x6570;&#x636E;&#x7ED3;&#x6784; 和 &#x5BF9;&#x…

    Java 2023年6月7日
    096
  • 插入排序(java)

    一、概念及其介绍 插入排序(InsertionSort),一般也被称为直接插入排序。 对于少量元素的排序,它是一个有效的算法。插入排序是一种最简单的排序方法,它的基本思想是将一个记…

    Java 2023年6月5日
    0121
  • 大厂钟爱的全链路压测有什么意义?四种压测方案详细对比分析

    全链路压测? 基于实际的生产业务场景和系统环境,模拟海量的用户请求和数据,对整个业务链路进行各种场景的测试验证,持续发现并进行瓶颈调优,保障系统稳定性的一个技术工程。 针对业务场景…

    Java 2023年6月15日
    098
  • 链表相加_2_445

    给你两个 非空 的链表,表示两个非负的整数。它们每位数字都是按照 逆序 的方式存储的,并且每个节点只能存储 一位 数字。 请你将两个数相加,并以相同形式返回一个表示和的链表。你可以…

    Java 2023年6月5日
    083
  • CompletableFuture的入门

    runAsync 和 supplyAsync runAsync接受一个Runable的实现,无返回值 CompletableFuture.runAsync(()->Syste…

    Java 2023年6月9日
    077
  • @Autowired和@Resouce的区【转】

    @Resource和@Autowired都是做bean的注入时使用,其实@Resource并不是Spring的注解,它的包是javax.annotation.Resource,需要…

    Java 2023年6月9日
    066
  • 尚医通项目总结(二)———-Mybatis与Mybatis Plus

    Mybatis是持久层解决方案,它是一个半自动化的ORM框架,底层封装了JDBC,可以简化对数据库的增删改查操作。Mybatis的半自动体现在,在查询关联对象或者关联集合对象时,需…

    Java 2023年6月5日
    0107
  • 进程通讯 & Binder机制 & Service 笔记

    每个 app 都处于不同进程,每启动一个 APP,默认会启动一个虚拟机上,一个虚拟机就是一个进程。 分享通过 intent 传递数据,成功后回到 app;当你需要把本地数据库对外提…

    Java 2023年6月7日
    0119
  • 机器学习(6)K近邻算法

    k-近邻,通过离你最近的来判断你的类别 例子: 定义:如果一个样本在特征空间中的k个最相似(即特征空间中最邻近的样本中大多数属于某一类别),则该样本属于这个类别 K近邻需要做标准化…

    Java 2023年6月8日
    077
  • 封装RabbitTemplate,使用自定义消息转换器

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Java 2023年6月7日
    071
  • SQL执行顺序

    首先select语句中都会用到哪些关键字: select ,from,join,where,group by,having,order by,limit 其次,要知道每执行一步就会…

    Java 2023年6月9日
    095
  • Java调用阿里云OSS下载文件

    1、准备工作 2、项目需求 首先需要引入阿里云的依赖包,如下所示: 如果只是想将oss文件下载到服务器的磁盘里面,可以使用下面的案例。需要注意的就是OSS的文件目录下面的文件,指定…

    Java 2023年5月29日
    089
  • java~Map集合整理

    Map图 HashMap 是 Java Collection Framework 的重要成员,也是Map族(如下图所示)中我们最为常用的一种。不过遗憾的是,HashMap是无序的,…

    Java 2023年5月29日
    078
  • SSM框架的整合

    B站楠哥的笔记 视频链接:https://www.bilibili.com/video/BV1hE411F77L 学习SSM框架的整合 SSM框架整合 Spring + Sprin…

    Java 2023年6月7日
    089
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球