阅读 121

Java 并发编程之 Condition 接口

Java 并发编程之 Condition 接口

任意一个 Java 对象,都拥有一个监视器方法,主要包括 wait()、wait(long timeout)、notify() 以及 notifyAll() 方法,这些方法与 synchronized 同步关键字配合,可以实现等待 - 通知模式。Condition 接口也提供了类似 Object 的监视器方法,与 Lock 配合可以实现等待 - 通知模式

Object 的监视器方法与 Condition 接口的对比:

对比项Object 监视器方法Condition
前置条件获取对象的监视器锁调用 Lock.lock() 获取锁调用 Lock.newCondition() 获取 Condition 对象
调用方法直接调用如:object.wait()直接调用如:condition.await()
等待队列个数一个多个
当前线程释放锁并进入等待队列支持支持
当前线程释放锁并进入等待队列,在等待状态中不响应中断不支持支持
当前线程释放锁并进入超时等待状态支持支持
当前线程释放锁并进入等待状态到将来的某个时间不支持支持
唤醒等待队列中的一个线程支持支持
唤醒等待队列中的全部线程支持支持


2|0接口示例

Condition 定义了等待 - 通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition 对象关联的锁。Condition 对象是由 Lock 对象(调用 Lock 对象的 newCondition() 方法)创建,换句话说,Condition 是依赖 Lock 对象的

public class ConditionUserCase {

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();    public void conditionWait() throws InterruptedException {
        lock.lock();        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }    public void conditionSignal() {
        lock.lock();        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
}

当调用 await() 方法后,当前线程会释放锁并在此等待,而其他线程调用 Condition 对象的 signal() 方法,通知当前线程后,当前线程才从 await() 方法返回,并且在返回前已经获取了锁

Condition 的部分方法以及描述:

方法名称描 述
void await() throws InterruptedException当前线程进入等待状态直到被通知(signal)或中断。
void awaitUninterruptibly()当前线程进入等待状态直到被通知,该方法不响应中断。
long awaitNanos(long nanosTimeout) throws InterruptedException当前线程进入等待状态直到被通知、中断或者超时,返回值表示剩余超时时间。
boolean awaitUntil(Date deadline) throws InterruptedException当前线程进入等待状态直到被通知、中断或者到某个时间。如果没有到指定时间就被通知,方法返回 true,否则,表示到了指定时间,返回 false。
void signal()唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关联的锁。
void signalAll()唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition 相关联的锁。

下面通过一个有界队列的示例来深入理解 Condition 的使用方式

public class BoundedQueue<T> {    private Object[] items;    // 添加的下标,删除的下标和数据当前数量
    private int addIndex, removeIndex, count;    private Lock lock = new ReentrantLock();    private Condition notEmpty = lock.newCondition();    private Condition notFull = lock.newCondition();    public BoundedQueue(int size) {
        items = new Object[size];
    }    /**
     * 添加一个元素,如果数组满,则添加线程进入等待状态,直到有空位
     */
    public void add(T t) throws InterruptedException {
        lock.lock();        try {            while (count == items.length) {
                notFull.await();
            }
            items[addIndex] = t;            if (++addIndex == items.length) {
                addIndex = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }    /**
     * 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新元素添加
     */
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();        try {            while (count == 0) {
                notEmpty.await();
            }
            Object x = items[removeIndex];            if (++removeIndex == items.length) {
                removeIndex = 0;
            }
            --count;
            notFull.signal();            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}


3|0实现分析

ConditionObject 是同步器 AbstractQueuedSynchronizer 的内部类,每个 Condition 对象都包含着一个队列(等待队列),该队列是 Condition 对象实现等待 - 通知功能的关键

1. 等待队列

等待队列是一个 FIFO 队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程,如果一个线程调用了 Condition.await() 方法,那么该线程就会释放锁,构造成节点并加入等待队列并进入等待状态

一个 Condition 包含一个等待队列,Condition 拥有首尾节点的引用,新增节点只需要将原有的尾节点 nextWaiter 指向它,并更新尾节点即可。节点引用更新的过程并没有使用 CAS 来保证,原因在于调用 await() 方法的线程必定是获取了锁的线程,也就是该过程是由锁来保证线程安全的

在 Object 的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的 Lock 拥有一个同步队列和多个等待队列,其对应关系如图所示:

2. 等待

调用 Condition 的 await() 方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await() 方法返回时,当前线程一定获取了 Condition 相关联的锁

Condition 的 await() 方法如下所示:

public final void await() throws InterruptedException {    // 检测线程中断状态
    if (Thread.interrupted())        throw new InterruptedException();    // 当前线程包装为 Node 并加入等待队列
    Node node = addConditionWaiter();    // 释放同步状态,也就是释放锁
    int savedState = fullyRelease(node);    int interruptMode = 0;    // 检测该节点是否在同步队列中,如果不在,则继续等待
    while (!isOnSyncQueue(node)) {        // 挂起线程
        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;
    }    // 竞争同步状态
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;    // 清理条件队列中的不是在等待条件的节点
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();    // 对等待线程中断,会抛出异常
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

3. 通知

调用 Condition 的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中

Condition 的 signal() 方法代码如下所示:

public final void signal() {    // 检查当前线程是否获取了锁
    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    // 获取等待队列首节点,移动到同步队列并唤醒
    Node first = firstWaiter;    if (first != null)
        doSignal(first);
}

Condition 的 signAll() 方法,相当于对等待队列中的每个结点均执行一个 signal() 方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