Condition

版本 JDK7

1. 对 Condition 大概的了解

本篇讲到的 Condition 是指 java.util.concurrent.locks.Condition

来看一下它的构造:

原来它只是一个接口,仔细观察其内部结构,有没有一种似曾相识的感觉?对,它和 Object 中提供的一下方法有点相像,实际上他们的功能在一定程度上还是一样的。

1.1 Condition 是什么?

所以 Condition 到底是什么样的存在呢?

Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.

根据上面官方对 Condition 的描述,我的理解就是:Condition 是将 Object monitor methods(比如:wait, notify, notifyAll)分解到每个不同的对象中,通过将这些新方法与任意实现了 Lock(java.util.concurrent.locks.Lock) 接口的锁(比如:ReentrantLock,ReentrantReadWriteLock等)结合使用,来实现每个对象都有多个 wait-sets(等待队列)的效果。如果 Lock 代替了 synchronized 方法和语句的使用,那么 Condition 将代替 Object monitor methods。

简单点理解就是,Condition 细化并且丰富了原来的 Object monitor methods。

1.2 Condition 与 Object Monitor Methods 的区别?

既然 Condition 是 Object Monitor Methods 的替代品,那么它们之间有什么区别呢?

  1. Condition 提供了不响应中断的等待方法 - void awaitUninterruptibly();,而这个在 Object 中是没有的。

  2. Condition 是配合 Lock 使用,而 Object Monitor Methods 是配合 synchronized 使用的。

引用《Java 并发编程的艺术》

Condition 与 Object Monitor Methods 的异同:

对比项 Object Monitor Methods Condition
前置条件 获取对象的锁 - synchronized 调用 Lock.lock() 获取锁
调用 Lock.newCondition() 获取 Condition 对象
调用方式 直接调用
如:object.wait()
直接调用
如:condition.wait()
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁并进入等待状态,在等待状态中 不响应 中断 不支持 调用 condition.awaitUninterruptibly()
当前线程释放锁并进入 超时等待状态 支持 支持
当前线程释放锁并进入 等待状态 到将来的某个时间 不支持 调用 condition.awaitUntil(Date deadline)
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

2. 源码解析

下面的目前实现了 Condition 接口的所有实现类:

下面我将以 java.util.concurrent.locks.AbstractQueuedSynchronizer 的内部静态类 - ConditionObject 为例,来解析其实现 Condition 接口的细节。

2.1 ConditionObject 的结构

ConditionObject 中有 2 个重要的成员变量:firstWaiter 和 lastWaiter。这两个成员变量和同步器(AQS)中的节点都是同一类型 —— AbstractQueuedSynchronizer.Node

ConditionObject 中维护着等待队列。等待队列与同步队列一样都是遵循 FIFO 规则。

2.1.1 ConditionObject 的成员变量

/** 等待队列的头结点 */
private transient Node firstWaiter;
/** 等待队列的尾节点 */
private transient Node lastWaiter;
/** 中断模式之一:REINTERRUPT */
private static final int REINTERRUPT =  1;
/** 中断模式之一:THROW_IE */
private static final int THROW_IE    = -1;

解释下 interruptMode,interruptMode 可以取值为 REINTERRUPT(1)、THROW_IE(-1)、0。

2.1.2 等待队列的基本结构

一个 Condition 包含一个等待队列,Condition 与同步器(AQS)一样都有首节点(firstWaiter)和尾节点(lastWaiter)。

与同步器不同的地方是:等待队列中的每个节点只存有下一个等待节点(即nextWaiter),而没有保存前驱节点的地址。

如上如所示,Condition 的实现是同步器的内部类,因此每个 Condition 实例都能访问同步器提供的方法,相当于每个 Condition 都拥有所属同步器的引用。

2.2.1 await()

await() 这个方法是响应中断。

作用:使调用该方法的线程会进入等待队列,然后释放对象锁。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        // 当前线程被标记为中断状态,需要抛出中断异常
        throw new InterruptedException();
    // 将当前线程“加工”成节点加入到等待队列中,具体实现后面会介绍
    Node node = addConditionWaiter();
    // 释放对象锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {// 当前线程节点所在的队列不是同步队列
        // 阻塞当前线程
        LockSupport.park(this);

        // 当前线程退出阻塞状态,先判断一下当前线程是否因为中断而退出的
        // 如果是中断而退出的,退出 while 循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }

    /** 
     * 代码执行到这里有两种情况:
     * 1. 当前节点已经在同步队列中
     * 2. 当前节点在等待队列中被标记为中断状态
     */

    // 下面开始获取对象锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        /**
         * 获取对象锁失败,并且此时线程已经被打断了
         * 注意:acquireQueued 返回的是当前线程是否是因为中断而退出的
         * 如果 acquireQueued 返回 false,说明线程成功获取对象锁
         * 如果 acquireQueued 返回 true,说明线程因为被中断而获取对象锁失败
         */
        interruptMode = REINTERRUPT;
    
    // 如果当前节点还未从等待队列中清除出去,那么进行清除
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    
    if (interruptMode != 0)
        // 线程被中断,需要根据 interruptMode 来判断是否是抛出中断异常,还是重新将当前线程标记被中断状态
        reportInterruptAfterWait(interruptMode);
}

