Java异步执行器CompletableFuture源码解析

CompletableFuture是对 Future的一种强有力的扩展, Future只能通过轮询 isDone()方法或者调用 get()阻塞等待获取一个异步任务的结果,才能继续执行下一步,当我们执行的异步任务很多,而且相互之前还要依赖结果的时候,可能会创建很多这样的 Future,并通过 get或者轮询等待执行结果返回之后继续执行,这样的代码显得很不方便而且也不高效。

CompletableFuture同时继承了 CompletionStageFuture,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过 lambda表达式的风格处理各个执行阶段的结果。

CompletionStageJava8新增得一个接口,用于异步执行中的阶段处理,其大量用在 Lambda表达式计算过程中,目前只有 CompletableFuture一个实现类。

CompletionStage定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等等,一般来说要执行下一个阶段都需要上一个阶段正常完成,当然这个类也提供了对异常结果的处理接口。

CompletionStage的接口方法可以从多种角度进行分类,从最宏观的横向划分, CompletionStage的接口主要分三类:

还有一组特别的方法带有 compose字样,它以依赖阶段本身作为参数而不是阶段产生的结果进行产出型(或函数型)操作。

在以上三类横向划分方法的基础上,又可以按照以下的规则对这些接口方法进行纵向的划分:

CompletionStage的异常规则

除了 whenComplete不要求其依赖的阶段是正常完成还是异常完成,以及 handle前缀的方法只要求其依赖的阶段异常完成之外,其余所有接口方法都要求其依赖的阶段正常完成。

  • 如果一个阶段的执行由于一个(未捕获的)异常或错误而突然终止,那么所有要求其完成的相关阶段也将异常地完成,并通过 CompletionException包装其具体异常堆栈。
  • 如果一个阶段同时依赖于两个阶段,并且两个阶段都异常地完成,那么 CompletionException可以对应于这两个异常中的任何一个。
  • 如果一个阶段依赖于另外两个阶段中的任何一个,并且其中只有一个异常完成,则不能保证依赖阶段是正常完成还是异常完成。
  • 在使用方法 whenComplete的情况下,当提供的操作本身遇到异常时,如果前面的阶段没有异常完成,则阶段将以其异常作为原因异常完成。

所有方法都遵循上述触发、执行和异常完成规范,此外,虽然用于传递一个表示完成结果的参数(也就是说,对于T类型的参数)可以为 null,但是如果为其它任何参数传递 null都将导致 NullPointerException

此接口不定义用于初始创建、强制正常或异常完成、探测完成状态或结果或等待阶段完成的方法。 CompletionStage的实现类可以提供适当的方法来实现这些效果。

方法 toCompletableFuture通过提供一个公共转换类型,支持该接口的不同实现之间的互操作性。

CompletableFuture通过以下策略实现了接口 CompletionStage

CompletableFuture通过以下策略实现了接口 Future

以下是 CompletableFuture的内部实现概述:

由于 CompletableFuture可以依赖其他一个或多个 CompletableFuture,所以在内部实现的时候,每一个 CompletableFuture都拥有一个依赖操作栈,栈中的元素是 Completion的子类,它包含相关的操作、 CompletableFuture以及源操作。

当一个 CompletableFuture完成之后会从栈中弹出并递归执行那些依赖它的 CompletableFuture。由于依赖栈中的 Completion元素也包含 CompletableFuture对象,其 CompletableFuture对象可能也拥有一个依赖栈,因此将形成一个非常复杂的依赖树。

CompletableFuture对每一种形式的实现使用了不同的 Completion子类,例如:单输入(UniCompletion)、双输入(BiCompletion)、投影(使用 BiCompletion两个输入中的任何一个(而不是两个)的双输入)、共享(CoCompletion,由两个源中的第二个使用)、零输入(不消费不产出的 Runnable)操作和解除阻塞等待(get()join()方法)的信号器 Signallers

Completion类扩展了 ForkJoinTask来启用异步执行(不增加空间开销,因为我们利用它的”标记”方法来维护声明).它还被声明为Runnable,可以被任意线程池调度执行。

CompletableFuture又在 UniCompletionBiCompletionCoCompletion等这几种 Completion子类的基础上扩展出了实现 CompletionStage具体接口方法的前缀为”Uni”, “Bi”, “Or”的子类。例如实现单个输入、两个输入、两者之一的 thenApply对应的就是 UniApplyBiApplyOrApply

