前面我们稍微的过了一下线程池的原理,但是没有深入去解读它的源码,所以这次就让我们深入一下,一起看看线程池内部的实现。
在并发这块,停留的时间有点久,所以想把这一篇作为并发的终章,但终章并不是终止。
后面再有涉及并发编程这块的内容,会以番外的形式提供。
好了,话不多说,开整吧。
注意:该篇文字较多,可能会有点恶心~
0x01 回顾线程池原理
- 这里我们简单回顾下线程池的原理。具体见Java并发之线程池&Callable
- 先来张图,然后对着图来说事,后面我们分析源码的时候也会按照这个图来解说。
- 从上面看,我们调用ThreadPoolExecutor.execute()时,会传递一个Runnable的实例,这个实例就是上图说的一个任务。
- 当我们提交一个任务时,首先会判断当前工作的线程数是不是大于核心线程数(corePoolSize),如果是false,则说明还可以继续创建核心线程数,所以就会新建一个线程,执行我们的任务,相反,如果是true,则说明现在核心线程数已经满了,不能再创建核心线程数了,所以只能把该任务放到队列。
- 既然要把任务放到队列,那么这时候又会有两种情况,一种是队列已满,一种是队列未满。对于这两种情况,会做如下处理
- 队列已满:
- 判断当前工作线程是否达到最大线程数。
- 队列未满:
- 加入队列
- 队列已满:
- 加入队列这种情况我们就不说了,我们说下队列已满的情况。看图是说,会判断当前工作线程是否达到最大线程数。这里也有两种情况:
- 工作线程<最大线程数
- 创建新线程,只不过这里的线程是临时工,任务执行完后,如果在一定时间内(keepAliveTime)没有被使用,就会被销毁。
- 工作线程>最大线程数
- 直接拒绝该任务。
- 工作线程<最大线程数
- 以上就是整个线程池当一个新任务来的时候执行的流程。下面我们在源码中来一一验证。
0x02 从ThreadPoolExecutor的局部变量说起
- 我们点开
ThreadPoolExecutor
这个类,在类的开头,会发现有如下的定义(可能会有点晕。如果晕的可以直接看结论):我们一个一个来说,顺序可能和上面不一样。1
2
3
4
5
6
7
8
9
10
11
12
13private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; } - 先看字段定义
private static final int COUNT_BITS = Integer.SIZE - 3
- 首先,这里的结果是29.也就是32-3.那么这里为什么要用29呢?为啥不直接用32?后面我们再说。
private static final int CAPACITY = (1 << COUNT_BITS) - 1
- 从字段名来看,这个变量代表的是最多支持的线程数目,也就是我们创建的线程不能超过这个数字。
- 这里一顿操作后,结果是536870911。我们单纯看10进制,是发现不了什么鬼的。但是我们搞成二进制看看,二进制为:
0001 1111 1111 1111 1111 1111 1111 1111
。 - 我们可以猜想下,它后面会用二进制来计算线程的数目。至于是怎么做的,我们后面分析。
private static final int RUNNING = -1 << COUNT_BITS
- 操作后的结果是-536870912。这里我们也需要看他的二进制.二进制补码表示如下:
1110 0000 0000 0000 0000 0000 0000 0000
- 这里就是定义线程池RUNNING状态的常量。
- 操作后的结果是-536870912。这里我们也需要看他的二进制.二进制补码表示如下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))
- 这里是初始化一个原子类,初始值是通过一个ctlOf()方法计算出来的。等后面说到这个方法时在聊。
private static final int SHUTDOWN = 0 << COUNT_BITS
- 没错啦,这是定义线程池SHUTDOWN状态的常量
private static final int STOP = 1 << COUNT_BITS
- 定义线程池STOP的常量
private static final int TIDYING = 2 << COUNT_BITS
- 定义线程池TIDYING的常量
private static final int TERMINATED = 3 << COUNT_BITS
- 定义线程池TERMINATED的常量。
- 从上面可以看到,状态的常量定义中只有RUNNING常量是<0的,其他状态都是>0的。这点在后面会用到。
- 再看方法声明
private static int ctlOf(int rs, int wc) { return rs | wc; }
- 这里的操作就是按位或了一下。
- 然后我们来看一下上面遗留的问题。在初始化原子Integer类时,调用了此方法,
ctlOf(RUNNING, 0)
。结果是-536870912。即RUNNING状态的常量,转成二进制也就是1110 0000 0000 0000 0000 0000 0000 0000
private static int runStateOf(int c) { return c & ~CAPACITY; }
- 首先我们可以从名字得出,这里是返回线程池的运行状态。
- 然后我们看他是怎么得到线程池状态的。上面我们有说过CAPACITY的值,那么c的值是什么?我们往后翻翻代码,可以得出c是ctl的值。那么初始状态下,我们知道ctl的值就是
1110 0000 0000 0000 0000 0000 0000 0000
。进行runStateOf()方法操作后,得出的值是1110 0000 0000 0000 0000 0000 0000 0000
。也就是RUNNING状态 - 那么我们在最前面也说了,COUNT_BITS为什么要取32-3呢?是因为要用高3位作为线程池的状态,为什么是3呢?因为现在线程池已知有5种状态,而如果取2的话,只能最大表示2^2=4种,所以只能取3,这样就能最大表示2^3=8种状态。
private static int workerCountOf(int c) { return c & CAPACITY; }
- 根据名称可以得出,这是获取工作线程的数量。也就是当前正在跑的线程的数量。
- 上面我们知道高3位是表示线程池状态,那么剩下的29位呢?没错啦,就是表示工作线程的数量。new一个线程,就会+1.而用这个方法,是可以得出当前工作的线程。这里的c的取值也是原子类ctl的值。在初始状态在,ctl的值是
1110 0000 0000 0000 0000 0000 0000 0000
,那么经过运算后,得到的结果就是0.后面会对ctl的值进行自增、自减,然后再通过该方法,就可以得到线程池中运行的线程数量。
- 好了,最开头的定义和声明就说到这里了。总结下:
- 翻译了下ThreadPoolExecutor开篇的注释。也就是对这些字段的解释,加工解释如下:
- 线程池控制状态ctl是一个整合了两个概念的字段,即指示线程的有效数量的原子整数:workerCount,和指示线程池是否运行,关闭等状态的runState。(解释了ctl变量)
- 为了将它们打包为一个int,我们将workerCount限制为(2 ^ 29 )-1(约5亿)个线程,而不是(2 ^ 31)-1(20亿)可以表示的线程。如果将来有问题,可以将变量更改为AtomicLong,并在以下调整shift / mask常数。但是在需要之前,使用int可以使此代码更快,更简单。 (解释了COUNT_BITS和CAPACITY)
- workerCount是已被允许启动但不允许停止的工人数。该值可能与活动线程的实际数量暂时不同,例如,当ThreadFactory在被询问时未能创建线程,并且退出线程仍在终止之前执行簿记操作时。用户可见池大小报告为工作集的当前大小。
- runState提供了 主要生命周期控制,采用以下值:
- RUNNING:接受新任务并处理排队的任务
- SHUTDOWN:不接受新任务,但处理排队的任务
- STOP:不接受新任务,不处理排队的任务并中断进行中的任务
- TIDYING:所有任务都已终止,workerCount为零,线程转换为状态TIDYING时将运行terminated()方法
- TERMINATED:terminald()方法执行完成
- 这些值之间的数字顺序很重要,可以进行有序比较。
- runState随时间增加,但不必达到每个状态。转换规律是:
- RUNNING -> SHUTDOWN:
- 在调用shutdown()时,可能隐式在finalize()中
- (RUNNING或SHUTDOWN) -> STOP:
- 在调用shutdownNow()时
- SHUTDOWN -> TIDYING:
- 当队列和池都为空时
- STOP -> TIDYING:
- 当池为空时
- TIDYING -> TERMINATED:
- 当Terminated()挂钩方法完成时
- RUNNING -> SHUTDOWN:
- 状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。检测从SHUTDOWN到TIDYING的转换比您想要的要简单,因为在SHUTDOWN状态期间队列可能在非空之后变空,反之亦然,但是只有在看到它为空后,我们看到workerCount为0时,我们才能终止 (有时需要重新检查-参见下文)。
- 用我们的大白话总结一下开头的方法和定义吧。
- 作者使用一个int值(ctl)来整合线程池状态和线程数量。其中高3位用来标识线程池状态,而低29位用来表示线程数量。后面创建线程和销毁线程都会对该值操作。
- runStateOf()方法:获取当前线程池的状态
- workerCountOf()方法:获取当前线程池中工作的线程数量
- ctlOf()方法:整合线程池状态和线程池数量。也就是合并结果。
- 翻译了下ThreadPoolExecutor开篇的注释。也就是对这些字段的解释,加工解释如下:
0x03 从ThreadPoolExecutor.execute入手
- 上面我们简单分析了下开头的定义。那么现在我们来看一下,我们构造出一个线程池后,往池中扔Runnable任务时(调用execute方法),是怎样的处理逻辑吧。记住参照开局的那张图。
- 先整体浏览一下execute方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}- 上面的代码可以对照开局图来看。图上的点都可以在代码中得到印证。下面我们简单的对照代码说一下流程
- 首先判断任务是不是null。如果是null就抛出异常。
- 然后使用
ctl.get()
获取ctl的值。ctl的含义上面也说了,就是高三位是线程池状态,低29位为工作线程的数量。 - 再然后通过
workCountOf()
计算出当前工作线程的数量。如果当前工作线程的数量<核心线程数。就会执行addWorker()
方法,添加一个Worker对象到Set中(这个后面会看到)。这里面的代码待会我们分析。 - 如果addWorker()返回true,则说明新建一个Worker对象成功了,也就是把任务添加到了线程池。
- 接下来在看如果工作线程数量>核心线程数,或者创建一个worker对象,加入Set失败了,会做什么事。
- 判断线程池是否还在运行。然后把任务加入到队列。
- 把任务加入到队列后,判断下线程池的状态,如果不是运行状态,就移除刚刚加入的节点。然后拒绝任务。
- 如果线程池中没有工作的线程,则会默认创建一个线程。
- 如果加入队列失败,也就是队列满了的话,就会调用addWorker方法,尝试另起核心线程之外的线程。如果失败则拒绝任务。
- 上面的流程总结来说,就是先判断当前工作线程是否<核心线程。如果是小于,就创建一个线程跑任务。如果大于,就会加入到阻塞队列。如果阻塞队列满了,就会判断总工作线程数是否大于最大线程数。如果大于就拒绝任务。如果小于就新开线程执行。
- 上面的代码可以对照开局图来看。图上的点都可以在代码中得到印证。下面我们简单的对照代码说一下流程
- 深入addWorker方法。
- 首先我们先看方法签名:
private boolean addWorker(Runnable firstTask, boolean core)
- 从方法名来看,是add一个Worker。而Worker是什么?我们待会分析。
- 从参数上看:
- 第一个参数是firstTask。表示这个Worker第一个执行的任务是什么。
- 第二个参数是core。表示下面做线程数的判断是判断核心线程数还是最大线程数。true表示核心线程数。
- 主体代码有点长。我们分段来说明。首先看第一段代码。分析见注释
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43//这个是类似于goto的写法我们暂时往下看。后面会说到
retry:
//一个自旋
for (;;) {
int c = ctl.get(); //获取ctl的值
//获取当前线程池的运行状态。这个方法我们最开始时有说到。
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//上面这句翻译为:仅在必要时检查队列是否为空。
//这块我们先不管,跳过。总得来说是做一些线程池状态和队列的一些合法判断。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//又一个自旋
for (;;) {
//获取当前的工作线程数。
int wc = workerCountOf(c);
//和CAPACITY比较。这个字段我们上面已经说过了。最大是(2 ^ 29 )-1
//然后根据core的值选择比较核心线程数还是最大线程数。
//总的来说,这里是判断当前工作线程数是否超过核心线程数或者最大线程数或者最大的容量数。
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
//超过了返回false
return false;
//这里是使用cas来使工作线程数+1.如果成功的话就执行break retry.
if (compareAndIncrementWorkerCount(c))
//这里也就是跳到retry:标记处。注意,break的意识是跳到retry:处,
//不需要再进最外层for循环了。
break retry;
//如果使用cas对线程数+1失败的话,就重新获取一次ctl的值。
c = ctl.get(); // Re-read ctl
//如果线程池状态改变了,就会执行continue retry
if (runStateOf(c) != rs)
//continue retry也是跳到retry标记处。但是他和break不同,
//break是不会再进最外层for循环,而continue是会重新进入最外层的for循环。
continue retry;
//如果仅仅是cas失败,则会继续自旋cas。直到成功。
}
}
.............//以下代码省略。留到下面分析- 到这,上面这一段代码已经分析完了。这里总计一下。上面代码做了一些什么事;
- 判断线程池状态,也就是线程池的合法判断
- 自旋获取工作线程数和核心线程数或者最大线程数做比较。如果小于,则会自旋使用cas为工作线程数+1。
- 以上就是上面代码的总结。
- 到这,上面这一段代码已经分析完了。这里总计一下。上面代码做了一些什么事;
- 前面总得来说就是合法性检测。如果不被return,就说明合法了。继续执行下面的代码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57//这两个变量见名知意。就不多说了
boolean workerStarted = false;
boolean workerAdded = false;
//重头戏来了,方法名是addWorker。就是添加这个Worker对象。那么对象出来了,往哪里加?
Worker w = null;
try {
//这里创建一个worker对象。把这个worker对象第一个要执行的任务放进去。
//worker的构造方法我们下面分析
w = new Worker(firstTask);
//这里是取worker的thread属性。那么可以断定,worker这个对象里面肯定有一个线程。
final Thread t = w.thread;
if (t != null) {
//获取重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
//重新获取一下线程池运行状态。不用想,下面肯定又需要合法检查。
int rs = runStateOf(ctl.get());
//如果运行状态是小于SHUTDOWN的。也就是小于0的情况,条件才会成立。
//或者状态是等于0,并且该Worker对象第一个需要执行的任务为空。
//在整个线程池状态中,只有运行状态(RUNNING)才会小于0
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//这里是判断t是不是已经启动了。如果启动了就需要抛出异常。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//上面我们说了方法是addWorker。对象有了。那么add到哪呢?
//看这里就知道了。是add到workers里面。那么workers是什么呢?
//我们点进去看发现workers的签名是这样的:
//private final HashSet<Worker> workers = new HashSet<Worker>();
//所以我在最开头说是把对象add到Set里。这里是HashSet。
workers.add(w);
//为largestPoolSize(当前池中Worker的大小)赋值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//到这,说明任务已经算是添加完了。
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果worker添加成功,就启动Worker对象里面的线程。置线程启动为true。
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果线程没启动。就执行addWorkerFailed。我们下面再看。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;- 这里我们还得先看一下Worker的构造函数。代码如下:
1
2
3
4
5
6
7
8
9Worker(Runnable firstTask) {
//按照官方注释,这一步是中断设置为禁止,直到runWorker方法执行
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//这里会调用ThreadFactory创建一个线程。需要注意的是,newThread传的参数是this。
//那么这个this指的是什么?指的是Worker对象。那么可以知道Worker也实现了Runnable接口。
//由此可知,线程启动的时候,执行的run方法是执行的Worker对象中重写的run方法。
this.thread = getThreadFactory().newThread(this);
} - 然后我们还看一下addWorkerFailed()方法。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void addWorkerFailed(Worker w) {
//首先获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//失败后,从workers里面移除掉这个任务
if (w != null)
workers.remove(w);
//当前工作线程数-1
decrementWorkerCount();
//尝试把线程池状态切换为Terminate
tryTerminate();
} finally {
mainLock.unlock();
}
} - 到这里,我们来总结一下这段代码的含义吧。
- 简单点就是创建一个Worker对象,然后把Worker加入到HashSet中,并且启动该对象内部创建的线程。
- 如果启动线程失败,就会执行addWorkerFailed方法,从HashSet中移除该Worker对象。并将当前工作线程数-1
- 这里我们还得先看一下Worker的构造函数。代码如下:
- 到此,我们addWorker方法已经分析完毕了。不考虑里面的合法检查(校验线程池状态)。整体代码做了如下几件事:
- 自旋取当前工作线程数和核心线程数或者最大线程数(取决于addWorker的第二个参数)比较。如果大于的话就return,小于的话就使用cas让工作线程数+1.如果这一步成功的话,就跳出自旋。往下执行。
- 创建一个Worker对象,把它加入到HashSet里。如果加入成功,就启动该Worker对象内部的线程。如果启动失败,就从HashSet中移除掉这个对象,并且让工作线程数-1。
- 首先我们先看方法签名:
- 到此,我们execute方法里一些重要的东西都分析完了。总结的话,就是开局的那张图。下面我们来看看线程池是怎么运行我们调用execute方法时传的Runnable的实现类的。
0x04 线程池是怎么运行我们的Runnable实现类的?
- 我们在上面分析addWorker方法的时候,在最后有个worker内部线程.start().而这个内部线程是在创建Worker对象时使用ThreadFactory创建的,并且传的Runnable是this。也就是Worker对象。那么我们就先从Worker对象的run方法入手。
- Worker.run()的源码如下:
1
2
3public void run() {
runWorker(this);
}- 由上可知,他里面调用了runWorker方法。传参是this。也就是Worker对象。深入一下runWorker方法把。
- runWorker方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55final void runWorker(Worker w) {
//获取当前线程,也就是当前调用runWorker方法的线程
Thread wt = Thread.currentThread();
//获取Worker里面第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
//释放锁。这里为什么要这样呢?因为我们在创建Worker对象时,使用了一个setState(-1).
//看到这个state,再加上Worker实现了AQS,可以知道这个state就是锁标识。设置成-1.所有的线程都获取不到锁。
//在创建Worker对象的时候,官方也注释了。
w.unlock();
boolean completedAbruptly = true;
try {
//死循环跑任务。假设Worker的firstTask有值,则在跑完后,会不断从HashSet中取任务。也即是getTask()方法。
while (task != null || (task = getTask()) != null) {
//加锁,当我们使用shutdown方法关闭线程时会尝试获取w的锁。
//这里的目的就是为了shutdown时不中断线程。让任务跑完。
w.lock();
//一系列合法验证
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//beforeExecute是一个空方法,我们可以重写它,在线程执行前做一些事。
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用任务的run方法。注意。这里是调run方法。而不是start方法。
//start方法是会令启线程。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//和beforeExecute一样,可以重写它,在线程执行完毕后做一些事。
afterExecute(task, thrown);
}
} finally {
task = null;
//记录当前Worker执行过的任务数。
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//当池中的任务执行后,或者有异常情况的时候,Worker的退出处理。
processWorkerExit(w, completedAbruptly);
}
}- 从上可知,每个Worker都会先执行对象里面的任务,然后再从HashSet中获取任务执行。这里有一点需要注意,在执行的时候是直接调用Runnable的run方法,而不是Thread的start方法。
- runWorker方法从全局上大家也有了一点底。现在我们局部去分析runWorker里面调用的两个方法。
- getTask方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45//标识从阻塞队列中取任务是否超时
boolean timedOut = false;
for (;;) {
//获取线程池的状态信息
int c = ctl.get();
int rs = runStateOf(c);
//如果线程池状态是shutdown并且阻塞队列是空的。就返回null。
//因为在runWorker里,getTask是null,就不会进while直接走下面的流程结束runWorker方法。
//也就是结束线程的run方法。最终该线程会被销毁。所以这里会把工作线程数-1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//如果上面不成立,则获取工作线程的个数
int wc = workerCountOf(c);
//判断工作线程是否大于核心线程数,或者是否允许核心线程数超时销毁
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//这里是超时机制的判断,也就是如果队列空闲,并且工作线程比核心线程大的时候,就会进入if
//把工作线程-1,并且return null。同样在runWorker方法里不进while循环,直接结束方法,销毁线程
//标记为(***)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//这里timed有两种情况。一种是工作线程数>核心线程数,一种就是允许核心线程数超时销毁。
//如果这两种情况满足其中一个就用poll。也就是如果取不到,会等一段时间再取。如果还没有,
//下面就会把timeOut置true,表示获取超时了,于是会走上面那个标记为(***)的if。后续销毁线程。
//如果不是这两种情况,也就是当前的线程是算作核心线程,并且也没开启核心线程过期。
//这时候就使用take(),直到获取到了任务才会返回。不然就会一直阻塞。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}- 稍稍总结下:在getTask里面会判断当前工作线程数是否大于核心线程数。在这里实现非核心线程的超时检测和处理。除去这些,最根本的就是从队列里获取Runnable任务。
- processWorkerExit方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32//completedAbruptly只有在执行任务时出错了才会为true。如果是正常执行,completedAbruptly=false
if (completedAbruptly) {
//如果执行任务出错,则在这里把工作线程数-1.
//如果是一路执行下来没错误,那么到最后这个线程数-1会在getTask方法里执行.见上面分析
decrementWorkerCount();
}
//获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//计算一下完成的任务数。并且在HashSet里移除对应的Worker对象
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
//尝试去把线程池状态置为Terminate。我们暂时不做分析,在分析线程池的关闭时再谈
tryTerminate();
//下面的一块代码都是在做一件事:
//如果队列不为空,并且当前工作的线程在不设置核心线程超时的情况下,创建一个Worker执行任务
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果当前工作的线程小于核心线程数就创建一个Worker去消费队列中的任务。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}- 小结一下,这里做了下面几件事
- 统计下完成的任务数,从Workers里面移除当前Worker
- 在阻塞队列不为空的情况下,总是会保持核心线程数个Worker。如果开启了核心线程超时的情况下,总会保持1个Worker工作。
- 小结一下,这里做了下面几件事
- getTask方法
- 到这里,整个线程池的创建->执行任务的流程已经分析完了。下面我们简单的总结一下任务执行流程:
- 在addWorker里,创建一个Worker后,会启动Worker里面对应的线程。执行Worker里面的run方法。
- 在run方法中调用runWorker方法。使用While循环在执行完本身Worker对象中的任务后,不断的从队列中获取任务执行。
- 在while中直接调用Runnable的run方法。执行任务代码。
- 在队列里所有的任务都执行完后,销毁自身线程。如果在getTask()后,队列里又来了任务,并且工作线程池数<核心线程数时,会再新建一个Worker对象,继续跑队列里面的任务。
0x05 线程池的关闭
- 线程池的创建和执行任务的流程我们上面已经分析了,那么我们现在来分析下线程的关闭吧。一般关闭线程池我们会调用如下两个方法中的一个:
- shutdown():当前执行的任务不会被中断,也就是说会等到我们的当前正在执行的任务执行完后才关闭
- shutdownNow():不管当前是否有任务在跑,直接关闭线程池。执行的任务也会被强制中断。
- 下面我们来各自分析下。首先把他们两个的代码都贴出来。看看哪里不同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
//把未执行的任务查出来保存返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}- 从代码上来看,在try代码块里。第一行是一样的,第二行也只是状态不同。但是第三行就开始不一样了。一个是
interruptIdleWorkers();
,一个是interruptWorkers();
,那么这两个方法分别是做什么呢?一起来分析一下吧。
- 从代码上来看,在try代码块里。第一行是一样的,第二行也只是状态不同。但是第三行就开始不一样了。一个是
interruptIdleWorkers()
分析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
//获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//循环遍历所有的Worker
for (Worker w : workers) {
//取出对应的线程
Thread t = w.thread;
//判断线程的中断状态,并且尝试获取锁。这里需要注意下。
//这里的tryLock。如果当前Worker正在跑任务,会在while中获取worker的锁。
//这个在分析runWorker时也提了下。
if (!t.isInterrupted() && w.tryLock()) {
try {
//中断线程
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}- 小结:
- 遍历所有的Worker,尝试获取Worker对象锁。
- 如果获取到了锁,说明当前任务已经跑完了。那么就执行中断。
- 如果未获取到锁,则说明当前任务还没跑完。则不进行任何操作。
- 遍历所有的Worker,尝试获取Worker对象锁。
- 小结:
interruptWorkers()
分析1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private void interruptWorkers() {
//获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有Worker对象
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
//直接中断线程。
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}- 上面就是两者的区别。总的来说,一个关闭得比较柔和。一个就非常粗鲁。
- 往下面看,两者都执行了
tryTerminate()
方法。上面Worker跑完所有的任务后也会执行下这个方法。下面我们来看一下这个方法把:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34final void tryTerminate() {
for (;;) {
//如果状态不符合,或者队列不为空,则不往下执行
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果工作线程>0则尝试中断一个线程。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//获取锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//把状态改为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//这个方法是一个空方法,我们可以重写这个方法做一些事。
terminated();
} finally {
//在方法执行完后,把线程池中的状态变为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}- 总的来说,改方法是尝试把线程池的状态改为
TERMINATED
,这里的TIDYING状态改为TERMINATED状态的流程也在这体现了。
- 总的来说,改方法是尝试把线程池的状态改为
0x06 总结
- 这一篇文章终于算是写完了。时间跨度大概有1-2个月。不是不想写,而是忙+懒,一直没去呈下行分析,现在趁着有空把它完善了。也算是了了一个念头。
- 因为是自己分析的,难免会有一些错误,欢迎指正。谢谢!
- 下面估计会走入IO的世界。从BIO到NIO再到Netty。已经自己在学了,笔记不知道什么时候会输出。一起加油吧!向上吧,少年!
评论