当前位置: 首页 > news >正文

JUC实战经验-CompletableFuture 异步编程利器

开发中为什么使用线程池

  • 降低资源的消耗
    • 通过重复利用已经创建好的线程降低线程的创建和销毁带来的损耗
  • 提高响应速度
    • 因为线程池中的线程数没有超过线程池的最大上限时,有的线程处于等待分配任务的状态,当任务来时无需创建新的线程就能执行
  • 提高线程的可管理性
    • 线程池会根据当前系统特点对池内的线程进行优化处理,减少创建和销毁线程带来的系统开销。无限的创建和销毁线程不仅消耗系统资源,还降低系统的稳定性,使用线程池进行统一分配

常见的 4 种线程池

  • newCachedThreadPool
    • 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若
      无可回收,则新建线程。
  • newFixedThreadPool
    • 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool
    • 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor
    • 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

我们异步执行一个任务时,一般是用线程池Executor去创建。如果不需要有返回值,任务实现Runnable接口;如果需要有返回值,任务实现Callable接口,调用Executor的submit方法,再使用Future获取即可。如果多个线程存在依赖组合的话,则可以使用CompeletableFuture。

Future和Callable接口

因为CompletableFuture实现了Future接口,我们先从Future接口说起

Future是Java5新加的一个接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。,它提供了一种异步并行计算的功能。如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。主线程继续处理其他任务,处理完成后,再通过Future获取计算结果。

如下例,假设我们有两个任务服务,一个是sku基本信息获取,一个是获取spu的销售属性组合。

public class GoodsSkuService {

    public GoodsSkuInfo getGoodsSkuInfo(Long userId) throws InterruptedException {
        Thread.sleep(300);//模拟调用耗时  一般是查数据库,或者远程调用返回的结果
    }
}

public class GoodsSpuService {

    public GoodsSpuInfo getGoodsSpu(long userId) throws InterruptedException {
        Thread.sleep(500); //模拟调用耗时
    }
}

接下来,我们来演示下,在主线程中是如何使用Future来进行异步调用的。

public class FutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //调用用户服务sku基本信息获取
        FutureTask<GoodsSkuInfo> goodsSkuInfo= new FutureTask<>(new Callable<GoodsSkuInfo>() {
            @Override
            public GoodsSkuInfocall() throws Exception {
                return goodsSkuInfoService.getGoodsSkuInfo(goodsId);
            }
        });
        executorService.submit(userInfoFutureTask);
        Thread.sleep(300); //模拟主线程其它操作耗时
        FutureTask<GoodsSpuInfo> goodsSpuInfo= new FutureTask<>(new Callable<GoodsSpuInfo>() {
            @Override
            public GoodsSkuInfocall() throws Exception {
                return goodsSpuInfoService.getGoodsSpuInfo(goodsId);
            }
        });
        executorService.submit(medalInfoFutureTask);
        GoodsSkuInfo goodsSkuInfo= userInfoFutureTask.get();//sku基本信息获取
        GoodsSpuInfo goodsSpuInfo= medalInfoFutureTask.get();//获取spu的销售属性组合。
    }
}
    

future+线程池异步配合,可以提高程序的执行效率,但是Future对于结果的获取,不是很友好,只能通过阻塞或者轮询的方式得到任务的结果。

  • Future.get() 就是阻塞调用,在线程获取结果之前get方法会一直阻塞。
  • Future提供了一个isDone方法,可以在程序中轮询这个方法查询执行结果。

阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。因此,JDK8设计出CompletableFuture。CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

对Future的改进

CompletableFuture

  • 在java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处埋计算结果,也提供了转换和组合CompletableFuture的方法。
  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。

上述例子可以改写成:

