【Java 多线程并发】 ThreadPoolExecutor
【Java 多线程并发】 ThreadPoolExecutor
Metadata
title: 【Java 多线程并发】 ThreadPoolExecutor
date: 2023-07-05 12:34
tags:
- 行动阶段/完成
- 主题场景/程序
- 笔记空间/KnowladgeSpace/ProgramSpace/BasicsSpace
- 细化主题/Java
categories:
- Java
keywords:
- Java
description: 【Java 多线程并发】 ThreadPoolExecutor
线程池框架
第一层结构
- sun.nio.ch.AsynchronousChannelGroupImpl(Iocp) 异步channel –AIO相关实现
- java.util.concurrent.CompletableFuture.ThreadPerTaskExecutor (启动一个线程执行)
- sun.net.httpserver.ServerImpl.DefaultExecutor (more执行器,直接执行)
- com.sun.jmx.remote.internal.ClientNotifForwarder.LinearExecutor (线性执行器)
- java.util.concurrent.ExecutorService (核心执行器服务)
接口简介
- java.util.concurrent.Executor (执行器,执行方法)
- java.util.concurrent.ExecutorService (执行服务) 包含服务的生命周期
- java.util.concurrent.ScheduledExecutorService (调度相关的服务)
核心实现类
- java.util.concurrent.ThreadPoolExecutor (普通的的线程池实现类)
- java.util.concurrent.ScheduledThreadPoolExecutor (调度的核心实现类)
辅助类
- java.util.concurrent.Executors
完成服务
java.util.concurrent.CompletionService
java.util.concurrent.ExecutorCompletionService
核心的话,我们的Java自带的线程池就是由两种类型(那些Executors提供的固定类型只是对这两种类型改了一些参数而已):
- 普通线程池 ThreadPoolExecutor
- 定时线程池 ScheduledThreadPoolExecutor
下面我们就先来分析普通线程池 ThreadPoolExecutor 的源码。
二、ThreadPoolExecutor的成员属性和内部类
主要成员属性以及工具方法
public class ThreadPoolExecutor extends AbstractExecutorService {
// 线程池的状态和线程数量都是通过一个原子类型AtomicInteger的变量ctl来表示的,创建线程池的时候就会将ctl初始化为RUNNING状态和0个工作线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示低29位的值,用于表示线程池中工作线程的数量
private static final int COUNT_BITS = Integer.SIZE - 3; // =29
// 用于表示线程池中工作线程的数量的最大值,等于2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // =000 11111...
// 线程池的五种状态,高3位表示 下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
private static final int RUNNING = -1 << COUNT_BITS; // 111 00000...
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 00000...
private static final int STOP = 1 << COUNT_BITS; // 001 00000...
private static final int TIDYING = 2 << COUNT_BITS; // 010 00000...
private static final int TERMINATED = 3 << COUNT_BITS; // 011 00000...
// 传入ctl,计算返回线程池的状态
// 通过与运算,将高3位的值保留,低29位的值置为0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 传入ctl,计算返回线程池中工作线程的数量
// 通过与运算,将高3位的值置为0,低29位的值保留
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl的值,等于运行状态“加上”线程数量
// 通过或运算,将高3位和低29位的值都保留,合并为ctl并返回
private static int ctlOf(int rs, int wc) { return rs | wc; }
// runStateLessThan方法用于判断线程池的状态是否小于某个状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// runStateAtLeast方法用于判断线程池的状态是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// isRunning方法用于判断线程池的状态是否是RUNNING状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 该方法用于增加工作线程的数量,如果增加成功则返回true,否则返回false
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 该方法用于减少工作线程的数量,如果减少成功则返回true,否则返回false
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 该方法用于设置线程池的状态,如果设置成功则返回true,否则返回false
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
// 用于保存工作线程的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 线程池的状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
private final ReentrantLock mainLock = new ReentrantLock();
// 线程池的条件锁,用mainLock创建的,在线程池的状态发生改变时,会使用这个条件锁来通知所有等待的线程
private final Condition termination = mainLock.newCondition();
// 核心参数:任务阻塞队列
private final BlockingQueue<Runnable> workQueue;
// 核心参数:线程池的线程工厂,用于创建线程
private volatile ThreadFactory threadFactory;
// 核心参数:拒绝策略,用于在线程池已经关闭或者线程数已经达到最大值的情况下,对新任务的处理策略
private volatile RejectedExecutionHandler handler;
// 核心参数:当线程数大于核心线程数时,多余的空闲线程存活的最长时间
private volatile long keepAliveTime;
// 是否允许核心线程空闲退出,默认值为false
private volatile boolean allowCoreThreadTimeOut;
// 核心参数:线程池的核心线程数
private volatile int corePoolSize;
// 核心参数:线程池的最大线程数
private volatile int maximumPoolSize;
// 这是一个动态的变量。
// largestPoolSize表示线程池中曾经出现过的最大线程数,即在线程池的整个生命周期中,曾经有多少个线程同时处于活动状态。所以说largestPoolSize是小于等于maximumPoolSize的。
// 它的值可以通过ThreadPoolExecutor的getLargestPoolSize()方法获取。
private int largestPoolSize;
// 表示已经完成的任务数量
private long completedTaskCount;
// 默认的拒绝策略,默认的拒绝策略就是直接抛出异常。如果构造方法中没有转入指定的拒绝策略,就会将defaultHandler赋值给handler
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
...
}
五种内部类
ThreadPoolExecutor中有五个内部类,总的来说其实就两类:1、拒绝策略(Policy) 2、工作线程类(Worker)
拒绝策略内部类(Policy)
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。
拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。
工作线程内部类(Worker)
Worker类,每一个Worker类就会绑定一个线程(Worker类有一个成员属性持有一个线程对象),可以将Worker理解为线程池中执行任务的线程。但是实际上它并不仅仅是一个线程,Worker里面还会有一些统计信息,存储一些相关的数据。
源码原理解析(执行普通任务)
下面我们以执行普通任务为例,讲解ThreadPoolExecutor的源码。
线程池的生命周期
之前我们学过线程是有自己的生命周期的,其实线程池也是存在生命周期的。在我们讲线程池体系结构的时候,讲了一些方法,比如shutDown()/shutDownNow(),它们都是与线程池的生命周期相关联的。
我们先来看一下线程池ThreadPoolExecutor中定义的生命周期中的状态及相关方法:
// 线程池的状态和线程数量都是通过一个原子类型AtomicInteger的变量ctl来表示的,创建线程池的时候就会将ctl初始化为RUNNING状态和0个工作线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示低29位的值,用于表示线程池中工作线程的数量
private static final int COUNT_BITS = Integer.SIZE - 3; // =29
// 用于表示线程池中工作线程的数量的最大值,等于2^29-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // =000 11111...
// 线程池的五种状态,高3位表示 下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
private static final int RUNNING = -1 << COUNT_BITS; // 111 00000...
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000 00000...
private static final int STOP = 1 << COUNT_BITS; // 001 00000...
private static final int TIDYING = 2 << COUNT_BITS; // 010 00000...
private static final int TERMINATED = 3 << COUNT_BITS; // 011 00000...
// 传入ctl,计算返回线程池的状态
// 通过与运算,将高3位的值保留,低29位的值置为0
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 传入ctl,计算返回线程池中工作线程的数量
// 通过与运算,将高3位的值置为0,低29位的值保留
private static int workerCountOf(int c) { return c & CAPACITY; }
// 计算ctl的值,等于运行状态“加上”线程数量
// 通过或运算,将高3位和低29位的值都保留,合并为ctl并返回
private static int ctlOf(int rs, int wc) { return rs | wc; }
// runStateLessThan方法用于判断线程池的状态是否小于某个状态
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
// runStateAtLeast方法用于判断线程池的状态是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
// isRunning方法用于判断线程池的状态是否是RUNNING状态
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
从上面这段代码,我们可以得出:
(1)线程池的状态和工作线程的数量共同保存在控制变量ctl中,类似于AQS中的state变量,不过这里是直接使用的AtomicInteger,这里换成unsafe+volatile也是可以的;
(2)ctl的高三位保存运行状态,低29位保存工作线程的数量,也就是说线程的数量最多只能有(2^29-1)个,也就是上面的CAPACITY;关于ctl的详细讲解见:【线程池】线程池的ctl属性详解
(3)线程池的状态一共有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED;
(4)RUNNING,表示可接受新任务,且可执行队列中的任务;
(5)SHUTDOWN,表示不接受新任务,但可执行队列中的任务;
(6)STOP,表示不接受新任务,且不再执行队列中的任务,且中断正在执行的任务;
(7)TIDYING,所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;
(8)TERMINATED,中止状态,已经执行完terminated()钩子方法;
下面我们再通过流程图来看看这些状态之间是怎么流转的:
(1)新建线程池时,它的初始状态为RUNNING,这个在上面定义ctl的时候可以看到;
(2)RUNNING->SHUTDOWN,执行shutdown()方法时;
(3)RUNNING->STOP,执行shutdownNow()方法时;
(4)SHUTDOWN->STOP,执行shutdownNow()方法时;
(5)STOP->TIDYING,执行了shutdown()或者shutdownNow()后,所有任务已中止,且工作线程数量为0时,此时会执行terminated()方法;
(6)TIDYING->TERMINATED,执行完terminated()方法后;
下面让我们一起来看看源码中是怎么控制线程池状态的。
(1)RUNNING初始化状态
RUNNING,比较简单,创建线程池的时候就会初始化ctl,而ctl初始化为RUNNING状态,所以线程池的初始状态就为RUNNING状态。
// 初始状态为RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
(2)SHUTDOWN()方法
执行shutdown()方法时把状态修改为SHUTDOWN,这里肯定会成功,因为advanceRunState()方法中是个自旋,不成功不会退出。
public void shutdown() {
// 获取线程池锁
final ReentrantLock mainLock = this.mainLock;
// 加锁,修改线程池状态
mainLock.lock();
try {
// 检查是否有权限关闭线程池
checkShutdownAccess();
// 修改状态为SHUTDOWN,自旋操作,只有状态修改成功才会返回
advanceRunState(SHUTDOWN);
// 标记空闲线程为中断状态
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void advanceRunState(int targetState) {
// 自旋修改线程池状态
for (;;) {
// 获取ctl
int c = ctl.get();
// 如果状态大于SHUTDOWN(因为只有RUNNING状态才能转化为SHUTDOWN状态,而只有RUNNING状态是小于SHUTDOWN状态的),
// 或者修改为SHUTDOWN成功了,才会break跳出自旋
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
(3)STOP
执行shutdownNow()方法时,会把线程池状态修改为STOP状态,同时标记所有线程为中断状态。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
// 获取线程池的锁
final ReentrantLock mainLock = this.mainLock;
// 加锁,进行状态修改操作
mainLock.lock();
try {
// 检查是否有权限关闭线程池
checkShutdownAccess();
// 修改为STOP状态
advanceRunState(STOP);
// 标记所有线程为中断状态
interruptWorkers();
// 将队列中的任务全部移除,并返回
tasks = drainQueue();
} finally {
// 解锁
mainLock.unlock();
}
// 尝试终止线程池
tryTerminate();
// 返回队列中的任务
return tasks;
}
至于线程是否响应中断其实是在队列的take()或poll()方法中响应的,最后会到AQS中,它们检测到线程中断了会抛出一个InterruptedException异常,然后getTask()中捕获这个异常,并且在下一次的自旋时退出当前线程并减少工作线程的数量。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果状态为STOP了,这里会直接退出循环,且减少工作线程数量
// 退出循环了也就相当于这个线程的生命周期结束了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 真正响应中断是在poll()方法或者take()方法中
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
// 这里捕获中断异常
timedOut = false;
}
}
}
这里有一个问题,就是已经通过getTask()取出来且返回的任务怎么办?
实际上它们会正常执行完毕,这个知识点涉及到runWorker()这个方法,我们在后面会分析这个方法。
(4)TIDYING
当执行shutdown()或shutdownNow()之后,如果所有任务已中止,且工作线程数量为0,就会进入这个状态。
final void tryTerminate() {
// 自旋修改状态
for (;;) {
int c = ctl.get();
// 下面几种情况不会执行后续代码
// 1. 运行中
// 2. 状态的值比TIDYING还大,也就是TERMINATED
// 3. SHUTDOWN状态且任务队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 工作线程数量不为0,也不会执行后续代码
if (workerCountOf(c) != 0) {
// 尝试中断空闲的线程
interruptIdleWorkers(ONLY_ONE);
return;
}
// 获取线程池的锁
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// CAS修改状态为TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 更新成功,执行terminated钩子方法
terminated();
// 确保terminated钩子方法执行完毕后,再次修改状态为TERMINATED(最终的终止状态),线程池彻底关闭
} finally {
// 强制更新状态为TERMINATED,这里不需要CAS了
ctl.set(ctlOf(TERMINATED, 0));
// 通知所有等待线程
termination.signalAll();
}
return;
}
} finally {
// 解锁
mainLock.unlock();
}
// else retry on failed CAS
}
}
实际更新状态为TIDYING和TERMINATED状态的代码都在tryTerminate()方法中,实际上tryTerminated()方法在很多地方都有调用,比如shutdown()、shutdownNow()、线程退出时,所以说几乎每个线程最后消亡的时候都会调用tryTerminate()方法,但最后只会有一个线程真正执行到修改状态为TIDYING的地方。
修改状态为TIDYING后执行terminated()方法,最后修改状态为TERMINATED,标志着线程池真正消亡了。
(5)TERMINATED
见上面TIDYING中分析。
线程池执行原理
本章节我们讲的是普通线程池,以固定线程池为例,下面就是线程池提交任务的基本用法。我们可以使用Executors框架来直接创建线程池,但是本质它还是通过ThreadPoolExecutor的构造方法创建的线程池。
// 固定线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交单个任务,无返回值
executorService.execute(new Runnable() {
public void run() {
}
});
// 提交单个任务,有返回值,返回Future
executorService.submit(new Callable<String>() {
public String call() throws Exception {
return "abc";
}
});
初始化构造器
ThreadPoolExecutor的多个构造方法,都是调用的下面这个构造方法来实现的。
/**
* 用给定的初始参数创建一个新的ThreadPoolExecutor。
* 所有的构造方法最终都会调用这个构造方法,都是基于这个方法实现的
*/
public ThreadPoolExecutor( int corePoolSize, // 线程池的核心线程数量
int maximumPoolSize, // 线程池的最大线程数
long keepAliveTime, // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
TimeUnit unit, // 存活时间的时间单位
BlockingQueue<Runnable> workQueue, // 任务队列,用来储存等待执行任务的队列
ThreadFactory threadFactory, // 线程工厂,用来创建线程,一般默认即可
RejectedExecutionHandler handler // 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面我们通过这个构造方法创建一个线程池,它的核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话。如果使用它运行20个任务,会是什么结果呢?
public class ThreadPoolTest01 {
public static void main(String[] args) {
// 通过构造方法新建一个线程池
// 核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话
ExecutorService threadPool = new ThreadPoolExecutor(5, 10,
1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
Executors.defaultThreadFactory(),
new RejectedExecutionHandler() { // 自定义拒绝策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(currentThreadName() + ", discard task");
}
}
);
// 循环提交20个任务,注意观察num
for (int i = 0; i < 20; i++) {
int num = i;
threadPool.execute(()->{
try {
// 打印执行任务的线程信息 和 num的大小 以及执行的时间
System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
// 打印当前线程的名字
private static String currentThreadName() {
return Thread.currentThread().getName();
}
}
我们一起来看看一次运行的结果:
pool-1-thread-1, 0 running, 1572678434411
pool-1-thread-3, 2 running, 1572678434411
pool-1-thread-2, 1 running, 1572678434411
pool-1-thread-4, 3 running, 1572678434411
pool-1-thread-5, 4 running, 1572678434411
pool-1-thread-6, 10 running, 1572678434412
pool-1-thread-7, 11 running, 1572678434412
pool-1-thread-8, 12 running, 1572678434412
main, discard task
main, discard task
main, discard task
main, discard task
main, discard task
pool-1-thread-9, 13 running, 1572678434412
pool-1-thread-10, 14 running, 1572678434412
pool-1-thread-3, 5 running, 1572678436411
pool-1-thread-1, 6 running, 1572678436411
pool-1-thread-6, 7 running, 1572678436412
pool-1-thread-2, 8 running, 1572678436412
pool-1-thread-7, 9 running, 1572678436412
注意,观察num值的打印信息,先是打印了04,再打印了1014,最后打印了5~9,竟然不是按顺序打印的,为什么呢?
下面我们基于上面这段代码示例,一步一步debug进去查看ThreadPoolExecutor中地相关源码。
execute()方法——提交任务
execute()方法是线程池提交任务的方法之一,也是最核心的方法。该方法是在Executor线程池顶层接口中定义的,用于提交Runnable无返回值任务到线程池中执行,该方法也没有返回值。该方法在ThreadPoolExecutor中实现。
java.util.concurrent.ThreadPoolExecutor#execute
// 提交任务,任务并非立即执行,所以翻译成执行任务似乎不太合适,其实是将任务提交到任务队列中,然后线程池再从任务队列中取出任务执行
public void execute(Runnable command) {
// 任务不能为空
if (command == null)
throw new NullPointerException();
// 获取ctl(高3位存储状态,低29位存储工作线程的数量)
int c = ctl.get();
// 1. 如果工作线程数量小于核心数量
if (workerCountOf(c) < corePoolSize) {
// 就添加一个工作线程(核心) 并将该任务作为该工作线程的第一个任务,创建线程完成后直接返回
if (addWorker(command, true))
return;
// 如果创建工作线程失败,则重新获取下ctl
c = ctl.get();
}
// 2. 如果达到了核心数量且线程池是运行状态,则将任务加入队列(workQueue是构造方法的核心参数之一)
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取ctl
int recheck = ctl.get();
// 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 容错检查工作线程数量是否为0,如果为0就创建一个,此时就不用为工作线程绑定任务了,因为任务已经加入到队列中了
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 任务入队列失败(例如队列满了),尝试创建非核心工作线程,将该任务和非核心工作线程绑定在一起
else if (!addWorker(command, false))
// 非核心工作线程创建失败,执行拒绝策略
reject(command);
}
提交任务的过程大致如下:
(1)工作线程数量小于核心数量,创建核心线程,并将任务交给该核心线程去执行;
(2)达到核心数量,将任务加入任务队列;
(3)任务队列满了,创建非核心线程,并将任务交给该非核心线程去执行;
(4)线程数达到最大数量,队列也满了,则执行拒绝策略;
其实,就是三道坎——核心数量、任务队列、最大数量,这样就比较好记了。
流程图大致如下:
任务流转的过程我们知道了,但是任务是在哪里执行的呢?继续往下看。
addWorker()方法——添加工作线程并启动
retry:标记
讲解这个方法之前先了解下 retry: 因为源码中有这个retry标记
先看一个简单的例子
public void testRetry() {
int i = 0;
retry: //①
while (true) {
i++;
System.out.println("i=" + i);
int j = 0;
// retry: //②
for (; ; ) {
j++;
System.out.println("j=" + j);
if (j == 2) {
break retry;
}
}
}
}
首先需要说明的是,retry:可以理解为java中的一种特殊的标记,其中retry可以换成任何合法的命名。
a:、b:、A13: 等等都是可以的。该标记其实就是”:”,符号前面是自己定义的任意合法名字。
打开①,关闭② 打印结果
i=1
j=1
j=2
这里有一点需要注意,这里和C语言的goto不一样,如果break到了循环外面(就是说再也没有循环套在retry外面了),那么这个程序就永远不会再进入到这个循环了,而C语言的goto只是重新调整了程序的执行位置,程序会接着按照代码向下执行,进入循环。如果是用的continue retry就还可以再进入循环。
打开②,关闭①,打印结果
....
j=1
j=2
i=132348
j=1
j=2
i=132349
j=1
j=2
i=132350
j=1
j=2
...一直循环打印
retry相当于一个标记,只用在循环里面,很像goto语句,break到retry字符处。如果retry没有在循环(for,while)里面,在执行到retry时,就会跳出整个循环。如果retry在循环里面,可以理解为跳到了关键字处执行,不管几层循环。continue理解也是一样。
注意:retry:需要放在for,whlie,do…while的前面声明,变量只跟在break和continue后面。
addWorker()方法源码
如果工作线程数小于核心线程数的话,会调用 addWorker,顾名思义,这个方法主要用来创建一个工作线程,并启动之,其中会做线程池状态、工作线程数量等各种检测。
源码很长,但其实就做了两件事:
- 使用循环 CAS 操作来将线程数加 1;
- 新建一个线程并启用。
java.util.concurrent.ThreadPoolExecutor#addWorker
// 该方法用于创建新的工作线程添加到线程池中,如果添加成功则启动线程
// 参数firstTask是一个Runnable对象,表示要给该线程的第一个任务
// 参数core表示创建的是否是核心线程
// 返回值表示是否成功创建并启动了工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
// 判断有没有资格创建新的工作线程
// 主要是一些状态/数量的检查等等
retry:
for (;;) {
// 获取ctl
int c = ctl.get();
// 获取当前线程池运行状态
int rs = runStateOf(c);
// 线程池状态检查
if (rs >= SHUTDOWN && // 大于等于SHUTDOWN说明线程池不是RUNNING状态
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 两种情况会直接返回false 1.如果非运行状态 2.不是【线程池是停止状态并且传入的任务是null并且workQueue不等于空】这种情况
return false;
// 工作线程数量检查
for (;;) {
// 获取当前线程数
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
// 判断是否饱和容量了,如果已经达到最大线程数则不能再新建线程,直接返回false
return false;
// 完成了上面的判断后,说明现在线程池可以创建新线程,则将线程数量加1,因为后面要创建线程了,并跳出循环
if (compareAndIncrementWorkerCount(c))
// 因为外面的retry:已经不在循环内了,跳到retry:位置后也不会再进入到该循环了,其实就相当于跳出这两层循环并且不再进入这两层循环,执行后续的流程
break retry;
// 如果上面的compareAndIncrementWorkerCount(c)方法返回false,则说明有其他线程在操作线程池的线程数量,所以需要重新获取ctl
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态已经和最开始获取的状态不一样了
if (runStateOf(c) != rs)
// 回到retry:,但是因为这里用的是continue,所以程序往后执行还是会再次进入到该循环继续执行上面的判断
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作
// 标记是否启动了工作线程
boolean workerStarted = false;
// 标记是否成功添加了工作线程
boolean workerAdded = false;
// 要创建的Worker对象
Worker w = null;
try {
// 创建工作线程
// 增加一个worker 这个firstTask就是一个线程(任务线程,每一个Worker都和一个任务线程绑定),Runable,具体可以看这Worker源码
w = new Worker(firstTask);
// 这个是绑定在Worker上的工作线程,并不是任务线程,工作线程是用来执行任务的,
final Thread t = w.thread;
// 判断t是否为null
if (t != null) {
// 获取线程池的锁
final ReentrantLock mainLock = this.mainLock;
// 在读取线程池状态的时候就应该上锁,防止有并发操作破坏程序的一致性,上下不一致
mainLock.lock();
try {
// 锁定后并重新检查线程池的状态,查看是否存在线程工厂的失败或者锁定前的关闭
// 获取线程池运行状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加到工作线程队列。 workers是线程池中存放工作线程的集合
// 增加work 每一个线程池都持有一个workers集合,里面存储着线程池的所有worker,也就是所有线程
workers.add(w);
// 还在池子中的线程数量(只能在mainLock中使用)
int s = workers.size();
// 不能超过线程池的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
// 标记线程添加成功
workerAdded = true;
}
} finally {
// 解锁
mainLock.unlock();
}
if (workerAdded) {
// 线程添加成功之后启动线程
// 启动绑定在worker上的线程。启动了之后该工作线程就会开始从任务队列中拿任务去执行了
t.start();
// 标记工作线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)
if (! workerStarted)
addWorkerFailed(w);
}
// 返回新创建的工作线程是否启动成功
return workerStarted;
}
这里其实还没到任务执行的地方,上面我们可以看到线程是包含在Worker这个类中的,我们可以推测出其实执行任务的操作是在内部类Worker中完成的。
通过源码,可以总结出创建 Worker 失败的原因:
- 在添加时线程池被停止了
- 添加核心线程池时,超过核心线程池数量
- 添加非核心线程池时,超过最大线程池数量
- Worker 对象初始化未完成时就被使用,即 thread 对象还未完全创建
- 当前线程正在被运行(防止出现重复运行,抛出异常)
- 线程创建过多,导致 OOM
addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// 移除添加失败的worker
if (w != null)
workers.remove(w);
// 减少worker数量
decrementWorkerCount();
// 尝试终止线程池
tryTerminate();
} finally {
// 解锁
mainLock.unlock();
}
这个方法主要做三件事
- 如果 worker 已经构造好了,则从 workers 集合中移除这个 worker
- 原子递减核心线程数(因为在 addWorker 方法中先做了原子增加)
- 尝试结束线程池
线程池添加线程流程图:
runWorker()方法——工作线程执行任务
其实真正执行任务的操作,是在runWorker()方法中完成的。该方法是ThreadPoolExecutor中的核心方法之一,但是调用该方法确实通过内部类Worker实现的。
上面在addWorker()方法的源码中,ThreadPoolExecutor线程池调用了绑定在Worker对象上的t工作线程的start()方法,也就相当于启动了Worker工作线程。Worker实现了Runnable方法,所以它其中存在run()方法,当启动了工作线程之后,就会自动执行run()方法,而run()方法其实调用的就是ThreadPoolExecutor的runWorker()方法,该方法就会让工作线程持续地去队列中拿任务执行。
至于ThreadPoolExecutor如何执行任务和对其源码的分析,详见:Worker源码分析。这里就不再赘述了。
总结
通过上面对线程池的重要方法地分析,我们了解了线程池中普通任务执行的流程。
(1)execute(),提交任务的方法,根据核心数量、任务队列大小、最大数量,分成四种情况判断任务应该往哪去;
(2)addWorker(),添加工作线程的方法,通过Worker内部类封装一个Thread实例维护工作线程的执行;
(3)runWorker(),真正执行任务的地方,先执行第一个任务,再源源不断从任务队列中取任务来执行;
(4)getTask(),真正从队列取任务的地方,默认情况下,根据工作线程数量与核心数量的关系判断使用队列的poll()还是take()方法,keepAliveTime参数也是在这里使用的。
线程池运作的总流程图:
- 如果当前运行的线程数(当前池大小)poolSize 小于 corePoolSize ,则创建新线程执行任务
- 如果当前运行的线程数(当前池大小)poolSize 大于 corePoolSize ,且等待队列未满,则进入等待队列
- 如果当前运行的线程数(当前池大小)poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待队列已满,则创建新线程执行任务
- 如果当前运行的线程数(当前池大小)poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待队列已满,则调用拒绝策略来处理该任务,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。
- 线程池里的每个线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行
- 如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出,默认只有超过核心数的线程空闲时间超过keepAliveTime才会被销毁。但是也可以通过allowCoreThreadTimeOut(true)来设置,核心数以内的线程空闲超过keepAliveTime后也会关闭
下面这个流程图我感觉可能更能直观的看出ThreadPoolExecutor每一部分的分工:
下面这个流程我我感觉能更直观展现每一个方法的调用流程:
至此,我们再看最开始给出的那个使用例子带来的问题:
观察num值的打印信息,先是打印了04,再打印了1014,最后打印了5~9,竟然不是按顺序打印的,为什么呢?
线程池的参数:核心数量5个,最大数量10个,任务队列5个。
答:执行前5个任务执行时,正好还不到核心数量,所以新建核心线程并执行了他们;
执行中间的5个任务时,已达到核心数量,所以他们先入队列;
执行后面5个任务时,已达核心数量且队列已满,所以新建非核心线程并执行了他们;
再执行最后5个任务时,线程池已达到满负荷状态,所以执行了拒绝策略。
最后再来一个ThreadPoolExecutor运行机制图示:
总结
- 线程池的本质就是一种池化的思想,复用线程,减少线程的创建和销毁的开销。常量池和连接池也都是这样的思想。
- 线程池的设计思路是生产者和消费者模型,通过队列进行解耦,而使用阻塞队列则是为了并发安全和不必要的自旋。其实消息队列也是生产者和消费者模型,因为消息队列和阻塞队列都是队列。
- 线程池如果用不好可能会死锁,即尽量不要在线程池内部使用 execute 方法提交任务,因为可能会出现循环等待的情况。而且线程池使用 submit() 方法要注意选择合适的拒绝策略,因为 submit() 会将任务包装成 FutureTask 对象,如果任务被拒绝,即没有调用 run() 方法,那么调用 get() 方法的线程会被阻塞住。
面试问题
核心线程和非核心线程有什么区别?
答:实际上并没有什么区别,主要是根据corePoolSize来判断任务该去哪里,两者在执行任务的过程中并没有任何区别。有可能新建的时候是核心线程,而keepAliveTime时间到了结束了的也可能是刚开始创建的核心线程。
Worker继承自AQS有何意义?
前面我们看了Worker内部类的定义,它继承自AQS,天生自带锁的特性,那么,它的锁是用来干什么的呢?跟任务的执行有关系吗?
答:既然是跟锁(同步)有关,说明Worker类跨线程使用了,此时我们查看它的lock()方法发现只在runWorker()方法中使用了,但是其tryLock()却是在interruptIdleWorkers()方法中使用的。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers()方法的意思是中断空闲线程的意思,它只会中断BlockingQueue的poll()或take()方法,而不会中断正在执行的任务。
一般来说,interruptIdleWorkers()方法的调用不是在本工作线程,而是在主线程中调用的,还记得shutdown()和shutdownNow()方法吗?
观察两个方法中中断线程的方法,shutdown()中就是调用了interruptIdleWorkers()方法来中断线程地,而且interruptIdleWorkers()方法中就用到了tryLock(),只有获取到锁了才能中断线程,如果没有获取到锁则不中断。而调用tryLock()后没获取到锁只有一种原因,那就是lock()所在的地方runWorker()方法中,有任务正在执行。这样shutdown()方法就实现了只中断空闲线程,不会中断正在执行任务的线程。
而shutdownNow()中中断线程则很暴力,并没有tryLock(),而是直接中断了线程,所以调用shutdownNow()可能会中断正在执行的任务。
所以,Worker继承自AQS实际是要使用其锁的能力,这个锁主要是用来控制调用shutdown()时不要中断正在执行任务的线程。
那么为什么Worker使用AQS实现锁,而不直接用ReentrantLock呢?我们可以看到Worker的tryAcquire 方法,它是不允许重入的,而 ReentrantLock是允许重入的。所以这是为了实现不可重入的特性去反应线程现在的执行状态。
execute() 和 submit() 的区别
很多同学应该关注到了。线程池的执行任务有两种方法,一种是 submit、一种是 execute; 这两个方法是有区别的。
execute:
- execute 只可以接收一个 Runnable 的参数
- execute 如果出现异常会抛出
- execute 没有返回值
submit:
- submit 可以接收 Runable 和 Callable 这两种类型的参数,
- 对于 submit 方法,如果传入一个 Callable,可以得到一个 Future 的返回值
- submit 方法调用不会抛异常,除非调用 Future.get()才会在调用get()方法时抛出异常。
RPC框架中异步调用是怎么实现的?
答:RPC框架常用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的。
一般地,通过一个线程(我们叫作远程线程)去调用远程接口,如果是同步调用,则直接让调用者线程阻塞着等待远程线程调用的结果,待结果返回了再返回;如果是异步调用,则先返回一个未来可以获取到远程结果的东西FutureXxx,当然,如果这个FutureXxx在远程结果返回之前调用了get()方法一样会阻塞着调用者线程。
有兴趣的同学可以先去预习一下dubbo的异步调用(它是把Future扔到RpcContext中的)。