CompletableFuture在实现 CompletionStage接口方法甚至自己独有的方法使都采用了相同的模式,以及调度策略,因此只要立即了一种方法的实现,其他方法都是类似的原理。

3.1 runAsync和supplyAsync

虽然 CompletableFuture提供了无参的构造方法,但我们一般从它的静态方法开始,根据是否有返回值,它对外提供了两种形式的执行异步任务的方法:

// 执行无返回值的异步任务
public static CompletableFuture runAsync(Runnable runnable);
public static CompletableFuture runAsync(Runnable runnable, Executor executor);

// 执行有返回值的异步任务
public static  CompletableFuture supplyAsync(Supplier supplier);
public static  CompletableFuture supplyAsync(Supplier supplier, Executor executor);

它们都以 async为后缀,根据 CompletionStage的接口定义规律也可以知道是通过异步安排执行。

对于不支持并行运算的环境,例如单核 CPUCompletableFuture默认将采用一个任务创建一个 Thread实例的方式执行。

supplyAsync方法源码为例,继续向下分析:

public static  CompletableFuture supplyAsync(Supplier supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

static  CompletableFuture asyncSupplyStage(Executor e, Supplier f) {
    if (f == null) throw new NullPointerException();
    // 新创建一个CompletableFuture,以此构建AsyncSupply作为Executor的执行参数
    CompletableFuture d = new CompletableFuture();
    // 安排异步执行
    // AsyncSupply继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口
    e.execute(new AsyncSupply(d, f));
    // 立即返回
    return d;
 }

可见, supplyAsync具体的实现调用了 asyncSupplyStage,这也是 CompletableFuture的内部实现惯例,每一种方法的实现都对应一个 XXXStage方法,用于创建 stage对象(这里就是实现了 CompletionStage接口的 CompletableFuture),并安排任务的执行。

这里由于是异步任务,所以直接创建了异步任务实例 AsyncSupply,然后交给线程池执行。接着看 AsyncSupply实现:

// 实现了ForkJoinTask,Runnable可以被ForkJoinPool,或者其他实现Executor的自定义线程池调度
static final class AsyncSupply extends ForkJoinTask
        implements Runnable, AsynchronousCompletionTask {

    // 依赖的CompletableFuture
    // AsyncSupply作为一个依赖Task,dep作为这个Task的Future
    CompletableFuture dep;
    // 具体的任务执行逻辑
    // fn作为这个Task的具体执行逻辑,函数式编程
    Supplier fn;

    AsyncSupply(CompletableFuture dep, Supplier fn) {
        this.dep = dep; this.fn = fn;
    }

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}

    // exec由ForkJoinPool调度执行,最终直接调用run
    public final boolean exec() { run(); return true; }

    // 由ForkJoinPool间接调度,或其他自定义线程池直接调用
    public void run() {
        CompletableFuture d; Supplier f;
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            if (d.result == null) { // result为空表示任务还没完成
                try {
                    // 执行任务并将结果设置到依赖的CompletableFuture
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    // 异常完成的情况
                    d.completeThrowable(ex);
                }
            }
            // 从依赖栈中弹出并触发执行依赖当前CompletableFuture的其他阶段
            d.postComplete();
        }
    }
}

AsyncSupply实现了 ForkJoinTaskRunnable是为了兼容 ForkJoinPool线程池和其他自定义的 Executor线程池实现, run方法就是线程调度时执行任务的逻辑,就是执行给定的操作,并将结果设置到当前任务对应的 CompletableFuture对象d(也就是依赖该任务的阶段),最后通过 d.postComplete触发其他依赖阶段d的其他任务执行。postComplete的逻辑如下:

