大家好,又见面啦。
在项目开发中,后端服务对外提供 API接口一般都会关注 响应时长
。但是某些情况下,由于业务规划逻辑的原因,我们的接口可能会是一个 聚合信息处理类的处理逻辑,比如我们从多个不同的地方获取数据,然后汇总处理为最终的结果再返回给调用方,这种情况下,往往会导致我们的接口响应特别的慢。
而如果我们想要动手进行优化的时候呢,就会涉及到 串行
处理改 并行
处理的问题。在 JAVA
中并行处理的能力支持已经相对完善,通过对CompletableFuture的合理利用,可以让我们面对这种聚合类处理的场景会更加的得心应手。
好啦,话不多说,接下来就让我们一起来品尝下JAVA中组合式并行处理这道饕餮大餐吧。
前菜:先看个实际场景
在开始享用这顿大餐前,我们先来个前菜开开胃。
例如现在有这么个需求:
需求描述:
实现一个全网比价服务,比如可以从某宝、某东、某夕夕去获取某个商品的价格、优惠金额,并计算出实际付款金额,最终返回价格最优的平台与价格信息。
📢这里假定每个平台获取原价格与优惠券的接口已经实现、且都是需要调用HTTP接口查询的耗时操作,Mock接口每个耗时 1s
左右。
根据最初的需求理解,我们可以很自然的写出对应实现代码:
public PriceResult getCheapestPlatAndPrice(String product) {
// 获取某宝的价格以及优惠,并计算最终实付价格
PriceResult mouBaoPrice = computeRealPrice(HttpRequestMock.getMouBaoPrice(product),
HttpRequestMock.getMouBaoDiscounts(product));
// 获取某东的价格以及优惠,并计算最终实付价格
PriceResult mouDongPrice = computeRealPrice(HttpRequestMock.getMouDongPrice(product),
HttpRequestMock.getMouDongDiscounts(product));
// 获取某夕夕的价格以及优惠,并计算最终实付价格
PriceResult mouXiXiPrice = computeRealPrice(HttpRequestMock.getMouXiXiPrice(product),
HttpRequestMock.getMouXiXiDiscounts(product));
// 计算并选出实际价格最低的平台
return Stream.of(mouBaoPrice, mouDongPrice, mouXiXiPrice).
min(Comparator.comparingInt(PriceResult::getRealPrice))
.get();
}
一切顺利成章,运行测试下:
05:24:53.759[main|1]获取某宝上 Iphone13的价格
05:24:54.779[main|1]获取某宝上 Iphone13的价格完成: 5199
05:24:54.779[main|1]获取某宝上 Iphone13的优惠
05:24:55.781[main|1]获取某宝上 Iphone13的优惠完成: -200
05:24:55.781[main|1]某宝最终价格计算完成:4999
05:24:55.781[main|1]获取某东上 Iphone13的价格
05:24:56.784[main|1]获取某东上 Iphone13的价格完成: 5299
05:24:56.784[main|1]获取某东上 Iphone13的优惠
05:24:57.786[main|1]获取某东上 Iphone13的优惠完成: -150
05:24:57.786[main|1]某东最终价格计算完成:5149
05:24:57.786[main|1]获取某夕夕上 Iphone13的价格
05:24:58.788[main|1]获取某夕夕上 Iphone13的价格完成: 5399
05:24:58.788[main|1]获取某夕夕上 Iphone13的优惠
05:24:59.791[main|1]获取某夕夕上 Iphone13的优惠完成: -5300
05:24:59.791[main|1]某夕夕最终价格计算完成:99
获取最优价格信息:【平台:某夕夕, 原价:5399, 折扣:0, 实付价:99】
结果与第一种实现方式一致,但是接口总耗时从 6s
下降到了 2s
,效果还是很显著的。但是,是否还能再压缩一些呢?
基于上面按照平台拆分并行处理的思路继续推进,我们可以看出每个平台内的处理逻辑其实可以分为3个主要步骤:
- 获取原始价格(耗时操作)
- 获取折扣优惠(耗时操作)
- 得到原始价格和折扣优惠之后,计算实付价格
这3个步骤中,第1、2两个耗时操作也是相对独立的,如果也能并行处理的话,响应时长上应该又会缩短一些,即如下的处理流程:
我们当然可以继续使用上面提到的 线程池+Future
的方式,但 Future
在应对并行结果组合以及后续处理等方面显得力不从心, 弊端明显:
代码写起来会 非常拖沓:先封装
Callable
函数放到线程池中去执行查询操作,然后分三组阻塞等待
结果并计算出各自结果,最后再阻塞等待
价格计算完成后汇总得到最终结果。
说到这里呢,就需要我们新的主人公 CompletableFuture
登场了,通过它我们可以很轻松的来完成任务的并行处理,以及各个并行任务结果之间的组合再处理等操作。我们使用 CompletableFuture
编写实现代码如下:
public PriceResult getCheapestPlatAndPrice3(String product) {
// 获取并计算某宝的最终价格
CompletableFuture mouBao =
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)),
this::computeRealPrice);
// 获取并计算某宝的最终价格
CompletableFuture mouDong =
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouDongDiscounts(product)),
this::computeRealPrice);
// 获取并计算某宝的最终价格
CompletableFuture mouXiXi =
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiPrice(product))
.thenCombine(CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouXiXiDiscounts(product)),
this::computeRealPrice);
// 排序并获取最低价格
return Stream.of(mouBao, mouDong, mouXiXi)
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(PriceResult::getRealPrice))
.findFirst()
.get();
}
看下执行结果符合预期,而接口耗时则降到了 1s
(因为我们依赖的每一个查询实际操作的接口耗时都是模拟的1s,所以这个结果已经算是此复合接口能达到的极限值了)。
06:01:12.334[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13的价格
06:01:12.334[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13的优惠
06:01:12.334[ForkJoinPool.commonPool-worker-11|14]获取某东上 Iphone13的价格
06:01:12.334[ForkJoinPool.commonPool-worker-13|16]获取某夕夕上 Iphone13的价格
06:01:12.334[ForkJoinPool.commonPool-worker-4|15]获取某东上 Iphone13的优惠
06:01:12.334[ForkJoinPool.commonPool-worker-6|17]获取某夕夕上 Iphone13的优惠
06:01:13.354[ForkJoinPool.commonPool-worker-6|17]获取某夕夕上 Iphone13的优惠完成: -5300
06:01:13.354[ForkJoinPool.commonPool-worker-13|16]获取某夕夕上 Iphone13的价格完成: 5399
06:01:13.354[ForkJoinPool.commonPool-worker-4|15]获取某东上 Iphone13的优惠完成: -150
06:01:13.354[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13的价格完成: 5199
06:01:13.354[ForkJoinPool.commonPool-worker-11|14]获取某东上 Iphone13的价格完成: 5299
06:01:13.354[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13的优惠完成: -200
06:01:13.354[ForkJoinPool.commonPool-worker-13|16]某夕夕最终价格计算完成:99
06:01:13.354[ForkJoinPool.commonPool-worker-11|14]某东最终价格计算完成:5149
06:01:13.354[ForkJoinPool.commonPool-worker-2|13]某宝最终价格计算完成:4999
获取最优价格信息:【平台:某夕夕, 原价:5399, 折扣:0, 实付价:99】
现在,我们知道了方法名称带有Async和不带Async的实现策略上的差异点就在于使用哪个线程池来执行而已。那么,对我们实际的指导意义是啥呢?实际使用的时候,我们怎么判断自己应该使用带Async结尾的方法、还是不带Async结尾的方法呢?
上面是Async结尾方法默认使用的ForkJoinPool创建的逻辑,这里可以看出,默认的线程池中的工作线程数是 CPU核数 - 1
,并且指定了默认的丢弃策略等,这就是一个主要关键点。
所以说,符合以下几个条件的时候,可以考虑使用带有Async后缀的方法,指定自定义线程池:
- 默认线程池的线程数满足不了实际诉求
- 默认线程池的类型不符合自己业务诉求
- 默认线程池的队列满处理策略不满足自己诉求
与Stream结合使用的注意点
在我前面的文档中,有细致全面的介绍过 Stream
流相关的使用方式(不清楚的同学速点👉👉《吃透JAVA的Stream流操作,多年实践总结》了解下啦)。在涉及批量进行并行处理的时候,通过 Stream
与 CompletableFuture
结合使用,可以简化我们的很多编码逻辑。但是 在使用细节方面需要注意下,避免达不到使用 CompletableFuture
的预期效果。
需求场景:
在同一个平台内,传入多个商品,查询不同商品对应的价格与优惠信息,并选出实付价格最低的商品信息。
结合前面的介绍分析,我们应该知道最佳的方式,就是同时并行的方式去各自请求数据,最后合并处理即可。所以我们规划按照如下的策略来实现:
先看第一种编码实现:
public PriceResult comparePriceInOnePlat(List products) {
return products.stream()
.map(product ->
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(() -> HttpRequestMock.getMouBaoDiscounts(product)),
this::computeRealPrice))
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(PriceResult::getRealPrice))
.findFirst()
.get();
}
对于List的处理场景,这里采用了Stream方式来进行遍历与结果的收集、排序与返回。看似正常,但是执行的时候会发现,并没有达到我们预期的效果:
07:37:14.388[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13黑色的价格
07:37:14.388[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13黑色的优惠
07:37:15.408[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13黑色的价格完成: 5199
07:37:15.408[ForkJoinPool.commonPool-worker-2|13]获取某宝上 Iphone13黑色的优惠完成: -200
07:37:15.408[ForkJoinPool.commonPool-worker-2|13]某宝最终价格计算完成:4999
07:37:15.408[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13白色的价格
07:37:15.409[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13白色的优惠
07:37:16.410[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13白色的价格完成: 5199
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13白色的优惠完成: -200
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]某宝最终价格计算完成:4999
07:37:16.410[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13红色的优惠
07:37:16.410[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13红色的价格
07:37:17.412[ForkJoinPool.commonPool-worker-11|14]获取某宝上 Iphone13红色的价格完成: 5199
07:37:17.412[ForkJoinPool.commonPool-worker-9|12]获取某宝上 Iphone13红色的优惠完成: -200
07:37:17.412[ForkJoinPool.commonPool-worker-9|12]某宝最终价格计算完成:4999
获取最优价格信息:【平台:某宝, 原价:5199, 折扣:0, 实付价:4999】
从执行结果可以看出,三个商品并行处理,整体处理耗时相比前面编码方式有很大提升,达到了预期的效果。
📢 归纳下:
因为Stream的操作具有 延迟执行的特点,且只有遇到终止操作(比如collect方法)的时候才会真正的执行。所以遇到这种需要并行处理且需要合并多个并行处理流程的情况下,需要将并行流程与合并逻辑放到两个Stream中,这样分别触发完成各自的处理逻辑,就可以了。
甜点:并发和并行的区别
对一个吃货而言,主餐完毕,总得来点餐后甜点才够满足。
在前面的内容中呢,我们始终是在围绕 并行
处理这个话题在展开。实际工作的时候,我们对于并发这个词肯定也不陌生, 高并发
这个词,就像高端人士酒杯中那八二年的拉菲一般,成了每一个开发人员简历上用来彰显实力的一个标签。
那么, 并发和 并行到底啥区别?这里我们也简单的概括下。
并发
所谓 并发,其关注的点是服务器的 吞吐量
情况,也就是服务器可以在单位时间内同时处理多少个请求。并发是通过 多线程
的方式来实现的,充分利用当前CPU多核能力,同时使用多个进程去处理业务,使得同一个机器在相同时间内可以处理更多的请求,提升吞吐量。
所有的操作在一个线程中串行推进,如果有多个线程同步处理,则同时有多个请求可以被处理。但是因为是串行处理,所以如果某个环节需要对外交互时,比如等待网络IO的操作,会使得当前线程处于 阻塞状态
,直到资源可用时被唤醒继续往后执行。
对于 高并发场景,服务器的线程资源是非常宝贵的。如果频繁的处于阻塞则会导致浪费,且线程频繁的阻塞、唤醒切换动作,也会加剧整体系统的性能损耗。所以并发这种多线程场景,更适合 CPU密集型的操作。
并行
所谓 并行,就是将同一个处理流程没有相互依赖的部分放到多个线程中进行同时并行处理,以此来达到相对于串行模式更短的单流程处理耗时的效果,进而提升系统的 整体响应时长与 吞吐量。
基于异步编程实现的并行操作也是借助线程池的方式,通过多线程同时执行来实现效率提升的。与并发的区别在于:并行通过将任务切分为一个个可独立处理的小任务块,然后基于系统 调度策略
,将需要执行的任务块分配给空闲可用 工作线程去处理,如果出现需要等待的场景(比如IO请求)则工作线程会将此任务先放下,继续处理后续的任务,等之前的任务IO请求好了之后,系统重新分配可用的工作线程来处理。
根据上面的示意图介绍可以看出,异步并行编程,对于工作线程的利用率上升,不会出现工作线程阻塞的情况,但是因为任务拆分、工作线程间的切换调度等 系统层面的开销也会随之加大。
如何选择
前面介绍了下并发与并行两种模式的特点、以及各自的优缺点。所以选择采用并发还是并行方式来提升系统的处理性能,还需要结合实际项目场景来确定。
综合而言:
- 如果业务处理逻辑是 CPU密集型的操作,优先使用基于线程池实现并发处理方案(可以避免线程间切换导致的系统性能浪费)。
- 如果业务处理逻辑中存在较多 需要阻塞等待的耗时场景、且相互之间没有依赖,比如本地IO操作、网络IO请求等等,这种情况优先选择使用 并行处理策略(可以避免宝贵的线程资源被阻塞等待)。
总结回顾
好啦,关于JAVA中 CompletableFuture
的使用,以及并行编程相关的内容呢就介绍到这里啦。看到这里,相信您应该有所收获吧?那么你的项目里有这种适合并行处理的场景吗?你在处理并行场景的时候是怎么做的呢? 评论区一起讨论下吧~~
补充:
本文中有提及 CompletableFuture执行时所使用的默认线程池是 ForkJoinPool
,早在JAVA7版本就已经被引入,但是很多人对 ForkJoinPool
不是很了解,实际项目中使用的也比较少。其实对 ForkJoinPool
的合理利用,可以让我们在面对某些多线程场景时会更加的从容高效。在后面的文章中,我会针对 ForkJoinPool
有关的内容进行专门的介绍与探讨,如果有兴趣,可以点个关注,及时获取后续的内容。
此外:
- 关于本文中涉及的 演示代码的完整示例,我已经整理并提交到github中,如果您有需要,可以自取:https://github.com/veezean/JavaBasicSkills
我是悟道,聊技术、又不仅仅聊技术~
如果觉得有用,请 点赞 + 关注让我感受到您的支持。也可以关注下我的公众号【架构悟道】,获取更及时的更新。
期待与你一起探讨,一起成长为更好的自己。
Original: https://www.cnblogs.com/softwarearch/p/16516980.html
Author: 架构悟道
Title: 使用JAVA CompletableFuture实现流水线化的并行处理,深度实践总结
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/584924/
转载文章受原作者版权保护。转载请注明原作者出处!