阅读 95

线程池系列三:动态修改线程池队列大小

线程池中的队列要求的是阻塞队列,作用主要是当线程池处理任务能力不足时,队列存储多余的任务,从而起到削峰和缓冲的目的。

可以选择的队列种类很多,如何选择合适的队列应用到自己的线程池中?就需要了解他们的优缺点,从而择优使用 在这里插入图片描述

1、常见阻塞队列

常见的阻塞队列都是以基于BlockingQueue的实现

ArrayBlockingQueue 一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

LinkedBlockingQueue 一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),此队列按FIFO (先进先出) 排序元素。Executors的几个静态线程池工厂方法大部分都是使用这个队列

SynchronousQueue 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态;同理,当每个读操作的时候,同样需要一个相匹配的写操作。这里的 Synchronous 指的就是读写操作需要同步,一个读操作对应一个写操作。 注:吞吐量通常要高于LinkedBlockingQueue。静态工厂方法Executors.newCachedThreadPool使用了这个队列。

DelayQueue 是一个支持延时获取元素的无界阻塞队列。内部是基于PriorityQueue的实现。

PriorityBlockingQueue 一个具有优先级的无限阻塞队列。只能指定初始的队列大小,后面插入元素的时候,如果空间不够的话会自动扩容 注:它是无界队列,put操作不会阻塞,但take方法在队列为空的时候会阻塞队列元素不可以插入null值,同时插入队列的元素必须是可比较大小的(comparable),否则报 ClassCastException 异常

2、最常使用的两种队列

2.1、ArrayBlockingQueue 是一个基于数组结构的有界阻塞队列,底层结构是一个数组

 /** The queued items */ final Object[] items; 创建队列时,在构造器中创建的数组对象(最大的容量在创建时就确定了) /** items index for next take, poll, peek or remove */ int takeIndex; 外界下次从队列获取数据时,指定从队列的takeIndex下标获取 /** items index for next put, offer, or add */ int putIndex; 外界下次向队列存入数据时,指定从队列的putIndex下标存入 /** Number of elements in the queue */ int count; 队列中实际存储数据的数量(即数组中真正有数据的元素数量) /** Main lock guarding all access */ final ReentrantLock lock; 外界存储和读取队列数据时,存在并发情况。 这里使用一把锁同时控制读写数据。因此实际上读写是串行的。 正因如此,count的类型是int而非AtomicInteger,不需要考虑线程安全 /** Condition for waiting takes */ private final Condition notEmpty; lock的Condition控制,含义是非空。 即队列无数据时,notEmpty.await()阻塞;有数据时,notEmpty.signal()。 从而控制线程间调度 /** Condition for waiting puts */ private final Condition notFull; 同样是lock的Condition控制,含义是非满, 即队列数据数量达到最大时,notFull.await()阻塞; 有数据时,notFull.signal()。从而控制线程间调度 复制代码

在这里插入图片描述

外部存储数据时,从头开始向后遍历数组插入数据,并记录偏移量,供下次从偏移量位置再次存储;

读取数据时也是一样,从头开始向后遍历数组读取并删除数据,记录偏移量供下次从偏移量位置再次读取

另外考虑一件问题:由于数组有界,总会读写到最后一个元素。数组前部分读取后置空,如果不再使用就浪费了。而后部分到达了尾部,队列不能再插入数据了

这就引入ArrayBlockingQueue的一个设计,即到达尾部后,若头部空置,则复用头部资源。

让线性的有界数组,在逻辑上成为环形数组,从而达到资源复用的目的。 在这里插入图片描述

2.2、LinkedBlockingQueue

一个基于链表结构的有界阻塞队列(不设置大小时,默认为Integer.MAX_VALUE),底层结构是一个单向链表