//递归触发其他依赖当前阶段的其他阶段执行
final void postComplete() {

    CompletableFuture f = this; Completion h;
    while ((h = f.stack) != null ||        // f对应的栈不为空
           (f != this && (h = (f = this).stack) != null)) { // f对应的栈为空了,重新回到this,继续另一条路径
        CompletableFuture d; Completion t;
        if (f.casStack(h, t = h.next)) { // 将f的栈顶元素h出栈
            if (t != null) { // 表示出栈的h不是最后一个元素
                if (f != this) { // f不是this,即不是当前栈
                    pushStack(h); // 将f出栈的元素h压入当前的栈,这里是为了避免递归层次太深
                    continue;
                }
                h.next = null;    // detach 辅助GC
            }
            // tryFire就是触发当前栈的栈顶h被执行,完成之后又返回依赖h的其它CompletableFuture,
            // 使其通过while循环继续触发依赖它的其余阶段任务的执行
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

postComplete的代码很简短,但是其代表的逻辑含义却非常不容易理解,这主要是因为形成的依赖树结构复杂,总之, postComplete就是递归的触发依赖当前阶段的其他任务的执行,它一次只沿着一条路径将其压入当前栈,避免递归调用的层次太深。

具体的触发其他任务的执行是通过内嵌模式的 tryFire方法来完成的,嵌套模式只用于这里。为了理解 tryFire,我们再以 thenApply为例。

3.2 CompletionStage实现之thenApply

这是实现 CompletionStage的接口方法 thenApply,它包含三种形式(同步、默认线程池的异步、指定线程池的异步)这里我以同步模式为例,一般来说,我们都会以下面这种形式使用 thenApply

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
    }).thenApply(s -> s + " world").thenAccept(System.out::println);

按照 CompletableFuture的惯例, thenApply由一个 uniApplyStage方法实现,创建一个新的 CompletableFuture,并安排任务执行:

private  CompletableFuture uniApplyStage(Executor e, Function f) {
    if (f == null) throw new NullPointerException();
    // 创建CompletableFuture实例
    CompletableFuture d =  new CompletableFuture();
    if (e != null         // Executor不为空,表示需要安排异步执行
            || !d.uniApply(this, f, null)) {    // 尝试立即同步执行
        // 需要被安排异步执行,或者依赖的上一个阶段this还没完成,需要等待
        UniApply c = new UniApply(e, d, this, f);    //构造UniApply实例
        push(c);    // 将当前任务入栈,注意这里入的上一个阶段即this的栈,作为依赖其的阶段
        c.tryFire(SYNC);    //
    }
    return d;
}

对于同步任务,在入栈等待前会通过一个布尔型的 uniApply方法先尝试安排执行这个任务,这个布尔型的方法也是 CompletableFuture实现其他多种形式的方法的惯例,对应每一种形式的方法实现都有一个这样的返回布尔型的方法: uniAcceptuniRununiWhenCompleteuniHandleuniExceptionallyuniComposebiApply等等。

// 根据依赖的上一个阶段a是否完成,看要不要立即安排当前任务执行。
// 返回true表示已经同步完成执行了当前任务,
// 为false表示依赖的阶段a还没完成,需要等待,或者已经安排异步执行(如果是异步任务的话)
final  boolean uniApply(CompletableFuture a,
                           Function f,
                           UniApply c) {
    Object r; Throwable x;
    if (a == null || (r = a.result) == null || f == null)
        return false;    // 表示依赖的阶段a还没完成,还不能执行当前阶段
    tryComplete: if (result == null) { // 依赖的阶段a已经完成,当前阶段还没完成
        if (r instanceof AltResult) {
            // 如果依赖的阶段a是异常结束,那么当前阶段也异常结束
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        // 到这里表示依赖的阶段a是正常结束
        try {
            if (c != null && !c.claim())
                return false; // 只有在c不为空,并且不能被执行或者已经安排异步执行才会返回false

            // 拿到已经完成的依赖阶段a的结果,执行同步执行当前任务,并把结果设置到当前CompletableFuture阶段
            @SuppressWarnings("unchecked") S s = (S) r;
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            // 异常完成的处理
            completeThrowable(ex);
        }
    }
    return true;
}

// 通过自定义TAG,标记任务正在被执行,保证任务只会被执行一次。
// 该方法只会在不能被执行或者已经安排异步执行才会返回false
final boolean claim() {
    Executor e = executor;
    // 解锁成功,表示可以执行了
    if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
        if (e == null)    // 需要被安排同步执行,立即返回true
            return true;
        executor = null;    // disable 赋值GC
        e.execute(this);    // 否则立即安排异步执行
    }
    return false;
}

这个布尔型的方法返回 true表示已经同步完成执行了当前任务。为 false表示依赖的上一个阶段a还没完成,需要等待,或者已经安排异步执行(如果是异步任务的话),其中的 claim方法通过 CAS加锁保证任务只会被执行一次,同时还可以安排异步任务的执行。

回到 uniApplyStage,如果是异步任务,或者还不能立即执行的同步任务(因为上一个阶段还没结束),则创建 UniApply实例,并入栈,但在入栈之后,还会通过 tryFire进行一次尝试同步执行,下面来看其 tryFire实现:

// UniCompletion是Completion的子类,
// Completion继承了ForkJoinTask,实现了Runnable, AsynchronousCompletionTask
// UniCompletion的子类,主要就是实现tryFire
static final class UniApply extends UniCompletion {
    Function fn;
    UniApply(Executor executor, CompletableFuture dep,
             CompletableFuture src,
             Function fn) {
        super(executor, dep, src); this.fn = fn;
    }

    //根据不同的模式,尝试触发执行
    final CompletableFuture tryFire(int mode) {
        CompletableFuture d; CompletableFuture a;
        if ((d = dep) == null || //已经执行过的阶段的dep才会为null
            !d.uniApply(a = src, fn, mode > 0 ? null : this)) //尝试执行
            return null;    //已经安排异步任务异步执行,或者同步任务需要等待,返回null
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);    //同步任务已经执行完成,触发依赖它的其他阶段执行
    }
}

