阅读 124

AQS——AbstractQueuedSynchronizer 同步框架(基于JDK14)

AQS——AbstractQueuedSynchronizer 同步框架(基于JDK14)

1、结构说明

AbstractQueuedSynchronizer 继承 AbstractOwnableSynchronizer,含有五个内部类,其中比较重要的是Node,ConditionNode及ConditionObject。下面是具体类图(由于AbstractQueuedSynchronizer 方法太多,此处已屏蔽大多数方法):

AQS类图.png

AbstractOwnableSynchronizer:

AbstractQueuedSynchronizer 的父类,主要是设置当前独占锁的线程:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    /**
     * Empty constructor for use by subclasses.
     */
    protected AbstractOwnableSynchronizer() { }

    /**
     * The current owner of exclusive mode synchronization.
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * Sets the thread that currently owns exclusive access.
     * A {@code null} argument indicates that no thread owns access.
     * This method does not otherwise impose any synchronization or
     * {@code volatile} field accesses.
     * @param thread the owner thread
     * 设置独占锁
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * Returns the thread last set by {@code setExclusiveOwnerThread},
     * or {@code null} if never set.  This method does not otherwise
     * impose any synchronization or {@code volatile} field accesses.
     * @return the owner thread
     * 获取独占锁
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

Node:节点类,AQS队列的节点,包含prev,next,由此可以看出,AQS队列是个基于链表实现的双向队列。waiter为在该节点的线程,既等待获取锁的线程。status用来标识当前节点线程的状态,有以下值:

表示节点的状态。其中包含的状态有(此处有待):

  1. CANCELLED,值小于0,表示当前的线程被取消;
  2. COND,值为 2,表示当前节点在等待condition,也就是在condition队列中(此时Node的实际值是3);
  3. WAITING,值为1,表示当前节点在sync队列中,等待着获取锁;

ExclusiveNode:独占锁节点,继承Node,无扩展;

SharedNode:共享锁节点,继承Node,无扩展;

ConditionNode:继承Node,与Node相比,多了nextWaiter属性(),为Condition队列的节点;

// Node status bits, also used as argument and return values
static final int WAITING   = 1;          // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND      = 2;          // in a condition wait

static final class ConditionNode extends Node
 implements ForkJoinPool.ManagedBlocker {
 ConditionNode nextWaiter;            // link to next waiting node

 /**
 * Allows Conditions to be used in ForkJoinPools without
 * risking fixed pool exhaustion. This is usable only for
 * untimed Condition waits, not timed versions.
 */
 public final boolean isReleasable() {
 return status <= 1 || Thread.currentThread().isInterrupted();
 }

 public final boolean block() {
 while (!isReleasable()) LockSupport.park();
 return true;
 }
}

ConditionObject:实现了Condition的接口,提供操作Condition队列方法,主要包含Condition类型的firstWaiter及lastWaiter属性(队列头及队列尾);这个类是为了让子类支持独占模式的。await()、sign()方法就是让线程阻塞、加入队列、唤醒线程。AQS框架下基本各种独占的加锁,解锁等操作到最后都是基于这个类实现的。

该类是提供给子类去使用的,在Reentrantlock有相关的使用。有人可能觉得为什么实现这个内部类,又不用,而是给子类去用,那为什么不放到子类去呢?其实答案,很简单,抽象加模板模式

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient ConditionNode firstWaiter;
    /** Last node of condition queue. */
    private transient ConditionNode lastWaiter;
    ...
 }

从以上的结构可以看出,AQS中包含两个队列,一个是由Node组成的双向队列,也就是同步队列,另一个则是由ConditionNode组成的单向队列,即Condition队列。Condition队列由ConditionObject操作,ConditionObject实现Condition接口。

两个队列的关系如下图所示(此图来自网络):

同步队列及Condition队列.png

2、同步队列

AQS是一个基础的同步框架,只是定义了部分基础的方法,具体实现部分交由子类去实现,以便扩展不同的功能。

获取锁的流程如下:


aqs.png

可以看出来,在线程未暂停之前,每进行一次循环,都会调用tryAcquire/tryAcquireShared方法。

加入同步队列(同时,离队也在该方法中实现):

//子类进入同步队列时,调用此方法
public final void acquire(int arg) {
    //tryAcquire方法交由子类去实现
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}


