CompletableFuture
是对 Future
的一种强有力的扩展, Future
只能通过轮询 isDone()
方法或者调用 get()
阻塞等待获取一个异步任务的结果,才能继续执行下一步,当我们执行的异步任务很多,而且相互之前还要依赖结果的时候,可能会创建很多这样的 Future
,并通过 get
或者轮询等待执行结果返回之后继续执行,这样的代码显得很不方便而且也不高效。
CompletableFuture
同时继承了 CompletionStage
和 Future
,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过 lambda
表达式的风格处理各个执行阶段的结果。
CompletionStage
是 Java8
新增得一个接口,用于异步执行中的阶段处理,其大量用在 Lambda
表达式计算过程中,目前只有 CompletableFuture
一个实现类。
CompletionStage
定义了一组接口用于在一个阶段执行结束之后,要么继续执行下一个阶段,要么对结果进行转换产生新的结果等等,一般来说要执行下一个阶段都需要上一个阶段正常完成,当然这个类也提供了对异常结果的处理接口。
CompletionStage
的接口方法可以从多种角度进行分类,从最宏观的横向划分, CompletionStage
的接口主要分三类:
还有一组特别的方法带有 compose
字样,它以依赖阶段本身作为参数而不是阶段产生的结果进行产出型(或函数型)操作。
在以上三类横向划分方法的基础上,又可以按照以下的规则对这些接口方法进行纵向的划分:
CompletionStage
的异常规则
除了 whenComplete
不要求其依赖的阶段是正常完成还是异常完成,以及 handle
前缀的方法只要求其依赖的阶段异常完成之外,其余所有接口方法都要求其依赖的阶段正常完成。
- 如果一个阶段的执行由于一个(未捕获的)异常或错误而突然终止,那么所有要求其完成的相关阶段也将异常地完成,并通过
CompletionException
包装其具体异常堆栈。 - 如果一个阶段同时依赖于两个阶段,并且两个阶段都异常地完成,那么
CompletionException
可以对应于这两个异常中的任何一个。 - 如果一个阶段依赖于另外两个阶段中的任何一个,并且其中只有一个异常完成,则不能保证依赖阶段是正常完成还是异常完成。
- 在使用方法
whenComplete
的情况下,当提供的操作本身遇到异常时,如果前面的阶段没有异常完成,则阶段将以其异常作为原因异常完成。
所有方法都遵循上述触发、执行和异常完成规范,此外,虽然用于传递一个表示完成结果的参数(也就是说,对于T类型的参数)可以为 null
,但是如果为其它任何参数传递 null
都将导致 NullPointerException
。
此接口不定义用于初始创建、强制正常或异常完成、探测完成状态或结果或等待阶段完成的方法。 CompletionStage
的实现类可以提供适当的方法来实现这些效果。
方法 toCompletableFuture
通过提供一个公共转换类型,支持该接口的不同实现之间的互操作性。
CompletableFuture
通过以下策略实现了接口 CompletionStage
:
CompletableFuture
通过以下策略实现了接口 Future
:
以下是 CompletableFuture
的内部实现概述:
由于 CompletableFuture
可以依赖其他一个或多个 CompletableFuture
,所以在内部实现的时候,每一个 CompletableFuture
都拥有一个依赖操作栈,栈中的元素是 Completion
的子类,它包含相关的操作、 CompletableFuture
以及源操作。
当一个 CompletableFuture
完成之后会从栈中弹出并递归执行那些依赖它的 CompletableFuture
。由于依赖栈中的 Completion
元素也包含 CompletableFuture
对象,其 CompletableFuture
对象可能也拥有一个依赖栈,因此将形成一个非常复杂的依赖树。
CompletableFuture
对每一种形式的实现使用了不同的 Completion
子类,例如:单输入(UniCompletion
)、双输入(BiCompletion
)、投影(使用 BiCompletion
两个输入中的任何一个(而不是两个)的双输入)、共享(CoCompletion
,由两个源中的第二个使用)、零输入(不消费不产出的 Runnable
)操作和解除阻塞等待(get()
、 join()
方法)的信号器 Signallers
。
Completion
类扩展了 ForkJoinTask
来启用异步执行(不增加空间开销,因为我们利用它的”标记”方法来维护声明).它还被声明为Runnable,可以被任意线程池调度执行。
CompletableFuture
又在 UniCompletion
、 BiCompletion
、 CoCompletion
等这几种 Completion
子类的基础上扩展出了实现 CompletionStage
具体接口方法的前缀为”Uni”, “Bi”, “Or”的子类。例如实现单个输入、两个输入、两者之一的 thenApply
对应的就是 UniApply
、 BiApply
、 OrApply
。
CompletableFuture
在实现 CompletionStage
接口方法甚至自己独有的方法使都采用了相同的模式,以及调度策略,因此只要立即了一种方法的实现,其他方法都是类似的原理。
3.1 runAsync和supplyAsync
虽然 CompletableFuture
提供了无参的构造方法,但我们一般从它的静态方法开始,根据是否有返回值,它对外提供了两种形式的执行异步任务的方法:
// 执行无返回值的异步任务
public static CompletableFuture runAsync(Runnable runnable);
public static CompletableFuture runAsync(Runnable runnable, Executor executor);
// 执行有返回值的异步任务
public static CompletableFuture supplyAsync(Supplier supplier);
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);
它们都以 async为后缀,根据 CompletionStage
的接口定义规律也可以知道是通过异步安排执行。
对于不支持并行运算的环境,例如单核 CPU
, CompletableFuture
默认将采用一个任务创建一个 Thread
实例的方式执行。
以 supplyAsync
方法源码为例,继续向下分析:
public static CompletableFuture supplyAsync(Supplier supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
static CompletableFuture asyncSupplyStage(Executor e, Supplier f) {
if (f == null) throw new NullPointerException();
// 新创建一个CompletableFuture,以此构建AsyncSupply作为Executor的执行参数
CompletableFuture d = new CompletableFuture();
// 安排异步执行
// AsyncSupply继承了ForkJoinTask, 实现了Runnable, AsynchronousCompletionTask接口
e.execute(new AsyncSupply(d, f));
// 立即返回
return d;
}
可见, supplyAsync
具体的实现调用了 asyncSupplyStage
,这也是 CompletableFuture
的内部实现惯例,每一种方法的实现都对应一个 XXXStage
方法,用于创建 stage
对象(这里就是实现了 CompletionStage
接口的 CompletableFuture
),并安排任务的执行。
这里由于是异步任务,所以直接创建了异步任务实例 AsyncSupply
,然后交给线程池执行。接着看 AsyncSupply
实现:
// 实现了ForkJoinTask,Runnable可以被ForkJoinPool,或者其他实现Executor的自定义线程池调度
static final class AsyncSupply extends ForkJoinTask
implements Runnable, AsynchronousCompletionTask {
// 依赖的CompletableFuture
// AsyncSupply作为一个依赖Task,dep作为这个Task的Future
CompletableFuture dep;
// 具体的任务执行逻辑
// fn作为这个Task的具体执行逻辑,函数式编程
Supplier fn;
AsyncSupply(CompletableFuture dep, Supplier fn) {
this.dep = dep; this.fn = fn;
}
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
// exec由ForkJoinPool调度执行,最终直接调用run
public final boolean exec() { run(); return true; }
// 由ForkJoinPool间接调度,或其他自定义线程池直接调用
public void run() {
CompletableFuture d; Supplier f;
if ((d = dep) != null && (f = fn) != null) {
dep = null; fn = null;
if (d.result == null) { // result为空表示任务还没完成
try {
// 执行任务并将结果设置到依赖的CompletableFuture
d.completeValue(f.get());
} catch (Throwable ex) {
// 异常完成的情况
d.completeThrowable(ex);
}
}
// 从依赖栈中弹出并触发执行依赖当前CompletableFuture的其他阶段
d.postComplete();
}
}
}
AsyncSupply
实现了 ForkJoinTask
, Runnable
是为了兼容 ForkJoinPool
线程池和其他自定义的 Executor
线程池实现, run
方法就是线程调度时执行任务的逻辑,就是执行给定的操作,并将结果设置到当前任务对应的 CompletableFuture
对象d(也就是依赖该任务的阶段),最后通过 d.postComplete
触发其他依赖阶段d的其他任务执行。postComplete的逻辑如下:
//递归触发其他依赖当前阶段的其他阶段执行
final void postComplete() {
CompletableFuture f = this; Completion h;
while ((h = f.stack) != null || // f对应的栈不为空
(f != this && (h = (f = this).stack) != null)) { // f对应的栈为空了,重新回到this,继续另一条路径
CompletableFuture d; Completion t;
if (f.casStack(h, t = h.next)) { // 将f的栈顶元素h出栈
if (t != null) { // 表示出栈的h不是最后一个元素
if (f != this) { // f不是this,即不是当前栈
pushStack(h); // 将f出栈的元素h压入当前的栈,这里是为了避免递归层次太深
continue;
}
h.next = null; // detach 辅助GC
}
// tryFire就是触发当前栈的栈顶h被执行,完成之后又返回依赖h的其它CompletableFuture,
// 使其通过while循环继续触发依赖它的其余阶段任务的执行
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
postComplete
的代码很简短,但是其代表的逻辑含义却非常不容易理解,这主要是因为形成的依赖树结构复杂,总之, postComplete
就是递归的触发依赖当前阶段的其他任务的执行,它一次只沿着一条路径将其压入当前栈,避免递归调用的层次太深。
具体的触发其他任务的执行是通过内嵌模式的 tryFire
方法来完成的,嵌套模式只用于这里。为了理解 tryFire
,我们再以 thenApply
为例。
3.2 CompletionStage实现之thenApply
这是实现 CompletionStage
的接口方法 thenApply
,它包含三种形式(同步、默认线程池的异步、指定线程池的异步)这里我以同步模式为例,一般来说,我们都会以下面这种形式使用 thenApply
:
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
}).thenApply(s -> s + " world").thenAccept(System.out::println);
按照 CompletableFuture
的惯例, thenApply
由一个 uniApplyStage方法实现,创建一个新的 CompletableFuture
,并安排任务执行:
private CompletableFuture uniApplyStage(Executor e, Function f) {
if (f == null) throw new NullPointerException();
// 创建CompletableFuture实例
CompletableFuture d = new CompletableFuture();
if (e != null // Executor不为空,表示需要安排异步执行
|| !d.uniApply(this, f, null)) { // 尝试立即同步执行
// 需要被安排异步执行,或者依赖的上一个阶段this还没完成,需要等待
UniApply c = new UniApply(e, d, this, f); //构造UniApply实例
push(c); // 将当前任务入栈,注意这里入的上一个阶段即this的栈,作为依赖其的阶段
c.tryFire(SYNC); //
}
return d;
}
对于同步任务,在入栈等待前会通过一个布尔型的 uniApply
方法先尝试安排执行这个任务,这个布尔型的方法也是 CompletableFuture
实现其他多种形式的方法的惯例,对应每一种形式的方法实现都有一个这样的返回布尔型的方法: uniAccept
、 uniRun
、 uniWhenComplete
、 uniHandle
、 uniExceptionally
、 uniCompose
、 biApply
等等。
// 根据依赖的上一个阶段a是否完成,看要不要立即安排当前任务执行。
// 返回true表示已经同步完成执行了当前任务,
// 为false表示依赖的阶段a还没完成,需要等待,或者已经安排异步执行(如果是异步任务的话)
final boolean uniApply(CompletableFuture a,
Function f,
UniApply c) {
Object r; Throwable x;
if (a == null || (r = a.result) == null || f == null)
return false; // 表示依赖的阶段a还没完成,还不能执行当前阶段
tryComplete: if (result == null) { // 依赖的阶段a已经完成,当前阶段还没完成
if (r instanceof AltResult) {
// 如果依赖的阶段a是异常结束,那么当前阶段也异常结束
if ((x = ((AltResult)r).ex) != null) {
completeThrowable(x, r);
break tryComplete;
}
r = null;
}
// 到这里表示依赖的阶段a是正常结束
try {
if (c != null && !c.claim())
return false; // 只有在c不为空,并且不能被执行或者已经安排异步执行才会返回false
// 拿到已经完成的依赖阶段a的结果,执行同步执行当前任务,并把结果设置到当前CompletableFuture阶段
@SuppressWarnings("unchecked") S s = (S) r;
completeValue(f.apply(s));
} catch (Throwable ex) {
// 异常完成的处理
completeThrowable(ex);
}
}
return true;
}
// 通过自定义TAG,标记任务正在被执行,保证任务只会被执行一次。
// 该方法只会在不能被执行或者已经安排异步执行才会返回false
final boolean claim() {
Executor e = executor;
// 解锁成功,表示可以执行了
if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
if (e == null) // 需要被安排同步执行,立即返回true
return true;
executor = null; // disable 赋值GC
e.execute(this); // 否则立即安排异步执行
}
return false;
}
这个布尔型的方法返回 true
表示已经同步完成执行了当前任务。为 false
表示依赖的上一个阶段a还没完成,需要等待,或者已经安排异步执行(如果是异步任务的话),其中的 claim
方法通过 CAS
加锁保证任务只会被执行一次,同时还可以安排异步任务的执行。
回到 uniApplyStage
,如果是异步任务,或者还不能立即执行的同步任务(因为上一个阶段还没结束),则创建 UniApply
实例,并入栈,但在入栈之后,还会通过 tryFire
进行一次尝试同步执行,下面来看其 tryFire
实现:
// UniCompletion是Completion的子类,
// Completion继承了ForkJoinTask,实现了Runnable, AsynchronousCompletionTask
// UniCompletion的子类,主要就是实现tryFire
static final class UniApply extends UniCompletion {
Function fn;
UniApply(Executor executor, CompletableFuture dep,
CompletableFuture src,
Function fn) {
super(executor, dep, src); this.fn = fn;
}
//根据不同的模式,尝试触发执行
final CompletableFuture tryFire(int mode) {
CompletableFuture d; CompletableFuture a;
if ((d = dep) == null || //已经执行过的阶段的dep才会为null
!d.uniApply(a = src, fn, mode > 0 ? null : this)) //尝试执行
return null; //已经安排异步任务异步执行,或者同步任务需要等待,返回null
dep = null; src = null; fn = null;
return d.postFire(a, mode); //同步任务已经执行完成,触发依赖它的其他阶段执行
}
}
首先, UniApply
是 UniCompletion
的子类, UniCompletion
是 Completion
的子类, Completion
继承了 ForkJoinTask
,实现了 Runnable
, AsynchronousCompletionTask
,所以不论是同步任务还是异常任务其实都是 AsynchronousCompletionTask
的实现类。
可见 tryFire
还是调用 uniApply
方法尝试执行的,不过这时候其第三个参数 c
不再是 null
,而是当前任务。因此会有机会执行 claim
方法来安排异步任务被线程池调度执行。在同步任务完成之后,通过 postFire
清理栈,并触发其他依赖该阶段的其他阶段执行。
final CompletableFuture postFire(CompletableFuture a, int mode) {
if (a != null && a.stack != null) { // 源栈不为空
if (mode < 0 || a.result == null) // 是内嵌模式或者源阶段a还没完成,清理一下其栈
a.cleanStack();
else
a.postComplete(); // 否则不是内嵌模式,并且源阶段a已经结束,继续触发依赖该阶段的其他阶段
}
if (result != null && stack != null) { // 当前阶段已经完成,并且有依赖它的其他阶段
if (mode < 0) // 内嵌模式,返回当前阶段
return this;
else
postComplete(); // 同步或者异常模式,触发依赖它的其他阶段执行
}
return null;
}
这里的 postFire
中 mode
小于 0
的内嵌模式就是上面 supplyAsync中postComplete
的 while
循环中传递给 tryFire
的参数,它会返回 this
,避免了递归太深。
通过以上源码的分析,可见跟在 supplyAsync/runAsync
异步阶段后面的同步阶段的执行可能会是调用整个阶段的外部主线程,也可能是执行异步阶段的线程池中的线程。
如果在安排同步任务的时候,刚好上一个异步阶段已经结束,那么就会使用外部主线程执行,否则入栈之后,在异步任务完成后,会通过内嵌的方式由执行异步任务的线程池中的线程调度执行,而异步任务则始终会被线程池中的线程调度执行。
实现 CompletionStage
的其他接口方法都是类似 thenApply
相同的套路模式,就不一一列举了。其中有一个与用户方法不对应的”Relay”类/方法。它们将结果从一个阶段复制到另一个阶段,用于辅助实现其中的一些方法。
3.3 Future实现
// 如果尚未完成,则使用CancellationException异常完成此CompletableFuture。
// 尚未完成的依赖CompletableFutures也将以一个CancellationException导致的CompletionException异常完成。
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = (result == null) && // 还没完成
internalComplete(new AltResult(new CancellationException()));//尝试以CancellationException异常完成
postComplete(); // 触发依赖它的其他阶段也异常结束
return cancelled || isCancelled();
}
// 如果此CompletableFuture在正常完成之前被取消,则返回true。
public boolean isCancelled() {
Object r;
// 若结果是CancellationException异常,则表示是被取消的
return ((r = result) instanceof AltResult) &&
(((AltResult)r).ex instanceof CancellationException);
}
可见, cancel
只会尝试取消还没完成的 CompletableFuture
(即还没有设置结果字段result),由于 cancel
的参数 mayInterruptIfRunning
并没有使用,一旦任务已经在执行了,则不会中断其执行。取消行为只会在其未完成之前修改其结果指示其已经被取消,即使已经处于执行中的任务最后成功完成也不能再修改其结果。
取消成功(即把 result
成功修改为被取消状态)的将会以 CancellationException
异常完成。不论取消是否成功,都会通过 postComplete
递归的触发依赖当前阶段的其他任务的尝试执行。
isDone()
方法只要有了结果(result
字段不为 null
),不论是正常还是异常结束都会返回 true
,由于 CompletableFutures
内部实现的时候将本身返回 null
的结果包装成了 AltResult
对象,所以当返回 null
结果时也不例外。
对Future的实现最重要的就是非get莫属了:
// 阻塞等待结果,可以被中断。
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
// 在等待之后返回原始结果,如果是可中断或已经中断的,则返回null。
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) { // 只要没完成就继续
if (spins < 0) // 设置多处理上的自旋次数
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) { // 自旋
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
} // 自旋结束,还没完成,初始化Signaller
else if (q == null)
q = new Signaller(interruptible, 0L, 0L);
else if (!queued) // 将Signaller入栈
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) { // 可以被中断,并且已经被中断
q.thread = null; // 辅助GC
cleanStack(); // 清理一下其栈
return null; 返回null
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q); // 阻塞等待
} catch (InterruptedException ie) {
q.interruptControl = -1; // 被中断了
}
}
}
// 到这里说明result已经有了结果了
if (q != null) {
q.thread = null; // 辅助GC
if (q.interruptControl < 0) { // 有被中断过
if (interruptible)
r = null; // report interruption // 若支持中断,则返回null
else
Thread.currentThread().interrupt(); // 不支持中断,则补偿中断标记
}
}
postComplete(); // 递归触发其他依赖当前阶段的其他阶段执行
return r; // 支持中断并且被中断过,返回null,否则返回原始结果
}
// 使用Future报告结果。
private static T reportGet(Object r)
throws InterruptedException, ExecutionException {
if (r == null) // 结果为null表示可以被中断,并且被中断了,立即抛出中断异常
throw new InterruptedException();
if (r instanceof AltResult) { // 空结果或者异常结果
Throwable x, cause;
if ((x = ((AltResult)r).ex) == null) // 空结果返回null
return null;
if (x instanceof CancellationException) // 被取消的,返回CancellationException异常
throw (CancellationException)x;
if ((x instanceof CompletionException) &&
(cause = x.getCause()) != null) // CompletionException异常结果的,返回导致其异常结束的异常
x = cause;
throw new ExecutionException(x); // 其他运行时异常,包装成CompletionException异常抛出
}
@SuppressWarnings("unchecked") T t = (T) r; // 非异常结束,返回正常结果
return t;
}
先说结果: get()
方法是可以被中断的,因此发生中断的话将会抛出 InterruptedException
(即使已经拿到结果),被取消的则会抛出 CancellationException
异常,任务执行产生的其他异常导致异常结束的,其异常会被封装成 ExecutionException
抛出。
其实现过程首先使用waitingGet方法返回获得的结果,如果支持中断并且被中断过则返回null,否则返回响应的结果,等待的过程采用了 自旋 + ForkJoinPool.managedBlock方式,它将调用 get
方法等待结果也视为一种任务,
构造成 Completion
的子类 Signaller
进入被等待执行结束的 CompletableFutures
依赖栈,一旦它完成就会通过 postComplete
方法的 tryFire
触发执行,其tryFire会唤醒阻塞的线程,从而使 get
方法返回,
值得注意的是,这里阻塞使用了 ForkJoinPool.managedBlock
的方法,因为阻塞的线程可能是 ForkJoinPool
线程池中的工作线程,为了不让线程池中的任务由于过多的工作线程被阻塞导致饥饿堆积等待, ForkJoinPool.managedBlock
在阻塞的时候会激活新的线程补偿当前阻塞的线程,保证线程池的并行性。
最后, reportGet
方法根据 waitingGet
方法返回的结果,该跑异常的跑异常,该返回正常结果的,返回正常结果。
超时版本的 get
方法,其实现阻塞等待的方法 timedGet
方法与 waitingGet
原理差不多,除了因为也支持中断可能抛出 InterruptedException
之外,还可能会因超时抛出 TimeoutException
,就不再详细说明了。
3.4 CompletableFutures常用方法
join
方法不是实现 Future
接口的方法,是 CompletableFutures
自己的方法,它与 get
方法的作用都是等待执行返回结果,但是它不支持中断,必须要等到执行结果:
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
// 解析结果,或者抛出未捕获的异常
private static T reportJoin(Object r) {
if (r instanceof AltResult) { // 空结果或者异常结果
Throwable x;
if ((x = ((AltResult)r).ex) == null) // 空结果返回null
return null;
if (x instanceof CancellationException)
throw (CancellationException)x; // 被取消的,返回CancellationException异常
if (x instanceof CompletionException)
throw (CompletionException)x; // CompletionException异常
throw new CompletionException(x); // 其他异常也被包装成CompletionException异常
}
@SuppressWarnings("unchecked") T t = (T) r;
return t;
}
可见, join和get方法的区别有两点:
public static CompletableFuture allOf(CompletableFuture... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture andTree(CompletableFuture[] cfs,
int lo, int hi) {
CompletableFuture d = new CompletableFuture();
if (lo > hi) // empty
d.result = NIL;
else {
CompletableFuture a, b;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] :
andTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
andTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.biRelay(a, b)) {
BiRelay c = new BiRelay<>(d, a, b);
a.bipush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
当所有给定的 CompletableFutures
都执行结束时,它就完成,或者其中任何一个异常结束,它也会立即以同样的异常结束。列表中任何一个 CompletableFuture
的结果都不会反映到返回的 CompletableFuture
中,但可以遍历列表通过其 get
方法获取。
如果一个 CompletableFuture
都没指定,即 cfs
为长度为 0
的空数组,那么将返回一个完成结果为 null
的 CompletableFuture
。
由于 allOf
返回的是无返回结果的 CompletableFuture
,因此不能直接获取那些列表中每一个 CompletableFuture
的结果,例如这样将打印出null:
System.out.println(CompletableFuture.allOf(a, b, c, ....).thenApply(V -> V).join());
public static CompletableFuture anyOf(CompletableFuture... cfs) {
return orTree(cfs, 0, cfs.length - 1);
}
static CompletableFuture orTree(CompletableFuture[] cfs,
int lo, int hi) {
CompletableFuture d = new CompletableFuture();
if (lo a, b;
int mid = (lo + hi) >>> 1;
if ((a = (lo == mid ? cfs[lo] :
orTree(cfs, lo, mid))) == null ||
(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
orTree(cfs, mid+1, hi))) == null)
throw new NullPointerException();
if (!d.orRelay(a, b)) {
OrRelay c = new OrRelay<>(d, a, b);
a.orpush(b, c);
c.tryFire(SYNC);
}
}
return d;
}
当给定的 CompletableFutures
其中任何一个执行结束时,不论是正常还是异常结束,它就以相同的结果或者异常结束。注意异常结束的会被包装成 CompletionException
异常。
如果一个 CompletableFuture
都没指定,即 cfs
为长度为 0
的空数组,那么将返回一个执行没有结束的 CompletableFuture
。
注意,如果给定的 cfs
数组中的 CompletableFuture
其返回类型不一致,那么 anyOf
的最终返回的 CompletableFuture
的结果的类型也将不确定。不像 allOf
返回的 Void
的结果, anyOf
可以用返回的结果继续处理,例如:
CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> "Tom"),
CompletableFuture.supplyAsync(() -> "John"),
CompletableFuture.supplyAsync(() -> "Jack")
).thenApply(name -> "hello "+ name)
.thenAccept(System.out::println);
public T getNow(T valueIfAbsent) {
Object r;
return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
}
如果已经执行完成,就返回与执行 join
时相同的结果:即被取消的抛出 CancellationException
,正常结果的返回结果,其他异常一律被包装成 CompletionException
抛出。如果调用该方法的时候还没执行结束,则返回指定的结果。注意,在发现执行还没有结束时,该方法并没有强制使执行结束。
public boolean complete(T value) {
// 以指定的值设置到结果result字段,只会在result还是null的时候成功
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
如果执行还没结束,就以指定的值作为其执行结果,并触发依赖它的其他阶段执行。由于该方法直接修改了执行的结果,即使后面该任务真正的逻辑执行完之后也不能再更新该 result
,所以如果此方法成功,那么调用该阶段的 get
、 join
方法返回的值就是该方法指定的 value
值。
如果还没执行结果,就以指定的异常作为其执行结果,并触发依赖它的其他阶段执行。由于该方法直接修改了执行的结果,即使后面该任务真正的逻辑执行完之后也不能再更新该 result
,所以如果此方法成功,那么调用该阶段的 get
、 join
方法也将抛出异常,不同的是, get
可能抛出被封装成 ExecutionException
的异常, join
可能抛出被封装成 CompletionException
的异常。
强制以指定的值或者异常作为当前阶段的执行结果,不论其是否已经完成。与 complete
和 completeExceptionally
不同, complete
和 completeExceptionally
只会在执行还没结束的情况,更新结果。
在这只后返回的 get
、 join
也将返回相应的结果或异常。这两方法一般用于错误恢复,但也不一定有用。
如果执行以异常完成,则该方法返回 true
,这里的异常结束包括被取消,被 completeExceptionally
和 obtrudeException
方法主动异常结束,当然也包括任务执行过程中异常结束的情况。
public int getNumberOfDependents() {
int count = 0;
for (Completion p = stack; p != null; p = p.next)
++count;
return count;
}
返回等待该 CompletableFutures
完成的其他 CompletableFutures
的估计个数,只是一个瞬态值,一般用于监控系统状态。
3.5 构造方法
除了使用静态方法 runAsync
, supplyAsync
之外,构造一个 CompletableFutures
还可以通过以下几种途径,首先就是构造方法:
CompletableFutures
提供了两个构造方法,一个无参,一个可以指定结果:
public CompletableFuture() {
}
private CompletableFuture(Object r) {
this.result = r;
}
另一种就是使用静态方法 completedFuture
:
public static CompletableFuture completedFuture(U value)
其内部还是使用的有参的构造方法返回一个 CompletableFuture
实例。利用构造无参的构造方法可以这些实现异步任务的执行:
public static CompletableFuture asyncDoSomething(String a, String b) {
CompletableFuture future = new CompletableFuture();
new Thread(() -> {
try{
Thread.sleep(TimeUnit.SECONDS.toSeconds(10000));
future.complete(a + b);
} catch (Exception e) {
future.completeExceptionally(e);
}
}).start();
return future;
}
public static void main(String[] args) {
asyncDoSomething("hello"," world").thenAccept(System.out::println);
}
但一般我们也不会这样用了,我这里只是为了举一个例子而已。
CompletableFuture
是 CompletionStage
与 Future
接口的实现类,它提供了大量的用于异步或同步执行任务的方法,其通过链表栈形成的树形结构组织那些具有依赖关系的各个阶段 CompletableFuture
,异步任务的执行默认通过 ForkJoinPool.commonPool
执行(其创建的工作线程都是守护线程,不用担心 JVM
挂起),除非指定了 Executor
参数,因为默认的线程池并行度为CPU核心数并发度并不高,因此大多数时候我们都会指定自定义的 Executor
实现类。
同步任务的执行有可能是被主线程执行,也可能是被完成上一个阶段的线程池中的线程执行。另外所有实现 CompletionStage
的接口方法都是独立于其他公共方法实现的,基本上每一个方法都对应了一个内部类,因此一个方法的行为不会受到子类中其他方法重写的影响。
5.1 get和join
实现 Future
的 cancel
方法不会中断已经被安排执行的任务,仅仅是在任务的结果还没被回写之前,更新其结果为被取消状态,一旦将其结果设置为被取消状态,还没有开始仔执行的将不会被调度执行,已经在执行的,最后就算正常完成也不能再修改结果。
Future.get
与 CompletableFuture
的 join
方法除了 join
不能被中断之外,对异常结束分别会将异常包装成 ExecutionException
、 CompletionException
,当然 InterruptedException
、 TimeoutException
、 CancellationException
除外,通常我们都会使用 join
而不是 get
,大概是 join
方法不需要处理异常,而 get
方法有 InterruptedException
、 TimeoutException
异常需要处理吧。
Original: https://www.cnblogs.com/ciel717/p/16436288.html
Author: 夏尔_717
Title: Java异步执行器CompletableFuture源码解析
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/599135/
转载文章受原作者版权保护。转载请注明原作者出处!