使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

大家好,又见面啦。

在项目开发中,后端服务对外提供 API接口一般都会关注 响应时长。但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个 聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最终的结果再返回给调用方,这种情况下,往往会导致我们的接口响应特别的慢。

而如果我们想要动手进行优化的时候呢,就会涉及到 串行处理改 并行处理的问题。在 JAVA中并行处理的能力支持已经相对完善,通过对CompletableFuture的合理利用,可以让我们面对这种聚合类处理的场景会更加的得心应手。

好啦,话不多说,接下来就让我们一起来品尝下JAVA中组合式并行处理这道饕餮大餐吧。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

前菜:先看个实际场景

在开始享用这顿大餐前,我们先来个前菜开开胃。

例如现在有这么个需求:

需求描述
实现一个全网比价服务,比如可以从某宝、某东、某夕夕去获取某个商品的价格、优惠金额,并计算出实际付款金额,最终返回价格最优的平台与价格信息。

📢这里假定每个平台获取原价格与优惠券的接口已经实现、且都是需要调用HTTP接口查询的耗时操作,Mock接口每个耗时 1s左右。

根据最初的需求理解,我们可以很自然的写出对应实现代码:

public PriceResult getCheapestPlatAndPrice(String product) {
    // 获取某宝的价格以及优惠,并计算最终实付价格
    PriceResult mouBaoPrice = computeRealPrice(HttpRequestMock.getMouBaoPrice(product),
            HttpRequestMock.getMouBaoDiscounts(product));
    // 获取某东的价格以及优惠,并计算最终实付价格
    PriceResult mouDongPrice = computeRealPrice(HttpRequestMock.getMouDongPrice(product),
            HttpRequestMock.getMouDongDiscounts(product));
    // 获取某夕夕的价格以及优惠,并计算最终实付价格
    PriceResult mouXiXiPrice = computeRealPrice(HttpRequestMock.getMouXiXiPrice(product),
            HttpRequestMock.getMouXiXiDiscounts(product));

    // 计算并选出实际价格最低的平台
    return Stream.of(mouBaoPrice, mouDongPrice, mouXiXiPrice).

            min(Comparator.comparingInt(PriceResult::getRealPrice))
            .get();
}

一切顺利成章,运行测试下:

05:24:53.759[main|1]获取某宝上 Iphone13的价格
05:24:54.779[main|1]获取某宝上 Iphone13的价格完成: 5199
05:24:54.779[main|1]获取某宝上 Iphone13的优惠
05:24:55.781[main|1]获取某宝上 Iphone13的优惠完成: -200
05:24:55.781[main|1]某宝最终价格计算完成:4999
05:24:55.781[main|1]获取某东上 Iphone13的价格
05:24:56.784[main|1]获取某东上 Iphone13的价格完成: 5299
05:24:56.784[main|1]获取某东上 Iphone13的优惠
05:24:57.786[main|1]获取某东上 Iphone13的优惠完成: -150
05:24:57.786[main|1]某东最终价格计算完成:5149
05:24:57.786[main|1]获取某夕夕上 Iphone13的价格
05:24:58.788[main|1]获取某夕夕上 Iphone13的价格完成: 5399
05:24:58.788[main|1]获取某夕夕上 Iphone13的优惠
05:24:59.791[main|1]获取某夕夕上 Iphone13的优惠完成: -5300
05:24:59.791[main|1]某夕夕最终价格计算完成:99
获取最优价格信息:【平台:某夕夕, 原价:5399, 折扣:0, 实付价:99】

结果与第一种实现方式一致,但是接口总耗时从 6s下降到了 2s,效果还是很显著的。但是,是否还能再压缩一些呢?

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

基于上面按照平台拆分并行处理的思路继续推进,我们可以看出每个平台内的处理逻辑其实可以分为3个主要步骤:

  1. 获取原始价格(耗时操作)
  2. 获取折扣优惠(耗时操作)
  3. 得到原始价格和折扣优惠之后,计算实付价格