ublic class FutureTest {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

        //调用用户服务sku基本信息获取
       FutureTask<GoodsSkuInfo> completablegoodsSkuInfo = CompletableFuture.supplyAsync(() ->  goodsSkuInfoService.getGoodsSkuInfo(goodsId));

        Thread.sleep(300); //模拟主线程其它操作耗时

        FutureTask<GoodsSpuInfo> completablegoodsSpuInfo = CompletableFuture.supplyAsync(() -> goodsSpuInfoService.getGoodsSpuInfo(goodsId)); 
		GoodsSkuInfo goodsSkuInfo= completablegoodsSkuInfo Future.get(2,TimeUnit.SECONDS);;//sku基本信息获取
        GoodsSpuInfo goodsSpuInfo= completablegoodsSpuInfo Future.get();//获取spu的销售属性组合。
    }
}

可以发现,使用CompletableFuture,代码简洁了很多。CompletableFuture的supplyAsync方法,提供了异步执行的功能,线程池也不用单独创建了。实际上,它CompletableFuture使用了默认线程池是ForkJoinPool.commonPool。

CompletionStage(不做详细叙述)

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function,Consumer或者Runnable。.比如:stage.thenApply(X -> square(x).thenAccept( x -System.out.print(x) ).thenRun(() -> System.out.printIn())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

CompletableFuture的优点

异步任务结束时,会自动回调某个对象的方法;
异步任务出错时,会自动回调某个对象的方法;
主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行。

核心的四个静态方法

CompletableFuture提供了几十种方法,辅助我们的异步任务场景。这些方法包括创建异步任务、任务异步回调、多个任务组合处理等方面。再次仅仅比较重要的四个核心的静态方法。
1、runAsync 无 返回值

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)  

代码示例

public class CompletableFutureDemo3{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().getName()+"\t"+"-----come in");
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("-----task is over");
        });
        System.out.println(future.get());
    }
}

2、supplyAsync 有 返回值

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

代码示例

public class CompletableFutureDemo3{
    public static void main(String[] args) throws ExecutionException, InterruptedException{
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
            //暂停几秒钟线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextInt(100);
        });
        System.out.println(completableFuture.get());
    }
}

上述Executor executor参数说明

没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

计算完成时回调方法

public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whencompleteAsync(BiConsumer<? super T,? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,super Throwable> action,Executor executor);
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);

whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。
whenComplete和whenCompleteAsync的区别:

  • whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务。
  • whenCompleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池来进行执行。

方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

减少阻塞和轮询

从Java8开始引入了CompletableFuture,它是Future的功能增强版,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

public class CompletableFutureDemo3{
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "\t" + "-----come in");
            int result = ThreadLocalRandom.current().nextInt(10);
            //暂停几秒钟线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println("-----计算结束耗时1秒钟,result: "+result);
            if(result > 6){
                int age = 10/0;
            }
            return result;
        }).whenComplete((v,e) ->{
            if(e == null){
                System.out.println("-----result: "+v);
            }
        }).exceptionally(e -> {
            System.out.println("-----exception: "+e.getCause()+"\t"+e.getMessage());
            return -44;
        });

        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
    }
}

线程串行化方法

public <U> CompletableFuture<U> thenApply(Function<? super T,extends U> fn)
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前
任务的返回值。

public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)
public Completionstage<Void> thenAccept(Consumer<? super T> action);
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。

public Completionstage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
public CompletionStage<Void> thenRun(Runnable action);
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行
thenRun 的后续操

public Completionstage<Void> thenRunAsync(Runnable action);
public Completionstage<Void> thenRunAsync(Runnable action,Executor executor )

带有 Async 默认是异步执行的。

多任务组合

public static CompletableFuture<Void>allof(CompletableFuture<?>... cfs);
public static CompletableFuture<Object>anyof(CompletableFuture<?>... cfs);

allOf:等待所有任务完成
anyOf:只要有一个任务完成

实战案例

业务场景:
查询商品详情页的逻辑比较复杂,有些数据还需要远程调用,必然需要花费更多的时间。

//1,获取sku的基本信息				0.5s

