Future异步回调进阶使用记录一下
前言
在某些场景中,对于一些不重要的任务可以使用异步执行的方式进行处理,而有些情况下又有需要获取异步执行任务的回调结果
这时就可以使用Future接口,这次结合个人理解,记录一下关于Future接口异步回调的进阶使用,以便后续查阅。
FutureTask
在使用线程池异步执行任务时,如果需要获取执行结果,那就需要使用submit()方法提交任务,
其内部会封装一个FutureTask对象,最后通过get()方法阻塞式获取结果(其实相当于伪异步):
public static void main (String[] args) throws Exception { ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6)); // 例子一 Future<?> submit = EXECUTOR.submit(() -> { log.info("FutureTask使用,无返回值"); }); log.info("FutureTask使用,无返回值:" + submit.get()); // 例子二 Future<String> submit1 = EXECUTOR.submit(() -> { log.info("FutureTask使用,有返回值"); return "1"; }); log.info("FutureTask使用,有返回值:" + submit1.get()); // 例子三 AtomicInteger num = new AtomicInteger(1); Future<AtomicInteger> submit2 = EXECUTOR.submit(() -> { log.info("FutureTask使用,多参数"); num.set(2); }, num); log.info("FutureTask使用,多参数:" + submit2.get()); } ----------------------------- 打印结果: 18:07:17.346 - [pool-2-thread-1] - FutureTask使用,无返回值 18:07:17.348 - [main] - FutureTask使用,无返回值:null 18:07:17.349 - [pool-2-thread-2] - FutureTask使用,有返回值 18:07:17.349 - [main] - FutureTask使用,有返回值:1 18:07:17.350 - [pool-2-thread-3] - FutureTask使用,多参数 18:07:17.350 - [main] - FutureTask使用,多参数:2复制代码
使用FutureTask回调方式会使主线程阻塞,如果想要不阻塞主线程那可以使用下列几种机制复制代码
ListenableFuture
谷歌的guava包提供了一种不阻塞主线程的回调类ListenableFuture,其扩展的Future接口提供一个方法addListener()
如果想要使用这种回调机制,需要使用MoreExecutors对象对自定义线程池ThreadPoolExecutor进行了装饰
使用MoreExecutors装饰后的线程池提交任务后会返回一个ListenableFuture对象
在ListenableFuture对象调用addListener()方法,注册一个监听对象和线程池。当线程任务执行完后就会触发监听对象:
public static void main (String[] args) throws Exception { ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6)); log.info("约女神看电影"); ListeningExecutorService executorService = MoreExecutors.listeningDecorator(EXECUTOR); ListenableFuture<String> future = executorService.submit(() -> { log.info("女神开始化妆了"); try { Thread.sleep(3000); } catch (InterruptedException e) { } return "女神妆化完了"; }); future.addListener(() -> { try { log.info("正常回调:" + future.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } }, executorService); log.info("我先做其它事情"); Thread.currentThread().join(); } -------------------------- 打印结果: 18:08:23.926 - [main] - 约女神看电影 18:08:23.975 - [pool-2-thread-1] - 女神开始化妆了 18:08:23.975 - [main] - 我先做其它事情 18:08:26.989 - [pool-2-thread-2] - 正常回调:女神妆化完了复制代码
FutureCallback
若想要自定义控制提交到线程池的任务出现异常情况,可以使用FutureCallback对象
public static void main (String[] args) throws Exception { ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6)); log.info("约女神看电影"); ListeningExecutorService executorService = MoreExecutors.listeningDecorator(EXECUTOR); ListenableFuture<?> future = executorService.submit(() -> { log.info("女神开始化妆了"); try { Thread.sleep(3000); } catch (Exception e) { } // throw new RuntimeException("女神不化妆了"); return "女神妆化完了"; }); Futures.addCallback(future, new FutureCallback<Object>() { @Override public void onSuccess (Object result) { log.info("正常情况回调:" + result); } @Override public void onFailure (Throwable t) { log.info("异常情况回调:" + t.getMessage()); } }); log.info("我先做其它事情"); Thread.currentThread().join(); } ------------------------------- 打印结果: 18:12:25.848 - [main] - 约女神看电影 18:12:25.892 - [pool-2-thread-1] - 女神开始化妆了 18:12:25.903 - [main] - 我先做其它事情 18:12:28.899 - [pool-2-thread-1] - 正常情况回调:女神妆化完了复制代码
把这个demo中的异常注释打开,然后执行代码其结果如下:
18:13:26.317 - [main] - 约女神看电影 18:13:26.363 - [pool-2-thread-1] - 女神开始化妆了 18:13:26.373 - [main] - 我先做其它事情 18:13:29.368 - [pool-2-thread-1] - 异常情况回调:女神不化妆了复制代码
CompletableFuture
CompletableFuture是JDK 8之后才推出来的一种回调机制,FutureTask是JDK 1.5时发布的
CompletableFuture可以看做是对Future的增强,实现了不阻塞主线程获取回调结果(真异步)
使用CompletableFuture对象执行任务,不需要自定义线程池(其内部也提供了自定义线程池的扩展方法),默认使用ForkJoinPool.commonPool()线程池
除了不阻塞主线程获取回调结果,还可以自定义处理异常场景:
public static void main (String[] args) throws Exception { ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6)); log.info("约女神看电影"); CompletableFuture.supplyAsync(() -> { log.info("女神开始化妆了"); try { Thread.sleep(3000); } catch (InterruptedException e) { } return "女神妆化完了"; // throw new RuntimeException("女神不化妆了哈"); }).whenComplete((result, throwable) -> { if (throwable == null) { log.info("正常回调:" + result); return; } log.info("异常回调:" + throwable.getMessage()); return; }); log.info("我先做其它事情"); Thread.currentThread().join(); } ------------------------------- 打印结果: 18:15:13.797 - [main] - 约女神看电影 18:15:13.831 - [ForkJoinPool.commonPool-worker-1] - 女神开始化妆了 18:15:13.832 - [main] - 我先做其它事情 18:15:16.833 - [ForkJoinPool.commonPool-worker-1] - 正常回调:女神妆化完了复制代码
把这个demo中的异常注释打开,然后执行代码其结果如下:
18:15:42.565 - [main] - 约女神看电影 18:15:42.600 - [ForkJoinPool.commonPool-worker-1] - 女神开始化妆了 18:15:42.600 - [main] - 我先做其它事情 18:15:45.601 - [ForkJoinPool.commonPool-worker-1] - 异常回调:java.lang.RuntimeException: 女神不化妆了哈复制代码
CompletableFuture提供了很多实用方法,demo里举例了两种提交任务和任务完成时执行的回调方法。其他方法读者可以自行操作一下
作者:超级爽朗的郑
链接:https://juejin.cn/post/7018492453195874311