上篇我们说到Synchronized的原理,那么接下来我们谈谈并发相关的其他东西。在这一篇我们会说到如下内容:
1.wait/notify的使用(生产者/消费者)以及原理
2.为什么Thread类中的一些方法会抛出InterruptedException
3.join的原理
话不多说,开整。
0x1 wait/notify
- wait/notify初认识(也就是J.U.C下的Condition类)
- wait/notify是两个方法, 定义下Object类中。
- 两个方法的调用都需要放在Synchronized同步代码块中,不然会抛出
IllegalMonitorStateException
- wait方法会释放锁,然后把当前线程放在等待队列中阻塞。直到notify/notifyAll方法执行后,才被唤醒
- notify、notifyAll都是唤醒在等待队列中的线程。notify是唤醒一个,notifyAll是唤醒所有。
- wait/notify经典案例:生产者/消费者
- Provider:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public class Provider implements Runnable{
/**
* 容器
*/
private Queue<String> queue;
/**
* 容器最大的容量
*/
private int maxSize;
public Provider(Queue<String> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}
public void run() {
//死循环往queue中生成数据
while(true){
//wait/notify需要加锁
synchronized (queue){
if(queue.size() == maxSize){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//休息几秒
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//往queue里面生产数据
String data = UUID.randomUUID().toString();
System.out.println("生产:"+data);
queue.add(data);
//因为生产了一个数据,所以需要唤醒下消费者线程去消费
queue.notify();
}
}
}
} - Comsumer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public class Comsumer implements Runnable {
/**
* 容器
*/
private Queue<String> queue;
/**
* 容器最大的容量
*/
private int maxSize;
public Comsumer(Queue<String> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}
public void run() {
//死循环,不断往容器中消费数据
while(true){
//wait/notify需要加锁
synchronized(queue){
if(queue.isEmpty()){
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//休息几秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//这里说明queue里面不为空。则取数据
String remove = queue.remove();
System.out.println("消费:"+remove);
//因为我们这里消费了一个,已经不是满的情况了,所以需要唤醒生产者去生产
queue.notify();
}
}
}
} - Main:
1
2
3
4
5
6
7
8
9
10public class Main {
public static void main(String[] args) {
Queue<String> queue = new LinkedList<>();
int maxSize = 10;
Thread provider = new Thread(new Provider(queue,maxSize));
Thread comsumer = new Thread(new Comsumer(queue,maxSize));
provider.start();
comsumer.start();
}
}
- Provider:
- 下面我们来看看
wait/notify
底层的代码- wait
- java层面 简单粗暴,直接调JVM虚拟机里面的代码。下面我们去一探究竟。
1
2
3
4public final void wait() throws InterruptedException {
wait(0);
}
public final native void wait(long timeout) throws InterruptedException; - JVM层面
- 上篇我们说Synchronized锁升级时,有说过一个同步队列的定义,是在
objectMonitor.hpp
中。然后在讲Condition的时候,提到了两个链表,一个是等待队列链表,一个是同步队列。当时也说,Condition的实现原理和wait/notify是差不多的。既然这样,那他肯定也有一个等待队列,定义同样是在objectMonitor.cpp
中。如下:
- 继续往下说,我们的wait、notify方法的实现,也是在这个类中。
- 我们挑着几个地方来说,毕竟C++我也不太会。。。
- 为什么wait需要获取到锁?如果没有同步代码块包裹还会抛出异常?,这段在源代码层面有体现,如下:
- 包装成Node对象,加入到等待队列中的代码:
- 好了,后面的代码看不懂了。原谅我菜。。。
- 为什么wait需要获取到锁?如果没有同步代码块包裹还会抛出异常?,这段在源代码层面有体现,如下:
- 上篇我们说Synchronized锁升级时,有说过一个同步队列的定义,是在
- java层面
- notify
- java层面
1
public final native void notify();
- JVM层面
- 和wait一样,我们直入主题,稍微浏览一下,C++实在是看不懂,请见谅。
- 开局也是直接检查是否获取锁
- 然后从等待队列中获取node
- 最后加入到同步队列(只是一部分代码)
- 开局也是直接检查是否获取锁
- 和wait一样,我们直入主题,稍微浏览一下,C++实在是看不懂,请见谅。
- java层面
- wait
- 总结一下
- 开局来张图
- 剩下全靠编
- 拿上面那个生产者消费者模型作为例子来说吧。简化一下,Provider的线程叫P,Consumer的线程叫C.
- 假设P先获取锁,于是进入Synchronized体,判断queue是否以满,不为空就往里面加东西。然后释放锁。假设线程P的运气好,一直是他抢先获取锁。于是就一直往里面add东西,直到到了maxSize的限制。于是会调用wait()方法,进行等待。在wait方法内部,就会把这个线程加入到queue对象的等待队列中(因为调用的是
queue.wait()
),线程挂起,释放锁,等待唤醒。 - 这时候,就没线程来和线程C来竞争了。C顺理成章的会拿到锁,进入Synchronized体执行代码。线程C从queue里面拿出一个元素后,调用了
queue.notify()
,注意,这里和Condition
有点不一样,Condition
把等待队列中的第一个节点移到queue对象的同步队列,排队获取锁,而notify
是在等待队列中随机挑一个移到queue对象的同步队列,然后线程C执行完代码后,会释放锁。这时候线程P和线程C又各凭本事来竞争锁了(竞争失败还是会在同步队列中等待)。
- wait/notify和sleep的区别
- wait/notify会释放锁,而sleep不会。这是他们最大的区别
- 开局来张图
0x02 interrupted()和interrupt()方法
- 先提2个问题,然后我们根据这2个问题来展开了解下。
- interrupted()和interrupt()方法的区别与联系
InterruptedException
是什么?为什么涉及等待的方法,如sleep、wait方法都会抛出这个异常。碰到这个异常应该怎么做?
- 首先来看第一个问题,interrupted()和interrupt()方法的区别与联系;
- 简介下interrupt吧。interrupt是一个标记,标记该线程是否在等待的过程中被中断过,也就是是否被打断过,和唤醒不同,唤醒是正常流程,而中断是异常流程。举个例子说,你睡觉睡到自然醒,恩这个是正常的,但是你睡到中途,被人叫醒,这就是中断。
- 那么来说一下interrupted()和interrupt()方法分别是什么吧。
- interrupted()方法
- 借鉴下jdk doc上面的说法
Tests whether the current thread has been interrupted.
,翻译过来就是调用这个方法,可以获取当前线程是否被中断,并且重置为未中断状态。那么它是怎么判断是否被中断呢?这个方法签名是这样的:1
2
3
4
5
6public static boolean interrupted() {
//如果被中断过,则返回true,如果没被中断过,则返回false
//这里传了true。所以每次获取完后,都会重置interrupted为false
return currentThread().isInterrupted(true);
}
private native boolean isInterrupted(boolean ClearInterrupted); - 于是我们又得去翻JVM源码了。源码是在
thread.cpp
中。代码如下:1
2
3
4
5
6
7bool Thread::is_interrupted(Thread* thread, bool clear_interrupted) {
trace("is_interrupted", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
// Note: If clear_interrupted==false, this simply fetches and
// returns the value of the field osthread()->interrupted().
return os::is_interrupted(thread, clear_interrupted);
} - 因为各个系统的差别,所以
os::is_interrupted()
有多种实现,我们就拿在Linux下面的实现来说事吧,代码是在os_linux.cpp
中。
- 从上面看,最终的值是指向osthread里面的interrupted,点过去看一下,代码如下:
1
volatile bool interrupted() const { return _interrupted != 0; }
- 从这里就看出来了,最终获取到的是
_interrupted
这个属性的值。 - 然后重置interrupted的值,也可以从
os_linux.cpp
中的代码看的出来。如果被中断过,并且clear_interrupted=true
的话,会设置_interrupted
为false
- 借鉴下jdk doc上面的说法
- interrup()方法
- 还是先从java层面看一眼源码,然后再去JVM的源码里瞅瞅吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); //设置中断
b.interrupt(this);
return;
}
}
interrupt0();
}
private native void interrupt0(); - 现在我们去JVM源码里看看interrupt0()是怎么实现的吧。
- 刚开始也是先从
thread.cpp
中看起,代码如下:1
2
3
4
5
6void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debug_only(check_for_dangling_thread_pointer(thread);)
//调用各个系统的interrupt方法
os::interrupt(thread);
} - 接下来我们还是以
os_linux.cpp
为例,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threads_lock->owned_by_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
//如果线程没有被中断,就执行中断
if (!osthread->interrupted()) {
//设置_interrputed标志位的值为1
osthread->set_interrupted(true);
OrderAccess::fence();
//唤醒sleep的线程
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
// For JSR166. Unpark even if interrupt status already was set
//如果是java的线程,则唤醒同步代码块和wait()
if (thread->is_Java_thread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
//设置标识
void set_interrupted(bool z){ _interrupted = z ? 1 : 0; }
- 刚开始也是先从
- 还是先从java层面看一眼源码,然后再去JVM的源码里瞅瞅吧。
- 总结一下:
- interrupted()方法是获取该线程是否被中断过,获取后,会把中断状态置为非中断。
- interrupt()方法是置该线程的中断标识为中断。并且会唤醒所有处于阻塞状态的线程。
- 两者的关系显而易见了,可以简单的这样理解,一个是get,一个是set。
- interrupted()方法
InterruptedException
简单介绍InterruptedException
是所有的涉及阻塞的方法都会抛出的一个异常。那么为什么会抛出这个异常呢?这个异常和上面所说的interrupted()
以及interrupt()
有什么联系呢?- 假设有两个线程,P1,P2。首先我们应该知道,如果在P2中调用P1的
interrupt()
方法,这时候就会把P1的_interrupted
标识置为1,并且会唤醒P1线程中正在阻塞的方法。 - 那么为什么呢?首先我们知道,这些抛异常的方法都是阻塞的,而阻塞方法的释放会取决于一些外部的事件,但是阻塞方法可能因为等不到外部的触发事件而导致无法终止,所以它允许一个线程请求自己来停止它正在做的事情。当一个方法抛出
InterruptedException
时,它是在告诉调用者如果执行该方法的线程被中断,它会尝试停止正在做的事情并且通过抛出InterruptedException
表示提前返回。所以,这个异常的意思是表示一个阻塞被其他线程中断了。然后,由于线程调用interrupt()
中断方法,那么Object.wait
、Thread.sleep
等被阻塞的线程被唤醒以后会通过is_interrupted
方法判断中断标识的状态变化,如果发现中断标识为true
,则先清除中断标识,然后抛出InterruptedException
。 - 那么如何处理呢?这里需要注意,InterruptedException 异常的抛出并不意味着线程必须终止!所以,你可以做以下操作:
- 直接捕获异常不做任何处理
- 将异常往外抛出
- 停止当前线程,并打印异常信息
- 下面我们以Thread.sleep为例,看一下他是怎么抛出异常的。源码位置是在
share\vm\prims\jvm.cpp
1
2
3
4
5
6
7
8
9
10
11JVM_ENTRY(void, JVM_Sleep(JNIEnv* env, jclass threadClass, jlong millis))
JVMWrapper("JVM_Sleep");
if (millis < 0) {
THROW_MSG(vmSymbols::java_lang_IllegalArgumentException(), "timeout value is negative");
}
//这里判断是否被中断
if (Thread::is_interrupted (THREAD, true) && !HAS_PENDING_EXCEPTION) {
THROW_MSG(vmSymbols::java_lang_InterruptedException(), "sleep interrupted");
}
....
- 整个interrupt就分析到这里了。有错误的地方,还望不吝指教。
0x03 join简析
- join方法简介
- 首先join方法是Thread类中的方法。
- join方法的作用是等待该线程结束,再往下执行。这样说法可能有点不清晰,引用
Java 7 Concurrency Cookbook
上的定义:join() method suspends the execution of the calling thread until the object called finishes its execution.
也就是说,join方法会暂停调用该方法的线程,直到被调用方执行完成后才会继续往下执行。这样说好像也有点抽象,下面我们以一个例子来说明。 - 一个简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27public class JoinTest {
public static void main(String[] args) {
//首先创建1个线程.
Thread t1 = new Thread(new Runnable(){
public void run() {
System.out.println("我是线程T1,接下来我会睡2秒钟,让出CPU的执行权");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是线程T1,我已经睡醒了,继续往下执行");
}
});
t1.start();
System.out.println("我是主线程,我下面不知道要干啥事。");
try {
//调用t1的join方法。
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是主线程,我啥事也没干,一脸懵逼的走了。");
}
} - 结果如下:
1
2
3
4
5
6
7
8
9我是主线程,我下面不知道要干啥事。
我是线程T1,接下来我会睡2秒钟,让出CPU的执行权
我是线程T1,我已经睡醒了,继续往下执行
我是主线程,我啥事也没干,一脸懵逼的走了。
-----------------------下面是把join方法注释的结果----------------
我是主线程,我下面不知道要干啥事。
我是主线程,我啥事也没干,一脸懵逼的走了。
我是线程T1,接下来我会睡2秒钟,让出CPU的执行权
我是线程T1,我已经睡醒了,继续往下执行 - 对比可见,线程T1睡觉的2秒钟内,是不会执行主线程的。因为在主线程中调用T1的join方法,是需要等待T1线程执行完后,才会去执行主线程。那么为什么会这样?下面就让我们一起来简单分析下。
- join方法实现简单分析
Thread.join()
方法,我们还是拿上面的例子来说,源码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public final void join() throws InterruptedException {
join(0);
}
//这里需要注意一下 synchronized
public final synchronized void join(long millis) throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
//自旋判断当前线程T1是否还存活。如果存活就调用wait方法。
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}- 下面我们来整理下需要注意的点:
- 首先是join方法的签名,是有带
synchronized
的。我们知道Synchronized一定会有个对象保存锁状态,那么这里的synchronized的对象是谁?而加在方法上的,他的对象是this,那么这个this又是谁?恩,这个this就是T1,因为我们调用的t1.join()
。 - 然后自旋判断是否存活,注意,这里判断的也是T1线程是否存活,然后调用
wait()
方法。此时需要注意一下,wait()
方法是Object类中的方法。原理我们上面也分析了。总得来说,wait()
方法会释放锁,并且把当前线程加入到等待队列里。这里又得注意了,当前线程指的不是T1哦,而是我们调用t1.join()
方法的线程,也就是主线程!所以执行到wait()
,我们的主线程自然而然的会加入到T1线程对象的等待队列中,也就是主线程阻塞了。 - 那么既然有
wait()
,肯定会在一个地方进行notify()
,不然主线程就会一直阻塞在那里,就让我们找找notify()
吧。其实这个notify是会在线程退出的时候进行调用的。不信,看源码,thread.cpp
有如下方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19void JavaThread::exit(bool destroy_vm, ExitType exit_type) {
assert(this == JavaThread::current(), "thread consistency check");
....
//就是在这里执行的notify()
ensure_join(this);
assert(!this->has_pending_exception(), "ensure_join should have cleared");
.....
}
static void ensure_join(JavaThread* thread) {
Handle threadObj(thread, thread->threadObj());
assert(threadObj.not_null(), "java thread object must exist");
ObjectLocker lock(threadObj, thread);
thread->clear_pending_exception();
java_lang_Thread::set_thread_status(threadObj(),java_lang_Thread::TERMINATED);
java_lang_Thread::set_thread(threadObj(), NULL);
//在这里调用notifyAll
lock.notify_all(thread);
thread->clear_pending_exception();
} - 从上面可以知道,在T1线程退出的时候,会唤醒T1的等待队列中所有的node。从而在T1执行完后,主线程才能继续往下执行。
- 首先是join方法的签名,是有带
04.总结
- 今天这篇篇幅还是挺长的,但是总得来说,我们说了如下几点:
- wait/notify的原理,可以说实现原理和Condition是差不多的。
- interrupted、interrupt、interruptException的联系,通过分析我们知道了,线程中断是通过底层有个
_interrupted
的字段来标识的。 - 简单分析了join的实现原理。
感谢大家观看,才疏学浅,不当之处,请不吝赐教。谢谢~
评论