阅读 16 SEO

Java 线程池监控

对 ThreadPoolExecutor 各个属性解释得挺全的 Java并发(六)线程池监控

背景

业务使用线程池的时候,出现了问题,影响线上业务,由于没有线程池监控,导致问题难以发现和排查。于是需要这么一个线程池监控组件,用来监控线程池执状态,任务执行状态等。

实现方式

ThreadPoolExecutor 提供了以下几个方法可以监控线程池的使用情况:

方法 含义
getActiveCount() 线程池中正在执行任务的线程数量
getCompletedTaskCount() 线程池已完成的任务数量,该值小于等于taskCount
getCorePoolSize() 线程池的核心线程数量
getLargestPoolSize() 线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize
getMaximumPoolSize() 线程池的最大线程数量
getPoolSize() 线程池当前的线程数量
getTaskCount() 线程池已经执行的和未执行的任务总数

通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法, afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。极简示例如下,hello~~

@Slf4j
public class ThreadPoolMonitor extends ThreadPoolExecutor {
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        log.info("init");
    }
    @Override
    public void shutdown() {
        log.info("shutdown");
        super.shutdown();
    }
    @Override
    public List<Runnable> shutdownNow() {
        log.info("shutdownNow");
        return super.shutdownNow();
    }
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        log.info("beforeExecute");
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        log.info("afterExecute");
    }
}

实战应用

上面是已经说明该组件的实现方式,但是在生产环境中,面对业务的复杂度高、变数大,我们应该如何实现一个高可拓展的线程池监控组件呢?这是这一小节的内容主题。

1. 使用示例

先来个 hello world 演示效果

@Test
public void helloWorld() throws InterruptedException {
    //创建线程池对象
    MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor("被监控的线程池1", 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new MonitorConfig().setQueueSlowTime(100).setTaskSlowTimeThreshold(100));
    monitoredThreadPoolExecutor.execute(() -> {
        log.info("任务1开始……");
        try {
            Thread.sleep(RandomUtils.nextInt(1000, 2000));
        } catch (InterruptedException ignore) {
        }
        log.info("任务1完成……");
    });
    monitoredThreadPoolExecutor.execute(() -> {
        log.info("任务2开始……");
        try {
            Thread.sleep(RandomUtils.nextInt(0, 100));
        } catch (InterruptedException ignore) {
        }
        log.info("任务2完成……");
    });
    Thread.currentThread().join(5 * 1000);
}

hello world 程序运行日志如下:

Connected to the target VM, address: '127.0.0.1:62724', transport: 'socket'
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 提交任务数+1
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 提交任务数+1
[被监控的线程池1_1] INFO MonitoredThreadPoolExecutorTest - 任务2开始……
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutorTest - 任务1开始……
[被监控的线程池1_1] INFO MonitoredThreadPoolExecutorTest - 任务2完成……
[被监控的线程池1_1] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 任务排队时间 = 1, 任务执行时间 = 86
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 1, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 2
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutorTest - 任务1完成……
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 任务排队时间 = 2, 任务执行时间 = 1452
[被监控的线程池1_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池1, 执行慢任务数+1
[ThreadPoolMonitor_1] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
[ThreadPoolMonitor_2] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池1, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 2, 最大线程数 = 2, 执行的任务总数 = 0
from the target VM, address: '127.0.0.1:62724', transport: 'socket'

2. 详细使用示例

监控方式

线程池的监控分为2种类型,一种是在执行任务前后全量统计任务排队时间和执行时间,另外一种是通过定时任务,定时获取活跃线程数,队列中的任务数,核心线程数,最大线程数等数据。

MonitoredThreadPoolExecutor 会同时统计这两种类型的数据。如果您不想统计全量任务执行和排队的监控数据,可以使用 ThreadPoolMonitor.monitor(String name, ThreadPoolExecutor threadPoolExecutor) 方法,该方法只使用定时任务来监控线程数据。其中,name 需要唯一,threadPoolExecutor 不能是MonitoredThreadPoolExecutor 类型,否则会抛出异常。

监控参数

  • poolName :线程池名称。必须为每个线程池创建不同的名称,否则会抛出异常。可以将其作为监控平台的id,通过名称找到对应的监控数据。

  • monitorConfig :监控配置参数。其中可以设置两个参数,taskSlowTimeThreshold和 queueSlowTimeThreshold。如果taskSlowTime指定为100,则表示任务执行时间大于100ms的任务会统计为慢任务,在监控中可以看到慢任务的数量。同样的,queueSlowTime指定为100,表示排队时间大于100ms的任务统计为排队慢任务,可以在监控中看到排队慢任务的数量。

其他参数和JDK中线程池参数意义相同。

使用示例代码

@Slf4j
public class MonitoredThreadPoolExecutorTest {

    @Test
    public void helloWorld() throws InterruptedException {
        //创建线程池对象
        MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor("被监控的线程池1", 2, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1), new MonitorConfig().setQueueSlowTimeThreshold(100).setTaskSlowTimeThreshold(100));
        monitoredThreadPoolExecutor.execute(() -> {
            log.info("任务1开始……");
            try {
                Thread.sleep(RandomUtils.nextInt(1000, 2000));
            } catch (InterruptedException ignore) {
            }
            log.info("任务1完成……");
        });
        monitoredThreadPoolExecutor.execute(() -> {
            log.info("任务2开始……");
            try {
                Thread.sleep(RandomUtils.nextInt(0, 100));
            } catch (InterruptedException ignore) {
            }
            log.info("任务2完成……");
        });
        Thread.currentThread().join(5 * 1000);
    }

    @Test
    public void ThreadPoolExecutorTest() throws InterruptedException {
        //线程池需要指定唯一的线程池名称,否则会抛出异常
        String uniqPoolName = "被监控的线程池2";
        MonitoredThreadPoolExecutor monitoredThreadPoolExecutor = new MonitoredThreadPoolExecutor(uniqPoolName, 1, 4, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<>(256), new MonitorConfig().setQueueSlowTimeThreshold(100).setTaskSlowTimeThreshold(100)) {
            //如果想在任务执行开始或者执行结束时,执行一些操作,覆盖afterExecute0(Runnable r, Throwable t)和beforeExecute0(Thread t, Runnable r),注意方法名称后面有0
            @Override
            public void afterExecute0(Runnable r, Throwable t) {
                log.info("增强afterExecute0");
            }

            @Override
            public void beforeExecute0(Thread t, Runnable r) {
                log.info("增强beforeExecute0");
            }
        };

        //使用方式和ThreadPoolExecutor完全相同
        for (int i = 0; i < 3; i++) {
            int taskId = i;
            monitoredThreadPoolExecutor.submit(() -> log.info("任务{}", taskId));
        }

        for (int i = 0; i < 3; i++) {
            monitoredThreadPoolExecutor.submit(() -> {
                int id = RandomUtils.nextInt();
                log.info("生成id = {}", id);
                return id;
            });
        }
        //使用结束后,如果需要再创建相同名称的线程池,则需要调用remove方法移除定时任务。
        ThreadPoolMonitor.remove(uniqPoolName);
        //关闭线程池,在一段时间内会关闭所有监控的定时任务
        monitoredThreadPoolExecutor.shutdown();
        Thread.currentThread().join(2 * 1000);
    }

}

示例代码运行日志

[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[ThreadPoolMonitor_0] INFO PoolMonitorTask - 线程池名称 = 被监控的线程池2, 活跃线程数峰值 = 0, 队列任务数峰值 = 0, 核心线程数 = 1, 最大线程数 = 4, 执行的任务总数 = 0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务1
[main] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 提交任务数+1
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 任务2
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 488282372
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 0, 任务执行时间 = 3
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 1176017668
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 4, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强beforeExecute0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 生成id = 463520743
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutor - 线程池名称 = 被监控的线程池2, 任务排队时间 = 4, 任务执行时间 = 0
[被监控的线程池2_0] INFO MonitoredThreadPoolExecutorTest - 增强afterExecute0

3. 主要实现

详情代码看仓库吧~~

  • ThreadPoolMonitor 负责线程池与监控方法的管理;
public class ThreadPoolMonitor {

    private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();
    private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));

    private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;

    public ThreadPoolMonitor() {
    }

    public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {
        if (threadPoolExecutor instanceof MonitoredThreadPoolExecutor) {
            throw new IllegalArgumentException("MonitoredThreadPoolExecutor is already monitored.");
        } else {
            monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
        }
    }

    public static void remove(String name) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);
        if (futureWrapper != null) {
            futureWrapper.future.cancel(false);
        }

    }

    public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);
        if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {
            POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);
            futureWrapper.future.cancel(false);
        }

    }

    static void monitor(MonitoredThreadPoolExecutor threadPoolExecutor) {
        monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
    }

    private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {
        PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);
        POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {
            if (v == null) {
                return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);
            } else {
                throw new IllegalStateException("duplicate pool name: " + name);
            }
        });
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));
    }

    static class FutureWrapper {
        private final Future<?> future;
        private final ThreadPoolExecutor threadPoolExecutor;

        public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) {
            this.future = future;
            this.threadPoolExecutor = threadPoolExecutor;
        }
    }
}
  • 给异步任务Runnable套一个壳,让他该任务可监控;
