阅读 97

AQS源码分析

锁的分类
悲观锁和乐观锁

在Java里使用的各种锁,几乎全都是悲观锁。synchronized从偏向锁、轻量级锁到重量级锁,全是悲观锁。JDK提供的Lock实现类全是悲观锁。其实只要有“锁对象”出现,那么就一定是悲观锁。因为乐观锁不是锁,而是一个在循环里尝试CAS的算法。

乐观锁是atomic包下的原子类。

公平锁、非公平锁

多个线程申请一把公平锁,那么当锁释放的时候,先申请的先得到,非常公平。显然如果是非公平锁,后申请的线程可能先获取到锁,是随机或者按照其他优先级排序的。

公平锁与非公平锁的区别在代码中的区别为:

final void lock() {
    if (this.compareAndSetState(0, 1)) {
        this.setExclusiveOwnerThread(Thread.currentThread());
    } else {
        this.acquire(1);
    }
}

就是非公平锁在调用lock()时,先尝试插队直接去拿锁,更改state状态为1,如果成功则把Owner线程设置为当前线程,则表示成功获得锁,没获得锁则acquire,而公平锁直接acquire。

偏向锁 → 轻量级锁 → 重量级锁

初次执行到synchronized代码块的时候,锁对象变成偏向锁(通过CAS修改对象头里的锁标志位),字面意思是“偏向于第一个获得它的线程”的锁。执行完同步代码块后,线程并不会主动释放偏向锁。当第二次到达同步代码块时,线程会判断此时持有锁的线程是否就是自己(持有锁的线程ID也在对象头里),如果是则正常往下执行。由于之前没有释放锁,这里也就不需要重新加锁。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。

一旦有第二个线程加入锁竞争,偏向锁就升级为轻量级锁(自旋锁)。在轻量级锁状态下继续锁竞争,没有抢到锁的线程将自旋,即不停地循环判断锁是否能够被成功获取。获取锁的操作,其实就是通过CAS修改对象头里的锁标志位。先比较当前锁标志位是否为“释放”,如果是则将其设置为“锁定”,比较并设置是原子性发生的。这就算抢到锁了,然后线程将当前锁的持有者信息修改为自己。

长时间的自旋操作是非常消耗资源的,一个线程持有锁,其他线程就只能在原地空耗CPU,执行不了任何有效的任务,这种现象叫做忙等(busy-waiting)。如果多个线程用一个锁,但是没有发生锁竞争,或者发生了很轻微的锁竞争,那么synchronized就用轻量级锁,允许短时间的忙等现象。这是一种折衷的想法,短时间的忙等,换取线程在用户态和内核态之间切换的开销。

此忙等是有限度的(有个计数器记录自旋次数,默认允许循环10次,可以通过虚拟机参数更改)。如果锁竞争情况严重,某个达到最大自旋次数的线程,会将轻量级锁升级为重量级锁(依然是CAS修改锁标志位,但不修改持有锁的线程ID)。当后续线程尝试获取锁时,发现被占用的锁是重量级锁,则直接将自己挂起(而不是忙等),等待将来被唤醒。

独占锁与共享锁

独占锁:队首持锁,唤醒队二后tryAcquire尝试拿锁,队三及以后休眠

共享锁:相较于独占模式只唤醒队二 ,共享模式还唤醒所有mode=shared节点

独占和共享是对于加锁而言(能否多线程同时获锁),释放锁时没有独占和共享的概念。

怎么实现同步

根据参考文章1,提出了4中方式实现同步:

1:自旋实现同步

缺点:耗费cpu资源。没有竞争到锁的线程会一直占用cpu资源进行cas操作

2:yield+自旋实现同步

优点:要解决自旋锁的性能问题必须让竞争锁失败的线程不空转,而是在获取不到锁的时候能把cpu资源给让出来,yield()方法就能让出cpu资源,当线程竞争锁失败时,会调用yield方法让出cpu。

