一网打尽异步神器CompletableFuture

最近一直畅游在RocketMQ的源码中,发现在RocketMQ中很多地方都使用到了CompletableFuture,所以今天就跟大家来聊一聊JDK1.8提供的异步神器CompletableFuture,并且最后会结合RocketMQ源码分析一下CompletableFuture的使用。

Future接口以及它的局限性

我们都知道,Java中创建线程的方式主要有两种方式,继承Thread或者实现Runnable接口。但是这两种都是有一个共同的缺点,那就是都无法获取到线程执行的结果,也就是没有返回值。于是在JDK1.5 以后为了解决这种没有返回值的问题,提供了Callable和Future接口以及Future对应的实现类FutureTask,通过FutureTask的就可以获取到异步执行的结果。

于是乎,我们想要开启异步线程,执行任务,获取结果,就可以这么实现。

java;gutter:true; FutureTask futureTask = new FutureTask<>(() -> "三友"); new Thread(futureTask).start(); System.out.println(futureTask.get());</p> <pre><code> 或者使用线程池的方式 ;gutter:true;
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

线程池底层也是将提交的Callable的实现先封装成FutureTask,然后通过execute方法来提交任务,执行异步逻辑。

Future接口的局限性

虽然通过Future接口的get方法可以获取任务异步执行的结果,但是get方法会阻塞主线程,也就是异步任务没有完成,主线程会一直阻塞,直到任务结束。

Future也提供了isDone方法来查看异步线程任务执行是否完成,如果完成,就可以获取任务的执行结果,代码如下。

java;gutter:true; ExecutorService executorService = Executors.newSingleThreadExecutor(); Future future = executorService.submit(() -> "三友"); while (!future.isDone()) { //任务有没有完成,没有就继续循环判断 } System.out.println(future.get()); executorService.shutdown();</p> <pre><code> 但是这种轮询查看异步线程任务执行状态,也是非常消耗cpu资源。 同时对于一些复杂的异步操作任务的处理,可能需要各种同步组件来一起完成。 所以,通过上面的介绍可以看出,Future在使用的过程中还是有很强的局限性,所以为了解决这种局限性,在JDK1.8的时候,Doug Lea 大神为我们提供了一种更为强大的类CompletableFuture。 ## 什么是CompletableFuture? CompletableFuture在JDK1.8提供了一种更加强大的异步编程的api。它实现了Future接口,也就是Future的功能特性CompletableFuture也有;除此之外,它也实现了CompletionStage接口,CompletionStage接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。 CompletableFuture相比于Future最大的改进就是提供了类似观察者模式的回调监听的功能,也就是当上一阶段任务执行结束之后,可以回调你指定的下一阶段任务,而不需要阻塞获取结果之后来处理结果。 ## CompletableFuture常见api详解 CompletableFuture的方法api多,但主要可以分为以下几类。 ### 1、实例化CompletableFuture #### 构造方法创建 ;gutter:true;
CompletableFuture completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

此时如果有其它线程执行如下代码,就能执行打印出 三友

java;gutter:true; completableFuture.complete("三友")</p> <pre><code> #### 静态方法创建 除了使用构造方法构造,CompletableFuture还提供了静态方法来创建 ;gutter:true;
public static CompletableFuture supplyAsync(Supplier supplier);
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor);

public static CompletableFuture runAsync(Runnable runnable);
public static CompletableFuture runAsync(Runnable runnable, Executor executor);

supply 和 run 的主要区别就是 supply 可以有返回值,run 没有返回值。至于另一个参数Executor 就是用来执行异步任务的线程池,如果不传Executor 的话,默认是ForkJoinPool这个线程池的实现。

一旦通过静态方法来构造,会立马开启异步线程执行Supplier或者Runnable提交的任务。

java;gutter:true; CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> "三友"); System.out.println(completableFuture.get());</p> <pre><code> 一旦任务执行完成,就可以打印返回值,这里的使用方法跟Future是一样的。 所以对比两个两种实例化的方法,使用静态方法的和使用构造方法主要区别就是,使用构造方法需要其它线程主动调用complete来表示任务执行完成,因为很简单,因为在构造的时候没有执行异步的任务,所以需要其它线程主动调用complete来表示任务执行完成。 ### 2、获取任务执行结果 ;gutter:true;
public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();