首先, UniApplyUniCompletion的子类, UniCompletionCompletion的子类, Completion继承了 ForkJoinTask,实现了 Runnable, AsynchronousCompletionTask,所以不论是同步任务还是异常任务其实都是 AsynchronousCompletionTask的实现类。

可见 tryFire还是调用 uniApply方法尝试执行的,不过这时候其第三个参数 c不再是 null,而是当前任务。因此会有机会执行 claim方法来安排异步任务被线程池调度执行。在同步任务完成之后,通过 postFire清理栈,并触发其他依赖该阶段的其他阶段执行。

final CompletableFuture postFire(CompletableFuture a, int mode) {
    if (a != null && a.stack != null) {     // 源栈不为空
        if (mode < 0 || a.result == null) // 是内嵌模式或者源阶段a还没完成,清理一下其栈
            a.cleanStack();
        else
            a.postComplete(); // 否则不是内嵌模式,并且源阶段a已经结束,继续触发依赖该阶段的其他阶段
    }
    if (result != null && stack != null) { // 当前阶段已经完成,并且有依赖它的其他阶段
        if (mode < 0)    // 内嵌模式,返回当前阶段
            return this;
        else
            postComplete(); // 同步或者异常模式,触发依赖它的其他阶段执行
    }
    return null;
}

这里的 postFiremode小于 0的内嵌模式就是上面 supplyAsync&#x4E2D;postCompletewhile循环中传递给 tryFire的参数,它会返回 this,避免了递归太深。

通过以上源码的分析,可见跟在 supplyAsync/runAsync异步阶段后面的同步阶段的执行可能会是调用整个阶段的外部主线程,也可能是执行异步阶段的线程池中的线程。

如果在安排同步任务的时候,刚好上一个异步阶段已经结束,那么就会使用外部主线程执行,否则入栈之后,在异步任务完成后,会通过内嵌的方式由执行异步任务的线程池中的线程调度执行,而异步任务则始终会被线程池中的线程调度执行。

实现 CompletionStage的其他接口方法都是类似 thenApply相同的套路模式,就不一一列举了。其中有一个与用户方法不对应的”Relay”类/方法。它们将结果从一个阶段复制到另一个阶段,用于辅助实现其中的一些方法。

3.3 Future实现

// 如果尚未完成,则使用CancellationException异常完成此CompletableFuture。
// 尚未完成的依赖CompletableFutures也将以一个CancellationException导致的CompletionException异常完成。
public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = (result == null) &&     // 还没完成
      internalComplete(new AltResult(new CancellationException()));//尝试以CancellationException异常完成
    postComplete();     // 触发依赖它的其他阶段也异常结束
    return cancelled || isCancelled();
}

// 如果此CompletableFuture在正常完成之前被取消,则返回true。
public boolean isCancelled() {
    Object r;
    // 若结果是CancellationException异常,则表示是被取消的
    return ((r = result) instanceof AltResult) &&
        (((AltResult)r).ex instanceof CancellationException);
}

可见, cancel只会尝试取消还没完成的 CompletableFuture(即还没有设置结果字段result),由于 cancel的参数 mayInterruptIfRunning并没有使用,一旦任务已经在执行了,则不会中断其执行。取消行为只会在其未完成之前修改其结果指示其已经被取消,即使已经处于执行中的任务最后成功完成也不能再修改其结果。