final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    //first表示当前node是否是head的下一个节点,在出队时需要
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    /*
     * Repeatedly:
     *  Check if node now first
     *    if so, ensure head stable, else ensure valid predecessor
     *  if node is first or not yet enqueued, try acquiring
     *  else if node not yet created, create it
     *  else if not yet enqueued, try once to enqueue
     *  else if woken from park, retry (up to postSpins times)
     *  else if WAITING status not set, set and retry
     *  else park and clear WAITING status, and check cancellation
     */
    //第一次调用时,此时的node必定为null(conditionNode获取锁时,此时node不为null)
    //当初始化node之后,node的prev、next等都是null
    for (;;) {
        //first == false 且 node不为null,node的前置节点不为null,node的前置节点不是head节点
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            //node 的status < 0
            if (pred.status < 0) {
                //清除已取消的节点
                //这个方法暂时没看懂
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                //进入此步的情景:同步队列只有一个节点(即head的prev才会等于null),但是if条件中
                //限制了这种情景
                //这步没看懂
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                //此处,调用子类的 tryAcquire/tryAcquireShared 的实现。如果成功,则获取锁,不进入同步队列(cas修改status变量)
                //相比JDK1.8,此处是一个小优化,当线程在进入同步队列期间,线程还没暂停之前,在执行方法进入同步方法时,不时直接去获取锁,
                //在并发较低的场景下,省去了进入队列、暂停线程的操作;但是在并发较高的情况下,tryAcquire基本是失败的,多了tryAcquire的消耗。
                //线程被唤醒之后,也会通过此处获取锁,走出循环
                //共享锁
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    //独占锁
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            //获取锁成功
            if (acquired) {
                if (first) {
                    //离队
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        //根据是否是独占锁,创建对应的节点
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {          // try to enqueue
            //初始化节点
            node.waiter = current;
            Node t = tail;
            //尾节点
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                //初始化头尾节点(head == tail)
                tryInitializeHead();
            //cas修改尾节点和新节点(即移动tail节点的位置)
            //此处是节点入队列的关键操作
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        } else if (first && spins != 0) {
            //非公平锁时会进入该步骤
            //即线程被唤醒,却没有获取到锁
            --spins;                        // reduce unfairness on rewaits
            //jdk9新方法,优化自旋 改善自旋等待循环中的响应时间
            Thread.onSpinWait();
        } else if (node.status == 0) {
            //初始化status
            node.status = WAITING;          // enable signal and recheck
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                //暂停线程
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            //被唤醒,此时清除状态
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

唤醒线程,通知线程离队:

public final boolean release(int arg) {
    //调用子类方法
    if (tryRelease(arg)) {
        //唤醒下一个节点
        signalNext(head);
        return true;
    }
    return false;
}

private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        //设置等待状态(此时该节点的线程将要被唤醒)
        s.getAndUnsetStatus(WAITING);
        //唤醒线程
        LockSupport.unpark(s.waiter);
    }
}

3、Condition队列

Condition队列是独立与同步队列的队列,一个同步队列可以对应多个Condition队列。

入队:

/**
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
*     throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
*     {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //构造ConditionNode
    ConditionNode node = new ConditionNode();
    //填充conditionNode,并且condition队列入队,同步队列离队
    int savedState = enableWait(node);
    //暂存线程 jdk14新方法
    LockSupport.setCurrentBlocker(this); // for back-compatibility
    boolean interrupted = false, cancelled = false;
    //当node不在condition队列时,当线程被唤醒时,node已不在condition队列,而是在同步队列
    while (!canReacquire(node)) {
        if (interrupted |= Thread.interrupted()) {
            //如果线程被中断并且当前状态值为3
            //COND 按位非得 mask = -(COND+1),status = status & mask,
            //mask == -3,以status的取值,只有当status == 3时,才退出
            if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                break;              // else interrupted after signal
        } else if ((node.status & COND) != 0) {
            try {
                //挂起线程,调用 ConditionNode 的block方法
                // 唤醒条件:
                // status <= 1 || Thread.currentThread().isInterrupted()
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                interrupted = true;
            }
        } else
            Thread.onSpinWait();    // awoke while enqueuing
    }
    LockSupport.setCurrentBlocker(null);
    node.clearStatus();
    //重新竞争锁
    acquire(node, savedState, false, false, false, 0L);
    if (interrupted) {
        if (cancelled) {
            unlinkCancelledWaiters(node);
            throw new InterruptedException();
        }
        Thread.currentThread().interrupt();
    }
}


/**
 * Adds node to condition list and releases lock.
 *
 * @param node the node
 * @return savedState to reacquire after wait
 */
private int enableWait(ConditionNode node) {
    //是否持有独占锁
    if (isHeldExclusively()) {
        //设置node的线程
        node.waiter = Thread.currentThread();
        //coonditionNode的status 为2|1 = 3
        node.setStatusRelaxed(COND | WAITING);
        ConditionNode last = lastWaiter;
        //进入condition队列
        if (last == null)
            firstWaiter = node;
        else
            last.nextWaiter = node;
        lastWaiter = node;
        //获取status
        int savedState = getState();
        //释放锁,注意,进入condition队列将释放锁,且离开同步队列
        if (release(savedState))
            return savedState;
    }
    node.status = CANCELLED; // lock not held or inconsistent
    throw new IllegalMonitorStateException();
}

出队:

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
 *         returns {@code false}
 */
public final void signal() {
    ConditionNode first = firstWaiter;
    //如果当前线程没有独占锁的话,抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    if (first != null)
        //condition队列还有节点
        doSignal(first, false);
}

/**
 * Removes and transfers one or all waiters to sync queue.
 */
private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        ConditionNode next = first.nextWaiter;
        if ((firstWaiter = next) == null)
            lastWaiter = null;
        //此处判断status是否等于3,即是否处于等待中
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            //重新进入同步队列队尾
            enqueue(first);
            if (!all)
                break;
        }
        first = next;
    }
}
        
/**
 * Enqueues the node unless null. (Currently used only for
 * ConditionNodes; other cases are interleaved with acquires.)
 */
final void enqueue(Node node) {
    if (node != null) {
        for (;;) {
            Node t = tail;
            node.setPrevRelaxed(t);        // avoid unnecessary fence
            if (t == null)                 // initialize
                tryInitializeHead();
            else if (casTail(t, node)) {
                t.next = node;
                if (t.status < 0)          // wake up to clean link
                    //唤醒线程
                    LockSupport.unpark(node.waiter);
                break;
            }
        }
    }
}

作者:summerbring

原文链接:https://www.jianshu.com/p/390e0d4c3da9

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