public class MonitoredRunnable implements Runnable, Monitored {

    private final Runnable runnable;
    private final long inQueueNanoTime;

    public MonitoredRunnable(Runnable runnable) {
        this.runnable = runnable;
        this.inQueueNanoTime = System.nanoTime();
    }

    @Override
    public long inQueueNanoTime() {
        return this.inQueueNanoTime;
    }

    @Override
    public void run() {
        this.runnable.run();
    }
}
  • MonitoredThreadPoolExecutor 继承 ThreadPoolExecutor 覆盖其方法做监控统计增强;
@Slf4j
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadLocal<Long> executeStartTimeThreadLocal;
    protected String poolName;
    private final int slowTaskThreshold;
    private final int queueTimeThreshold;
    private static final int DEFAULT_SLOW_TASK_TIME = 5000;
    private static final int DEFAULT_QUEUE_TIME = 100;


    @Override
    public void execute(Runnable command) {
        log.info("线程池名称 = {}, 提交任务数+1", this.poolName());
        super.execute(new MonitoredRunnable(command));
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), new AbortPolicy(), new MonitorConfig());
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, new MonitorConfig());
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, MonitorConfig monitorConfig) {
        this(poolName, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new NamedThreadFactory(poolName), new AbortPolicy(), monitorConfig);
    }

    public MonitoredThreadPoolExecutor(String poolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, MonitorConfig monitorConfig) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new MonitorRejectedExecutionHandler(handler, poolName));
        this.executeStartTimeThreadLocal = new ThreadLocal<>();
        this.poolName = poolName;
        this.slowTaskThreshold = monitorConfig.getTaskSlowTimeThreshold() > 0 ? monitorConfig.getTaskSlowTimeThreshold() : DEFAULT_SLOW_TASK_TIME;
        this.queueTimeThreshold = monitorConfig.getQueueSlowTimeThreshold() > 0 ? monitorConfig.getQueueSlowTimeThreshold() : DEFAULT_QUEUE_TIME;
        ThreadPoolMonitor.monitor(this);
    }

    @Override
    protected final void beforeExecute(Thread t, Runnable r) {
        try {
            this.beforeExecute0(t, r);
        } finally {
            this.executeStartTimeThreadLocal.set(System.nanoTime());
        }
    }

    @Override
    protected final void afterExecute(Runnable r, Throwable t) {
        this.afterExecuteMonitor(r, t);
        this.afterExecute0(r, t);
    }

    private void afterExecuteMonitor(Runnable r, Throwable t) {
        try {
            long executeEndNano = System.nanoTime();
            Long executeStartTime = this.executeStartTimeThreadLocal.get();
            Monitored monitored = (Monitored) r;
            long queueNanoTime = monitored.inQueueNanoTime();
            int queueTime = (int) ((executeStartTime - queueNanoTime) / 1000000L);
            int executeTime = (int) ((executeEndNano - executeStartTime) / 1000000L);
            log.info("线程池名称 = {}, 任务排队时间 = {}, 任务执行时间 = {}",
                    this.poolName(), queueTime, executeTime);
            if (executeTime > this.slowTaskThreshold) {
                log.info("线程池名称 = {}, 执行慢任务数+1", this.poolName());
            }
            if (queueTime > this.queueTimeThreshold) {
                log.info("线程池名称 = {}, 排队慢任务数+1", this.poolName());
            }

            if (t != null) {
                log.info("线程池名称 = {}, 执行异常的任务数+1", this.poolName());
            }
        } catch (Exception ignore) {
        } finally {
            executeStartTimeThreadLocal.remove();
        }
    }

    protected void beforeExecute0(Thread t, Runnable r) {
    }

    protected void afterExecute0(Runnable r, Throwable t) {
    }

    @Override
    protected final void terminated() {
        ThreadPoolMonitor.remove(this.poolName(), this);
    }

    public String poolName() {
        return this.poolName;
    }
}
  • PoolMonitorTask 定时收集线程池监控项的任务实现;
@Slf4j
@Getter
public class PoolMonitorTask implements Runnable {

