AbstractQueuedSynchronizer(AQS)实现原理(上)- 独占锁
简介
AQS建立在CAS的基础上,实现了以FIFO等待队列的同步、阻塞、唤醒机制的简单框架。
继承体系
AbstractQueuedSynchronizer继承于AbstractOwnableSynchronizer,用于设置和获取访问权限的线程
AbstractQueuedSynchronizer的等待队列模型为CLH(Craig、Landin和Hagersten)队列的变种,为双链表
其内部类Node为CLH的节点
ConditionObject为Condition的实现类
什么是CLH队列?
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成 一个节点来实现锁的分配。
AQS属性
属性 | 说明 |
---|---|
head | 等待队列头节点 |
tail | 等待队列尾节点 |
state | 同步状态 |
AbstractQueuedSynchronizer#Node属性
属性 | 说明 |
---|---|
waitStatus | 当前节点在队列中的状态 |
prev | 前驱指针 |
next | 后继指针 |
thread | 表示处于该节点的线程 |
nextWaiter | 指向下一个处于CONDITION状态的节点 |
AQS锁模式
模式 | 说明 |
---|---|
SHARED | 共享模式,锁可以被多个线程持有,如在读写锁中,读模式下允许多个线程持有一把锁 |
EXCLUSIVE | 独占模式,锁只能被一个线程持有 |
waitStatus的状态值含义
对应字段定义 | 值 | 说明 |
---|---|---|
CANCELLED | 1 | 因超时或中断原因,线程获取锁的请求被取消 |
- | 0 | 线程Node初始化值 |
SIGNAL | -1 | 后继节点的线程要挂起进入等待状态 |
CONDITION | -2 | 节点在等待队列中,等待唤醒 |
PROPAGATE | -3 | 该线程处在共享模式下才会使用 |
AQS#CLH入队解析
public class AbstractQueuedSynchronizer { private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // 为当前线程创建新节点 // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 将尾部节点赋值给新前驱节点 if (pred != null) { // 前驱节点不为null node.prev = pred; // 将当前线程节点的前驱节点指向旧尾部节点 if (compareAndSetTail(pred, node)) { // 将当前线程节点设置为尾部节点 pred.next = node; // 将旧尾部节点的后继节点指向当前线程节点 return node; // 返回当前节点 } } enq(node); // 旧尾部节点为null,表示等待队列为null return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) // 设置头部节点 tail = head; // 设置尾部节点 } else { node.prev = t; // 将当前线程节点的前驱节点指向旧尾部节点 if (compareAndSetTail(t, node)) { // 设置尾部节点为当前线程节点 t.next = node; // 将旧尾部节点的后继节点指向当前线程接单 return t; // 返回 } } } } } 复制代码
从上面的代码可以看出,AQS的等待队列HEAD节点的pre、thread的属性是为null的,它的后继节点指向等待队列的第一个挂起线程的节点。
AQS入队图解
获取锁
独占锁
获取锁-acquire(int arg)
以独占模式获取,忽略中断(即使线程被中断,仍然会执行保护区的业务逻辑),成功返回,否则将线程挂起进入等待队列。
源码分析
public class AbstractQueuedSynchronizer { public final void acquire(int arg) { if (!tryAcquire(arg) && // 未获取到锁 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 1、将当前线程添加到等待队列 2、轮询队列等待队列,如果被中断,则返回true selfInterrupt(); // 中断线程 } } 复制代码
AbstractQueuedSynchronizer#acquire先是通过tryAcquire(int arg)方法获取锁(此方法的逻辑由具体的实现类确定),成功则直接返回,若失败则分 两步走,先是将线程通过addWaiter(Node mode)方法添加到等待队列,再通过acquireQueued(final Node node, int arg) 方法以不间断自旋的方式获取锁
public class AbstractQueuedSynchronizer { final boolean acquireQueued(final Node node, int arg) { // 当前线程Node,agr=1 boolean failed = true; // 获取锁失败标志 try { boolean interrupted = false; // 中断标志 for (;;) { // 轮询 final Node p = node.predecessor(); // 获取上一个节点 if (p == head && tryAcquire(arg)) { // 如果为头部节点,则尝试获取锁 setHead(node); // 设置头部节点 p.next = null; // help GC failed = false; // 锁失败标志设置为false return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && // 非头部节点 parkAndCheckInterrupt()) // 挂起当前线程并中断校验 interrupted = true; // 中断标志设置为true } } finally { if (failed) // 锁获取失败 cancelAcquire(node); // 取消锁的获取 } } } 复制代码
AbstractQueuedSynchronizer#acquireQueued是以独占不间断模式获取已在队列中的线程,当当前线程当前驱节点为头部节点时,获取锁并返回,当不为 头部节点时,通过shouldParkAfterFailedAcquire方法判断线程是否需要挂起,如果需要,则通过parkAndCheckInterrupt方法挂起线程并进行中断检测, 否则进入下一次循环,直到将线程挂起,最后等待前驱节点线程释放锁后进入最后一次循环并获取锁返回。 3. 锁获取过程
获取锁-acquireInterruptibly(int arg)
以独占模式获取,若线程中断则终止
源码分析
public class AbstractQueuedSynchronizer { public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) // 中断校验 throw new InterruptedException(); if (!tryAcquire(arg)) // 获取锁 doAcquireInterruptibly(arg); // 以独占可中断模式获取 } } 复制代码
AbstractQueuedSynchronizer#acquireInterruptibly首先做的就是中断检查,中断则直接抛出异常,同时擦除中断标记,不进入锁获取流程; 通过tryAcquire方法尝试获 取锁,失败的话执行doAcquireInterruptibly方法不间断的获取锁
public class AbstractQueuedSynchronizer { private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); // 添加到等待队列 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); // 获取前驱绩点 if (p == head && tryAcquire(arg)) { // 前驱节点为HEAD,再次获取锁 setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && // 校验是否需要挂起 parkAndCheckInterrupt()) // 线程挂起并中断校验 throw new InterruptedException(); // 抛出中断异常 } } finally { if (failed) // 获取锁失败-线程被中断 cancelAcquire(node); // 取消锁获取尝试 } } } 复制代码
通过addWaiter添加到等待队列,然后进入轮询,如果前驱节点为head节点,则尝试获取锁,否则将线程挂起。如果在挂起阶段线程被中 断,则抛出InterruptedException异常,并在finally块种处理cancelAcquire方法的逻辑
public class AbstractQueuedSynchronizer { private void cancelAcquire(Node node) { if (node == null) // node为null,直接忽略 return; node.thread = null; Node pred = node.prev; // 获取前驱节点 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 跳过取消到前驱节点 Node predNext = pred.next;// 未取消前驱节点的后继节点 node.waitStatus = Node.CANCELLED; // 将当前节点设置为取消 if (node == tail && compareAndSetTail(node, pred)) { // 如果当前节点为尾节点,则将跳过取消的前驱点位设置为尾节点 compareAndSetNext(pred, predNext, null); // 将尾节点的后继节点设置为null } else { int ws; if (pred != head && // 跳过取消的前驱点不为头节点 ((ws = pred.waitStatus) == Node.SIGNAL || // 跳过取消的前驱绩点的waitStatus为-1 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 跳过取消的前驱节点的waitStatus<=0并将其设置为-1 pred.thread != null) { // 跳过取消的前驱节点的thread不为null Node next = node.next; // 获取当前节点的后继节点 if (next != null && next.waitStatus <= 0) // 后继节点不为null&后继节点的waitStatus<=0 compareAndSetNext(pred, predNext, next); // 将跳过取消的前驱节点的后继节点指向当前节点的后继节点 } else { unparkSuccessor(node); // 唤醒节点的后继节点 } node.next = node; // help GC } } } 复制代码
AbstractQueuedSynchronizer#cancelAcquire方法略长,但分为三大块来执行但,首先是过滤掉前驱线程种被取消的线程,在这个过程中将当前线程的前驱 节点指向过滤后的前驱节点,如有等待线程1、2、3、4、5,线程4在获取锁的过程中被中断,在执行过滤的时候发现线程2、3也被取消,则将线程4的前驱节点指向 线程1。然后再判断当前线程节点是否为尾节点,如果是则设置尾节点为前驱节点,并将新尾节点的后继节点设置为null;如果不为尾部节点,在满足:
前驱节点不为头部节点; 前驱节点waitStatus=-1或waitStatus<=0时,设置waitStatus=-1看看是否成功; 前驱节点thread不为null
时,当当前节点的后继节点不为null,且后继节点的waitStatus<=0时,将过滤后的前驱节点指向当前节点的后继节点;上述三条件只要一个条件不满足,则唤醒 当前节点的后继节点。
锁获取过程
获取锁-tryAcquireNanos(int arg, long nanosTimeout)
以独占模式在给定到时间内获取锁,超时失败,如果被中断,锁获取终止
源码分析
public class AbstractQueuedSynchronizer { public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) // 判断线程是否中断 throw new InterruptedException(); return tryAcquire(arg) || // 先尝试获取锁,成功返回true,失败返回false doAcquireNanos(arg, nanosTimeout); } } 复制代码
首先判断线程是否中断,中断则抛出异常,同时擦除中断标记;其次通过tryAcquire尝试获取锁,成功返回true,失败返回false;最后调用doAcquireNanos 方法在设定到时间内获取锁
public class AbstractQueuedSynchronizer { private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) // 等待时间小于等于0直接返回false return false; final long deadline = System.nanoTime() + nanosTimeout; // 系统当前时间+等待时间 final Node node = addWaiter(Node.EXCLUSIVE); // 添加到等待队列 boolean failed = true; // 失败标志 try { for (;;) { final Node p = node.predecessor(); // 前驱节点 if (p == head && tryAcquire(arg)) { // 前驱节点为head,再次尝试获取锁 setHead(node); // 将当前节点设置为头部节点,也为等待队列到出队 p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); // 剩下等待时间 if (nanosTimeout <= 0L) // 小于0,返回false return false; if (shouldParkAfterFailedAcquire(p, node) && // 判断线程是否挂起 nanosTimeout > spinForTimeoutThreshold) // 剩余等待时间大于自旋时间 LockSupport.parkNanos(this, nanosTimeout); // 在等待时间内获取锁 if (Thread.interrupted()) // 判断是否中断 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); // 中断处理 } } } 复制代码
在进入自旋前获取超时时间、构造当前线程节点并添加到等待队列,然后在进入自旋。如果前驱节点为head,再次尝试获取锁,成功的话当前节点出队并获取锁, 失败则计算剩余等待时间,其次判断是否需要将线程挂起及剩余等待大于自旋时间时,则将线程挂起,否则将在下一次自旋时返回false,同时取消当前线程获取 锁。当线程被挂起后,如果因线程中断被唤醒,则抛出异常,同时取消当前线程获取锁。
锁获取过程
释放锁-release
以独占模式唤醒后继节点
源码分析
public class AbstractQueuedSynchronizer { public final boolean release(int arg) { if (tryRelease(arg)) { // 此方法有子类实现,用于判断是否可以唤醒后继节点 Node h = head; // 头部节点 if (h != null && h.waitStatus != 0) // 头部节点不为null,且waitStatus不为初始化状态 unparkSuccessor(h); // 唤醒节点的后继节点 return true; } return false; } } 复制代码
通过tryRelease方法判断后继节点是否可唤醒,不可唤醒返回false,否则判断头部节点是否为null且waitStatus不为0,都满足时,唤醒后继节点并返回true
public class AbstractQueuedSynchronizer { private void unparkSuccessor(Node node) { int ws = node.waitStatus; // waitStatus if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 将waitStatus设置为0 Node s = node.next; // 后继节点 // 后继节点为null,说明为尾部节点,waitStatus>0,说明为取消的锁获取的节点 if (s == null || s.waitStatus > 0) { // 后继节点为null或waitStatus>0 s = null; for (Node t = tail; t != null && t != node; t = t.prev) // 获取尾节点,并遍历其前驱节点 if (t.waitStatus <= 0) // 如果waitStatus<=0 s = t; // 将前驱节点赋值为t,进行下一次遍历 } if (s != null) LockSupport.unpark(s.thread); // 唤醒后继节点 } } 复制代码
如果waitStatus<0,将waitStatus设置为0,然后获取其后继节点,如果后继节点为null或后继节点的waitStatus>0,则需通过遍历的方式,找到需唤醒的 未取消的节点,随后将其唤醒 3. 唤醒过程
作者:bran_2015
链接:https://juejin.cn/post/7046940640089178142