线程池的工作原理

Java线程池主要用于主要用于管理线程组及其运行状态,以便java虚拟机更好地利用CPU资源。Java线程池的工作原理为:JVM先根据用户的参数创建一定数量的可运行的线程任务,并将其放入队列中,在线程创建这些任务,如果正在运行的线程数量,超过了最大线程数量(用户设置的线程池大小),则超出数量的线程派对等候,在任务执行完毕后,线程池调度器会发现有可用的线程,进而再次从队列中取出任务并执行。

线程池相关概念

  • 核心线程CorePool:当有新任务提交时,如果核心线程都在工作,并且数量已经达到最大核心线程数,则不会新建核心线程,而是把任务放入等待队列。
  • 阻塞队列 workQueue: 等待队列是一个线程安全的阻塞队列。当线程都在忙时,阻塞队列用于存放新增的任务。核心线程完成当前任务回去等待队列中拉取新出的任务。
  • 非核心线程: 当等待队列满时,若当前总线程线程数没有查过最大线程数,则会创建新的非核心线程。
    • 核心线程与非核心线程没有区别,只会在比较线程池中线程数目时,区分核心线程与非核心线程。
  • 线程活动保持时间keepAliveTime: 当一个线程空闲下来之后,其保持继续存活的时间。超过该时间则线程销毁。默认情况下,核心线程数量会一直保持,即使他空闲下来了;当设置threadPoolExecutor.allowCoreThreadTimeOut(true) 时,则对核心线程也执行销毁。
  • 饱和策略 rejectedExecutionHandler : 当等待队列满,且总线程数达到最大线程数时,会执行饱和策略。默认饱和策略是抛弃新的任务请求。

img

其中,ThreadPoolExecutor 是构建线程的核心方法,该方法的定义如下:

1
2
3
4
5
6
7
8
9
10
ThreadPoolExecutor{
int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程的存活时间
TimeUnit unit, // keepAliveTime的时间单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂,用于创建线程
RejectedExecutionHandler handler); // 饱和策略

}

image-20210428163209103

这个花瓶由 瓶口 、 瓶颈 、 瓶身 三个部分组成。

这三个部分分别对应着线程池的三个参数:maximumPoolSize, workQueue,corePoolSize。

Executor框架实例

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadPoolDemo1 {
public static void main(String[] args) {

ExecutorService executorService = Executors.newFixedThreadPool(4);

for (int i = 0; i < 10; i++) {
int index = i;
executorService.submit(() -> System.out.println("i:" + index +
" executorService"));
}
executorService.shutdown();
}
}

submit(Runnable task)方法提交一个线程。

但是使用最新的“阿里巴巴编码规范插件”检测一下会发现:

1
2
3
4
5
6
7
8
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,
这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors各个方法的弊端:

1)newFixedThreadPool和newSingleThreadExecutor:
  主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
  主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {

ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());

for (int i = 0; i < 10; i++) {
int index = i;
executorService.submit(() -> System.out.println("i:" + index +
" executorService"));
}
executorService.shutdown();
}

线程复用

线程复用的核心是,我们知道,Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。则要达到复用的目的,则必须从Runnable接口的run()方法上入手,可以这样设计这个Runnable.run()方法(就叫外面的run()方法):
它本质上是个无限循环,跑的过程中不断检查我们是否有新加入的子Runnable对象(就叫内部的runnable:run()吧,它就是用来实现我们自己的任务),有就调一下我们的run(),其实就一个大run()把其它小run()#1,run()#2,…给串联起来了,基本原理就这么简单。不停地处理我们提交的Runnable任务。

1
2
3
4
5
6
7
8
9
10
public void run() {
while(true) {
if(tasks available) {
Runnable task = taskqueue.dequeue();
task.run();
} else {
// wait or whatever
}
}
}

为什么需要线程池

  • 降低资源消耗:通过池化技术复用已创建的线程,降低线程创建和销毁造成的损耗。

  • 提高响应速度:任务到达时,无需等待线程创建即可立即执行。

  • 提高线程的可管理性:使用线程池可以进行统一的分配、调优和监控,避免线程被无限制地创建。

  • 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池 ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

线程池的工作流程

图4 任务调度流程

Java 线程池 ThreadPoolExecutor

继承关系

线程池在内部实际上构建了一个生产者-消费者模型,利用阻塞队列,将线程和任务解耦,并不直接关联,从而良好的缓冲任务,复用线程。

线程池的运行主要分成两部分:任务管理、线程管理

  • 任务管理部分充当生产者,当任务提交后,线程池会判断该任务后续的流转:
    • 直接申请线程执行该任务
    • 缓冲到队列中等待线程执行
    • 拒绝该任务
  • 线程管理部分是消费者,线程被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会获取新的任务去执行,最终当线程获取不到任务的时候,线程会被回收。

线程池运行状态-runState

运行状态 状态描述
RUNNING 运行状态 接收新任务,并且也能处理阻塞队列中的任务。
SHUTDOWN 停工状态 不接收新任务,但是却可以继续处理阻塞队列中的任务。
STOP停止状态 不接收新任务,同时也不处理队列任务,并且中断正在进行的任务。
TIDYING清空状态 所有任务都已终止,workercount(有效线程数)为0,线程转向 TIDYING 状态将会运行 terminated() 钩子方法。
TERMINATED终止状态 terminated() 方法调用完成后变成此状态。

线程池生命周期状态转化:

img

线程池任务执行机制

线程池的任务调度 execute()