缺点:自旋+yield的方式并没有完全解决问题,当系统只有两个线程竞争锁时,yield是有效的。需要注意的是该方法只是当前让出cpu,有可能操作系统下次还是选择运行该线程

3:sleep+自旋方式实现同步

缺点:sleep的时间怎设置

4:park+自旋方式实现同步

volatile int status=0;
Queue parkQueue;//集合 数组  list

void lock(){
    while(!compareAndSet(0,1)){
        //
        park();
    }
    //lock    10分钟
   unlock()
}

void unlock(){
    lock_notify();
}

void park(){
    //将当期线程加入到等待队列
    parkQueue.add(currentThread);
    //将当期线程释放cpu  阻塞
    releaseCpu();
}
void lock_notify(){
    //得到要唤醒的线程头部线程
    Thread t=parkQueue.header();
    //唤醒等待线程
    unpark(t);
}
ReentrantLock源码分析

而我们的ReentrantLock就是采用第四种方法实现的。

首先来看一段代码

public class MyRunnable implements Runnable {
    private int num = 0;
    private ReentrantLock lock = new ReentrantLock(true);
    @Override
    public void run() {
        while (num < 20){
            lock.lock();
            try{
                num++;
                Log.e("zzf",Thread.currentThread().getName() + "获得锁,num is"+ num);
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                lock.unlock();
            }
        }
    }
}

初始化锁实例

首先来看构造函数,

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

有两种方式,无参的默认new 一个非公平锁NonfairSync。

在我们上面的例子中,调用了第二个构造函数,并且传入的true,所以使用的是公平锁。(后续都以公平锁进行分析)

lock.lock()
public void lock() {
    sync.lock();
}

final void lock() {
    acquire(1);
}

public final void acquire(int arg) {
   if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       selfInterrupt();
}

调用lock()时,首先调用acquire(),对于acquire()分两部分来讲。

tryAcquire

这个方法是尝试去获取锁。

protected final boolean tryAcquire(int var1) {
    Thread var2 = Thread.currentThread();
    int var3 = this.getState();
    if (var3 == 0) {
        if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, var1)) {
            this.setExclusiveOwnerThread(var2);
                return true;
        }
    } else if (var2 == this.getExclusiveOwnerThread()) {
        int var4 = var3 + var1;
        if (var4 < 0) {
            throw new Error("Maximum lock count exceeded");
         }

        this.setState(var4);
        return true;
    }
        return false;
    }
}

要想知道上面代码的意思,我们需要先了解一下AQS的一个静态内部类Node;

static final class Node {
    static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();//表示共享模式
    static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;//表示独占模式
    static final int CANCELLED = 1;//因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;
    static final int SIGNAL = -1;//表示这个结点的继任结点被阻塞了,到时需要通知它
    static final int CONDITION = -2;//表示这个结点在条件队列中,因为等待某个条件而被阻塞;
    static final int PROPAGATE = -3;//使用在共享模式头结点有可能牌处于这种状态,表示锁的下一次获取可以无条件传播;
    volatile int waitStatus;//用来表示上面4种状态
    volatile AbstractQueuedSynchronizer.Node prev;//前一个节点
    volatile AbstractQueuedSynchronizer.Node next;//后一个节点
    volatile Thread thread;//表示线程
    Node nextWaiter;//条件队列才使用。
}

getState()得到的是一个int类型的volatile int state;被volatile修饰。默认为0,表示无锁状态,1表示上锁状态,>1表示重入。

当第一个线程进来,此时getState()为0,所以进入第一个if中。

public final boolean hasQueuedPredecessors() {
    AbstractQueuedSynchronizer.Node var1 = this.tail;
    AbstractQueuedSynchronizer.Node var2 = this.head;
    AbstractQueuedSynchronizer.Node var3;
    return var2 != var1 && ((var3 = var2.next) == null || var3.thread != Thread.currentThread());
}

该方法是判断是否需要排队。

在这需要分三种情况讨论:

1:队列没有初始化

当队列没有初始化的时候,var1 = null,var2 = null.所以hasQueuedPredecessors()返回false。在第一个if里面进行取反,则表示不需要入队列处理,直接CAS进行加锁操作。并把当前线程设置成Node的thread。然后返回true,而此时在acquire()中又是取反,则不会走if里面的代码。从这里可以看出,其实加锁就是让获取到这个锁的线程能够顺畅的往下执行逻辑。

2:队列初始化且>1个节点

在队列初始化的时候,此时会new一个Node,放在最前面,而在队列的第二才是真正排队的第一个,而new 出来的那个Node可以理解为第一个拿到锁的那个的占位。当大于一个节点的时候var2 != var1为true,因为var2最开始为null,现在>1个节点,表示var2.next是有值的,所以(var3 = var2.next) == null为false。

var3.thread != Thread.currentThread())为true时表示前面已经有人在排队,那我就必须要排队。所以hasQueuedPredecessors()返回true。此时则会跳出第一个if。tryAcquire返回flase,则需在acquire()判断acquireQueued(addWaiter(Node.EXCLUSIVE), arg)在后面讲解。

var3.thread != Thread.currentThread()为flase时,则不需要排队,最简单的理解是前面排队的是自己的女朋友,那她买票等价于我买票了,就不需要排队了。

3:队列初始化只有一个节点

当只有一个节点的时候,相当于就只有初始化队列的时候new的那个Node。那此时队首和队尾就是它自己,但是它又是不算排队的那个,只是算正在持有锁的那个线程在队列中的一个占位而已。

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

当处于上面讲的第二种情况时,就会执行这段代码。addWaiter()是将当前线程封装成Node,然后添加到AQS队列中。

private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            U.putObject(node, Node.PREV, oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

将当前的线程封装成Node并且mode为独占锁。

然后进行一个无限循环,第一次oldTail为null,则进入到else中,进行初始化。第二次进来的时候,oldTail则不为null,将当前线程Node的prev节点指向tail,然后通过cas将node加入AQS队列,成功之后,把旧的tail的next指向新的tail。所以AQS并不是第一的时候就进行初始化。

final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

首先获取前一个节点,只有当前节点的前一个节点是head,表示当前节点在队列中是第二个元素。AQS是根据FIFO来的,所以当前一个释放了锁,只有队列中的第二个才能去获取锁。如果索取锁成功,则吧当前的节点设置为head。并把前一个head断开。因为为head的那个node的node.head 和node.thread都是为null,只需要把node.next置为null时,就断开了链表的链接。

如果获取锁失败,则根据waitStatus决定是否需要挂起线程。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

如果前一个节点的waitStatus是SIGNAL,表明当前的的线程需要被unpark。

如果waitStatus > 0,则表明是cancel状态,则从前节点开始逐步循环找到下一个没有被cancel的节点设置为当前节点的前节点。目的将cancel状态的节点踢除掉。如果是其他的状态,将前节点改成SIGNAL状态。当返回true时,继续走下面代码。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

意思就是通过park()将当前线程挂起到WATING状态。

lock.unlock()
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

执行unLock的时候,会执行release();

protected final boolean tryRelease(int var1) {
    int var2 = this.getState() - var1;
    if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
        throw new IllegalMonitorStateException();
    } else {
        boolean var3 = false;
        if (var2 == 0) {
            var3 = true;
            this.setExclusiveOwnerThread((Thread)null);
        }

        this.setState(var2);
        return var3;
    }
}

getState() -1表示可能有重入的,getState()可能>1,只有当var2 ==0时,把当前线程设置为null,并释放锁,返回true。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
    }
        if (s != null)
            LockSupport.unpark(s.thread);
}

该方法是真正的释放锁,传入的是head节点,当前线程被释放后,需要唤醒下一个节点的线程。

这里是从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点。如果从前面向后遍历会导致死循环。