/** The capacity bound, or Integer.MAX_VALUE if none */ private final int capacity; 创建队列时,在构造器中指定队列最大的容量 /**Head of linked list.Invariant: head.item == null*/ transient Node<E> head; 链表的头部节点 这里的头部节点其实并没有存储真实数据,下一个节点才开始存储。 这是链表常用的结构,通过在头部增加一个空节点,从而使所有有效节点都是 链表中间节点,增删是可以统一处理。否则就要区分头部节点和中间节点, 导致区分处理了。 /**Tail of linked list.Invariant: last.next == null*/ private transient Node<E> last; 链表的尾部节点 /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); 外部多线程存储数据时,存在并发场景。因此使用putLock和notFull加锁控制。 /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); 外部多线程读取数据时,存在并发场景。因此使用takeLock和notEmpty加锁控制。 /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); 链表中实际存储数据的数量,含义对应ArrayBlockingQueue的count。 但它的类型是AtomicInteger,由于链表读写为不同的锁,存在并发场景, 因此加减数量时要考虑并发安全问题 复制代码

在这里插入图片描述

外部存储数据时在尾部插入数据;而读取数据时则从头部读取并删除节点数据

3、ArrayBlockingQueue和LinkedBlockingQueue的选择

在这里插入图片描述


ArrayBlockingQueueLinkedBlockingQueue
底层结构数组
(逻辑上环形数组)
单向链表
是否有界有界阻塞队列
(大小必须声明)
有界阻塞队列
(不声明则默认为Integer.MAX_VALUE)
公平锁可配置是否使用公平锁
(默认非公平)
仅支持非公平锁
读写共用一把ReentrantLock锁
两个condition判断满空状态
读写分别使用一把ReentrantLock锁
使用各自的condition判断满空状态
count共用一把锁,因此队列内无并发,类型为int读写两把锁,队列内存在并发,因此类型为AtomicInteger
其他
由于是链表,因此插入的数据要多创建一个Node对象存,会对GC有影响

其他异同点:

  • LinkedBlockingQueue底层由于是链表,因此插入数据时要多创建一个Node对象,因此会对GC有影响

  • ArrayBlockingQueue从头开始遍历进行读写;而LinkedBlockingQueue则为链尾加元素,链尾取元素

  • LinkedBlockingQueue读写各加一把锁,通常比ArrayBlockingQueue具有更高的吞吐量,但是在大多数并发应用程序中,可预测的性能较差。



4、自定义队列大小

无论是ArrayBlockingQueue和LinkedBlockingQueue,他们的队列大小都是不可变的。

ArrayBlockingQueue底层是数组,大小固定。

而LinkedBlockingQueue的capacity则被final修饰,不可修改。

而我们实际项目中,往往需要根据业务实际需要调整队列大小。那么如果实现队列的大小可变?

如果我们想基于ArrayBlockingQueue进行改造,但修改大小必然要涉及到重新创建数组,以及新旧数组的数据迁移问题,有些复杂。

如果考虑基于LinkedBlockingQueue进行改造,我们只要将修饰capacity的final去掉即可实现动态调整。但有一个问题,LinkedBlockingQueue具体实现中很多是基于capacity不变进行的设计,因此我们需要将涉及的功能进行调整

调整思路: 1、capacity可修改大小:去掉修饰词final,增加set方法便可动态调整 2、梳理受影响的范围:将代码中应用capacity的功能进行梳理,通过capacity的改动使其兼容正常功能


具体进行以下调整,其他内容不变:

4.1、类名修改 将LinkedBlockingQueue的代码实现拷贝并修改类名为ResizeLinkedBlockingQueue

4.2、将capacity的修饰词final去掉,增加volatile修饰词。改动后使其立即生效。并设置capacity的set方法,并在set时限制大小

图片

4.3、梳理capacity所涉及的各个应用点,进行调整 主要是存储数据的几个方法条件判断时进行改动 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

在这里插入图片描述 在这里插入图片描述


作者:zy苦行僧
链接:https://juejin.cn/post/7023014251480481799


文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