阅读 172

Future异步回调进阶使用记录一下

前言

  • 在某些场景中,对于一些不重要的任务可以使用异步执行的方式进行处理,而有些情况下又有需要获取异步执行任务的回调结果

  • 这时就可以使用Future接口,这次结合个人理解,记录一下关于Future接口异步回调的进阶使用,以便后续查阅。

FutureTask

  • 在使用线程池异步执行任务时,如果需要获取执行结果,那就需要使用submit()方法提交任务,

  • 其内部会封装一个FutureTask对象,最后通过get()方法阻塞式获取结果(其实相当于伪异步):

public static void main (String[] args) throws Exception {

    ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6));
    // 例子一
    Future<?> submit = EXECUTOR.submit(() -> {
        log.info("FutureTask使用,无返回值");
    });
    log.info("FutureTask使用,无返回值:" + submit.get());

    // 例子二
    Future<String> submit1 = EXECUTOR.submit(() -> {
        log.info("FutureTask使用,有返回值");
        return "1";
    });
    log.info("FutureTask使用,有返回值:" + submit1.get());

    // 例子三
    AtomicInteger num = new AtomicInteger(1);
    Future<AtomicInteger> submit2 = EXECUTOR.submit(() -> {
        log.info("FutureTask使用,多参数");
        num.set(2);
    }, num);
    log.info("FutureTask使用,多参数:" + submit2.get());
}
-----------------------------
打印结果:
18:07:17.346 - [pool-2-thread-1] - FutureTask使用,无返回值
18:07:17.348 - [main] - FutureTask使用,无返回值:null
18:07:17.349 - [pool-2-thread-2] - FutureTask使用,有返回值
18:07:17.349 - [main] - FutureTask使用,有返回值:1
18:07:17.350 - [pool-2-thread-3] - FutureTask使用,多参数
18:07:17.350 - [main] - FutureTask使用,多参数:2复制代码
使用FutureTask回调方式会使主线程阻塞,如果想要不阻塞主线程那可以使用下列几种机制复制代码

ListenableFuture

  • 谷歌的guava包提供了一种不阻塞主线程的回调类ListenableFuture,其扩展的Future接口提供一个方法addListener()

  • 如果想要使用这种回调机制,需要使用MoreExecutors对象对自定义线程池ThreadPoolExecutor进行了装饰

  • 使用MoreExecutors装饰后的线程池提交任务后会返回一个ListenableFuture对象

  • 在ListenableFuture对象调用addListener()方法,注册一个监听对象和线程池。当线程任务执行完后就会触发监听对象:

public static void main (String[] args) throws Exception {

    ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6));
    log.info("约女神看电影");
    
    ListeningExecutorService executorService = MoreExecutors.listeningDecorator(EXECUTOR);
    ListenableFuture<String> future = executorService.submit(() -> {
        log.info("女神开始化妆了");
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }
        return "女神妆化完了";
    });

    future.addListener(() -> {
        try {
            log.info("正常回调:" + future.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }
    }, executorService);

    log.info("我先做其它事情");
    Thread.currentThread().join();
}
--------------------------
打印结果:
18:08:23.926 - [main] - 约女神看电影
18:08:23.975 - [pool-2-thread-1] - 女神开始化妆了
18:08:23.975 - [main] - 我先做其它事情
18:08:26.989 - [pool-2-thread-2] - 正常回调:女神妆化完了复制代码

FutureCallback

  • 若想要自定义控制提交到线程池的任务出现异常情况,可以使用FutureCallback对象

    public static void main (String[] args) throws Exception {
    
        ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6));
        log.info("约女神看电影");

        ListeningExecutorService executorService = MoreExecutors.listeningDecorator(EXECUTOR);
        ListenableFuture<?> future = executorService.submit(() -> {
            log.info("女神开始化妆了");
            try {
                Thread.sleep(3000);
            } catch (Exception e) {
            }
//            throw new RuntimeException("女神不化妆了");
            return "女神妆化完了";
        });

        Futures.addCallback(future, new FutureCallback<Object>() {
            @Override
            public void onSuccess (Object result) {
                log.info("正常情况回调:" + result);
            }

            @Override
            public void onFailure (Throwable t) {
                log.info("异常情况回调:" + t.getMessage());
            }
        });

        log.info("我先做其它事情");
        Thread.currentThread().join();
    }
-------------------------------
打印结果:
18:12:25.848 - [main] - 约女神看电影
18:12:25.892 - [pool-2-thread-1] - 女神开始化妆了
18:12:25.903 - [main] - 我先做其它事情
18:12:28.899 - [pool-2-thread-1] - 正常情况回调:女神妆化完了复制代码
  • 把这个demo中的异常注释打开,然后执行代码其结果如下:

18:13:26.317 - [main] - 约女神看电影
18:13:26.363 - [pool-2-thread-1] - 女神开始化妆了
18:13:26.373 - [main] - 我先做其它事情
18:13:29.368 - [pool-2-thread-1] - 异常情况回调:女神不化妆了复制代码

CompletableFuture

  • CompletableFuture是JDK 8之后才推出来的一种回调机制,FutureTask是JDK 1.5时发布的

  • CompletableFuture可以看做是对Future的增强,实现了不阻塞主线程获取回调结果(真异步)

  • 使用CompletableFuture对象执行任务,不需要自定义线程池(其内部也提供了自定义线程池的扩展方法),默认使用ForkJoinPool.commonPool()线程池

  • 除了不阻塞主线程获取回调结果,还可以自定义处理异常场景:

    public static void main (String[] args) throws Exception {
    
        ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(4, 4, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(6));
        log.info("约女神看电影");

        CompletableFuture.supplyAsync(() -> {
            log.info("女神开始化妆了");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
            }
            return "女神妆化完了";
//            throw new RuntimeException("女神不化妆了哈");
        }).whenComplete((result, throwable) -> {
            if (throwable == null) {
                log.info("正常回调:" + result);
                return;
            }
            log.info("异常回调:" + throwable.getMessage());
            return;
        });

        log.info("我先做其它事情");
        Thread.currentThread().join();
    }
-------------------------------
打印结果:
18:15:13.797 - [main] - 约女神看电影
18:15:13.831 - [ForkJoinPool.commonPool-worker-1] - 女神开始化妆了
18:15:13.832 - [main] - 我先做其它事情
18:15:16.833 - [ForkJoinPool.commonPool-worker-1] - 正常回调:女神妆化完了复制代码
  • 把这个demo中的异常注释打开,然后执行代码其结果如下:

18:15:42.565 - [main] - 约女神看电影
18:15:42.600 - [ForkJoinPool.commonPool-worker-1] - 女神开始化妆了
18:15:42.600 - [main] - 我先做其它事情
18:15:45.601 - [ForkJoinPool.commonPool-worker-1] - 异常回调:java.lang.RuntimeException: 女神不化妆了哈复制代码
  • CompletableFuture提供了很多实用方法,demo里举例了两种提交任务和任务完成时执行的回调方法。其他方法读者可以自行操作一下


作者:超级爽朗的郑
链接:https://juejin.cn/post/7018492453195874311

文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