取消成功(即把 result成功修改为被取消状态)的将会以 CancellationException异常完成。不论取消是否成功,都会通过 postComplete递归的触发依赖当前阶段的其他任务的尝试执行。

isDone()方法只要有了结果(result字段不为 null),不论是正常还是异常结束都会返回 true,由于 CompletableFutures内部实现的时候将本身返回 null的结果包装成了 AltResult对象,所以当返回 null结果时也不例外。

对Future的实现最重要的就是非get莫属了:

// 阻塞等待结果,可以被中断。
public T get() throws InterruptedException, ExecutionException {
    Object r;
    return reportGet((r = result) == null ? waitingGet(true) : r);
}

// 在等待之后返回原始结果,如果是可中断或已经中断的,则返回null。
private Object waitingGet(boolean interruptible) {
    Signaller q = null;
    boolean queued = false;
    int spins = -1;
    Object r;
    while ((r = result) == null) { // 只要没完成就继续
        if (spins < 0) // 设置多处理上的自旋次数
            spins = (Runtime.getRuntime().availableProcessors() > 1) ?
                1 << 8 : 0; // Use brief spin-wait on multiprocessors
        else if (spins > 0) { // 自旋
            if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                --spins;
        } // 自旋结束,还没完成,初始化Signaller
        else if (q == null)
            q = new Signaller(interruptible, 0L, 0L);
        else if (!queued) // 将Signaller入栈
            queued = tryPushStack(q);
        else if (interruptible && q.interruptControl < 0) { // 可以被中断,并且已经被中断
            q.thread = null;    // 辅助GC
            cleanStack(); // 清理一下其栈
            return null;  返回null
        }
        else if (q.thread != null && result == null) {
            try {
                ForkJoinPool.managedBlock(q); // 阻塞等待
            } catch (InterruptedException ie) {
                q.interruptControl = -1;  // 被中断了
            }
        }
    }

    // 到这里说明result已经有了结果了
    if (q != null) {
        q.thread = null; // 辅助GC
        if (q.interruptControl < 0) { // 有被中断过
            if (interruptible)
                r = null; // report interruption    // 若支持中断,则返回null
            else
                Thread.currentThread().interrupt(); // 不支持中断,则补偿中断标记
        }
    }
    postComplete(); // 递归触发其他依赖当前阶段的其他阶段执行
    return r;    // 支持中断并且被中断过,返回null,否则返回原始结果
}

// 使用Future报告结果。
private static  T reportGet(Object r)
    throws InterruptedException, ExecutionException {
    if (r == null) // 结果为null表示可以被中断,并且被中断了,立即抛出中断异常
        throw new InterruptedException();
    if (r instanceof AltResult) { // 空结果或者异常结果
        Throwable x, cause;
        if ((x = ((AltResult)r).ex) == null) // 空结果返回null
            return null;
        if (x instanceof CancellationException) // 被取消的,返回CancellationException异常
            throw (CancellationException)x;
        if ((x instanceof CompletionException) &&
            (cause = x.getCause()) != null)    // CompletionException异常结果的,返回导致其异常结束的异常
            x = cause;
        throw new ExecutionException(x); // 其他运行时异常,包装成CompletionException异常抛出
    }
    @SuppressWarnings("unchecked") T t = (T) r; // 非异常结束,返回正常结果
    return t;
}

先说结果: get()方法是可以被中断的,因此发生中断的话将会抛出 InterruptedException(即使已经拿到结果),被取消的则会抛出 CancellationException异常,任务执行产生的其他异常导致异常结束的,其异常会被封装成 ExecutionException抛出。

其实现过程首先使用waitingGet方法返回获得的结果,如果支持中断并且被中断过则返回null,否则返回响应的结果,等待的过程采用了 自旋 + ForkJoinPool.managedBlock方式,它将调用 get方法等待结果也视为一种任务,

构造成 Completion的子类 Signaller进入被等待执行结束的 CompletableFutures依赖栈,一旦它完成就会通过 postComplete方法的 tryFire触发执行,其tryFire会唤醒阻塞的线程,从而使 get方法返回,

值得注意的是,这里阻塞使用了 ForkJoinPool.managedBlock的方法,因为阻塞的线程可能是 ForkJoinPool线程池中的工作线程,为了不让线程池中的任务由于过多的工作线程被阻塞导致饥饿堆积等待, ForkJoinPool.managedBlock在阻塞的时候会激活新的线程补偿当前阻塞的线程,保证线程池的并行性。