//2.获取sku的图片信息				0.5s

//3.获取sku的促销信息				1s

//4.获取spu的所有销售属性			1s

//5.获取规格参数组及组下的规格参数	1.5s

//6.spu详情						1s

假如商品详情页的每个查询,需要如下标注的时间才能完成那么,用户需要 5.5s 后才能看到商品详情页的内容。很显然是不能接受的。如果有多个线程同时完成这 6 步操作,也许只需要 1.5s。

以下是实战项目中利用线程池加CompletableFuture 异步编排来优化代码一定程度上增大了接口的QPS。

@Override
    public SkuItemVo item(Long skuId) throws ExecutionException, InterruptedException {
        SkuItemVo skuItemVo = new SkuItemVo();
        CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
            //1、sku基本信息获取 pms_sku_info
            SkuInfoEntity info = getById(skuId);
            skuItemVo.setInfo(info);
            return info;
        }, executor);
        CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync(res -> {
            //3、获取spu的销售属性组合
            List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBySkuId(res.getSkuId());
            skuItemVo.setSaleAttr(saleAttrVos);
        }, executor);
        CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync(res -> {
            //4、获取spu的介绍 pms_sku_info_desc
            SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
            skuItemVo.setDesc(spuInfoDescEntity);
        }, executor);
        CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync(res -> {
            //5、获取spu的规格参数信息
            List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
            skuItemVo.setGroupAttrs(attrGroupVos);
        }, executor);

        //2、sku的图片信息 pms_sku_images
        CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
            List<SkuImagesEntity> images = imagesService.getIamImagesBySkuId(skuId);
            skuItemVo.setImages(images);
        }, executor);

        CompletableFuture<Void> seckillFuture = CompletableFuture.runAsync(() -> {
            //3、远程调用查询当前sku是否参与秒杀优惠活动
            R skuSeckilInfo = seckillFeignService.getSkuSeckilInfo(skuId);
            if (skuSeckilInfo.getCode() == 0) {
                //查询成功
                SeckillSkuVo seckilInfoData = skuSeckilInfo.getData("data", new TypeReference<SeckillSkuVo>() {
                });
                skuItemVo.setSeckillSkuVo(seckilInfoData);
                if (seckilInfoData != null) {
                    long currentTime = System.currentTimeMillis();
                    if (currentTime > seckilInfoData.getEndTime()) {
                        skuItemVo.setSeckillSkuVo(null);
                    }
                }
            }
        }, executor);
        //等到所有任务都完成
        CompletableFuture.allOf(saleAttrFuture,descFuture,baseAttrFuture,imageFuture,seckillFuture).get();

        return skuItemVo;
    }

上述代码是本此业务的核心代码,涉及微服务的远程调用等与CompletableFuture 无关的代码就给予忽视了

相关文章:

  • 免费注册网站域名可以用吗/河南郑州最新消息今天
  • 溧水做网站价格/守游网络推广平台
  • 政府网站外文版建设/百度一下进入首页
  • 做短视频网站/谷歌seo营销
  • 做网站需要人员/大数据营销系统多少钱
  • 厦门建模培训/优化模型数学建模
  • Java简系 - Java入门「一」
  • 清华学姐三年的测试成长经历,到最后的喜提高薪offer
  • 【软考】-- 多媒体基础知识
  • 关于ETL的两种架构(ETL架构和ELT架构)
  • ClickHouse 挺快,esProc SPL 更快
  • 【Error: error:0308010C:digital envelope routines::unsupported】
  • Spring Boot中Spring MVC的基本配置讲解与实战(包括静态资源配置,拦截器配置,文件上传配置及实战 附源码)
  • 第八章、ansible基于清单管理大项目
  • C语言学习笔记
  • 嵌入式硬件笔记——flash
  • 顺序表-c语言实现
  • 【数据结构基础】之树的介绍,生动形象,通俗易懂,算法入门必看