这3个步骤中,第1、2两个耗时操作也是相对独立的,如果也能并行处理的话,响应时长上应该又会缩短一些,即如下的处理流程:

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

我们当然可以继续使用上面提到的 线程池+Future的方式,但 Future在应对并行结果组合以及后续处理等方面显得力不从心, 弊端明显:

代码写起来会 非常拖沓:先封装 Callable函数放到线程池中去执行查询操作,然后分三组 阻塞等待结果并计算出各自结果,最后再 阻塞等待价格计算完成后汇总得到最终结果。

说到这里呢,就需要我们新的主人公 CompletableFuture登场了,通过它我们可以很轻松的来完成任务的并行处理,以及各个并行任务结果之间的组合再处理等操作。我们使用 CompletableFuture编写实现代码如下:

public PriceResult getCheapestPlatAndPrice3(String product) {
    // 获取并计算某宝的最终价格
    CompletableFuture mouBao =
            CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product))
                    .thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)),
                            this::computeRealPrice);
    // 获取并计算某宝的最终价格
    CompletableFuture mouDong =
            CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongPrice(product))
                    .thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongDiscounts(product)),
                            this::computeRealPrice);
    // 获取并计算某宝的最终价格
    CompletableFuture mouXiXi =
            CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
                    .thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
                            this::computeRealPrice);

    // 排序并获取最低价格
    return Stream.of(mouBao, mouDong, mouXiXi)
            .map(CompletableFuture::join)
            .sorted(Comparator.comparingInt(PriceResult::getRealPrice))
            .findFirst()
            .get();
}

看下执行结果符合预期,而接口耗时则降到了 1s(因为我们依赖的每一个查询实际操作的接口耗时都是模拟的1s,所以这个结果已经算是此复合接口能达到的极限值了)。

06:01:12.334[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13的价格
06:01:12.334[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13的优惠
06:01:12.334[ForkJoinPool.commonPool-worker-11|14]获取某东上 Iphone13的价格
06:01:12.334[ForkJoinPool.commonPool-worker-13|16]获取某夕夕上 Iphone13的价格
06:01:12.334[ForkJoinPool.commonPool-worker-4|15]获取某东上 Iphone13的优惠
06:01:12.334[ForkJoinPool.commonPool-worker-6|17]获取某夕夕上 Iphone13的优惠
06:01:13.354[ForkJoinPool.commonPool-worker-6|17]获取某夕夕上 Iphone13的优惠完成: -5300
06:01:13.354[ForkJoinPool.commonPool-worker-13|16]获取某夕夕上 Iphone13的价格完成: 5399
06:01:13.354[ForkJoinPool.commonPool-worker-4|15]获取某东上 Iphone13的优惠完成: -150
06:01:13.354[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13的价格完成: 5199
06:01:13.354[ForkJoinPool.commonPool-worker-11|14]获取某东上 Iphone13的价格完成: 5299
06:01:13.354[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13的优惠完成: -200
06:01:13.354[ForkJoinPool.commonPool-worker-13|16]某夕夕最终价格计算完成:99
06:01:13.354[ForkJoinPool.commonPool-worker-11|14]某东最终价格计算完成:5149
06:01:13.354[ForkJoinPool.commonPool-worker-2|13]某宝最终价格计算完成:4999
获取最优价格信息:【平台:某夕夕, 原价:5399, 折扣:0, 实付价:99】

现在,我们知道了方法名称带有Async和不带Async的实现策略上的差异点就在于使用哪个线程池来执行而已。那么,对我们实际的指导意义是啥呢?实际使用的时候,我们怎么判断自己应该使用带Async结尾的方法、还是不带Async结尾的方法呢?

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

上面是Async结尾方法默认使用的ForkJoinPool创建的逻辑,这里可以看出,默认的线程池中的工作线程数是 CPU核数 - 1,并且指定了默认的丢弃策略等,这就是一个主要关键点。

所以说,符合以下几个条件的时候,可以考虑使用带有Async后缀的方法,指定自定义线程池:

  • 默认线程池的线程数满足不了实际诉求
  • 默认线程池的类型不符合自己业务诉求
  • 默认线程池的队列满处理策略不满足自己诉求

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

与Stream结合使用的注意点

在我前面的文档中,有细致全面的介绍过 Stream流相关的使用方式(不清楚的同学速点👉👉《吃透JAVA的Stream流操作,多年实践总结》了解下啦)。在涉及批量进行并行处理的时候,通过 StreamCompletableFuture结合使用,可以简化我们的很多编码逻辑。但是 在使用细节方面需要注意下,避免达不到使用 CompletableFuture的预期效果。

需求场景:
在同一个平台内,传入多个商品,查询不同商品对应的价格与优惠信息,并选出实付价格最低的商品信息。

结合前面的介绍分析,我们应该知道最佳的方式,就是同时并行的方式去各自请求数据,最后合并处理即可。所以我们规划按照如下的策略来实现:

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

先看第一种编码实现:

public PriceResult comparePriceInOnePlat(List products) {
    return products.stream()
            .map(product ->
                    CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product))
                            .thenCombine(
                                    CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)),
                                    this::computeRealPrice))
            .map(CompletableFuture::join)
            .sorted(Comparator.comparingInt(PriceResult::getRealPrice))
            .findFirst()
            .get();
}