最后, reportGet方法根据 waitingGet方法返回的结果,该跑异常的跑异常,该返回正常结果的,返回正常结果。

超时版本的 get方法,其实现阻塞等待的方法 timedGet方法与 waitingGet原理差不多,除了因为也支持中断可能抛出 InterruptedException之外,还可能会因超时抛出 TimeoutException,就不再详细说明了。

3.4 CompletableFutures常用方法

join方法不是实现 Future接口的方法,是 CompletableFutures自己的方法,它与 get方法的作用都是等待执行返回结果,但是它不支持中断,必须要等到执行结果:

public T join() {
    Object r;
    return reportJoin((r = result) == null ? waitingGet(false) : r);
}

// 解析结果,或者抛出未捕获的异常
private static  T reportJoin(Object r) {
    if (r instanceof AltResult) { // 空结果或者异常结果
        Throwable x;
        if ((x = ((AltResult)r).ex) == null) // 空结果返回null
            return null;
        if (x instanceof CancellationException)
            throw (CancellationException)x; // 被取消的,返回CancellationException异常
        if (x instanceof CompletionException)
            throw (CompletionException)x;  // CompletionException异常
        throw new CompletionException(x);    // 其他异常也被包装成CompletionException异常
    }
    @SuppressWarnings("unchecked") T t = (T) r;
    return t;
}

可见, join和get方法的区别有两点:

public static CompletableFuture allOf(CompletableFuture... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

static CompletableFuture andTree(CompletableFuture[] cfs,
                                       int lo, int hi) {
    CompletableFuture d = new CompletableFuture();
    if (lo > hi) // empty
        d.result = NIL;
    else {
        CompletableFuture a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                     andTree(cfs, lo, mid))) == null ||
               (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                     andTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.biRelay(a, b)) {
            BiRelay c = new BiRelay<>(d, a, b);
            a.bipush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}

当所有给定的 CompletableFutures都执行结束时,它就完成,或者其中任何一个异常结束,它也会立即以同样的异常结束。列表中任何一个 CompletableFuture的结果都不会反映到返回的 CompletableFuture中,但可以遍历列表通过其 get方法获取。

如果一个 CompletableFuture都没指定,即 cfs为长度为 0的空数组,那么将返回一个完成结果为 nullCompletableFuture

由于 allOf返回的是无返回结果的 CompletableFuture,因此不能直接获取那些列表中每一个 CompletableFuture的结果,例如这样将打印出null:

System.out.println(CompletableFuture.allOf(a, b, c, ....).thenApply(V -> V).join());
public static CompletableFuture anyOf(CompletableFuture... cfs) {
    return orTree(cfs, 0, cfs.length - 1);
}

static CompletableFuture orTree(CompletableFuture[] cfs,
                                        int lo, int hi) {
    CompletableFuture d = new CompletableFuture();
    if (lo  a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                    orTree(cfs, lo, mid))) == null ||
              (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                    orTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.orRelay(a, b)) {
            OrRelay c = new OrRelay<>(d, a, b);
            a.orpush(b, c);
            c.tryFire(SYNC);
        }
    }
    return d;
}

当给定的 CompletableFutures其中任何一个执行结束时,不论是正常还是异常结束,它就以相同的结果或者异常结束。注意异常结束的会被包装成 CompletionException异常。

如果一个 CompletableFuture都没指定,即 cfs为长度为 0的空数组,那么将返回一个执行没有结束的 CompletableFuture

注意,如果给定的 cfs数组中的 CompletableFuture其返回类型不一致,那么 anyOf的最终返回的 CompletableFuture的结果的类型也将不确定。不像 allOf返回的 Void的结果, anyOf可以用返回的结果继续处理,例如:

CompletableFuture.anyOf(
                CompletableFuture.supplyAsync(() -> "Tom"),
                CompletableFuture.supplyAsync(() -> "John"),
                CompletableFuture.supplyAsync(() -> "Jack")
        ).thenApply(name -> "hello "+ name)
        .thenAccept(System.out::println);
public T getNow(T valueIfAbsent) {
    Object r;
    return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}

如果已经执行完成,就返回与执行 join时相同的结果:即被取消的抛出 CancellationException,正常结果的返回结果,其他异常一律被包装成 CompletionException抛出。如果调用该方法的时候还没执行结束,则返回指定的结果。注意,在发现执行还没有结束时,该方法并没有强制使执行结束。

