阅读 139

CompleteFuture实现简单的任务编排实践

一:前言

CompleteFuture是java8 新提供的API,是对函数式编程思想的体现,提供了很多的对于函数式编程支持。不止有同步处理功能,还有异步处理能力。

通过函数式编程可以实现线程的简单任务编排。高效,整洁实现多线程异步编程。

二:详细介绍

 CompleteFuture 提供的API中以ansy结尾的都是异步处理的。

  1. 异步执行任务,并返回结果:supplyAsync 异步处理,并返回结果,默认使用 ForkJoinPool.commonPool()线程池,同时提供支持自定义线程池的API。

    CompletableFuture.supplyAsync(() -> "HELLO");// 自定义线程池CompletableFuture.supplyAsync(()->"hello",ES);
    CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !"));
    CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !"),ES);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "HELLO")
                    .thenApply(a -> 
                        return a + " lili!";
                    });
       CompletableFuture<String> f1 =
                    CompletableFuture.supplyAsync(() -> "hello")
                            .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili"))
                            .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy"));// 执行结果: =====> hello lili lucy// mian线程下同步执行。
      CompletableFuture<String> f1 =
                    CompletableFuture.supplyAsync(() -> "hello")
                            .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili"))
                            .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy"))
                            .thenCombineAsync(CompletableFuture.supplyAsync(() -> " how are you!"), (a, b) -> a + b);
            log.info("=====> {}", f1.get());  // 执行结果: =====> hello lili lucy how are you!
            CompletableFuture<String> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {            try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();}            return "lucy";
            }).applyToEither(CompletableFuture.supplyAsync(() -> {            try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}            return "lili";
            }), a -> "hello " + a);
            log.info("ret ====> {}",voidCompletableFuture.get());// 执行结果: ret ====> hello lili 如果下面sleep改成3s,执行结果:ret ====> hello lucy
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "hello")
                    .thenAcceptAsync(a -> {
                        a = a + " lucy !";
                        log.info("ret ======> {}", a);
                    });
             log.info(" ======== end ========================");// 执行结果:ret ======> hello lucy ! 而且是异步的,不会阻塞主线程,下面的end是先打印出来的
         CompletableFuture.supplyAsync(() -> "hello")
                    .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " lili"), (a, b) -> {                    try {
                            TimeUnit.SECONDS.sleep(3);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        log.info("=======>{}", a + b);
                    });// 执行结果:=======>hello lili
        CompletableFuture.supplyAsync(() -> {            try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }            return "lucy";
            }).acceptEither(CompletableFuture.supplyAsync(() -> "lili"), a -> {
                log.info("hello {}", a);
            });        
     // 执行结果:hello lili
          CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {            if (ThreadLocalRandom.current().nextInt(2) < 2) {                throw new RuntimeException("error");
                }            return "hello";
            }).whenComplete((a, e) -> {
               log.info("ret -> {}", a + " lili!");
               log.error("error", e);
            });
            log.info("future.get()-->{}", future.get());        
     // 执行结果:ret -> null lili!  而且打印两次错误日志,一次是log打印,一次是get的时候。
      CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello")
                    .handle((a, e) -> a + " lili!");
    
            log.info("ret ==> {}", future.get());// 执行结果:ret ==> hello lili!
       CompletableFuture<Object> f =
                    CompletableFuture.supplyAsync(() -> "Hello")
                            .thenApplyAsync(res -> res + " World")
                            .thenApplyAsync(
                                    res -> {                                    throw new RuntimeException(" test has error");                                    //  return res + "!";
                                    })
                            .exceptionally(
                                    e -> {
                                        log.error("exceptionally exception",e);                                    return "出异常了。。";
                                    });
     log.info("ret ====> {}", f.get());// 执行结果:ret ====> 出异常了。。// 假如不抛出异常,执行结果:ret ====> Hello World!
         CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "hello");
            CompletableFuture<String> f4 = CompletableFuture.supplyAsync(() -> "world");
            CompletableFuture<String> f5 =
                    CompletableFuture.supplyAsync(
                            () -> {                            try {
                                    TimeUnit.SECONDS.sleep(3);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }                            return "!";
                            });        // 使用allOf方法 f3 f4 f5 都执行结束之前一直阻塞
            CompletableFuture.allOf(f3, f4, f5).join();
    
            System.out.println(f3.get());
            System.out.println(f4.get());
            System.out.println(f5.get());
            List<String> r =
                    Stream.of(f3, f4, f5).map(CompletableFuture::join).collect(Collectors.toList());
    
            System.out.println(r);        
            
    // 执行结果:hello// world// !// [hello, world, !]// 而且要等f1,f2,f3 三个任务都结束,不然会一直阻塞。

    这个类中的大部分方法上面都做了介绍,下面可以结合具体场景做一次演示。

    1. 无关性任务,互相依赖,allOf

    2. 产出型,异常时候进行处理,并产出,有点像try-catch(),exceptionally()

    3. 产出型,无论正常还是异常都是处理,并返回结果。handlehandleAsync

    4. 消费型,无论正常,还是异常都会消费处理,而且不会吞掉异常 whenComplete()whenCompleteAsync()

    5. 消费型:acceptEither() 依赖两个任务中先执行结束的那个

    6. 消费型,依赖两个任务都完成:thenAcceptBoth()thenAcceptBothAsync()

    7. 消费型,依赖单阶段: thenAccept()thenAcceptAsync()

    8. 依赖两个任务中的一个:applyToEither() ,那个任务先结束,就依赖那个任务。

    9. 组合与撰写:thenCompose() ,thenCombine()thenCombineAsync.

    10. 依赖单一阶段:thenApply thenApplyAsync

    11. 异步执行任务,不返回结果:runAsync

三:DEMO

场景1:需要查询一个订单信息,首先需要查询商品信息,然后查询支付信息,最后汇总成一个对象返回。

     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "商品信息")
                .thenCombineAsync(CompletableFuture.supplyAsync(() -> "支付信息"), (a, b) -> {                    // 组装信息
                    return a + b;
                });
        log.info("ret =========>{}",future.get());

场景2:用户注册,首先需要校验用户信息,然后生成账号信息,最后保存到数据库。这三个操作互相依赖。

 // A -> B-> C
        CompletableFuture<String> future = CompletableFuture.runAsync(() -> {            if (ThreadLocalRandom.current().nextBoolean()){                return;
            }            throw new RuntimeException("该手机号码已经注册");
        }).thenCompose(ret -> CompletableFuture.supplyAsync(() -> {                    if (ThreadLocalRandom.current().nextBoolean()) {                        // 生成账号信息
                        return "账号信息: 16289";
                    }                    throw new RuntimeException("账号信息生成失败。。");
                })).thenApplyAsync(ret -> {                    // 保存账号信息
                    log.info("保存账号信息->{}", ret);                    return "注册成功";
                }).exceptionally(e -> "注册失败" + e.getMessage());

        log.info("最终返回结果:===》 {}",future.get());

来源https://www.cnblogs.com/simple-flw/p/15422611.html

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