这几天自己分析了下J.U.C包下面的Condition,有点收获,所以特来记录一翻。在这篇中会学到如下知识点:
1. Condition的简单介绍和演示
2. Condition原理解析
注意,该篇需要先了解AQS的原理。如需了解,请参考之前的帖子:Java并发之ReentrantLock及AQS简析
0x01. Condition接口简介
-
什么是Condition?它在什么场景下可以使用?
- 在之前,我有篇文章讲的是通过
ReentrantLock
来分析J.U.C并发包下面的锁,即LOCK
接口。既然并发包提供了Lock锁,那么他会不会提供其他的和并发相关的类和方法?于是Condition就来了。 - 我们知道并发包里面的Lock是一种锁的实现机制,那么有没有线程之间通信(
wait/notify
)的实现呢?估计你也猜到了,就是Condition - 线程中通信,最典型的场景就是生产者消费者了。这里我也不再赘述,本文章重点在于分析Condition源码。
- 在之前,我有篇文章讲的是通过
-
那么如何使用Condition呢?我用一段代码来描述。
-
CondtionWait
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27/**
* @author zyzling
*/
public class ConditionWait extends Thread{
private Lock lock;
private Condition condition;
public ConditionWait(Lock lock, Condition condtion){
this.lock = lock;
this.condition = condtion;
}
public void run() {
try{
lock.lock();
System.out.println("[ConditionWait] 获取到锁,即将调用wait方法...");
//通过调用condition.await实现阻塞等待。期间会释放锁
condition.await();
System.out.println("[ConditionWait] 被唤醒,继续执行....");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
} -
ConditionNotify
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* @author zyzling
*/
public class ConditionNotify extends Thread{
private Lock lock;
private Condition condition;
public ConditionNotify(Lock lock,Condition condition){
this.lock = lock;
this.condition = condition;
}
public void run() {
try{
lock.lock();
System.out.println("[ConditionNotify] 获取到锁,准备唤醒等待的线程。");
condition.signal();
System.out.println("[ConditionNotify] 执行完signal代码,继续往下执行。");
}finally {
lock.unlock();
}
}
} -
conditionMain
1
2
3
4
5
6
7
8
9
10
11
12
13
14/**
* @author :zyzling
*/
public class ConditionMain {
public static void main(String[] args) {
//创建重入锁
Lock lock = new ReentrantLock();
//通过Lock创建Condition实例
Condition condition = lock.newCondition();
//创建两线程
new ConditionWait(lock,condition).start();
new ConditionNotify(lock, condition).start();
}
} -
运行结果如下:
1
2
3
4[ConditionWait] 获取到锁,即将调用wait方法...
[ConditionNotify] 获取到锁,准备唤醒等待的线程。
[ConditionNotify] 执行完signal代码,继续往下执行。
[ConditionWait] 被唤醒,继续执行.... -
还是解释下吧。
- 上面这个案例简单的演示了下Condition的使用,在
CondtionWait
类中调用Condition.await()
使线程进行阻塞,而通过ConditionNotify
类中的调用的Condition.signal()
唤醒使用await()
阻塞的线程。 - 我们知道
wait/notify
需要在Synchronized
同步代码块中执行,相应的,Condition.await()
和condition.signal()
也需要获取Lock
锁。 - 注意,
Condition.signal()
方法执行完后,并不会立马释放锁,而是需要等到代码执行完后,才会在finally释放锁。
- 上面这个案例简单的演示了下Condition的使用,在
-
-
在我们一起走入
Condition
的内心世界前,我们先看一下Condition的方法结构图和实现类,如下:- Condition方法结构
- 实现类
0x03.分析前戏—回顾AQS
- 我们上面也说过,在执行Condition.await/signal方法之前,需要获取锁。也就是两个线程(ThreadA/ThreadB)同时都会执行
lock.lock()
方法来竞争锁。图如下,那么在竞争锁的时候会发生什么事呢?一起再来回顾下吧。
-
首先我们应该明确的是,在AQS内部维护了一个双向链表的队列,用于存放阻塞的线程。假设,我们的ThreadA率先拿到锁,那么ThreadB就会阻塞在该队列,等待ThreadA线程释放锁后,唤醒ThreadB。图如下:
-
前戏就到这里了。稍微回顾了下,到这里,至少知道有个AQS队列。里面是个双向链表。在下面深入Condition的时候会用到。为了方便,下面我将把执行
Condition.await()
的线程叫ThreadA。同理,调用Condition.signal()
的线程叫ThreadB
0x02.Condition原理分析之await()—阻塞前部分
-
因为Condition是个接口,会有不同的实现类,不同实现类中的实现不同,而今天我们分析的是
AbstractQueuedSynchronizer
这个类中的实现。先来个await方法的全貌吧。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}下面我们开始分析,我这里会把代码拆分,一步一步的看。
-
到这里,我们假设ThreadA率先拿到锁,那么ThreadB就会在AQS同步队列中等待ThreadA释放锁。
-
ThreadA拿到锁后,下面就会走await()方法,重头戏来了。我们点进去,看看await()里面做了什么。
1
2
3
4
5
6
7
8public final void await() throws InterruptedException {
//一上来就判断是否被中断过,如果被中断过,就会抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//接下来,创建了个Node节点。
Node node = addConditionWaiter();
....//后面的代码省略,一步一步往下走。
}上面的一段代码看上去还是挺容易理解的,先判断是否被中断过,然后就直接创建了个condition节点。(关于interrupted()方法,我想后面应该会和Synchronized分析放在一起用一篇文章记录下学习过程。)
addConditionWaiter()
方法里做了些什么事呢?代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24//头节点
private transient Node firstWaiter;
//尾节点
private transient Node lastWaiter;
private Node addConditionWaiter() {
//初始时,lastWaiter=null
Node t = lastWaiter;
//如果最后一个节点的waitStatus不是CONDITION,则需要移除,
//直到最后一个waitStatus=CONDITION的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个节点。 Node.CONDITION = -2
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//构造单向链表结构
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
} 根据上面代码来看,Condition也会维护一个链表,只不过它是单向的。那么执行完后,抽象图如下:
-
addConditionWaiter()
执行完后,继续往下走1
2
3
4
5
6
7
8
9
10public final void await() throws InterruptedException {
//一上来就判断是否被中断过,如果被中断过,就会抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//接下来,创建了个Node节点。
Node node = addConditionWaiter(); //上面我们刚分析这里。继续往下面走
//这里调用fullyRelease()方法,它是干什么呢?我们一起来分析下
int savedState = fullyRelease(node);
....//后面的代码省略,一步一步往下走。
}-
fullyRelease()
方法内部代码1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18final int fullyRelease(Node node) {
boolean failed = true;
try {
//获取锁的重入次数(这里如果不清楚,请回过头看一下AQS源码)
int savedState = getState();
//release也在AQS中说过。作用是释放锁,并且唤醒AQS同步队列中阻塞的线程
if (release(savedState)) {
failed = false;
//返回重入次数,为什么需要返回?
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}getState()
是获取锁的重入次数,我们线程去竞争锁的时候,就是使用CAS修改这个字段为1。如果修改成功,则说明获取锁成功。同理,如果有重入的情况,是会把该值进行+1.release()
释放锁,这里不管重入了多少层,一次性释放掉,并唤醒AQS同步队列中的阻塞线程。那么,在这里为什么要释放锁呢?因为如果不释放锁,那么另外一个调用signal方法的线程,是永远获取不了锁,既然拿不到锁,更别说唤醒await的线程了。- 上面有个问题,为什么需要返回重入次数呢?这个问题先记着,我们在后面的分析中给出解答。
-
-
fullyRelease()
分析完了,简单来说,就是释放锁,也就是说,到这里ThreadB就可以去竞争锁了,并且按照现在的例子来说,一定会抢到锁。然后去执行Condition.signal()
方法。我们继续往下走。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public final void await() throws InterruptedException {
//一上来就判断是否被中断过,如果被中断过,就会抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//接下来,创建了个Node节点。
Node node = addConditionWaiter();
//这里调用fullyRelease()方法释放锁,并唤醒阻塞的线程
int savedState = fullyRelease(node);//上面我们刚分析这里。继续往下面走
//这里判断了这个节点是否在同步队列中,也就是AQS队列中。如果不在,就挂起。
//那么根据这个判断来说,我们可以知道一些信息,比如:这个节点会在某个地方放到AQS同步队列中。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
....
}
....//后面的代码省略,一步一步往下走。
}- 来看看它是怎么判断节点是否在同步队列吧。
1
2
3
4
5
6
7
8final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
return findNodeFromTail(node);
}-
我们知道,这个节点,刚开始加入Condition队列时,它的waitStatus是CONDITION,那么如果
waitStatus=CONDITION
毫无疑问,他是在Condition队列,而不会在AQS队列中。 -
node.prev==null,因为AQS队列中是有前驱和后继的。刚好存放这两个的字段名一个为prev,一个为next。并且我们知道,AQS中只有第一个节点(Head指向的节点)的prev值才是null,但是如果是head指向的节点,说明已经获取了锁。对于我们来说,确实是不在AQS队列中阻塞。
-
结合上面的条件来看,如果到了执行node.next==null这个if的时候,说明node.prev一定是不等于null的。同理,node.next!=null的情况也只有在AQS中才会成立。
-
如果上面的条件都不满足,那只能使用笨办法,直接去AQS队列中寻找这个节点。也就是
findNodeFromTail()
1
2
3
4
5
6
7
8
9
10private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}- 为什么要从尾部找起呢?因为如果从头部找起,会有并发问题。
- 自旋寻找。如果找到则返回true,未找到则返回false。
-
判断分析完了,下面就是挂起线程了。我们
Condition.await()
方法阻塞前的部分就已经分析完了,下面我们开始分析signal里面是怎么唤醒现在这个线程的。Condition.await()
唤醒后的操作,我们分析完signal
后继续分析。这里先让他阻塞等待下:sleeping:
-
简单做个总结吧。
- 先获取到锁
- 创建一个Node对象,
waitStatus=-2
,放到Condition队列中。Condition队列是一个单向链表结构。 - 释放锁,唤醒在AQS队列中阻塞的线程。
- 判断该节点是否在AQS队列中,从而采取不同的操作。如果不在AQS队列中,就会阻塞。
0x03.Condition原理分析之signal
-
从上面的分析来看,我们可以得到个信息:在某个地方是会把await方法中创建的节点放到AQS队列中。那么这个某个地方是指哪个地方呢?await是不能了,前面的代码也没有看到有这个操作,所以唯一的解释就是在signal方法中。所以一起来看看吧。这里ThreadB是一定获取到了锁的。
1
2
3
4
5
6
7
8
9public final void signal() {
//判断当前获取锁的线程是不是自己。这也就是为什么在执行signal方法前要lock.lock()的原因。
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//这里firstWaiter就是指向await()方法创建的node节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}这里没啥复杂的代码,我们直接看
doSignal()
方法把。 -
doSignal()
方法代码如下:1
2
3
4
5
6
7private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}方法很简短,这里我们先不看do…while()循环体内做了些啥,我们先看下他的判断条件。
-
transferForSignal()
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15final boolean transferForSignal(Node node) {
//首先进来就是通过CAS把node的waitStatus置0.
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//这里就是把该节点加入到AQS同步队列了。里面我就不展开了,在讲AQS的时候已经分析了。
//这里返回的是该节点加入到AQS队列后的前一个节点。也就是这个节点在AQS队列中的前驱。
Node p = enq(node);
//获取前驱的waitStatus
int ws = p.waitStatus;
//判断条件,如果最终结果为true。则会去唤醒阻塞的线程ThreadA。
//但一般不会走这里直接唤醒。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}- 这里首先进来就是用CAS改变waitStatus的值,这也是为什么在await方法里面调用
isOnSyncQueue
方法判断waitStatus的值是不是Condition的原因。 - 接下来就是调用
enq()
方法把node节点加入到AQS同步队列中,并且返回这个节点的前驱节点,因为我们后面要修改该前驱节点的waitStatus为Signal(-1)。只有前驱节点为这个值时,当前节点才会去竞争锁。 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
这个条件得细说下,不然会容易晕。ws>0
: 我们可以查看下Node的状态取值,发现ws>0的情况只有CANCELLED(1)
这个状态,如果节点的waitStatus等于这个,则说明前驱已经取消获取锁了,那么这个时候,对于当前这个场景来说,是不是需要去唤醒还在阻塞的线程?!compareAndSetWaitStatus(p, ws, Node.SIGNAL)
:这里会先尝试使用CAS把前驱的状态变为-1,也就是SIGNAL状态,如果失败的话,则说明这个前驱的节点的waitStatus已经变了,所以需要唤醒正在阻塞的线程。- 总体来说,这个条件如果为true,那么则说明中间出现了什么问题,也就是出现了不是我们预料之外的东西,所以这里直接唤醒阻塞的线程。
- 这里首先进来就是用CAS改变waitStatus的值,这也是为什么在await方法里面调用
-
transferForSignal
整体我们已经分析完毕了,如果在正常的情况下,只会做如下操作:- 改变节点的状态
- 加入AQS队列,拿到它的前驱节点
- 把前驱节点的waitStatus改为-1
操作完后,我们的两个队列抽象图就变成如下样子:
接下来,我们来整体看下之前略过的do…while()循环中的代码。
-
do…while()循环
1
2
3
4
5
6
7do {
//判断当前节点是否还有下一个节点。如果没有,就把lastWaiter置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//把下一个节点置为null
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);那么到这里,这个循环在做什么事大致都理清了。它做的事如下:
- 把当前节点加入到AQS队列中
- 从Condition队列中移除当前节点。即消除前后关系,让GC回收。
-
-
那么到这里,我们
Condition.Signal()
方法已经分析完了,小小的总结下,在线程获得锁后,做了如下事情:- 判断当前获得锁的线程是不是当前线程。
- 获取当前节点的下一个节点
- 消除当前节点和下个节点的关系。即这句代码:
first.nextWaiter = null;
- 在
transferForSignal()
方法里先把当前节点的waitStatus置为0,然后加入到AQS同步队列中,最后把当前节点在AQS同步队列的前驱节点的waitStatus设置成SIGNAL(-1)
-
这里有个问题,调用
await()
方法的线程ThreadA,会在什么时候unpark(唤醒)?- 我们知道,AQS同步队列是有个唤醒机制的。即当ThreadB释放锁后,就会去唤醒队列中的下一个节点,具体的代码可以去参考下
release()
方法。这也可以说明,为什么我们上面demo的执行结果是需要等ThreadB线程执行完后,才会继续去执行ThreadA中await()
方法后面的代码。 - 所以在AQS队列中的节点,是需要等前面获取锁的节点释放锁后,才能被唤醒,然后再去竞争锁,抢到锁后,才会去执行。
- 那么,唤醒是在ThreadB unlock锁后发生,竞争锁的代码是在哪执行呢?没错,就是在
await()
方法中。下面一起来看ThreadA park后的代码把。
- 我们知道,AQS同步队列是有个唤醒机制的。即当ThreadB释放锁后,就会去唤醒队列中的下一个节点,具体的代码可以去参考下
0x04.Condition原理分析之await()—阻塞后部分
-
接着来看
await()
方法的代码。我们上次是看到了这。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //上面是看到了这,当前线程会阻塞在这里,等待唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
} -
继续往下走,我们假设现在被唤醒了,这时候就会执行if里面的代码。即
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
。我们先看checkInterruptWhileWaiting()
这个方法做了些什么,首先根据方法名来看,这个方法应该是检查我们这次唤醒是正常唤醒,还是被中断唤醒的(这部分会在后面的文章中描述)。进去瞅一眼吧。-
checkInterruptWhileWaiting(node)
源码如下:1
2
3
4
5private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}从上面这段代码来说,它首先会判断是否被中断过,如果在这期间被中断过的话,就通过
transferAfterCancelledWait()
方法来判断是抛异常还是重新中断。继续深入进去。transferAfterCancelledWait()
代码如下:1
2
3
4
5
6
7
8
9
10final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}分析这段代码前,我们先来分析下,当前会在什么地方进行中断。列举如下:
- park后,ThreadB还没执行
condition.signal()
方法,就已经被中断。 - park后,ThreadB执行了
condition.signal()
方法,然后ThreadA被中断。
针对这两种情况,这里进行了判断。是这样玩的。
-
通过
compareAndSetWaitStatus(node, Node.CONDITION, 0)
来判断中断是发生在执行signal前还是执行signal后。如果条件成立,那么肯定是在signal执行前进行的中断,更加准确的来说,是在signal执行CAS把waitStatus成功置0前发生的中断。那么如果是这种情况,就会在这里把waitStatus置0,然后把该节点加入AQS队列,以便后面的acquireQueued()
方法的执行。然后返回true;- 如果上面 条件不满足,则说明一定是在signal执行CAS成功后,才被中断的。这个时候,只要循环判断当前节点是否存在AQS队列就行了。最后返回false。
内部分析完了,那么返回true和false有什么区别呢?可以看到这个三目运算符,当返回true的时候,就会返回THROW_IE,也就是对中断的处理采取抛异常的形式,而false就是返回REINTERRUPT,重新中断。
- park后,ThreadB还没执行
-
-
while循环分析完了,接下来我们走到了这。
1
2
3
4
5
6public final void await() throws InterruptedException {
.... //已分析完
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //正在分析
interruptMode = REINTERRUPT;
....//待分析
}当我们看到
acquireQueued()
方法,是不是觉得很熟悉?没错,这个方法就是AQS竞争锁的代码。这里我也就不深入了。然后这里会有个参数为:saveState
。这个就是我们刚开始的时候获取的锁的重入次数,这里就是相当于恢复之前加锁的状态。 -
后面就剩两个if了。我们放一起分析下。
1
2
3
4
5
6
7
8public final void await() throws InterruptedException {
.... //已分析完
if (node.nextWaiter != null) //正在分析
unlinkCancelledWaiters();
if (interruptMode != 0) //正在分析
reportInterruptAfterWait(interruptMode);
....//待分析
}首先看第一个if。根据if体里面的方法,我们可以知道,该方法会找出
waitStatus=1(CANCELL)
的节点,把他们从单向链表中删除。然后第二个if,就是针对中断的处理了。我们看一眼
reportInterruptAfterWait
方法把。1
2
3
4
5
6
7
8
9
10private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}是不是一目了然,非常之简单。
-
好了,到这里,我们已经分析了Condition.await/signal的实现机制了。下面我们总结下吧。
0x05.总结
-
简单再大体的概述下Condition的实现
-
首先不管是先执行await还是signal,都需要先获取锁。
-
await方法做了如下几件事:
- 创建一个节点,并且加入到Condition队列中。
- 释放锁
- 判断当前节点是否在AQS队列中。如果不在,则阻塞。
- 被唤醒后,首先判断是否是正常被唤醒的,如果是被中断唤醒的,则视是否把节点加入了AQS队列而采取不同的操作。
- 调用
acquireQueued()
竞争锁。 - 移除
waitStatus=CANCELL
的节点。 - 处理中断
-
signal方法做了如下几个事:
- 判断当前获取锁的线程是否为当前线程。
- 修改Condition队列中的节点的waitStatus为0
- 加入到AQS队列
- 修改节点在AQS队列中的前驱的waitStatus为-1(SIGNAL)
- 从Condition队列中移除当前节点
-
注意:await方法阻塞后唤醒,是通过AQS中的唤醒机制。也就是当上一个获取锁的线程释放锁的时候才会被唤醒,也不是执行完signal后立马就会被唤醒,而是按照AQS的机制一个节点接着另一个节点唤醒,然后竞争锁成功后才会继续执行。
-
-
Condition与wait/notify的区别
-
Condition是并发包下面的,而wait/notify是java中的。
-
Condition基于Lock,而wait/notify基于Synchronized。
-
Condition比较轻量,性能比wait/notify要高。
…
-
-
当你掌握了Condition,你再去看wait/notify的实现,会发现两者之间思想的共同之处。
-
才疏学浅,如有不当之处,望指出,感激不尽。