小结一下,await() 这个方法只有两种情况下才能够返回方法:

  1. 当前线程被唤醒
  2. 当前线程被中断

2.2.2 awaitNanos(long)

响应中断

public final long awaitNanos(long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    // 获取当前JVM系统的时间
    long lastTime = System.nanoTime();
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {// 当前节点在等待队列
        if (nanosTimeout <= 0L) {// 时间超时
            // 将当前线程加入到同步队列中
            transferAfterCancelledWait(node);
            break;
        }

        // 阻塞当前线程,最长不超过nanos纳秒,返回条件在park()的基础上增加了超时返回。
        LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;

        // 获取当前JVM时间
        long now = System.nanoTime();
        // 重新计算超时时间
        nanosTimeout -= now - lastTime;
        // 更新最后执行时间
        lastTime = now;
    }
    // 这里和 await() 一样就不再赘述
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    
    // 返回本方法执行所花的所有时间
    return nanosTimeout - (System.nanoTime() - lastTime);
}

await() 与 awaitNanos(long) 的区别:

2.2.3 awaitUntil(Date)

/**
 * 当前线程等待时间超过 deadline 的时候,将自动将线程唤醒
 * @param deadline, 超时时间点
 */
public final boolean awaitUntil(Date deadline) throws InterruptedException {
    if (deadline == null)
        throw new NullPointerException();
    // 获取 deadline 的时间长度
    long abstime = deadline.getTime();
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {// 当前节点在等待队列中
        if (System.currentTimeMillis() > abstime) { // 超时唤醒
            /**
             * 注意:transferAfterCancelledWait 无论返回的是 true 或者 false,当前线程最终都将进入同步队列。
             * transferAfterCancelledWait,如果返回 true。说明当前线程是因为超时而自己唤醒并且将当前线程加入同步队列。
             * 如果返回 false。说明当前线程是被主动唤醒加入同步队列的。
             */
            timedout = transferAfterCancelledWait(node);
            break;
        }
        LockSupport.parkUntil(this, abstime);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

看完 awaitUntil(Date) 的具体实现,有没有感觉与 awaitNanos(long) 的功能是差不多的,只不过表现的方式不一样。

2.2.4 await(long, TimeUnit)

await(long, TimeUnit) 可以理解为是 awaitNanos(long) 的加强版。为什么这么说呢?awaitNanos(long) 只支持等待纳秒级的时间。如果我们想让这个方法支持其他时间等级的等待就不能支持了。因此,引入了 await(long, TimeUnit) 这个重载方法。代码基本一致,这里就不多赘述。

2.2.5 awaitUninterruptibly()

忽略中断。

代码很简单,这里就不介绍了。

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

2.2.6 signal()

signal(),将等待时间最长的线程(如果存在)从这个条件的等待队列移动到拥有锁的等待队列(也就是同步队列)。

如果等待时间最长的节点没有成功转移到同步队列,那么头节点的第一个后继节点将尝试加入到同步队列中,以此类推。

public final void signal() {
    // 判断当前唤醒的线程是否是被阻塞的线程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 唤醒头节点(但是最终唤醒的不一定是头节点,请往下看)
        doSignal(first);
}

因为条件等待队列也是遵循 FIFO,所以头节点自然是等待时间最长的节点。

doSignal(Node)

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            // 进到这里说明此时条件等待队列中只有一个节点
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
    // 这里 while 循环,如果 first 转移不成功,那么选择 first 后面的第一个节点进行转移,依此类推
}

transferForSignal(Node)

final boolean transferForSignal(Node node) {
    
    // 如果 CAS 失败,说明当前节点被取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 加入到同步队列,返回的节点 p 是 node 的前驱节点
    Node p = enq(node);
    int ws = p.waitStatus;
    // ws > 0,说明节点 p 取消了等待锁
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
正常情况下,ws > 0   !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这句中,ws <= 0,而且 compareAndSetWaitStatus(p, ws, Node.SIGNAL) 会返回 true,所以一般也不会进去 if 语句块中唤醒 node 对应的线程。然后这个方法返回 true,也就意味着 signal 方法结束了,节点进入了阻塞队列。

注意:此时节点已经成功加入到同步队列,但是原先线程因为调用了 await() 而进入阻塞状态还没有被唤醒。那什么时候会被唤醒呢?只有当其他线程进行了 release(int) 释放了对象锁之后,才有可能将当前线程唤醒。

2.2.7 signalAll()

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

doSignalAll(node)

private void doSignalAll(Node first) {
    // 清空条件等待队列中的所有节点
    lastWaiter = firstWaiter = null;
    do {// 遍历所有的节点,将其加入到同步队列中
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);// 直到遍历完所有的节点才退出循环。
}

2.3 ConditionObject 的私有方法和 AQS 的一些方法解析

看了上面重写的方法可能还有一些疑惑的地方,可以看看其内部的调用方法实现,所剩的疑惑应该就能消除了。

2.3.1 addConditionWaiter (ConditionObject 私有)

addConditionWaiter,将当前线程构造成一个等待队列中的节点。

private Node addConditionWaiter() {
    // 获取等待队列中的尾节点
    Node t = lastWaiter;

    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {

        /**
         * 这个方法会遍历整个等待队列 并且 将所有已经取消的节点清除出去
         * 当然这个时候的 lastWaiter 肯定已经发生的改变。
         */
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    // 构造节点 并且 将节点放到等待队列的末尾
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

2.3.2 isOnSyncQueue (AQS的方法)

isOnSyncQueue 用来检查当前线程是否在 同步队列 中。

final boolean isOnSyncQueue(Node node) {

    // 如果当前节点的状态为 CODITION 或者 当前节点的前驱节点为null,都能说明当前节点不在同步队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;

    // 如果当前节点存在后继节点,说明已经在同步队列中
    if (node.next != null) // If has successor, it must be on queue
        return true;
    
    /**
     * 执行到这里说明此时的节点状态:
     * (node.waitStatus != Node.CONDITION || node.prev != null) && node.next == null
     * 这个时候,还差尾节点没有判断,所以调用 findNodeTail 来检测
     */
    return findNodeFromTail(node);
}
/**
 * 从尾节点开始遍历同步队列来检查当前节点是否存在于同步队列中。
 * 存在,返回 true;否则,返回false。
 */
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

2.3.3 checkInterruptWhileWaiting (CondtionObject 私有)

这个方法是用来检查当前线程是否被 标记为中断状态,线程被中断返回 1 或 -1;线程如果没有被中断,返回 0 。

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

2.3.4 unlinkCancelledWaiters (CondtionObject 私有)

unlinkCancelledWaiters,遍历整个等待队列,将等待队列中被取消的节点清理出队列。

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;

    // 遍历整个等待队列
    while (t != null) {
        Node next = t.nextWaiter;

        if (t.waitStatus != Node.CONDITION) {
            /** t节点需要被清除出等待队列 **/
            
            t.nextWaiter = null;
            // 更新 firstWaiter 或 lasterWaiter 所指向的节点
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else // t节点仍然在等待队列中
            trail = t;

        // 检查下一个节点
        t = next;
    }
}

2.3.5 reportInterruptAfterWait (CondtionObject 私有)

reportInterruptAfterWait(int),根据 interruptedMode 的值,来决定当前线程到底是直接抛出异常还是将当前线程标记为中断状态。

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    // -1,抛出中断异常
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    // 1,将当前线程标记为中断状态
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

2.3.6 transferAfterCancelledWait (AQS 的方法)

transferAfterCancelledWait(Node) 在不同的使用环境下有不同的含义,请根据不同的环境来分析。

transferAfterCancelledWait(Node):当前等待节点从等待队列中移除时,如果有必要,将其传输到同步队列中。

为什么说是“有必要”?因为,在将当前线程加入到同步队列这个操作之前,可能当前线程已经被唤醒(执行signal()),这个时候的节点的状态可能也已经不再是 CONDITION了。

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 成功改变当前节点的状态,将其放入同步队列中
        enq(node);
        return true;
    }

    /**
     * 执行到这里,说明,在进行 enq 之前就已经执行了 signal()
     */
    while (!isOnSyncQueue(node))
        // 让步,让cpu调度器决定先执行那个线程
        Thread.yield();
    return false;
}

参考

BACK