线程池的工作原理
Java线程池主要用于主要用于管理线程组及其运行状态,以便java虚拟机更好地利用CPU资源。Java线程池的工作原理为:JVM先根据用户的参数创建一定数量的可运行的线程任务,并将其放入队列中,在线程创建这些任务,如果正在运行的线程数量,超过了最大线程数量(用户设置的线程池大小),则超出数量的线程派对等候,在任务执行完毕后,线程池调度器会发现有可用的线程,进而再次从队列中取出任务并执行。
线程池相关概念
- 核心线程CorePool:当有新任务提交时,如果核心线程都在工作,并且数量已经达到最大核心线程数,则不会新建核心线程,而是把任务放入等待队列。
- 阻塞队列 workQueue: 等待队列是一个线程安全的阻塞队列。当线程都在忙时,阻塞队列用于存放新增的任务。核心线程完成当前任务回去等待队列中拉取新出的任务。
- 非核心线程: 当等待队列满时,若当前总线程线程数没有查过最大线程数,则会创建新的非核心线程。
- 核心线程与非核心线程没有区别,只会在比较线程池中线程数目时,区分核心线程与非核心线程。
- 线程活动保持时间keepAliveTime: 当一个线程空闲下来之后,其保持继续存活的时间。超过该时间则线程销毁。默认情况下,核心线程数量会一直保持,即使他空闲下来了;当设置
threadPoolExecutor.allowCoreThreadTimeOut(true)
时,则对核心线程也执行销毁。 - 饱和策略 rejectedExecutionHandler : 当等待队列满,且总线程数达到最大线程数时,会执行饱和策略。默认饱和策略是抛弃新的任务请求。
其中,ThreadPoolExecutor
是构建线程的核心方法,该方法的定义如下:
1 | ThreadPoolExecutor{ |
这个花瓶由 瓶口 、 瓶颈 、 瓶身 三个部分组成。
这三个部分分别对应着线程池的三个参数:maximumPoolSize, workQueue,corePoolSize。
Executor框架实例
1 | public class ThreadPoolDemo1 { |
submit(Runnable task)方法提交一个线程。
但是使用最新的“阿里巴巴编码规范插件”检测一下会发现:
1 | 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式, |
1 | public static void main(String[] args) { |
线程复用
线程复用的核心是,我们知道,Thread.start()
只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。则要达到复用的目的,则必须从Runnable
接口的run()
方法上入手,可以这样设计这个Runnable.run()方法(就叫外面的run()方法):
它本质上是个无限循环,跑的过程中不断检查我们是否有新加入的子Runnable对象(就叫内部的runnable:run()吧,它就是用来实现我们自己的任务),有就调一下我们的run(),其实就一个大run()把其它小run()#1,run()#2,…给串联起来了,基本原理就这么简单。不停地处理我们提交的Runnable任务。
1 | public void run() { |
为什么需要线程池
降低资源消耗:通过池化技术复用已创建的线程,降低线程创建和销毁造成的损耗。
提高响应速度:任务到达时,无需等待线程创建即可立即执行。
提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控,避免线程被无限制地创建。
提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池
ScheduledThreadPoolExecutor
,就允许任务延期执行或定期执行。
线程池的工作流程
Java 线程池 ThreadPoolExecutor
继承关系
线程池在内部实际上构建了一个生产者-消费者模型,利用阻塞队列,将线程和任务解耦,并不直接关联,从而良好的缓冲任务,复用线程。
线程池的运行主要分成两部分:任务管理、线程管理
- 任务管理部分充当生产者,当任务提交后,线程池会判断该任务后续的流转:
- 直接申请线程执行该任务
- 缓冲到队列中等待线程执行
- 拒绝该任务
- 线程管理部分是消费者,线程被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会获取新的任务去执行,最终当线程获取不到任务的时候,线程会被回收。
线程池运行状态-runState
运行状态 | 状态描述 |
---|---|
RUNNING 运行状态 | 接收新任务,并且也能处理阻塞队列中的任务。 |
SHUTDOWN 停工状态 | 不接收新任务,但是却可以继续处理阻塞队列中的任务。 |
STOP停止状态 | 不接收新任务,同时也不处理队列任务,并且中断正在进行的任务。 |
TIDYING清空状态 | 所有任务都已终止,workercount(有效线程数)为0,线程转向 TIDYING 状态将会运行 terminated() 钩子方法。 |
TERMINATED终止状态 | terminated() 方法调用完成后变成此状态。 |
线程池生命周期状态转化:
线程池任务执行机制
线程池的任务调度 execute()
当用户提交了一个任务,所有任务的调度都是由 execute()
方法完成的。包括:检查现在线程池的运行状态、运行线程数、运行策略,并决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。具体的执行过程如下:
首先检测线程池运行状态
runState
,如果不是RUNNING
,则直接拒绝,线程池要保证在RUNNING
的状态下执行任务。如果
workerCount < corePoolSize
,即核心线程还没创建满,则创建并启动一个核心线程来执行新提交的任务。如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则创建并启动一个非核心线程来执行新提交的任务。如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
任务缓冲与阻塞队列 BlockingQueue
线程池维护了一个 BlockingQueue<Runnable>
阻塞队列,用于存放等待执行的线程,队列中的所有线程都处于 Runnable
状态。队列的维护相当于一个 producer-consumer 模型,producer 把线程放入阻塞队列,consumer 从阻塞队列中拿出线程来执行。
当阻塞队列满时,生产者线程会等待队列可用;
当阻塞队列空时,消费者线程会等待队列变成非空。
为什么用阻塞队列不用普通队列?
阻塞队列是为了实现生产者-消费者模型。当消费者从空的队列中取元素时,线程会被阻塞直到队列非空,然后消费者会被自动唤醒。
如果使用普通队列的话,需要我们自己去实现这样的同步和互斥机制,以及一些线程阻塞与唤醒的策略。
线程池的拒绝策略
若线程池中的核心线程数被用完且阻塞队列已拍满,则此时线程数的线程资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。
AbortPolicy
- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* AbortPolicy 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
* <p>
* 结果说明:
* 将"线程池的拒绝策略"由DiscardPolicy修改为AbortPolicy之后,当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException。
*
*/
public class TestAbortPolicy {
private static final int THREADS_SIZE = 1;
private static final int TASK_MAX = 10;
private static final int CAPACITY = 1;
public static void main(String[] args) {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < TASK_MAX; i++) {
Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
try {
pool.execute(myRunnable);
} catch (RejectedExecutionException e) {
System.out.println(e.toString());
}
}
// 关闭线程池
pool.shutdown();
}
}只有0,1两个任务运行OK,其它的8个任务,在往线程池丢的时候,被线程池拒绝了,而且还抛了异常,被catch住了,catch了8次。
CallerRunsPolicy
- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/**
* CallerRunsPolicy 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务
* 翻译一下:就是不进入线程池执行,在这种方式(CallerRunsPolicy)中,任务将有调用者线程去执行
*/
public class TestCallerRunsPolicy {
private static final int THREADS_SIZE = 1;
private static final int TASK_MAX = 10;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory);
// 设置线程池的拒绝策略为"CallerRunsPolicy"
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < TASK_MAX; i++) {
Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
try {
pool.execute(myRunnable);
} catch (Exception e) {
System.out.println(e.toString());
}
}
// 关闭线程池
pool.shutdown();
}
}在任务往线程池丢的时候,发现线程池已经装不下了,那么这个时候,就让往线程池丢任务丢这个线程来执行这个任务,在此例子就是main线程了,从结果图可见线程池里面的一个线程和main一起干活,把10个任务给搞完了。
DiscardOldestPolicy
- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38/**
* DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
* 结果说明:线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
* 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
* 根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
*
*/
public class TestDiscardOldestPolicy {
private static final int THREADS_SIZE = 1;
private static final int TASK_MAX = 10;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory);
// 设置线程池的拒绝策略为"DiscardOldestPolicy"
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < TASK_MAX; i++) {
Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
try {
pool.execute(myRunnable);
} catch (Exception e) {
System.out.println(e.toString());
}
}
// 关闭线程池
pool.shutdown();
}
}可见0任务到池子之后,运行,剩下1-9在来池子的时候,没位置了,都的排队,但位置就1个,那每次新来的都会不客气但把旧时代的给挤掉。也就是这个策略的名字的由来。
DiscardPolicy
- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
* 结果说明:线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
* 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
* 根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
*/
public class TestDiscardPolicy {
private static final int THREADS_SIZE = 1;
private static final int TASK_MAX = 10;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory);
// 设置线程池的拒绝策略为"丢弃"
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < TASK_MAX; i++) {
Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
try {
pool.execute(myRunnable);
} catch (Exception e) {
System.out.println(e.toString());
}
}
// 关闭线程池
pool.shutdown();
}
}
任务0来了池子,先抢占了线程,可以执行,之后来的都的在队列里排队,但队列就一个位置,先来的占着位置,后面的来只能看着,被无情的抛弃,所以,输出结果就0,1两个任务执行,其它的都消失了。