public boolean complete(T value) {
    // 以指定的值设置到结果result字段,只会在result还是null的时候成功
    boolean triggered = completeValue(value);
    postComplete();
    return triggered;
}

如果执行还没结束,就以指定的值作为其执行结果,并触发依赖它的其他阶段执行。由于该方法直接修改了执行的结果,即使后面该任务真正的逻辑执行完之后也不能再更新该 result,所以如果此方法成功,那么调用该阶段的 getjoin方法返回的值就是该方法指定的 value值。

如果还没执行结果,就以指定的异常作为其执行结果,并触发依赖它的其他阶段执行。由于该方法直接修改了执行的结果,即使后面该任务真正的逻辑执行完之后也不能再更新该 result,所以如果此方法成功,那么调用该阶段的 getjoin方法也将抛出异常,不同的是, get可能抛出被封装成 ExecutionException的异常, join可能抛出被封装成 CompletionException的异常。

强制以指定的值或者异常作为当前阶段的执行结果,不论其是否已经完成。与 completecompleteExceptionally不同, completecompleteExceptionally只会在执行还没结束的情况,更新结果。

在这只后返回的 getjoin也将返回相应的结果或异常。这两方法一般用于错误恢复,但也不一定有用。

如果执行以异常完成,则该方法返回 true,这里的异常结束包括被取消,被 completeExceptionallyobtrudeException方法主动异常结束,当然也包括任务执行过程中异常结束的情况。

public int getNumberOfDependents() {
    int count = 0;
    for (Completion p = stack; p != null; p = p.next)
        ++count;
    return count;
}

返回等待该 CompletableFutures完成的其他 CompletableFutures的估计个数,只是一个瞬态值,一般用于监控系统状态。

3.5 构造方法

除了使用静态方法 runAsyncsupplyAsync之外,构造一个 CompletableFutures还可以通过以下几种途径,首先就是构造方法:

CompletableFutures提供了两个构造方法,一个无参,一个可以指定结果:

public CompletableFuture() {
}

private CompletableFuture(Object r) {
    this.result = r;
}

另一种就是使用静态方法 completedFuture

public static  CompletableFuture completedFuture(U value)

其内部还是使用的有参的构造方法返回一个 CompletableFuture实例。利用构造无参的构造方法可以这些实现异步任务的执行:

public static CompletableFuture asyncDoSomething(String a, String b) {
    CompletableFuture future = new CompletableFuture();
    new Thread(() -> {
        try{
            Thread.sleep(TimeUnit.SECONDS.toSeconds(10000));
            future.complete(a + b);
        } catch (Exception e) {
            future.completeExceptionally(e);
        }
    }).start();
    return future;
}

public static void main(String[] args) {
    asyncDoSomething("hello"," world").thenAccept(System.out::println);
}

但一般我们也不会这样用了,我这里只是为了举一个例子而已。

CompletableFutureCompletionStageFuture接口的实现类,它提供了大量的用于异步或同步执行任务的方法,其通过链表栈形成的树形结构组织那些具有依赖关系的各个阶段 CompletableFuture,异步任务的执行默认通过 ForkJoinPool.commonPool执行(其创建的工作线程都是守护线程,不用担心 JVM挂起),除非指定了 Executor参数,因为默认的线程池并行度为CPU核心数并发度并不高,因此大多数时候我们都会指定自定义的 Executor实现类。

同步任务的执行有可能是被主线程执行,也可能是被完成上一个阶段的线程池中的线程执行。另外所有实现 CompletionStage的接口方法都是独立于其他公共方法实现的,基本上每一个方法都对应了一个内部类,因此一个方法的行为不会受到子类中其他方法重写的影响。

5.1 get和join

实现 Futurecancel方法不会中断已经被安排执行的任务,仅仅是在任务的结果还没被回写之前,更新其结果为被取消状态,一旦将其结果设置为被取消状态,还没有开始仔执行的将不会被调度执行,已经在执行的,最后就算正常完成也不能再修改结果。

Future.getCompletableFuturejoin方法除了 join不能被中断之外,对异常结束分别会将异常包装成 ExecutionExceptionCompletionException,当然 InterruptedExceptionTimeoutExceptionCancellationException除外,通常我们都会使用 join而不是 get,大概是 join方法不需要处理异常,而 get方法有 InterruptedExceptionTimeoutException异常需要处理吧。

