阅读 124

ExecutorService的API详解

ExecutorService的终止方法

package com.dwz.executors;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

/**
 *    The demo for class {@link java.util.concurrent.ExecutorService}
 *
 *    question:
 *    When invoked the shutDown method, can execute the new runnable?
 *    Answer:
 *    No!! the ExecutorService will reject after shutDown
 */
public class ExecutorServiceExample1 {
    
    private static void isShutDown() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        System.out.println(executorService.isShutdown());
        executorService.shutdown();
        System.out.println(executorService.isShutdown());
        executorService.execute(() -> System.out.println("I will be executed after shutDown ???"));
    }
    
    /**
     * {@link ExecutorService#isTerminated()}
     * {@link ThreadPoolExecutor#isTerminating()}
     */
    private static void isTerminated() {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        executorService.shutdown();
        System.out.println(executorService.isShutdown());
        System.out.println(executorService.isTerminated());
        System.out.println(((ThreadPoolExecutor)executorService).isTerminating());
    }
    
    private static void executeRunnableError() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10, new MyThreadFactory());
        IntStream.range(0, 10).boxed().forEach(i -> executorService.execute(() -> System.out.println(1 / 0)));
        executorService.shutdown();
        executorService.awaitTermination(10, TimeUnit.MINUTES);
        System.out.println("=====================================");
    }
    
    private static class MyThreadFactory implements ThreadFactory {
        private final static AtomicInteger SEQ = new AtomicInteger();
        
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("My-Thread-" + SEQ.getAndIncrement());
            t.setUncaughtExceptionHandler((i, cause) -> {
                System.out.println("The thread " + i.getName() + " execute failed.");
                cause.printStackTrace();
                System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
            });
            return t;
        }
        
    }
    
    //通过某种方式获取执行结果,即runnable的返回值
    private abstract static class MyTask implements Runnable {
        protected final int no;

        public MyTask(int no) {
            this.no = no;
        }
        
        @Override
        public void run() {
            try {
                this.doInit();
                this.doExecute();
                this.done();
            } catch (Throwable cause) {
                this.error(cause);
            }
        }

        protected abstract void error(Throwable cause);

        protected abstract void done();

        protected abstract void doExecute();

        protected abstract void doInit();
    }
    
    /*
     * 
     *                               |--->操作1
     *                               |--->操作2
     * send request-->store db-->10->|--->操作3-->给出不同的反馈
     *                                  |--->操作4
     *                                  |--->操作5
     * 
*/ private static void executeRunnableTask() throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(10, new MyThreadFactory()); IntStream.range(0, 10).boxed().forEach(i -> executorService.execute( new MyTask(i) { @Override protected void error(Throwable cause) { System.out.println("The no:" + i + " failed , update status to ERROR."); } @Override protected void done() { System.out.println("The no:" + i + " successfully , update status to DONE."); } @Override protected void doInit() { //do nothing } @Override protected void doExecute() { if(i % 3 == 0) { int tmp = 1 / 0; } } } )); executorService.shutdown(); executorService.awaitTermination(10, TimeUnit.MINUTES); System.out.println("====================================="); } public static void main(String[] args) throws InterruptedException { isShutDown(); isTerminated(); executeRunnableError(); executeRunnableTask(); } }

ExecutorService的各种策略

package com.dwz.executors;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample2 {
    
    private static void testAbortPolicy() throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy());
        
        for(int i = 0; i < 3; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        //确保三个任务都已提交
        TimeUnit.SECONDS.sleep(1);
        //执行第四个出错java.util.concurrent.RejectedExecutionException,因为最多可以同时执行三个线程
        executorService.execute(() -> System.out.println("xxxxxxxxxx"));
    }
    
    //直接拒绝,没有任何异常抛出
    private static void testDiscardPolicy() throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.DiscardPolicy());
        
        for(int i = 0; i < 3; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        //确保三个任务都已提交
        TimeUnit.SECONDS.sleep(1);
        executorService.execute(() -> System.out.println("xxxxxxxxxx"));
        System.out.println("======================");
    }
    
    //把queue中的女任务抛弃,转而执行本应被拒绝的任务
    private static void testDiscardOldestPolicy() throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.DiscardOldestPolicy());
        
        for(int i = 0; i < 3; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println("I come from lambda.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        //确保三个任务都已提交
        TimeUnit.SECONDS.sleep(1);
        executorService.execute(() -> {
            System.out.println("xxxxxxxxxxxxxxxxxxxxx");
            System.out.println(Thread.currentThread().getName());
        });
        System.out.println("======================");
    }
    
    //本来应该被拒绝的任务直接执行
    private static void testCallerRunsPolicy() throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.CallerRunsPolicy());
        
        for(int i = 0; i < 3; i++) {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        
        //确保三个任务都已提交
        TimeUnit.SECONDS.sleep(1);
        executorService.execute(() -> {
            System.out.println("xxxxxxxxxxxxxxxxxxxxx");
            System.out.println(Thread.currentThread().getName());
        });
        System.out.println("======================");
    }
    
    public static void main(String[] args) throws InterruptedException {
        testAbortPolicy();
        testDiscardPolicy();
        testDiscardOldestPolicy();
        testCallerRunsPolicy();
    }
}

ThreadPoolExecutor对CoreThread的操作

package com.dwz.executors;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class ExecutorServiceExample3 {
    
    private static void test() throws InterruptedException {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
        System.out.println(executorService.getActiveCount());
        
        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        TimeUnit.MILLISECONDS.sleep(20);
        System.out.println(executorService.getActiveCount());
        
        executorService.allowCoreThreadTimeOut(true);
    }
    
    private static void testAllowCoreThreadTimeOut() {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
        executorService.setKeepAliveTime(10, TimeUnit.SECONDS);
        //为true时keepAliveTime的时间必须大于1
        executorService.allowCoreThreadTimeOut(true);
        IntStream.range(0, 5).boxed().forEach(i -> {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        });
    }
    
    private static void testRemove() throws InterruptedException {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
        executorService.setKeepAliveTime(10, TimeUnit.SECONDS);
        //为true时keepAliveTime的时间必须大于1
        executorService.allowCoreThreadTimeOut(true);
        IntStream.range(0, 2).boxed().forEach(i -> {
            executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                    System.out.println("================ I am finished.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        });
        
        TimeUnit.MILLISECONDS.sleep(20);
        Runnable r = () -> {
            System.out.println("I will never be executed.");
        };
        executorService.execute(r);
        TimeUnit.MILLISECONDS.sleep(20);
        //将queue中的任务删除
        executorService.remove(r);
    }
    
    private static void testPrestartCoreThread() {
        ThreadPoolExecutor executorService = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
        executorService.setMaximumPoolSize(3);
        System.out.println(executorService.getActiveCount());
        int num = executorService.prestartAllCoreThreads();
        System.out.println(num);
        System.out.println(executorService.getActiveCount());
        
//        executorService.prestartCoreThread();
        System.out.println(executorService.prestartCoreThread());
        
        System.out.println(executorService.getActiveCount());
//        executorService.prestartCoreThread();
        System.out.println(executorService.prestartCoreThread());
        
        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        System.out.println(executorService.getActiveCount());
//        executorService.prestartCoreThread();
        System.out.println(executorService.prestartCoreThread());
        
        System.out.println(executorService.getActiveCount());
    }
    
    //beforeExecute和afterExecute类似于切面
    private static void testThreadPoolAdvice() {
        ThreadPoolExecutor executorService = new MyThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1), r -> {
                    Thread t = new Thread(r);
                    return t;
                }, new ThreadPoolExecutor.AbortPolicy());
        
        executorService.execute(new MyRunnable(1) {
            @Override
            public void run() {
                System.out.println("=================");
            }
        });
    }
    
    private abstract static class MyRunnable implements Runnable {
        private final int no;

        public MyRunnable(int no) {
            this.no = no;
        }
        
        protected int getData() {
            return this.no;
        }
    }
    
    private static class MyThreadPoolExecutor extends ThreadPoolExecutor {

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
        
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("init the " + ((MyRunnable)r).getData());
        }
        
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            if(null == t) {
                System.out.println("successful " + ((MyRunnable)r).getData());
            } else {
                t.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        test();
        testAllowCoreThreadTimeOut();
        testRemove();
        testPrestartCoreThread();
        testThreadPoolAdvice();
    }
}

ExecutorService的invoke和submit调用

package com.dwz.executors;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ExecutorServiceExample4 {
    /**
     * Question:
     *         When the result returned, other callable will be keep on process?
     * Answer:
     *         Other executing Callable will be canceled. 
     * {@link ExecutorService#invokeAny(Collection)}
     */
    private static void testInvokeAny() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        List> callableList = IntStream.range(0, 5).boxed().map(
                i -> (Callable) () -> {
                    TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
                    System.out.println(Thread.currentThread().getName() + " :" + i);
                    return i;
                }
        ).collect(Collectors.toList());
        //这是一个同步方法,执行完成后才会执行后面的任务
        Integer value = executorService.invokeAny(callableList);
        System.out.println("===================finished=====================");
        System.out.println(value);
    }
    
    /**
     *     timeout 异常之后其他正在执行的任务会被取消
     * {@link ExecutorService#invokeAny(java.util.Collection, long, TimeUnit)}
     */
    private static void testInvokeAnyTimeOut() throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        List> callableList = IntStream.range(0, 5).boxed().map(
                i -> (Callable) () -> {
                    TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
                    System.out.println(Thread.currentThread().getName() + " :" + i);
                    return i;
                }
        ).collect(Collectors.toList());
        //这是一个同步方法,执行完成后才会执行后面的任务
        Integer value = executorService.invokeAny(callableList, 3, TimeUnit.SECONDS);
        System.out.println("===================finished=====================");
        System.out.println(value);
    }
    
    /**
     * {@link ExecutorService#invokeAny(java.util.Collection)}
     */
    private static void testInvokeAll() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        //这是一个同步方法,执行完成后才会执行后面的任务
        executorService.invokeAll(
                IntStream.range(0, 5).boxed().map(
                        i -> (Callable) () -> {
                            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
                            System.out.println(Thread.currentThread().getName() + " :" + i);
                            return i;
                        }
                ).collect(Collectors.toList())
        ).stream().map(future -> {
            try {
                return future.get();
            } catch (Exception e) {
                throw new RuntimeException();
            }
        }).forEach(System.out::println);
        
        System.out.println("===================finished=====================");
    }
    
    /**
     * {@link ExecutorService#invokeAny(java.util.Collection, long, TimeUnit)}
     */
    private static void testInvokeAllTimeOut() throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        
        //这是一个同步方法,执行完成后才会执行后面的任务
        executorService.invokeAll(
                IntStream.range(0, 5).boxed().map(
                        i -> (Callable) () -> {
                            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(20));
                            System.out.println(Thread.currentThread().getName() + " :" + i);
                            return i;
                        }
                ).collect(Collectors.toList()), 1, TimeUnit.SECONDS
        ).stream().map(future -> {
            try {
                return future.get();
            } catch (Exception e) {
                throw new RuntimeException();
            }
        }).forEach(System.out::println);
        
        System.out.println("===================finished=====================");
    }
    
    /**
     * {@link ExecutorService#submit(Runnable)}
     */
    private static void testSubmitRunnable() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        Future future = executorService.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        //get()会产生阻塞,等待前面的任务执行完
        Object NULL = future.get();
        System.out.println("R:" + NULL);
    }
    
    /**
     * {@link ExecutorService#submit(Runnable, Object)}
     */
    private static void testSubmitRunnableWithResult() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        String result = "DONE";
        Future future = executorService.submit(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, result);
        result = future.get();
        System.out.println("R:" + result);
    }
    
    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
        testInvokeAny();
        testInvokeAnyTimeOut();
        testInvokeAll();
        testInvokeAllTimeOut();
        testSubmitRunnable();
        testSubmitRunnableWithResult();
    }
}

 

原文:https://www.cnblogs.com/zheaven/p/13468803.html

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