CompleteFuture实现简单的任务编排实践
一:前言
CompleteFuture是java8 新提供的API,是对函数式编程思想的体现,提供了很多的对于函数式编程支持。不止有同步处理功能,还有异步处理能力。
通过函数式编程可以实现线程的简单任务编排。高效,整洁实现多线程异步编程。
二:详细介绍
CompleteFuture
提供的API中以ansy
结尾的都是异步处理的。
异步执行任务,并返回结果:
supplyAsync
异步处理,并返回结果,默认使用ForkJoinPool.commonPool()
线程池,同时提供支持自定义线程池的API。CompletableFuture.supplyAsync(() -> "HELLO");// 自定义线程池CompletableFuture.supplyAsync(()->"hello",ES);
CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !")); CompletableFuture.runAsync(() -> System.out.println("HELLO WORLD !"),ES);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "HELLO") .thenApply(a -> return a + " lili!"; });
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello") .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili")) .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy"));// 执行结果: =====> hello lili lucy// mian线程下同步执行。
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello") .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lili")) .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " lucy")) .thenCombineAsync(CompletableFuture.supplyAsync(() -> " how are you!"), (a, b) -> a + b); log.info("=====> {}", f1.get()); // 执行结果: =====> hello lili lucy how are you!
CompletableFuture<String> voidCompletableFuture = CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace();} return "lucy"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();} return "lili"; }), a -> "hello " + a); log.info("ret ====> {}",voidCompletableFuture.get());// 执行结果: ret ====> hello lili 如果下面sleep改成3s,执行结果:ret ====> hello lucy
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> "hello") .thenAcceptAsync(a -> { a = a + " lucy !"; log.info("ret ======> {}", a); }); log.info(" ======== end ========================");// 执行结果:ret ======> hello lucy ! 而且是异步的,不会阻塞主线程,下面的end是先打印出来的
CompletableFuture.supplyAsync(() -> "hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " lili"), (a, b) -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } log.info("=======>{}", a + b); });// 执行结果:=======>hello lili
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "lucy"; }).acceptEither(CompletableFuture.supplyAsync(() -> "lili"), a -> { log.info("hello {}", a); }); // 执行结果:hello lili
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (ThreadLocalRandom.current().nextInt(2) < 2) { throw new RuntimeException("error"); } return "hello"; }).whenComplete((a, e) -> { log.info("ret -> {}", a + " lili!"); log.error("error", e); }); log.info("future.get()-->{}", future.get()); // 执行结果:ret -> null lili! 而且打印两次错误日志,一次是log打印,一次是get的时候。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello") .handle((a, e) -> a + " lili!"); log.info("ret ==> {}", future.get());// 执行结果:ret ==> hello lili!
CompletableFuture<Object> f = CompletableFuture.supplyAsync(() -> "Hello") .thenApplyAsync(res -> res + " World") .thenApplyAsync( res -> { throw new RuntimeException(" test has error"); // return res + "!"; }) .exceptionally( e -> { log.error("exceptionally exception",e); return "出异常了。。"; }); log.info("ret ====> {}", f.get());// 执行结果:ret ====> 出异常了。。// 假如不抛出异常,执行结果:ret ====> Hello World!
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> f4 = CompletableFuture.supplyAsync(() -> "world"); CompletableFuture<String> f5 = CompletableFuture.supplyAsync( () -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "!"; }); // 使用allOf方法 f3 f4 f5 都执行结束之前一直阻塞 CompletableFuture.allOf(f3, f4, f5).join(); System.out.println(f3.get()); System.out.println(f4.get()); System.out.println(f5.get()); List<String> r = Stream.of(f3, f4, f5).map(CompletableFuture::join).collect(Collectors.toList()); System.out.println(r); // 执行结果:hello// world// !// [hello, world, !]// 而且要等f1,f2,f3 三个任务都结束,不然会一直阻塞。
这个类中的大部分方法上面都做了介绍,下面可以结合具体场景做一次演示。
无关性任务,互相依赖,
allOf
产出型,异常时候进行处理,并产出,有点像try-catch(),
exceptionally()
产出型,无论正常还是异常都是处理,并返回结果。
handle
,handleAsync
消费型,无论正常,还是异常都会消费处理,而且不会吞掉异常
whenComplete()
,whenCompleteAsync()
消费型:
acceptEither()
依赖两个任务中先执行结束的那个消费型,依赖两个任务都完成:
thenAcceptBoth()
,thenAcceptBothAsync()
消费型,依赖单阶段:
thenAccept()
,thenAcceptAsync()
依赖两个任务中的一个:
applyToEither()
,那个任务先结束,就依赖那个任务。组合与撰写:
thenCompose()
,thenCombine()
,thenCombineAsync
.依赖单一阶段:
thenApply thenApplyAsync
异步执行任务,不返回结果:
runAsync
三:DEMO
场景1:需要查询一个订单信息,首先需要查询商品信息,然后查询支付信息,最后汇总成一个对象返回。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "商品信息") .thenCombineAsync(CompletableFuture.supplyAsync(() -> "支付信息"), (a, b) -> { // 组装信息 return a + b; }); log.info("ret =========>{}",future.get());
场景2:用户注册,首先需要校验用户信息,然后生成账号信息,最后保存到数据库。这三个操作互相依赖。
// A -> B-> C CompletableFuture<String> future = CompletableFuture.runAsync(() -> { if (ThreadLocalRandom.current().nextBoolean()){ return; } throw new RuntimeException("该手机号码已经注册"); }).thenCompose(ret -> CompletableFuture.supplyAsync(() -> { if (ThreadLocalRandom.current().nextBoolean()) { // 生成账号信息 return "账号信息: 16289"; } throw new RuntimeException("账号信息生成失败。。"); })).thenApplyAsync(ret -> { // 保存账号信息 log.info("保存账号信息->{}", ret); return "注册成功"; }).exceptionally(e -> "注册失败" + e.getMessage()); log.info("最终返回结果:===》 {}",future.get());
来源https://www.cnblogs.com/simple-flw/p/15422611.html