get()和get(long timeout, TimeUnit unit)是实现了Future接口的功能,两者主要区别就是get()会一直阻塞直到获取到结果,get(long timeout, TimeUnit unit)值可以指定超时时间,当到了指定的时间还未获取到任务,就会抛出TimeoutException异常。

getNow(T valueIfAbsent):就是获取任务的执行结果,但不会产生阻塞。如果任务还没执行完成,那么就会返回你传入的 valueIfAbsent 参数值,如果执行完成了,就会返回任务执行的结果。

join():跟get()的主要区别就是,get()会抛出检查时异常,join()不会。

3、主动触发任务完成

java;gutter:true; public boolean complete(T value); public boolean completeExceptionally(Throwable ex);</p> <pre><code> complete:主动触发当前异步任务的完成。调用此方法时如果你的任务已经完成,那么方法就会返回false;如果任务没完成,就会返回true,并且其它线程获取到的任务的结果就是complete的参数值。 completeExceptionally:跟complete的作用差不多,complete是正常结束任务,返回结果,而completeExceptionally就是触发任务执行的异常。 ### 4、对任务执行结果进行下一步处理 #### 只能接收任务正常执行后的回调 ;gutter:true;
public CompletionStage thenApply(Function fn);
public CompletableFuture thenRun(Runnable action);
public CompletionStage thenAccept(Consumer action);

这类回调的特点就是,当任务正常执行完成,没有异常的时候就会回调。

thenApply:可以拿到上一步任务执行的结果进行处理,并且返回处理的结果 thenRun:拿不到上一步任务执行的结果,但会执行Runnable接口的实现 thenAccept:可以拿到上一步任务执行的结果进行处理,但不需要返回处理的结果

thenApply示例:

java;gutter:true; CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 10) .thenApply(v -> ("上一步的执行的结果为:" + v)); System.out.println(completableFuture.join());</p> <pre><code> 执行结果: ;gutter:true;
上一步的执行的结果为:10

thenRun示例:

java;gutter:true; CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 10) .thenRun(() -> System.out.println("上一步执行完成"));</p> <pre><code> 执行结果: ;gutter:true;
上一步执行完成

thenAccept示例:

java;gutter:true; CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> 10) .thenAccept(v -> System.out.println("上一步执行完成,结果为:" + v));</p> <pre><code> 执行结果: ;gutter:true;
上一步执行完成,结果为:10

thenApply有异常示例:

java;gutter:true; CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { //模拟异常 int i = 1 / 0; return 10; }).thenApply(v -> ("上一步的执行的结果为:" + v)); System.out.println(completableFuture.join());</p> <pre><code> 执行结果: ;gutter:true;
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

当有异常时是不会回调的

只能接收任务处理异常后的回调

java;gutter:true; public CompletionStage exceptionally(Function fn);</p> <pre><code> 当上面的任务执行过程中出现异常的时候,会回调exceptionally方法指定的回调,但是如果没有出现异常,是不会回调的。 exceptionally能够将异常给吞了,并且fn的返回值会返回回去。 其实这个exceptionally方法有点像降级的味道。当出现异常的时候,走到这个回调,可以返回一个默认值回去。 没有异常情况下: ;gutter:true;
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
return 100;
}).exceptionally(e -> {
System.out.println("出现异常了,返回默认值");
return 110;
});
System.out.println(completableFuture.join());

执行结果:

java;gutter:true; 100</p> <pre><code> 有异常情况下: ;gutter:true;
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return 100;
}).exceptionally(e -> {
System.out.println("出现异常了,返回默认值");
return 110;
});
System.out.println(completableFuture.join());

执行结果:

java;gutter:true; 出现异常了,返回默认值 110</p> <pre><code> #### 能同时接收任务执行正常和异常的回调 ;gutter:true;
public CompletionStage handle(BiFunction fn);
public CompletionStage whenComplete(BiConsumer actin);

不论前面的任务执行成功还是失败都会回调的这类方法指定的回调方法。

handle : 跟exceptionally有点像,但是exceptionally是出现异常才会回调,两者都有返回值,都能吞了异常,但是handle正常情况下也能回调。

whenComplete:能接受正常或者异常的回调,并且不影响上个阶段的返回值,也就是主线程能获取到上个阶段的返回值;当出现异常时,whenComplete并不能吞了这个异常,也就是说主线程在获取执行异常任务的结果时,会抛出异常。

这里演示一下whenComplete处理异常示例情况,handle跟exceptionally对异常的处理差不多。

whenComplete处理异常示例:

java;gutter:true; CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> { int i = 1 / 0; return 10; }).whenComplete((r, e) -> { System.out.println("whenComplete被调用了"); }); System.out.println(completableFuture.join());</p> <pre><code> 执行结果: ;gutter:true;
whenComplete被调用了
Exception in thread "main" java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)

5、对任务结果进行合并

java;gutter:true; public CompletionStage thenCombine (CompletionStage other, BiFunction fn);</p> <pre><code> 这个方法的意思是,当前任务和other任务都执行结束后,拿到这两个任务的执行结果,回调 BiFunction ,然后返回新的结果。 thenCombine的例子请往下继续看。 ### 6、以Async结尾的方法 上面说的一些方法,比如说thenAccept方法,他有两个对应的Async结尾的方法,如下: ;gutter:true;
public CompletionStage thenAcceptAsync(Consumer action,Executor executor);
public CompletionStage thenAcceptAsync(Consumer action);

thenAcceptAsync跟thenAccept的主要区别就是thenAcceptAsync会重新开一个线程来执行下一阶段的任务,而thenAccept还是用上一阶段任务执行的线程执行。

两个thenAcceptAsync主要区别就是一个使用默认的线程池来执行任务,也就是ForkJoinPool,一个是使用方法参数传入的线程池来执行任务。

当然除了thenAccept方法之外,上述提到的方法还有很多带有Async结尾的对应的方法,他们的主要区别就是执行任务是否开启异步线程来执行的区别。

当然,还有一些其它的api,可以自行查看

CompletableFuture在RocketMQ中的使用

CompletableFuture在RocketMQ中的使用场景比较多,这里我举一个消息存储的场景。

在RocketMQ中,Broker接收到生产者产生的消息的时候,会将消息持久化到磁盘和同步到从节点中。持久化到磁盘和消息同步到从节点是两个独立的任务,互不干扰,可以相互独立执行。当消息持久化到磁盘和同步到从节点中任务完成之后,需要统计整个存储消息消耗的时间,所以统计整个存储消息消耗的时间是依赖前面两个任务的完成。

一网打尽异步神器CompletableFuture

实现代码如下

消息存储刷盘任务和主从复制任务:

java;gutter:true; PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // 提交刷盘的请求 CompletableFuture flushResultFuture = submitFlushRequest(result, msg); //提交主从复制的请求 CompletableFuture replicaResultFuture = submitReplicaRequest(result, msg);</p> <p>//刷盘 和 主从复制 两个异步任务通过thenCombine联合 return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> { // 当两个刷盘和主从复制任务都完成的时候,就会回调 // 如果刷盘没有成功,那么就将消息存储的状态设置为失败 if (flushStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(flushStatus); } // 如果主从复制没有成功,那么就将消息存储的状态设置为失败 if (replicaStatus != PutMessageStatus.PUT_OK) { putMessageResult.setPutMessageStatus(replicaStatus); } // 最终返回消息存储的结果 return putMessageResult; });</p> <pre><code> 对上面两个合并的任务执行结果通过thenAccept方法进行监听,统计消息存储的耗时: ;gutter:true;
//消息存储的开始时间
long beginTime = this.getSystemClock().now();
// 存储消息,然后返回 CompletableFuture,也就是上面一段代码得返回值‍
CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);

//监听消息存储的结果
putResultFuture.thenAccept((result) -> {
// 消息存储完成之后会回调
long elapsedTime = this.getSystemClock().now() – beginTime;
if (elapsedTime > 500) {
log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
}
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().add(1);
}
});

CompletableFuture的优点

1、异步函数式编程,实现优雅,易于维护;

2、它提供了异常管理的机制,让你有机会抛出、管理异步任务执行中发生的异常,监听这些异常的发生;

3、拥有对任务编排的能力。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

参考:

  • [1]https://zhuanlan.zhihu.com/p/344431341

如果觉得这篇文章对你有所帮助,还请帮忙点赞、在看、转发给更多的人,非常感谢!

往期热门文章推荐

扫码或者搜索关注公众号 三友的java日记 ,及时干货不错过,公众号致力于通过画图加上通俗易懂的语言讲解技术,让技术更加容易学习。

一网打尽异步神器CompletableFuture