当用户提交了一个任务,所有任务的调度都是由 execute() 方法完成的。包括:检查现在线程池的运行状态、运行线程数、运行策略,并决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。具体的执行过程如下:

  • 首先检测线程池运行状态 runState,如果不是 RUNNING,则直接拒绝,线程池要保证在 RUNNING 的状态下执行任务。

  • 如果 workerCount < corePoolSize,即核心线程还没创建满,则创建并启动一个核心线程来执行新提交的任务。

  • 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。

  • 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个非核心线程来执行新提交的任务。

  • 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

任务缓冲与阻塞队列 BlockingQueue

线程池维护了一个 BlockingQueue<Runnable> 阻塞队列,用于存放等待执行的线程,队列中的所有线程都处于 Runnable 状态。队列的维护相当于一个 producer-consumer 模型,producer 把线程放入阻塞队列,consumer 从阻塞队列中拿出线程来执行。

  • 当阻塞队列满时,生产者线程会等待队列可用;

  • 当阻塞队列空时,消费者线程会等待队列变成非空。

为什么用阻塞队列不用普通队列?

阻塞队列是为了实现生产者-消费者模型。当消费者从空的队列中取元素时,线程会被阻塞直到队列非空,然后消费者会被自动唤醒。

如果使用普通队列的话,需要我们自己去实现这样的同步和互斥机制,以及一些线程阻塞与唤醒的策略。

线程池的拒绝策略

若线程池中的核心线程数被用完且阻塞队列已拍满,则此时线程数的线程资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。

  • AbortPolicy

    • 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * AbortPolicy 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
    * <p>
    * 结果说明:
    * 将"线程池的拒绝策略"由DiscardPolicy修改为AbortPolicy之后,当有任务添加到线程池被拒绝时,会抛出RejectedExecutionException。
    *
    */
    public class TestAbortPolicy {
    private static final int THREADS_SIZE = 1;
    private static final int TASK_MAX = 10;
    private static final int CAPACITY = 1;

    public static void main(String[] args) {

    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();

    // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    // 新建10个任务,并将它们添加到线程池中。
    for (int i = 0; i < TASK_MAX; i++) {
    Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
    try {
    pool.execute(myRunnable);
    } catch (RejectedExecutionException e) {
    System.out.println(e.toString());
    }
    }

    // 关闭线程池
    pool.shutdown();

    }
    }

    image-20210429104351291

    只有0,1两个任务运行OK,其它的8个任务,在往线程池丢的时候,被线程池拒绝了,而且还抛了异常,被catch住了,catch了8次。

  • CallerRunsPolicy

    • 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    /**
    * CallerRunsPolicy 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务
    * 翻译一下:就是不进入线程池执行,在这种方式(CallerRunsPolicy)中,任务将有调用者线程去执行
    */
    public class TestCallerRunsPolicy {

    private static final int THREADS_SIZE = 1;
    private static final int TASK_MAX = 10;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();

    // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory);
    // 设置线程池的拒绝策略为"CallerRunsPolicy"
    pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

    // 新建10个任务,并将它们添加到线程池中。
    for (int i = 0; i < TASK_MAX; i++) {
    Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
    try {
    pool.execute(myRunnable);
    } catch (Exception e) {
    System.out.println(e.toString());
    }
    }

    // 关闭线程池
    pool.shutdown();
    }
    }

    image-20210429104711573

    在任务往线程池丢的时候,发现线程池已经装不下了,那么这个时候,就让往线程池丢任务丢这个线程来执行这个任务,在此例子就是main线程了,从结果图可见线程池里面的一个线程和main一起干活,把10个任务给搞完了。

  • DiscardOldestPolicy

    • 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    /**
    * DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
    * 结果说明:线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
    * 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
    * 根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
    *
    */
    public class TestDiscardOldestPolicy {


    private static final int THREADS_SIZE = 1;
    private static final int TASK_MAX = 10;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();

    // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory);
    // 设置线程池的拒绝策略为"DiscardOldestPolicy"
    pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

    // 新建10个任务,并将它们添加到线程池中。
    for (int i = 0; i < TASK_MAX; i++) {
    Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
    try {
    pool.execute(myRunnable);
    } catch (Exception e) {
    System.out.println(e.toString());
    }
    }
    // 关闭线程池
    pool.shutdown();
    }
    }


    image-20210429104901856

    可见0任务到池子之后,运行,剩下1-9在来池子的时候,没位置了,都的排队,但位置就1个,那每次新来的都会不客气但把旧时代的给挤掉。也就是这个策略的名字的由来。

  • DiscardPolicy

    • 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
    * 结果说明:线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
    * 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
    * 根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
    */
    public class TestDiscardPolicy {


    private static final int THREADS_SIZE = 1;
    private static final int TASK_MAX = 10;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {
    ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("Test-Rejected-Policy-Pool-%d").build();

    // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(CAPACITY), namedThreadFactory);
    // 设置线程池的拒绝策略为"丢弃"
    pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

    // 新建10个任务,并将它们添加到线程池中。
    for (int i = 0; i < TASK_MAX; i++) {
    Runnable myRunnable = new MyRunnable("task [ " + i + " ]");
    try {
    pool.execute(myRunnable);
    } catch (Exception e) {
    System.out.println(e.toString());
    }
    }
    // 关闭线程池
    pool.shutdown();
    }
    }

    image-20210428212820271

任务0来了池子,先抢占了线程,可以执行,之后来的都的在队列里排队,但队列就一个位置,先来的占着位置,后面的来只能看着,被无情的抛弃,所以,输出结果就0,1两个任务执行,其它的都消失了。

Reference

美团技术团队-Java线程池

redspider-线程池部分

线程池工作原理

线程池源码分析:

掘金-线程池源码分析

掘金-Java 线程池

segmentfault-线程池源码分析