Original: https://www.cnblogs.com/ciel717/p/16436288.html
Author: 夏尔_717
Title: Java异步执行器CompletableFuture源码解析

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/599135/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • day04-1群聊功能

    多用户即时通讯系统04 4.编码实现03 4.5功能实现-群聊功能实现 4.5.1思路分析 群聊的实现思路和私聊的实现非常类似。 不同的是:私聊时,服务端接收到消息后,只需要找出接…

    数据库 2023年6月11日
    083
  • ZooKeeper集群部署

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    数据库 2023年6月16日
    0102
  • 在CentOS 7系统安装StoneDB数据库

    今天我会进行StoneDB数据库在CentOS 7系统下的安装。 在官方的快速部署文档中有详细的安装流程,我会严格遵循流程。 [En] There is a detailed in…

    数据库 2023年5月24日
    084
  • jQuery基础

    作者导言: 引用偶像刘德华的一句话 “学到的就要教人,赚到的就要给人”! 以下是关联的web前端基础知识文章,通过这些文章,您既可以系统地学习和了解这些知识…

    数据库 2023年6月14日
    091
  • Tomcat配置文件Server.xml解析

    一、Sax的事件驱动模型 类图 基础实现类 DefaultHandler2: 此类扩展了SAX2基本处理程序类,以支持SAX2 LexicalHandler , DeclHandl…

    数据库 2023年6月11日
    057
  • MySQL实战45讲 17

    17 | 如何正确地显示随机消息? 场景:从一个单词表中随机选出三个单词。 表的建表语句和初始数据的命令如下,在这个表里面插入了 10000 行记录: CREATE TABLE w…

    数据库 2023年6月14日
    065
  • 关于Mysql索引的数据结构

    索引的数据结构 1、为什么使用索引 &#x6982;&#x5FF5;: 索引是存储索引用于快速找到数据记录的一种数据结构,就好比一本书的目录部分,通过目录中对应的文…

    数据库 2023年5月24日
    090
  • 【MySQL】MySQL的安装、卸载、配置、登陆和退出

    1 MySQL安装 安装环境:Win10 64位软件版本:MySQL 5.7.24 解压版 1.1 下载 https://downloads.mysql.com/archives/…

    数据库 2023年6月16日
    089
  • 分布式消息队列RocketMQ(一)安装与启动

    分布式消息队列RocketMQ 一、RocketMQ简介 RocketMQ(火箭MQ) 出自于阿里,后开源给apache成为apache的顶级开源项目之一,顶住了淘宝10年的 双1…

    数据库 2023年6月6日
    098
  • 2022-08-20 数据库连接池

    每次去初始化一个连接池,连接池中会有很多个连接等待被使用,使用完连接之后,不需要关闭连接,只需要把连接还回到连接池,还回到连接池的操作不需要我们手动控制。 数据库连接池负责分配、管…

    数据库 2023年6月14日
    088
  • Activiti 7 源码学习

    启动分析 源码版本是 7.1.0.M6 首先从 ProcessEngineAutoConfiguration 开始 ProcessEngineAutoConfiguration 是…

    数据库 2023年6月14日
    092
  • JWT简介

    JWT简介 在用户注册或登录后,我们想记录用户的登录状态,或者为用户创建身份认证的凭证。我们不再使用Session认证机制,而使用Json Web Token认证机制。 (1) 什…

    数据库 2023年6月14日
    099
  • Question07-查询学过”张三”老师授课的同学的信息

    * SELECT DISTINCT Student.* FROM Student , SC , Course , Teacher WHERE Student.SID = SC.SI…

    数据库 2023年6月16日
    054
  • datatable 转化成xml以及json

    datatable dt=xxx获取 赋值给应用的字段 var pp=dt.row[0][“datatable里面的字段”].tostring() var …

    数据库 2023年6月9日
    078
  • 学习笔记——Django项目的删除数据、查询数据(filter、get、exclude)

    2022-09-30 删除数据: 方式一: 打开pycharm,进入虚拟环境,进入shell环境(python manage.py shell)。 删除数据,接上面的笔记——&#8…

    数据库 2023年6月14日
    0118
  • 达梦产品技术支持培训-day8-DM8数据库备份与还原-实操

    Disql 工具:联机数据备份与还原,包括库备份、表空间备份与还原、表备份与还原; DMRMAN 工具:脱机数据库备份还原与恢复; 客户端工具 MANAGER和CONSOLE:对应…

    数据库 2023年6月11日
    073
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球