线程池相关功能实现--权限后台管理系统
在使用某一个技术的时候,我觉得我们可以思考几个问题:
(1)这个技术是什么?
(2)这个技术是为了解决什么问题?
(3)这个技术和同类型的对比,为什么我们选择了它?
引言
线程池是什么
线程池(Thread Pool
)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如 MySQL
。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。
线程池维护多个线程,等待监督管理者分配可并发执行的任务。
这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
线程池作为一种池化思想的的实现,在 Java
中的实现是 TreadPoolExecutor
。
当然,使用线程池可以带来一系列好处:
降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
提高响应速度:任务到达时,无需等待线程创建即可立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池
ScheduledThreadPoolExecutor
,就允许任务延期执行或定期执行。
线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题。
在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
系统无法合理管理内部的资源分布,会降低系统的稳定性。
为解决资源分配这个问题,线程池采用了“池化”(Pooling
)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。
比较典型的几种使用策略包括:
内存池(
Memory Pooling
):预先申请内存,提升申请内存速度,减少内存碎片。连接池(
Connection Pooling
):预先申请数据库连接,提升申请连接的速度,降低系统的开销。如阿里的数据库连接池:Druid
。实例池(
Object Pooling
):循环使用对象,减少资源在初始化和释放时的昂贵损耗。Spring
的IOC
也是这种思想。
为什么我们选择了它
综合考虑,我们还是得选择 ThreadPoolExecutor。
功能
动态调参
通过 ThreadPoolExecutor
线程池类提供的 set
方法设置核心线程数、最大线程数、存活时间等进行动态调整,线程池内部做了处理,线程池内部会处理好当前状态做到平滑修改。
我们需要知道线程池内部是如何进行调整的,以便出现问题时能快速的发现并解决。
实时监控
通过 ThreadPoolExecutor
线程池类提供的一些方法,可以读取到当前线程池的运行状态以及参数。
任务监控
自定义线程池类继承 ThreadPoolExecutor
类,重写它的 beforeExecute()
、afterExecute()
方法,在对应的方法中写相应的代码逻辑进行线程池任务执行的状况日志记录。
负载告警
(1)事前,在 beforeExecutor()
方法中统计线程池的活跃度,如果超过一定的阈值就进行告警,发送消息给相应的开发负责人。
(2)事后,在 afterExecute()
方法中统计队列中等待任务的长度,如果超过一定的阈值就进行告警,还有就是通过捕获 RejectedExecutionException
异常,如果捕获到了,就进行告警,发送消息给相应的开发人员。
代码实现
线程池配置类:
/** * 线程池配置 * @ClassName TreadPoolConfig * @Author YH * @Date 2021/12/4 * @Version 1.0 */ @Configuration public class TreadPoolConfig { /** * 线程池核心线程数 */ private int coreSize = 2 * Runtime.getRuntime().availableProcessors(); /** * 线程池最大线程数 */ private int maxPoolSize = 25 * Runtime.getRuntime().availableProcessors(); /** * 队列最大长度 */ private int queueCapacity = 100; /** * 线程池维护线程所允许的空闲时间 */ private int keepAliveSeconds = 300; /** * 执行周期性或定时任务 */ @Bean(name = "scheduledExecutorService") protected ScheduledExecutorService scheduledExecutorService() { /** * ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor。 * 它主要用来在给定的延迟之后运行任务,或者定期执行任务。ScheduledThreadPoolExecutor 的 * 功能和 Timer 类似,但 ScheduledThreadPoolExecutor 功能更强大、更灵活。Timer 对应的是单个后台线程, * 而 ScheduledThreadPoolExecutor 可以在构造函数中指定多个对应的后台线程数。 * new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d") * 给线程池工厂取个名字 */ return new ScheduledThreadPoolExecutor(coreSize, new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d") .daemon(true).build()); } /** * 自定义线程池 * @return */ @Bean(name = "threadPoolExecutor") protected ThreadPoolExecutor threadPoolExecutor() { return new MyThreadPoolExecutor( this.coreSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<>(this.queueCapacity), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); } } 复制代码
动态调参
接口设计:
/** * 线程池参数动态化 * 修改线程池参数 * @param threadPoolUpdateVO 修改线程池参数视图类 * @return */ @ApiOperation("修改线程池参数") @PutMapping("/thread_pool") public Result updateThreadPool( @ApiParam("线程池名称") @Validated @RequestBody ThreadPoolUpdateVO threadPoolUpdateVO) { ThreadPoolExecutor threadPoolExecutor = null; try { // 根据名称从 Spring 容器中获取线程池实例 threadPoolExecutor = SpringUtil.getBean(threadPoolUpdateVO.getThreadPoolName()); } catch (NoSuchBeanDefinitionException e) { return Result.failure().message("无法通过线程池名称找到对应的线程池实例"); } // 修改线程池核心线程数 threadPoolExecutor.setCorePoolSize(threadPoolUpdateVO.getCorePoolSize()); // 修改线程池最大线程数 threadPoolExecutor.setMaximumPoolSize(threadPoolUpdateVO.getMaximumPoolSize()); // 修改线程池存活时间和单位 switch (threadPoolUpdateVO.getUnit()) { case 0: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.NANOSECONDS); } break; case 1: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.MICROSECONDS); } case 2: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.MILLISECONDS); } case 3: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.SECONDS); } case 4: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.MINUTES); } case 5: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.HOURS); } case 6: { threadPoolExecutor.setKeepAliveTime( threadPoolUpdateVO.getKeepAliveTime(), TimeUnit.DAYS); } } return Result.success().message("修改线程池参数成功"); } 复制代码
线程池参数修改视图类:
/** * 线程池参数修改视图类 * @ClassName ThreadPoolUpdateVO * @Author YH * @Date 2021/12/14 * @Version 1.0 */ @Data public class ThreadPoolUpdateVO { private static final long serialVersionUID = 1L; /** * 线程池名称 */ @NotBlank(groups = {}, message = "线程池名称不能为空") private String threadPoolName; /** * 核心线程数 */ private Integer corePoolSize; /** * 最大线程数 */ private Integer maximumPoolSize; /** * 存活时间 */ private Long keepAliveTime; /** * 存活时间单位,根据对应的索引转换为对应的枚举类型 * 0 NANOSECONDS;1 MICROSECONDS;2 MILLISECONDS; * 3 SECONDS;4 MINUTES;5 HOURS;6 DAYS */ private Integer unit; } 复制代码
实时监控
接口设计:
/** * 获取线程池状态 * @param threadName 线程池名称 * @return */ @ApiOperation("获取线程池状态") @GetMapping("/thread_pool") public Result getThreadPoolStatus( @ApiParam("线程池名称") @RequestParam("threadPoolName") String threadName ) { ThreadPoolExecutor threadPoolExecutor = null; try { threadPoolExecutor = SpringUtil.getBean(threadName); } catch (NoSuchBeanDefinitionException e) { return Result.failure().message("无法通过线程池名称找到对应的线程池实例") .errorMessage("无法通过线程池名称找到对应的线程池实例"); } // 线程池核心线程数 int corePoolSize = threadPoolExecutor.getCorePoolSize(); // 线程池允许的最大线程数量 int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); // 线程池中活动线程的数量 int activeCount = threadPoolExecutor.getActiveCount(); // 线程池的拒绝策略 RejectedExecutionHandler handler = threadPoolExecutor.getRejectedExecutionHandler(); String handlerName = handler.getClass().getSimpleName(); // 线程池中当前线程的数量 int poolSize = threadPoolExecutor.getPoolSize(); // 线程池的队列类型 BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue(); String queueName = queue.getClass().getSimpleName(); // 线程池执行的任务总数 long taskCount = threadPoolExecutor.getTaskCount(); // 线程池的线程工厂 ThreadFactory threadFactory = threadPoolExecutor.getThreadFactory(); String threadFactoryName = threadFactory.getClass().getSimpleName(); // 线程池曾经最大数量的线程,可以通过这个参数判断线程池是否在某个时间点满过 int largestPoolSize = threadPoolExecutor.getLargestPoolSize(); // 线程池已经完成的任务总数 long completedTaskCount = threadPoolExecutor.getCompletedTaskCount(); // 线程池中线程的存活时间,以秒为单位 long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS); // 线程池是否关闭 boolean shutdown = threadPoolExecutor.isShutdown(); // 线程池是否终止 boolean terminated = threadPoolExecutor.isTerminated(); // 线程池是否结束 boolean terminating = threadPoolExecutor.isTerminating(); Map<String, Object> map = new HashMap<>(); map.put("corePoolSize", corePoolSize); map.put("maximumPoolSize", maximumPoolSize); map.put("activeCount", activeCount); map.put("poolSize", poolSize); map.put("handler", handlerName); map.put("queue", queueName); map.put("taskCount", taskCount); map.put("threadFactory", threadFactoryName); map.put("largestPoolSize", largestPoolSize); map.put("completedTaskCount", completedTaskCount); map.put("keepAliveTime", keepAliveTime); map.put("shutdown", shutdown); map.put("terminated", terminated); map.put("terminating", terminating); return Result.success(map).message("获取线程池状态成功"); } 复制代码
任务监控
自定义线程池:
/** * 基础线程池类进行功能扩展 * @ClassName MyThreadPoolExecutor * @Author YH * @Date 2021/12/15 * @Version 1.0 */ @Slf4j public class MyThreadPoolExecutor extends ThreadPoolExecutor { /** * 任务开始执行的时间点 */ private ConcurrentHashMap<String, Date> startTimes; /** * 线程池名称 * 通过构造方法加入 */ private String threadPoolName; public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { // super 方法必须放在第一行代码 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.startTimes = new ConcurrentHashMap<>(); } public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.startTimes = new ConcurrentHashMap<>(); } public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); this.startTimes = new ConcurrentHashMap<>(); } public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.startTimes = new ConcurrentHashMap<>(); } /** * 任务执行之前要执行的方法 * 记录开始时间 * @param t * @param r */ @Override protected void beforeExecute(Thread t, Runnable r) { startTimes.put(String.valueOf(r.hashCode()), new Date()); double activity = (this.getActiveCount() * 1.0) / this.getMaximumPoolSize() * 100; // 如果线程池活跃度超过指定阈值,发送消息给开发负责人 if (activity > 0.8) { // 发送邮件给对应的负责人 } log.info("线程池活跃度为(activeCount/maximumPoolSize):" + activity + "%"); // 如果队列长度超过指定阈值,发送消息给开发负责人 log.info("等待队列长度:" + this.getQueue().size()); } /** * 任务执行之后要执行的方法 * 记录结束时间,以及通过开始时间计算出总的时间 * @param r * @param t */ @Override protected void afterExecute(Runnable r, Throwable t) { // 任务开始时间 Date startDate = startTimes.remove(String.valueOf(r.hashCode())); // 任务结束时间 Date endDate = new Date(); // 任务执行时长 long executionTime = endDate.getTime() - startDate.getTime(); // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、已完成任务数量、任务总数、 // 队列里缓存的任务数量、池中存在的最大线程数、最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止 log.info(String.format("线程池监控:" + this.threadPoolName + "-pool-monitor: Duration: %d ms, PoolSize: %d, CorePoolSize: " + "%d, Active: %d, Completed: %d, Task: %d, Queue: %d, LargestPoolSize: " + "%d, MaximumPoolSize: %d,KeepAliveTime: %d, isShutdown: %s, isTerminated: %s", executionTime, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated())); } } 复制代码
负载告警
/** * 任务执行之前要执行的方法 * 记录开始时间 * @param t * @param r */ @Override protected void beforeExecute(Thread t, Runnable r) { startTimes.put(String.valueOf(r.hashCode()), new Date()); double activity = (this.getActiveCount() * 1.0) / this.getMaximumPoolSize() * 100; // 如果线程池活跃度超过指定阈值,发送消息给开发负责人 if (activity > 0.8) { // 发送邮件给对应的负责人 } log.info("线程池活跃度为(activeCount/maximumPoolSize):" + activity + "%"); // 如果队列长度超过指定阈值,发送消息给开发负责人 log.info("等待队列长度:" + this.getQueue().size()); } 复制代码
背后的原理
运行状态
ThreadPoolExecutor
的运行状态有 5 种,分别为:
任务调度过程
execute()
方法为入口。其调度过程如下:
首先检测线程池运行状态,如果不是
RUNNING
,则直接拒绝,线程池要保证在RUNNING
的状态下执行任务。如果
workerCount < corePoolSize
,则创建并启动一个线程来执行新提交的任务。如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
任务缓冲
任务缓冲模块是线程池能够管理任务的核心部分。
线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。
线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:
(1)在队列为空时,获取元素的线程会等待队列变为非空。
(2)当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
使用不同的队列可以实现不一样的任务存取策略。
任务申请
任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。
线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由 getTask()
方法实现,其执行流程如下图所示:
getTask()
这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。
任务拒绝
任务拒绝模块是线程池的保护部分,线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize
时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
拒绝策略是一个接口,其设计如下:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); } 复制代码
用户可以通过实现这个接口去定制拒绝策略,也可以选择 JDK
提供的四种已有拒绝策略,其特点如下:
作者:热爱可抵漫长岁月
链接:https://juejin.cn/post/7059000265092890661