【Java 多线程并发】 Java线程池的核心参数
【Java 多线程并发】 Java线程池的核心参数
Metadata
title: 【Java 多线程并发】 Java线程池的核心参数
date: 2023-07-05 10:33
tags:
- 行动阶段/完成
- 主题场景/程序
- 笔记空间/KnowladgeSpace/ProgramSpace/BasicsSpace
- 细化主题/Java
categories:
- Java
keywords:
- Java
description: 【Java 多线程并发】 Java线程池的核心参数
简介
ThreadPoolExecutor的构造方法是创建线程池的入口,虽然比较简单,但是信息量很大,由此也能引发一系列的问题,构造方法中有很多核心参数,这也是面试中经常被问到的问题。
Executors创建线程池也是基于ThreadPoolExecutor的构造方法实现的。定时任务线程池还基于ScheduledThreadPoolExecutor的构造方法,但归根结底还是基于ThreadPoolExecutor,所以下面我们对线程池核心参数的讲解都是基于ThreadPoolExecutor来分析的。
构造方法
ThreadPoolExecutor有四个构造方法,其中前三个最终都是调用最后一个,它有7个参数,分别为corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。
// 构造方法有不同的参数版本,以满足不同的需求
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
// 使用默认线程工厂和默认拒绝策略 Executors.defaultThreadFactory()和defaultHandler
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
// 使用默认拒绝策略
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
// 使用默认线程工厂
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
* 所有的构造方法最终都会调用这个构造方法,都是基于这个方法实现的
*/
public ThreadPoolExecutor( int corePoolSize, // 线程池的核心线程数量
int maximumPoolSize, // 线程池的最大线程数
long keepAliveTime, // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit, // 存活时间的时间单位
BlockingQueue<Runnable> workQueue, // 任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory, // 线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler // 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor 3 个最重要的参数:
- corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。
- maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
- workQueue:新任务来的时候会先判断当前运行(正在执行任务的线程)的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。
ThreadPoolExecutor其他常见参数 :
- keepAliveTime:线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁;
- unit: keepAliveTime 参数的时间单位。
- threadFactory:executor(线程池)创建新线程的时候会用到。
- handler:饱和策略。关于饱和策略下面单独介绍一下。
下面这张图可以加深你对线程池中各个参数的相互关系的理解(图片来源:《Java 性能调优实战》):
线程池的核心参数
参数说明
参数名 | 说明 |
---|---|
corePoolSize | 线程池维护线程的最少数量。线程池至少会保持改数量的线程存在,即使没有任务可以处理。(注意:这里说的至少是指线程达到这个数量后,即使有空闲的线程也不会释放,而不是说线程池创建好之后就会初始化这么多线程) |
maximumPoolSize | 线程池维护线程的最大数量。线程池最多可创建的线程数,即使队列中的任务满了线程数量也不会超过 maximumPoolSize |
keepAliveTime | 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于 corePoolSize 时,超过 corePoolSize 的线程如果空闲时间超过 keepAliveTime,线程将被终止 |
unit | 线程池维护线程所允许的空闲时间的单位,和 keepAliveTime 配合使用 |
workQueue | 线程池所使用的缓冲队列。ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue,PriorityBlockingQueue |
handler | 线程池对拒绝任务的处理策略。AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,自定义 |
corePoolSize 线程池核心线程大小
线程池中会维护一个最小的线程数量,即使这些线程处于空闲状态,它们也不会被销毁,除非设置了allowCoreThreadTimeOut为true。所以默认情况下核心线程数量只会增多,不会减少,因为核心线程是不会被销毁的,除非显式的将线程销毁。
- 当正在运行的线程数小于核心线程数时,来一个任务就创建一个核心线程;
- 当正在运行的线程数大于或等于核心线程数时,任务来了先不创建线程而是丢到任务队列中。
maximumPoolSize 线程池最大线程数量
当正在运行的线程大于等于核心线程数的时,一个任务被提交到线程池后会被加入到任务队列中,在队列中等待核心线程将其取出并执行处理。但是如果任务提交的速度大于核心线程处理任务的速度,有界的任务队列就会被填满,当任务队列满了时,再来一个任务才会创建一个非核心线程,但线程池中核心线程数量 + 非核心线程数量 不能超过最大线程数。这个最大线程数量即由maximunPoolSize来指定。如果是无界的任务队列这个参数就没用了
keepAliveTime 空闲线程存活时间
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定。内部主要是通过阻塞队列带超时的poll(timeout, unit)方法实现的。
默认情况下,此参数仅当正在运行的线程数大于核心线程数时才有效,即只针对非核心线程。
但是,如果allowCoreThreadTimeOut被设置成了true,针对核心线程也有效。
unit 空间线程存活时间单位
keepAliveTime的计量单位。可选的单位有Days、HOURS、MINUTES、MILLISECONDS、MICROSECONDS、NANOSECONDS。
workQueue 工作队列
当正在运行的线程数大于或等于核心线程数时,任务来了是先进入任务队列中的。
这个队列必须是阻塞队列,所以像ConcurrentLinkedQueue这种就不能作为参数,因为它虽然是并发安全的队列,但是它不是阻塞队列,因为它并没有实现BlockingQueue接口。
当任务进入到工作队列后,在任务调度时再从队列中取出任务。线程池中比较常用的四种工作队列(阻塞队列):
ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务(不会将该任务暂时寄存在队列中等待),如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
SynchronousQuene同步队列没有任何内部容量,甚至连一个队列的容量都没有。
感觉ArrayBlockingQueue就像一个隧道,而SynchronousQueue就像一个门框(门框上不可以站人),进出队列就是穿门而过。
Executors提供的newCachedThreadPool线程池就是用了SynchronousQueue做任务队列,而它的核心线程数为0,最大线程数为无限大。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
因为核心线程数为0,所以任务来时,只能新建线程池(如果没有空闲的线程),因为SynchronousQueue队列没有容量,不能存放任务,有了工作线程之后通过SynchronousQueue队列获取任务(让任务穿门而过,前一个任务不被线程领取,后边的任务不能过门)
所以这个线程池不能用于执行耗时的任务,因为他的最大线程数为无限大,很可能会建很多的线程。
PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。比较器(Comparable、Comparator)
threadFactory 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程(守护线程)、查看创建线程数、给线程设置是否是后台运行、设置线程优先级等等。
默认使用的是Executors工具类中的DefaultThreadFactory类,这个类有个缺点,创建的线程的名称是自动生成的,无法自定义以区分不同的线程池,且它们都是非守护线程。
源码:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
handler 拒绝策略
RejectedExecutionHandler | 特性及效果 |
---|---|
AbortPolicy | 线程池默认的策略,如果元素添加到线程池失败,会抛出 RejectedExecutionException 异常 |
DiscardPolicy | 如果添加失败,则放弃,并且不会抛出任何异常 |
DiscardOldestPolicy | 如果添加到线程池失败,会将队列中最早添加的元素移除,再尝试添加,如果失败则按该策略不断重试 |
CallerRunsPolicy | 如果添加失败,那么主线程会自己调用执行器中的 execute 方法来执行改任务 |
自定义 | 如果觉得以上几种策略都不合适,那么可以自定义符合场景的拒绝策略。需要实现 RejectedExecutionHandler 接口,并将自己的逻辑写在 rejectedExecution 方法内。 |
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。
拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。
常用的拒绝策略有丢弃当前任务、丢弃最老的任务、抛出异常、调用者自己处理等待。
默认的拒绝策略是抛出异常(AbortPolicy),即线程池无法承载了,调用者再往里面添加任务会抛出异常。
默认的拒绝策略虽然比较简单粗暴,但是相对于丢弃任务策略明显要好很多,最起码调用者自己可以捕获这个异常再进行二次处理。
jdk中提供了四种拒绝策略(policy:政策,策略):
AbortPolicy
该策略是线程池的默认策略。使用该策略时,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//不做任何处理,直接抛出异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
DiscardPolicy
这个策略和AbortPolicy的slient版本,如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//就是一个空的方法
}
DiscardOldestPolicy
该策略下,抛弃进入队列最早的那个任务,然后尝试把这次拒绝的任务放入队列
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//移除队头元素
e.getQueue().poll();
//再尝试入队
e.execute(r);
}
}
CallerRunsPolicy
使用此策略,如果添加到线程池失败,那么主线程(调用线程池执行任务的线程)会自己去执行该任务,不会等待线程池中的线程去执行。就像是个急脾气的人,我等不到别人来做这件事就干脆自己干。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
//直接执行run方法
r.run();
}
}
自定义
如果以上策略都不符合业务场景,那么可以自己定义一个拒绝策略,只要实现RejectedExecutionHandler接口,并且实现rejectedExecution方法就可以了。具体的逻辑就在rejectedExecution方法里去定义就OK了。
public class MyRejectPolicy implements RejectedExecutionHandler{
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//Sender是我的Runnable类,里面有message字段
if (r instanceof Sender) {
Sender sender = (Sender) r;
//直接打印
System.out.println(sender.getMessage());
}
}
}
执行拒绝策略
以ThreadPoolExecutor为例,它是通过自己的reject()方法来执行拒绝策略的。
// 要针对任务command执行拒绝策略
final void reject(Runnable command) {
// handler就是核心参数中传入的拒绝策略
// 如果创建线程池时没有向构造方法中传入拒绝策略,那么默认的拒绝策略就是抛出异常
handler.rejectedExecution(command, this);
}