【Java 多线程并发】 Java线程池的核心参数

Metadata

title: 【Java 多线程并发】 Java线程池的核心参数
date: 2023-07-05 10:33
tags:
  - 行动阶段/完成
  - 主题场景/程序
  - 笔记空间/KnowladgeSpace/ProgramSpace/BasicsSpace
  - 细化主题/Java
categories:
  - Java
keywords:
  - Java
description: 【Java 多线程并发】 Java线程池的核心参数

简介

ThreadPoolExecutor的构造方法是创建线程池的入口,虽然比较简单,但是信息量很大,由此也能引发一系列的问题,构造方法中有很多核心参数,这也是面试中经常被问到的问题。

Executors创建线程池也是基于ThreadPoolExecutor的构造方法实现的。定时任务线程池还基于ScheduledThreadPoolExecutor的构造方法,但归根结底还是基于ThreadPoolExecutor,所以下面我们对线程池核心参数的讲解都是基于ThreadPoolExecutor来分析的。

构造方法

ThreadPoolExecutor有四个构造方法,其中前三个最终都是调用最后一个,它有7个参数,分别为corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。

// 构造方法有不同的参数版本,以满足不同的需求
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    // 使用默认线程工厂和默认拒绝策略  Executors.defaultThreadFactory()和defaultHandler
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
 
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) {
    // 使用默认拒绝策略                         
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
 
 
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) {
    // 使用默认线程工厂                        
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
 
 
/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 * 所有的构造方法最终都会调用这个构造方法,都是基于这个方法实现的
 */
public ThreadPoolExecutor(  int corePoolSize, // 线程池的核心线程数量
                            int maximumPoolSize, // 线程池的最大线程数
                            long keepAliveTime, // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                            TimeUnit unit, // 存活时间的时间单位
                            BlockingQueue<Runnable> workQueue, // 任务队列,用来储存等待执行任务的队列
                            ThreadFactory threadFactory, // 线程工厂,用来创建线程,一般默认即可
                            RejectedExecutionHandler handler // 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                        ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue:新任务来的时候会先判断当前运行(正在执行任务的线程)的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
  • unit: keepAliveTime 参数的时间单位。
  • threadFactory:executor(线程池)创建新线程的时候会用到。
  • handler:饱和策略。关于饱和策略下面单独介绍一下。

下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):

线程池的核心参数

参数说明

参数名 说明
corePoolSize 线程池维护线程的最少数量。线程池至少会保持改数量的线程存在,即使没有任务可以处理。(注意:这里说的至少是指线程达到这个数量后,即使有空闲的线程也不会释放,而不是说线程池创建好之后就会初始化这么多线程)
maximumPoolSize 线程池维护线程的最大数量。线程池最多可创建的线程数,即使队列中的任务满了线程数量也不会超过 maximumPoolSize
keepAliveTime 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于 corePoolSize 时,超过 corePoolSize 的线程如果空闲时间超过 keepAliveTime,线程将被终止
unit 线程池维护线程所允许的空闲时间的单位,和 keepAliveTime 配合使用
workQueue 线程池所使用的缓冲队列。ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue
handler 线程池对拒绝任务的处理策略。AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,自定义

corePoolSize 线程池核心线程大小

线程池中会维护一个最小的线程数量,即使这些线程处于空闲状态,它们也不会被销毁,除非设置了allowCoreThreadTimeOut为true。所以默认情况下核心线程数量只会增多,不会减少,因为核心线程是不会被销毁的,除非显式的将线程销毁。

  • 当正在运行的线程数小于核心线程数时,来一个任务就创建一个核心线程;
  • 当正在运行的线程数大于或等于核心线程数时,任务来了先不创建线程而是丢到任务队列中。

maximumPoolSize 线程池最大线程数量

当正在运行的线程大于等于核心线程数的时,一个任务被提交到线程池后会被加入到任务队列中,在队列中等待核心线程将其取出并执行处理。但是如果任务提交的速度大于核心线程处理任务的速度,有界的任务队列就会被填满,当任务队列满了时,再来一个任务才会创建一个非核心线程,但线程池中核心线程数量 + 非核心线程数量 不能超过最大线程数。这个最大线程数量即由maximunPoolSize来指定。如果是无界的任务队列这个参数就没用了

keepAliveTime 空闲线程存活时间

一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定。内部主要是通过阻塞队列带超时的poll(timeout, unit)方法实现的。

默认情况下,此参数仅当正在运行的线程数大于核心线程数时才有效,即只针对非核心线程。

但是,如果allowCoreThreadTimeOut被设置成了true,针对核心线程也有效。

unit 空间线程存活时间单位

keepAliveTime的计量单位。可选的单位有Days、HOURS、MINUTES、MILLISECONDS、MICROSECONDS、NANOSECONDS。

workQueue 工作队列

当正在运行的线程数大于或等于核心线程数时,任务来了是先进入任务队列中的。

这个队列必须是阻塞队列,所以像ConcurrentLinkedQueue这种就不能作为参数,因为它虽然是并发安全的队列,但是它不是阻塞队列,因为它并没有实现BlockingQueue接口。

当任务进入到工作队列后,在任务调度时再从队列中取出任务。线程池中比较常用的四种工作队列(阻塞队列):

ArrayBlockingQueue

基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

LinkedBlockingQuene

基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

SynchronousQuene

一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务(不会将该任务暂时寄存在队列中等待),如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

SynchronousQuene同步队列没有任何内部容量,甚至连一个队列的容量都没有。

感觉ArrayBlockingQueue就像一个隧道,而SynchronousQueue就像一个门框(门框上不可以站人),进出队列就是穿门而过。

Executors提供的newCachedThreadPool线程池就是用了SynchronousQueue做任务队列,而它的核心线程数为0,最大线程数为无限大。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

因为核心线程数为0,所以任务来时,只能新建线程池(如果没有空闲的线程),因为SynchronousQueue队列没有容量,不能存放任务,有了工作线程之后通过SynchronousQueue队列获取任务(让任务穿门而过,前一个任务不被线程领取,后边的任务不能过门)

所以这个线程池不能用于执行耗时的任务,因为他的最大线程数为无限大,很可能会建很多的线程。

PriorityBlockingQueue

具有优先级的无界阻塞队列,优先级通过参数Comparator实现。比较器(Comparable、Comparator)

threadFactory 线程工厂

创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程(守护线程)、查看创建线程数、给线程设置是否是后台运行、设置线程优先级等等。

默认使用的是Executors工具类中的DefaultThreadFactory类,这个类有个缺点,创建的线程的名称是自动生成的,无法自定义以区分不同的线程池,且它们都是非守护线程。

源码:

static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

handler 拒绝策略

RejectedExecutionHandler 特性及效果
AbortPolicy 线程池默认的策略,如果元素添加到线程池失败,会抛出 RejectedExecutionException 异常
DiscardPolicy 如果添加失败,则放弃,并且不会抛出任何异常
DiscardOldestPolicy 如果添加到线程池失败,会将队列中最早添加的元素移除,再尝试添加,如果失败则按该策略不断重试
CallerRunsPolicy 如果添加失败,那么主线程会自己调用执行器中的 execute 方法来执行改任务
自定义 如果觉得以上几种策略都不合适,那么可以自定义符合场景的拒绝策略。需要实现 RejectedExecutionHandler 接口,并将自己的逻辑写在 rejectedExecution 方法内。

当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。

拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。

常用的拒绝策略有丢弃当前任务、丢弃最老的任务、抛出异常、调用者自己处理等待。

默认的拒绝策略是抛出异常(AbortPolicy),即线程池无法承载了,调用者再往里面添加任务会抛出异常。

默认的拒绝策略虽然比较简单粗暴,但是相对于丢弃任务策略明显要好很多,最起码调用者自己可以捕获这个异常再进行二次处理。

jdk中提供了四种拒绝策略(policy:政策,策略):

AbortPolicy

该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    //不做任何处理,直接抛出异常
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

DiscardPolicy

这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    //就是一个空的方法
}

DiscardOldestPolicy

该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        //移除队头元素
        e.getQueue().poll();
        //再尝试入队
        e.execute(r);
    }
}

CallerRunsPolicy

使用此策略,如果添加到线程池失败,那么主线程(调用线程池执行任务的线程)会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        //直接执行run方法
        r.run();
    }
}

自定义

如果以上策略都不符合业务场景,那么可以自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。

public class MyRejectPolicy implements RejectedExecutionHandler{
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        //Sender是我的Runnable类,里面有message字段
        if (r instanceof Sender) {
            Sender sender = (Sender) r;
            //直接打印
            System.out.println(sender.getMessage());
        }
    }
}

执行拒绝策略

以ThreadPoolExecutor为例,它是通过自己的reject()方法来执行拒绝策略的。

// 要针对任务command执行拒绝策略
final void reject(Runnable command) {
    // handler就是核心参数中传入的拒绝策略
    // 如果创建线程池时没有向构造方法中传入拒绝策略,那么默认的拒绝策略就是抛出异常
    handler.rejectedExecution(command, this);
}