    private final ThreadPoolExecutor monitoredThreadPool;
    private final String poolName;
    private volatile long lastTaskCount = 0L;

    public PoolMonitorTask(ThreadPoolExecutor monitoredThreadPool, String poolName) {
        this.monitoredThreadPool = monitoredThreadPool;
        this.poolName = poolName;
    }

    @Override
    public void run() {
        int activeCount = this.monitoredThreadPool.getActiveCount();
        int corePoolSize = this.monitoredThreadPool.getCorePoolSize();
        int maximumPoolSize = this.monitoredThreadPool.getMaximumPoolSize();
        int queueTaskSize = this.monitoredThreadPool.getQueue().size();
        long taskCount = this.monitoredThreadPool.getTaskCount();
        int executedTask = (int) (taskCount - this.lastTaskCount);
        log.info("线程池名称 = {}, 活跃线程数峰值 = {}, 队列任务数峰值 = {}, 核心线程数 = {}, 最大线程数 = {}, 执行的任务总数 = {}",
                this.poolName, activeCount, queueTaskSize, corePoolSize, maximumPoolSize, executedTask);
        this.lastTaskCount = taskCount;
        if (this.monitoredThreadPool.isTerminated()) {
            ThreadPoolMonitor.remove(this.poolName, this.monitoredThreadPool);
        }
    }

}
  • ThreadPoolMonitor 线程池监控者,负责线程池与监控方法的管理,定时采集任务的执行者;
public class ThreadPoolMonitor {

    private static final Map<String, FutureWrapper> POOL_TASK_FUTURE_MAP = new ConcurrentHashMap<>();
    private static final ScheduledThreadPoolExecutor SCHEDULE_THREAD_POOL = new ScheduledThreadPoolExecutor(8, new NamedThreadFactory("ThreadPoolMonitor"));

    private static final Long DEFAULT_MONITOR_PERIOD_TIME_MILLS = 1000L;

    public ThreadPoolMonitor() {
    }

    public static void monitor(String name, ThreadPoolExecutor threadPoolExecutor) {
        if (threadPoolExecutor instanceof MonitoredThreadPoolExecutor) {
            throw new IllegalArgumentException("MonitoredThreadPoolExecutor is already monitored.");
        } else {
            monitor0(name, threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
        }
    }

    public static void remove(String name) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.remove(name);
        if (futureWrapper != null) {
            futureWrapper.future.cancel(false);
        }

    }

    public static void remove(String name, ThreadPoolExecutor threadPoolExecutor) {
        ThreadPoolMonitor.FutureWrapper futureWrapper = POOL_TASK_FUTURE_MAP.get(name);
        if (futureWrapper != null && futureWrapper.threadPoolExecutor == threadPoolExecutor) {
            POOL_TASK_FUTURE_MAP.remove(name, futureWrapper);
            futureWrapper.future.cancel(false);
        }

    }

    static void monitor(MonitoredThreadPoolExecutor threadPoolExecutor) {
        monitor0(threadPoolExecutor.poolName(), threadPoolExecutor, DEFAULT_MONITOR_PERIOD_TIME_MILLS);
    }

    private static void monitor0(String name, ThreadPoolExecutor threadPoolExecutor, long monitorPeriodTimeMills) {
        PoolMonitorTask poolMonitorTask = new PoolMonitorTask(threadPoolExecutor, name);
        POOL_TASK_FUTURE_MAP.compute(name, (k, v) -> {
            if (v == null) {
                return new ThreadPoolMonitor.FutureWrapper(SCHEDULE_THREAD_POOL.scheduleWithFixedDelay(poolMonitorTask, 0L, monitorPeriodTimeMills, TimeUnit.MILLISECONDS), threadPoolExecutor);
            } else {
                throw new IllegalStateException("duplicate pool name: " + name);
            }
        });
    }

    static {
        Runtime.getRuntime().addShutdownHook(new Thread(ThreadPoolMonitor.SCHEDULE_THREAD_POOL::shutdown));
    }

    static class FutureWrapper {
        private final Future<?> future;
        private final ThreadPoolExecutor threadPoolExecutor;

        public FutureWrapper(Future<?> future, ThreadPoolExecutor threadPoolExecutor) {
            this.future = future;
            this.threadPoolExecutor = threadPoolExecutor;
        }
    }
}

作者:欢乐的弹跳机

原文链接:https://www.jianshu.com/p/fd7b235eeb23

文章分类
后端
文章标签
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 gxwowoo@163.com 举报,一经查实,本站将立刻删除。
相关推荐