如果获取到head的next节点不为空,则释放许可唤醒线程。

Condition

在上面讲的是AQS的同步队列,除了这个同步队列外,还有一个条件队列。

加入条件队列的前提是,当前线程已经拿到了锁,并处于运行状态,加入条件队列后,就需要释放锁,进入阻塞状态,同时唤醒同步队列的队二拿锁。唤醒操作是将一个node从条件队列,移动到同步队列的队尾,让它返回同步队列park,并不是随机就唤醒一个线程。

流程

1、创建节点并添加到条件队列的尾部。

2、释放线程所持有的锁。

3、判断是否在同步队列中,在的话,进行CAS拿锁操作,不在的话,唤醒进入同步队列的尾部。

public class ConditionDemo {

    private int queueSize = 10;
    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);

    private Lock lock = new ReentrantLock();
    private Condition full = lock.newCondition();
    private Condition empty = lock.newCondition();

    class Consumer implements Runnable{

        @Override
        public void run() {
            consume();
        }

        private void consume() {
            while (true){
                lock.lock();
                try {
                    while (queue.size() == 0){
                        try {
                            System.out.println("队列空,等待数据");
                            empty.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.poll();
                    full.signal();
                    System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
                }finally {
                    lock.unlock();
                }
            }
        }
    }

    class Producer implements Runnable{

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true){
                lock.lock();
                try {
                    while(queue.size()== queueSize){
                        try {
                            System.out.println("队列满,等待有空余空间");
                            full.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    queue.offer(1);
                    empty.signal();
                }finally {
                    lock.unlock();
                }
            }

        }
    }
}

上面是利用Condition实现生产者消费者模式,当队列的size为0的时候,empty调用await().

休眠
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

在ConditionObject中,包含了两个成员对象,一个表示队列的第一个node,一个表示队列的最后一个node。

private transient Node firstWaiter;
private transient Node lastWaiter;


private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    Node node = new Node(Node.CONDITION);

    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

addConditionWaiter就是创建一CONDITION状态的个node,如果队列中没有,则把这个新创建的赋值给首节点,如果有,则赋值给当前节点的后一个。并且这个也是最后一个节点。条件队列也是先进先出的形式,只不过是单链表的形式。

final int fullyRelease(Node node) {
    try {
        int savedState = getState();
        if (release(savedState))
            return savedState;
        else    
            throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

首先获取当前的state,然后调用release方法进行释放锁。

在第三篇参考博客写到,fullyRelease会一次性释放所有的锁,不管重入多少次。我理解的是重入的时候,还是同一个线程,同一把锁。所以释放的只是一把锁吧。

经过上面的代码之后,节点已经放到条件队列并且释放了所持有的锁,而后需要挂起。但是在前面说了,挂起需要不在同步队列,所以需要判断是否在同步队列。

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null) // If has successor, it must be on queue
        return true;
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
    for (Node p = tail;;) {
        if (p == node)
            return true;
        if (p == null)
            return false;
        p = p.prev;
    }
}

当节点的状态为Node.CONDITION或者node.prev == null时,则不在同步队列,node.prev,node.next在条件队列中没有用到。如果含有这个,则表示是在同步队列中。

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
               trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
            }else
                trail = t;
                t = next;
        }
    }

如果状态不是CONDITION,就会自动删除。

唤醒
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
        }

首先判断是不是同一个线程,不是则抛出异常。然后获取等待队列的头结点,后续的操作都是基于该节点。

final boolean transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

private Node enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            U.putObject(node, Node.PREV, oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

enq()是将该节点移入到同步队列中去。

signal是只会对等待队列的头结点进行操作,而signalAll则是对每一个节点都移入到同步队列中。

同步队列与条件队列完整配合如下图所示:

1608883350(1).png
参考文献

https://blog.csdn.net/java_lyvee/article/details/98966684

https://segmentfault.com/a/1190000017372067

https://segmentfault.com/a/1190000020345054

文章分类
后端
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