对于List的处理场景,这里采用了Stream方式来进行遍历与结果的收集、排序与返回。看似正常,但是执行的时候会发现,并没有达到我们预期的效果:

07:37:14.388[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13黑色的价格
07:37:14.388[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13黑色的优惠
07:37:15.408[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13黑色的价格完成: 5199
07:37:15.408[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13黑色的优惠完成: -200
07:37:15.408[ForkJoinPool.commonPool-worker-2|13]某宝最终价格计算完成:4999
07:37:15.408[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13白色的价格
07:37:15.409[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13白色的优惠
07:37:16.410[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13白色的价格完成: 5199
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13白色的优惠完成: -200
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]某宝最终价格计算完成:4999
07:37:16.410[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13红色的优惠
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13红色的价格
07:37:17.412[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13红色的价格完成: 5199
07:37:17.412[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13红色的优惠完成: -200
07:37:17.412[ForkJoinPool.commonPool-worker-9|12]某宝最终价格计算完成:4999
获取最优价格信息:【平台:某宝, 原价:5199, 折扣:0, 实付价:4999】

从执行结果可以看出,三个商品并行处理,整体处理耗时相比前面编码方式有很大提升,达到了预期的效果。

📢 归纳下

因为Stream的操作具有 延迟执行的特点,且只有遇到终止操作(比如collect方法)的时候才会真正的执行。所以遇到这种需要并行处理且需要合并多个并行处理流程的情况下,需要将并行流程与合并逻辑放到两个Stream中,这样分别触发完成各自的处理逻辑,就可以了。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

甜点:并发和并行的区别

对一个吃货而言,主餐完毕,总得来点餐后甜点才够满足。

在前面的内容中呢,我们始终是在围绕 并行处理这个话题在展开。实际工作的时候,我们对于并发这个词肯定也不陌生, 高并发这个词,就像高端人士酒杯中那八二年的拉菲一般,成了每一个开发人员简历上用来彰显实力的一个标签。

那么, 并发并行到底啥区别?这里我们也简单的概括下。

并发

所谓 并发,其关注的点是服务器的 吞吐量情况,也就是服务器可以在单位时间内同时处理多少个请求。并发是通过 多线程的方式来实现的,充分利用当前CPU多核能力,同时使用多个进程去处理业务,使得同一个机器在相同时间内可以处理更多的请求,提升吞吐量。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

所有的操作在一个线程中串行推进,如果有多个线程同步处理,则同时有多个请求可以被处理。但是因为是串行处理,所以如果某个环节需要对外交互时,比如等待网络IO的操作,会使得当前线程处于 阻塞状态,直到资源可用时被唤醒继续往后执行。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

对于 高并发场景,服务器的线程资源是非常宝贵的。如果频繁的处于阻塞则会导致浪费,且线程频繁的阻塞、唤醒切换动作,也会加剧整体系统的性能损耗。所以并发这种多线程场景,更适合 CPU密集型的操作。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

并行

所谓 并行,就是将同一个处理流程没有相互依赖的部分放到多个线程中进行同时并行处理,以此来达到相对于串行模式更短的单流程处理耗时的效果,进而提升系统的 整体响应时长吞吐量

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

基于异步编程实现的并行操作也是借助线程池的方式,通过多线程同时执行来实现效率提升的。与并发的区别在于:并行通过将任务切分为一个个可独立处理的小任务块,然后基于系统 调度策略,将需要执行的任务块分配给空闲可用 工作线程去处理,如果出现需要等待的场景(比如IO请求)则工作线程会将此任务先放下,继续处理后续的任务,等之前的任务IO请求好了之后,系统重新分配可用的工作线程来处理。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

根据上面的示意图介绍可以看出,异步并行编程,对于工作线程的利用率上升,不会出现工作线程阻塞的情况,但是因为任务拆分、工作线程间的切换调度等 系统层面的开销也会随之加大。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

如何选择

前面介绍了下并发与并行两种模式的特点、以及各自的优缺点。所以选择采用并发还是并行方式来提升系统的处理性能,还需要结合实际项目场景来确定。

综合而言:

  1. 如果业务处理逻辑是 CPU密集型的操作,优先使用基于线程池实现并发处理方案(可以避免线程间切换导致的系统性能浪费)。
  2. 如果业务处理逻辑中存在较多 需要阻塞等待的耗时场景、且相互之间没有依赖,比如本地IO操作、网络IO请求等等,这种情况优先选择使用 并行处理策略(可以避免宝贵的线程资源被阻塞等待)。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

总结回顾

好啦,关于JAVA中 CompletableFuture的使用,以及并行编程相关的内容呢就介绍到这里啦。看到这里,相信您应该有所收获吧?那么你的项目里有这种适合并行处理的场景吗?你在处理并行场景的时候是怎么做的呢? 评论区一起讨论下吧~~

补充:

本文中有提及 CompletableFuture执行时所使用的默认线程池是 ForkJoinPool,早在JAVA7版本就已经被引入,但是很多人对 ForkJoinPool不是很了解,实际项目中使用的也比较少。其实对 ForkJoinPool的合理利用,可以让我们在面对某些多线程场景时会更加的从容高效。在后面的文章中,我会针对 ForkJoinPool有关的内容进行专门的介绍与探讨,如果有兴趣,可以点个关注,及时获取后续的内容。

此外

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

我是悟道,聊技术、又不仅仅聊技术~

如果觉得有用,请 点赞 + 关注让我感受到您的支持。也可以关注下我的公众号【架构悟道】,获取更及时的更新。

期待与你一起探讨,一起成长为更好的自己。

使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

Original: https://www.cnblogs.com/softwarearch/p/16516980.html
Author: 架构悟道
Title: 使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/584924/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Day3-笔记(java运行机制、基础语法、关键字、标识符、数据类型、字符扩展、类型转换)

    JAVA程序运行机制 编译型 直接翻译成其他语言,全局翻译。compile 解释型 说一句解释一句,实时更新 JAVA既有编译型,又有解释型。 程序运行机制 源代码->编译成…

    Java 2023年6月6日
    080
  • 从Spring框架看设计模式如何灵活使用

    Singleton 单例模式 单例模式是确保每个应用程序只存在一个实例的机制。默认情况下,Spring将所有bean创建为单例。 你用@Autowired获取的bean,全局唯一。…

    Java 2023年5月30日
    082
  • 经典实验–网络聊天室(NetChatRoom)3

    ·方法声明 Chatter package chat; import java.rmi.RemoteException; public interface Chatter exte…

    Java 2023年6月15日
    092
  • 华为暑期实习 通用软件开发 面经

    华为暑期实习 通用软件开发工程师 数据存储与机器视觉 面经 机试 7.6 第一题 字符串匹配 给五行英文句子,找出来其中的网址,网址以http或https开头,以com结尾,不要重…

    Java 2023年6月5日
    0109
  • 冒泡排序

    算法定义(摘抄百度文库): 它重复地走访过要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来。走访数列的工作是重复地进行直到没有再需要交换,也就是说该数列已经排序完…

    Java 2023年6月16日
    066
  • 可恶,又是个线上问题

    这几天,在搞 ShardingSphere,这不又来了一个问题嘛,启动的时候报了一个NPE出来。 好在,这个问题不影响使用,只是启动会报点错,接下来,又是辛苦的排查过程。 直接定位…

    Java 2023年6月13日
    075
  • 设计模式

    设计模式 工厂模式 public class Test { public static void main(String[] args) { Computer c1 =Comput…

    Java 2023年6月9日
    077
  • 时序数据库influxdb-1.8.9部署记录步骤

    何谓时间序列数据库? 什么是时间序列数据库,最简单的定义就是数据格式里包含Timestamp字段的数据,比如某一时间环境的温度,CPU的使用率等。但是,有什么数据不包含Timest…

    Java 2023年6月9日
    080
  • Cglib

    Cglib方式jdk自带的代理方式 必须要实现接口,有限制,所以有Cglib方式 原理:通过字节码技术,创建一个目标类的子类,作为代理对象,在子类中拦截目标类中的方法,对方法做一个…

    Java 2023年6月13日
    078
  • mock测试出现Circular view path [trade_records]: would dispatch back to the current handler URL

    这是因为你的Controller中返回的视图名称与你当前的requestMapping名称一样,这并没有很好的解决方案,除非你改掉其中一个名字。 因为springframework…

    Java 2023年6月7日
    0131
  • DBeaver配置Hive连接(转)

    https://blog.csdn.net/weixin_44374374/article/details/123957815 posted on2022-07-26 10:52 …

    Java 2023年6月8日
    088
  • Java序列化流的奇妙之旅

    Java序列化流有何奇妙之处呢?通过一个个案例逐一感受序列化流。 !!!好戏在后头!!! 1.IO流读写文件 先从一个普通文件读写字符串开始讲起。 例子:输出字符串到文件,再从文件…

    Java 2023年6月5日
    090
  • 【每日算法】剑指 Offer字符串的排列

    题目描述 代码实现 题目描述 这是 LeetCode 上的 剑指 Offer 38. 字符串的排列, 难度为 【中】 输入&#x…

    Java 2023年6月9日
    0100
  • Wildfly8 更改response header中的Server参数

    项目经过局方安全检查需要屏蔽掉服务器中间件信息,查了一下午,网上看到的都是修改jboss7的,我们使用的wildfly8(jboss改名为wildfly),修改地方不一样,折磨了半…

    Java 2023年6月7日
    099
  • C字符串和C++中string的区别

    在C++中则把字符串封装成了一种数据类型string,可以直接声明变量并进行赋值等字符串操作。以下是C字符串和C++中string的区别: C字符串 string 对象(C++) …

    Java 2023年6月7日
    079
  • mybatis配置解析

    一.引言:参考官方文档mybatis中提供了如下的配置,其中标红的需要掌握,其余的作为了解即可 二:配置解析之前的环境准备:进行配置之前要先有这几个东西,包括数据库表、Mybati…

    Java 2023年6月9日
    081
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球