Original: https://www.cnblogs.com/zzyang/p/16446007.html
Author: 三友的java日记
Title: 一网打尽异步神器CompletableFuture

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/622709/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • IDEA常用设置及插件

    设置 1.设置打开后不直接进入项目 IDEA默认打开时会直接进入上次打开的目录,有的时候加载很长时间,但这个时候可能我们并不是要打开这个项目,这里有一个设置,在如图的位置,去掉勾选…

    Java 2023年6月5日
    070
  • mybatis if else if 条件判断SQL片段表达式取值和拼接

    前言 最近在开发项目的时候涉及到复杂的动态条件查询,但是 mybaits本身不支持i f elseif类似的判断但是我们可以间接通过 chose when otherwise 去实…

    Java 2023年6月13日
    084
  • Java(4)字符串

    从概念上讲,Java字符串就是 Unicode字符序列。例如,字符串 “Java\u2122″由5个 Unicode字符 J、 a、 v、 a和 &#x2122;组成。Ja…

    Java 2023年6月9日
    082
  • 老年模式 之 fontScale篇

    之前整理过一篇全局字体设置 || 老年模式的文章,提到过4种方法,各有利弊。 最后推荐了方法4,自定义binding属性来实现。这里扩展一篇。 自定义binding实现的确不错,最…

    Java 2023年6月7日
    084
  • 重复文件查找工具:Duplicate Cleaner V4.11绿色免费版

    Duplicate Cleaner是一款可以帮助你在你的计算机上找到并且快速查找出重复文件并标记出不同的颜色,让你轻松查阅处理。你可以立即搜索多个文件夹结构并且设置识别副本文件的标…

    Java 2023年5月30日
    0112
  • MyBatisPlus实现分页和查询操作就这么简单

    《SpringBoot整合MybatisPlus基本的增删改查,保姆级教程》在这篇文章中,我们详细介绍了分页的具体实现方法。但是,在日常的开发中还需要搜索功能的。下面让我们一起动起…

    Java 2023年6月8日
    076
  • git clone、git pull和git fetch的用法及区别

    1.1 git clone与git fetch的区别 git clone:克隆的是整个远程库。git fetch:克隆的是远程库的一个分支。 1.2 git clone git c…

    Java 2023年6月13日
    067
  • HTML笔记整理–上节

    一、认识WEB 「网页」主要是由 &#x6587;&#x5B57;、 &#x56FE;&#x50CF;和 &#x8D85;&#x94…

    Java 2023年6月7日
    097
  • 【每日一题】leetcode3无重复字符的最长子串

    题目描述 给定一个字符串 s ,请你找出其中不含有重复字符的最长子串的长度。 示例 输入: s = “abcabcbb” 输出: 3 解释: 因为无重复字符…

    Java 2023年6月9日
    069
  • Java实现HttpClient发送GET、POST请求(https、http)

    package com.ruoyi.common.utils.http; import org.apache.http.HttpEntity; import org.apache….

    Java 2023年5月29日
    079
  • Redis入门讲解(介绍、安装、常用命令)

    Redis入门讲解(介绍、安装、常用命令) Redis是非关系型数据库 关系型数据库 关系型数据库是采用了关系模型来组织数据的数据库,以行和列的形式存储数据,由二维表及其之间的关系…

    Java 2023年6月15日
    073
  • 设计模式 16 命令模式

    命令模式(Command Pattern)属于 行为型模式 概述 现在各大电子厂商都在推智能家居,即可以通过手机这一个终端控制多个家用电器,比之前的单个设备智能由对应遥控器控制的方…

    Java 2023年6月6日
    073
  • Java中装饰者模式

    装饰模式可以在不改变原来类中代码的基础上,增强类中的方法 装饰类LDHWrapper和被装饰类LDH必须实现同个接口Star 装饰类中要定义属性为:被装饰类对象,同时定义构造方法 …

    Java 2023年6月15日
    068
  • Nginx 配置反向代理及负载均衡的实现

    一、nginx的反向代理及多台虚拟机负载均衡的实现 主机负载均衡服务器:192.168.232.132 的nginx.conf 配置如下 http { include mime.t…

    Java 2023年5月30日
    054
  • 【转】cron表达式详解

    Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式: (1) Seconds Minutes Hours Day…

    Java 2023年6月6日
    0105
  • java Builder模式

    Builder 模式也叫建造者模式,builder模式的作用将一个复杂对象的构建与他的表示分离,一步一步创建一个复杂对象的创建型模式。在不知道内部建造细节的情况下,可以更精细的控制…

    Java 2023年6月16日
    090
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球