【Java 多线程并发】 ThreadPoolExecutor

Metadata

title: 【Java 多线程并发】 ThreadPoolExecutor
date: 2023-07-05 12:34
tags:
  - 行动阶段/完成
  - 主题场景/程序
  - 笔记空间/KnowladgeSpace/ProgramSpace/BasicsSpace
  - 细化主题/Java
categories:
  - Java
keywords:
  - Java
description: 【Java 多线程并发】 ThreadPoolExecutor

线程池框架

第一层结构

  1. sun.nio.ch.AsynchronousChannelGroupImpl(Iocp) 异步channel –AIO相关实现
  2. java.util.concurrent.CompletableFuture.ThreadPerTaskExecutor (启动一个线程执行)
  3. sun.net.httpserver.ServerImpl.DefaultExecutor (more执行器,直接执行)
  4. com.sun.jmx.remote.internal.ClientNotifForwarder.LinearExecutor (线性执行器)
  5. java.util.concurrent.ExecutorService (核心执行器服务)

接口简介

  1. java.util.concurrent.Executor (执行器,执行方法)
  2. java.util.concurrent.ExecutorService (执行服务) 包含服务的生命周期
  3. java.util.concurrent.ScheduledExecutorService (调度相关的服务)

核心实现类

  1. java.util.concurrent.ThreadPoolExecutor (普通的的线程池实现类)
  2. java.util.concurrent.ScheduledThreadPoolExecutor (调度的核心实现类)

辅助类

  1. java.util.concurrent.Executors

完成服务

java.util.concurrent.CompletionService
java.util.concurrent.ExecutorCompletionService

核心的话,我们的Java自带的线程池就是由两种类型(那些Executors提供的固定类型只是对这两种类型改了一些参数而已):

  1. 普通线程池 ThreadPoolExecutor
  2. 定时线程池 ScheduledThreadPoolExecutor

下面我们就先来分析普通线程池 ThreadPoolExecutor 的源码。

二、ThreadPoolExecutor的成员属性和内部类

主要成员属性以及工具方法

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 线程池的状态和线程数量都是通过一个原子类型AtomicInteger的变量ctl来表示的,创建线程池的时候就会将ctl初始化为RUNNING状态和0个工作线程
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 表示低29位的值,用于表示线程池中工作线程的数量
    private static final int COUNT_BITS = Integer.SIZE - 3; // =29
    // 用于表示线程池中工作线程的数量的最大值,等于2^29-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // =000 11111...
    // 线程池的五种状态,高3位表示    下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
    private static final int RUNNING    = -1 << COUNT_BITS; // 111 00000...
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000 00000...
    private static final int STOP       =  1 << COUNT_BITS; // 001 00000...
    private static final int TIDYING    =  2 << COUNT_BITS; // 010 00000...
    private static final int TERMINATED =  3 << COUNT_BITS; // 011 00000...
 
    // 传入ctl,计算返回线程池的状态
    // 通过与运算,将高3位的值保留,低29位的值置为0
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 传入ctl,计算返回线程池中工作线程的数量
    // 通过与运算,将高3位的值置为0,低29位的值保留
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 计算ctl的值,等于运行状态“加上”线程数量
    // 通过或运算,将高3位和低29位的值都保留,合并为ctl并返回
    private static int ctlOf(int rs, int wc) { return rs | wc; }
 
 
    // runStateLessThan方法用于判断线程池的状态是否小于某个状态
    private static boolean runStateLessThan(int c, int s) {
        return c < s;
    }
    // runStateAtLeast方法用于判断线程池的状态是否大于等于某个状态
    private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
    // isRunning方法用于判断线程池的状态是否是RUNNING状态
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
    // 该方法用于增加工作线程的数量,如果增加成功则返回true,否则返回false
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 该方法用于减少工作线程的数量,如果减少成功则返回true,否则返回false
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    // 该方法用于设置线程池的状态,如果设置成功则返回true,否则返回false
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
    // 用于保存工作线程的集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 线程池的状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // 线程池的条件锁,用mainLock创建的,在线程池的状态发生改变时,会使用这个条件锁来通知所有等待的线程
    private final Condition termination = mainLock.newCondition();
 
 
    // 核心参数:任务阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    // 核心参数:线程池的线程工厂,用于创建线程
    private volatile ThreadFactory threadFactory;
    // 核心参数:拒绝策略,用于在线程池已经关闭或者线程数已经达到最大值的情况下,对新任务的处理策略
    private volatile RejectedExecutionHandler handler;
    // 核心参数:当线程数大于核心线程数时,多余的空闲线程存活的最长时间
    private volatile long keepAliveTime;
    // 是否允许核心线程空闲退出,默认值为false
    private volatile boolean allowCoreThreadTimeOut;
    // 核心参数:线程池的核心线程数
    private volatile int corePoolSize;
    // 核心参数:线程池的最大线程数
    private volatile int maximumPoolSize;
 
    // 这是一个动态的变量。
    // largestPoolSize表示线程池中曾经出现过的最大线程数,即在线程池的整个生命周期中,曾经有多少个线程同时处于活动状态。所以说largestPoolSize是小于等于maximumPoolSize的。
    // 它的值可以通过ThreadPoolExecutor的getLargestPoolSize()方法获取。
    private int largestPoolSize;
    // 表示已经完成的任务数量
    private long completedTaskCount;
    // 默认的拒绝策略,默认的拒绝策略就是直接抛出异常。如果构造方法中没有转入指定的拒绝策略,就会将defaultHandler赋值给handler
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    ...
}

五种内部类

ThreadPoolExecutor中有五个内部类,总的来说其实就两类:1、拒绝策略(Policy) 2、工作线程类(Worker)

拒绝策略内部类(Policy)

当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢?这里的拒绝策略,就是解决这个问题的。

拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了,这些新来的任务应该按什么逻辑来处理。

工作线程内部类(Worker)

Worker类,每一个Worker类就会绑定一个线程(Worker类有一个成员属性持有一个线程对象),可以将Worker理解为线程池中执行任务的线程。但是实际上它并不仅仅是一个线程,Worker里面还会有一些统计信息,存储一些相关的数据。

源码原理解析(执行普通任务)

下面我们以执行普通任务为例,讲解ThreadPoolExecutor的源码。

线程池的生命周期

之前我们学过线程是有自己的生命周期的,其实线程池也是存在生命周期的。在我们讲线程池体系结构的时候,讲了一些方法,比如shutDown()/shutDownNow(),它们都是与线程池的生命周期相关联的。

我们先来看一下线程池ThreadPoolExecutor中定义的生命周期中的状态及相关方法:

// 线程池的状态和线程数量都是通过一个原子类型AtomicInteger的变量ctl来表示的,创建线程池的时候就会将ctl初始化为RUNNING状态和0个工作线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 表示低29位的值,用于表示线程池中工作线程的数量
private static final int COUNT_BITS = Integer.SIZE - 3; // =29
// 用于表示线程池中工作线程的数量的最大值,等于2^29-1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // =000 11111...
// 线程池的五种状态,高3位表示    下面的状态值依次增高,是根据他们状态流程的顺序依次增高的
private static final int RUNNING    = -1 << COUNT_BITS; // 111 00000...
private static final int SHUTDOWN   =  0 << COUNT_BITS; // 000 00000...
private static final int STOP       =  1 << COUNT_BITS; // 001 00000...
private static final int TIDYING    =  2 << COUNT_BITS; // 010 00000...
private static final int TERMINATED =  3 << COUNT_BITS; // 011 00000...
// 传入ctl,计算返回线程池的状态
// 通过与运算,将高3位的值保留,低29位的值置为0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 传入ctl,计算返回线程池中工作线程的数量
// 通过与运算,将高3位的值置为0,低29位的值保留
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 计算ctl的值,等于运行状态“加上”线程数量
// 通过或运算,将高3位和低29位的值都保留,合并为ctl并返回
private static int ctlOf(int rs, int wc) { return rs | wc; }
 
 
// runStateLessThan方法用于判断线程池的状态是否小于某个状态
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
// runStateAtLeast方法用于判断线程池的状态是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
// isRunning方法用于判断线程池的状态是否是RUNNING状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

从上面这段代码,我们可以得出:

(1)线程池的状态和工作线程的数量共同保存在控制变量ctl中,类似于AQS中的state变量,不过这里是直接使用的AtomicInteger,这里换成unsafe+volatile也是可以的;
(2)ctl的高三位保存运行状态,低29位保存工作线程的数量,也就是说线程的数量最多只能有(2^29-1)个,也就是上面的CAPACITY;关于ctl的详细讲解见:【线程池】线程池的ctl属性详解
(3)线程池的状态一共有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED;
(4)RUNNING,表示可接受新任务,且可执行队列中的任务;
(5)SHUTDOWN,表示不接受新任务,但可执行队列中的任务;
(6)STOP,表示不接受新任务,且不再执行队列中的任务,且中断正在执行的任务;
(7)TIDYING,所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法;
(8)TERMINATED,中止状态,已经执行完terminated()钩子方法;

下面我们再通过流程图来看看这些状态之间是怎么流转的:

(1)新建线程池时,它的初始状态为RUNNING,这个在上面定义ctl的时候可以看到;
(2)RUNNING->SHUTDOWN,执行shutdown()方法时;
(3)RUNNING->STOP,执行shutdownNow()方法时;
(4)SHUTDOWN->STOP,执行shutdownNow()方法时;
(5)STOP->TIDYING,执行了shutdown()或者shutdownNow()后,所有任务已中止,且工作线程数量为0时,此时会执行terminated()方法;
(6)TIDYING->TERMINATED,执行完terminated()方法后;

下面让我们一起来看看源码中是怎么控制线程池状态的。

(1)RUNNING初始化状态

RUNNING,比较简单,创建线程池的时候就会初始化ctl,而ctl初始化为RUNNING状态,所以线程池的初始状态就为RUNNING状态。

// 初始状态为RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

(2)SHUTDOWN()方法

执行shutdown()方法时把状态修改为SHUTDOWN,这里肯定会成功,因为advanceRunState()方法中是个自旋,不成功不会退出。

public void shutdown() {
    // 获取线程池锁
    final ReentrantLock mainLock = this.mainLock;
    // 加锁,修改线程池状态
    mainLock.lock();
    try {
          // 检查是否有权限关闭线程池
        checkShutdownAccess();
        // 修改状态为SHUTDOWN,自旋操作,只有状态修改成功才会返回
        advanceRunState(SHUTDOWN);
        // 标记空闲线程为中断状态
        interruptIdleWorkers();
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
 
private void advanceRunState(int targetState) {
    // 自旋修改线程池状态
    for (;;) {
        // 获取ctl
        int c = ctl.get();
        // 如果状态大于SHUTDOWN(因为只有RUNNING状态才能转化为SHUTDOWN状态,而只有RUNNING状态是小于SHUTDOWN状态的),
        // 或者修改为SHUTDOWN成功了,才会break跳出自旋
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

(3)STOP

执行shutdownNow()方法时,会把线程池状态修改为STOP状态,同时标记所有线程为中断状态。

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    // 获取线程池的锁
    final ReentrantLock mainLock = this.mainLock;
    // 加锁,进行状态修改操作
    mainLock.lock();
    try {
        // 检查是否有权限关闭线程池
        checkShutdownAccess();
        // 修改为STOP状态
        advanceRunState(STOP);
        // 标记所有线程为中断状态
        interruptWorkers();
        // 将队列中的任务全部移除,并返回
        tasks = drainQueue();
    } finally {
        // 解锁
        mainLock.unlock();
    }
    // 尝试终止线程池
    tryTerminate();
    // 返回队列中的任务
    return tasks;
}

至于线程是否响应中断其实是在队列的take()或poll()方法中响应的,最后会到AQS中,它们检测到线程中断了会抛出一个InterruptedException异常,然后getTask()中捕获这个异常,并且在下一次的自旋时退出当前线程并减少工作线程的数量。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 如果状态为STOP了,这里会直接退出循环,且减少工作线程数量
        // 退出循环了也就相当于这个线程的生命周期结束了
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            // 真正响应中断是在poll()方法或者take()方法中
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            // 这里捕获中断异常
            timedOut = false;
        }
    }
}

这里有一个问题,就是已经通过getTask()取出来且返回的任务怎么办?

实际上它们会正常执行完毕,这个知识点涉及到runWorker()这个方法,我们在后面会分析这个方法。

(4)TIDYING

当执行shutdown()或shutdownNow()之后,如果所有任务已中止,且工作线程数量为0,就会进入这个状态。

final void tryTerminate() {
    // 自旋修改状态
    for (;;) {
        int c = ctl.get();
        // 下面几种情况不会执行后续代码
        // 1. 运行中
        // 2. 状态的值比TIDYING还大,也就是TERMINATED
        // 3. SHUTDOWN状态且任务队列不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        // 工作线程数量不为0,也不会执行后续代码
        if (workerCountOf(c) != 0) {
            // 尝试中断空闲的线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        
        // 获取线程池的锁
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        mainLock.lock();
        try {
            // CAS修改状态为TIDYING状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 更新成功,执行terminated钩子方法
                    terminated();
                // 确保terminated钩子方法执行完毕后,再次修改状态为TERMINATED(最终的终止状态),线程池彻底关闭
                } finally {
                    // 强制更新状态为TERMINATED,这里不需要CAS了
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 通知所有等待线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 解锁
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

实际更新状态为TIDYING和TERMINATED状态的代码都在tryTerminate()方法中,实际上tryTerminated()方法在很多地方都有调用,比如shutdown()、shutdownNow()、线程退出时,所以说几乎每个线程最后消亡的时候都会调用tryTerminate()方法,但最后只会有一个线程真正执行到修改状态为TIDYING的地方。

修改状态为TIDYING后执行terminated()方法,最后修改状态为TERMINATED,标志着线程池真正消亡了。

(5)TERMINATED

见上面TIDYING中分析。

线程池执行原理

本章节我们讲的是普通线程池,以固定线程池为例,下面就是线程池提交任务的基本用法。我们可以使用Executors框架来直接创建线程池,但是本质它还是通过ThreadPoolExecutor的构造方法创建的线程池。

// 固定线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 提交单个任务,无返回值
executorService.execute(new Runnable() {
    public void run() {
    }
});
// 提交单个任务,有返回值,返回Future
executorService.submit(new Callable<String>() {
    public String call() throws Exception {
        return "abc";
    }
});

初始化构造器

ThreadPoolExecutor的多个构造方法,都是调用的下面这个构造方法来实现的。

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 * 所有的构造方法最终都会调用这个构造方法,都是基于这个方法实现的
 */
public ThreadPoolExecutor(  int corePoolSize, // 线程池的核心线程数量
                            int maximumPoolSize, // 线程池的最大线程数
                            long keepAliveTime, // 当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                            TimeUnit unit, // 存活时间的时间单位
                            BlockingQueue<Runnable> workQueue, // 任务队列,用来储存等待执行任务的队列
                            ThreadFactory threadFactory, // 线程工厂,用来创建线程,一般默认即可
                            RejectedExecutionHandler handler // 拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                        ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
 
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
 
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

下面我们通过这个构造方法创建一个线程池,它的核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话。如果使用它运行20个任务,会是什么结果呢?

public class ThreadPoolTest01 {
    public static void main(String[] args) {
        // 通过构造方法新建一个线程池
        // 核心数量为5,最大数量为10,空闲时间为1秒,队列长度为5,拒绝策略打印一句话
        ExecutorService threadPool = new ThreadPoolExecutor(5, 10,
                1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5),
                Executors.defaultThreadFactory(), 
                new RejectedExecutionHandler() { // 自定义拒绝策略
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(currentThreadName() + ", discard task");
                    }
                }
        );
        // 循环提交20个任务,注意观察num
        for (int i = 0; i < 20; i++) {
            int num = i;
            threadPool.execute(()->{
                try {
                    // 打印执行任务的线程信息 和 num的大小  以及执行的时间
                    System.out.println(currentThreadName() + ", "+ num + " running, " + System.currentTimeMillis());
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    
    // 打印当前线程的名字
    private static String currentThreadName() {
        return Thread.currentThread().getName();
    }
}

我们一起来看看一次运行的结果:

pool-1-thread-1, 0 running, 1572678434411
pool-1-thread-3, 2 running, 1572678434411
pool-1-thread-2, 1 running, 1572678434411
pool-1-thread-4, 3 running, 1572678434411
pool-1-thread-5, 4 running, 1572678434411
pool-1-thread-6, 10 running, 1572678434412
pool-1-thread-7, 11 running, 1572678434412
pool-1-thread-8, 12 running, 1572678434412

main, discard task
main, discard task
main, discard task
main, discard task
main, discard task

pool-1-thread-9, 13 running, 1572678434412
pool-1-thread-10, 14 running, 1572678434412
pool-1-thread-3, 5 running, 1572678436411
pool-1-thread-1, 6 running, 1572678436411
pool-1-thread-6, 7 running, 1572678436412
pool-1-thread-2, 8 running, 1572678436412
pool-1-thread-7, 9 running, 1572678436412

注意,观察num值的打印信息,先是打印了04,再打印了1014,最后打印了5~9,竟然不是按顺序打印的,为什么呢?

下面我们基于上面这段代码示例,一步一步debug进去查看ThreadPoolExecutor中地相关源码。

execute()方法——提交任务

execute()方法是线程池提交任务的方法之一,也是最核心的方法。该方法是在Executor线程池顶层接口中定义的,用于提交Runnable无返回值任务到线程池中执行,该方法也没有返回值。该方法在ThreadPoolExecutor中实现。

java.util.concurrent.ThreadPoolExecutor#execute

// 提交任务,任务并非立即执行,所以翻译成执行任务似乎不太合适,其实是将任务提交到任务队列中,然后线程池再从任务队列中取出任务执行
public void execute(Runnable command) {
    // 任务不能为空
    if (command == null)
        throw new NullPointerException();
        
    // 获取ctl(高3位存储状态,低29位存储工作线程的数量)
    int c = ctl.get();
    // 1. 如果工作线程数量小于核心数量
    if (workerCountOf(c) < corePoolSize) {
        // 就添加一个工作线程(核心)  并将该任务作为该工作线程的第一个任务,创建线程完成后直接返回
        if (addWorker(command, true))
            return;
        // 如果创建工作线程失败,则重新获取下ctl
        c = ctl.get();
    }
    // 2. 如果达到了核心数量且线程池是运行状态,则将任务加入队列(workQueue是构造方法的核心参数之一)
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次获取ctl
        int recheck = ctl.get();
        // 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 容错检查工作线程数量是否为0,如果为0就创建一个,此时就不用为工作线程绑定任务了,因为任务已经加入到队列中了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 任务入队列失败(例如队列满了),尝试创建非核心工作线程,将该任务和非核心工作线程绑定在一起
    else if (!addWorker(command, false))
        // 非核心工作线程创建失败,执行拒绝策略
        reject(command);
}

提交任务的过程大致如下:

(1)工作线程数量小于核心数量,创建核心线程,并将任务交给该核心线程去执行;
(2)达到核心数量,将任务加入任务队列;
(3)任务队列满了,创建非核心线程,并将任务交给该非核心线程去执行;
(4)线程数达到最大数量,队列也满了,则执行拒绝策略;

其实,就是三道坎——核心数量、任务队列、最大数量,这样就比较好记了。

流程图大致如下:

任务流转的过程我们知道了,但是任务是在哪里执行的呢?继续往下看。

addWorker()方法——添加工作线程并启动

retry:标记

讲解这个方法之前先了解下 retry: 因为源码中有这个retry标记

先看一个简单的例子

public void testRetry() {
    int i = 0;
    retry:  //①
    while (true) {
        i++;
        System.out.println("i=" + i);
        int j = 0;
               // retry:   //②
        for (; ; ) {
            j++;
            System.out.println("j=" + j);
            if (j == 2) {
                break retry;
            }
        }
    }
}

首先需要说明的是,retry:可以理解为java中的一种特殊的标记,其中retry可以换成任何合法的命名。

a:、b:、A13: 等等都是可以的。该标记其实就是”:”,符号前面是自己定义的任意合法名字。

打开①,关闭② 打印结果

i=1
j=1
j=2

这里有一点需要注意,这里和C语言的goto不一样,如果break到了循环外面(就是说再也没有循环套在retry外面了),那么这个程序就永远不会再进入到这个循环了,而C语言的goto只是重新调整了程序的执行位置,程序会接着按照代码向下执行,进入循环。如果是用的continue retry就还可以再进入循环。

打开②,关闭①,打印结果

....

j=1
j=2
i=132348

j=1
j=2
i=132349

j=1
j=2
i=132350

j=1
j=2
...一直循环打印

retry相当于一个标记,只用在循环里面,很像goto语句,break到retry字符处。如果retry没有在循环(for,while)里面,在执行到retry时,就会跳出整个循环。如果retry在循环里面,可以理解为跳到了关键字处执行,不管几层循环。continue理解也是一样。

注意:retry:需要放在for,whlie,do…while的前面声明,变量只跟在break和continue后面。

addWorker()方法源码

如果工作线程数小于核心线程数的话,会调用 addWorker,顾名思义,这个方法主要用来创建一个工作线程,并启动之,其中会做线程池状态、工作线程数量等各种检测。

源码很长,但其实就做了两件事:

  1. 使用循环 CAS 操作来将线程数加 1;
  2. 新建一个线程并启用。

java.util.concurrent.ThreadPoolExecutor#addWorker

// 该方法用于创建新的工作线程添加到线程池中,如果添加成功则启动线程
// 参数firstTask是一个Runnable对象,表示要给该线程的第一个任务
// 参数core表示创建的是否是核心线程
// 返回值表示是否成功创建并启动了工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
    // 判断有没有资格创建新的工作线程
    // 主要是一些状态/数量的检查等等
    retry:
    for (;;) {
        // 获取ctl
        int c = ctl.get();
        // 获取当前线程池运行状态
        int rs = runStateOf(c);
        // 线程池状态检查
        if (rs >= SHUTDOWN && // 大于等于SHUTDOWN说明线程池不是RUNNING状态
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // 两种情况会直接返回false  1.如果非运行状态  2.不是【线程池是停止状态并且传入的任务是null并且workQueue不等于空】这种情况   
            return false;
        // 工作线程数量检查
        for (;;) {
            // 获取当前线程数
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 判断是否饱和容量了,如果已经达到最大线程数则不能再新建线程,直接返回false
                return false;
            // 完成了上面的判断后,说明现在线程池可以创建新线程,则将线程数量加1,因为后面要创建线程了,并跳出循环
            if (compareAndIncrementWorkerCount(c))
                // 因为外面的retry:已经不在循环内了,跳到retry:位置后也不会再进入到该循环了,其实就相当于跳出这两层循环并且不再进入这两层循环,执行后续的流程
                break retry;
            // 如果上面的compareAndIncrementWorkerCount(c)方法返回false,则说明有其他线程在操作线程池的线程数量,所以需要重新获取ctl
            c = ctl.get();  // Re-read ctl
            // 如果当前的运行状态已经和最开始获取的状态不一样了
            if (runStateOf(c) != rs)
                // 回到retry:,但是因为这里用的是continue,所以程序往后执行还是会再次进入到该循环继续执行上面的判断
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
    // 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作
    // 标记是否启动了工作线程
    boolean workerStarted = false;
    // 标记是否成功添加了工作线程
    boolean workerAdded = false;
    // 要创建的Worker对象
    Worker w = null;
    try {
        // 创建工作线程
        // 增加一个worker   这个firstTask就是一个线程(任务线程,每一个Worker都和一个任务线程绑定),Runable,具体可以看这Worker源码
        w = new Worker(firstTask);
        // 这个是绑定在Worker上的工作线程,并不是任务线程,工作线程是用来执行任务的,
        final Thread t = w.thread;
        // 判断t是否为null
        if (t != null) {
            // 获取线程池的锁
            final ReentrantLock mainLock = this.mainLock;
            // 在读取线程池状态的时候就应该上锁,防止有并发操作破坏程序的一致性,上下不一致
            mainLock.lock();
            try {
                // 锁定后并重新检查线程池的状态,查看是否存在线程工厂的失败或者锁定前的关闭
                // 获取线程池运行状态
                int rs = runStateOf(ctl.get());
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    
                    // 添加到工作线程队列。 workers是线程池中存放工作线程的集合
                    // 增加work  每一个线程池都持有一个workers集合,里面存储着线程池的所有worker,也就是所有线程
                    workers.add(w);
                    // 还在池子中的线程数量(只能在mainLock中使用)
                    int s = workers.size();
                    // 不能超过线程池的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    
                    // 标记线程添加成功
                    workerAdded = true;
                }
            } finally {
                // 解锁
                mainLock.unlock();
            }
            if (workerAdded) {
                // 线程添加成功之后启动线程
                // 启动绑定在worker上的线程。启动了之后该工作线程就会开始从任务队列中拿任务去执行了
                t.start();
                // 标记工作线程启动成功
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等)
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回新创建的工作线程是否启动成功
    return workerStarted;
}

这里其实还没到任务执行的地方,上面我们可以看到线程是包含在Worker这个类中的,我们可以推测出其实执行任务的操作是在内部类Worker中完成的。

通过源码,可以总结出创建 Worker 失败的原因:

  1. 在添加时线程池被停止了
  2. 添加核心线程池时,超过核心线程池数量
  3. 添加非核心线程池时,超过最大线程池数量
  4. Worker 对象初始化未完成时就被使用,即 thread 对象还未完全创建
  5. 当前线程正在被运行(防止出现重复运行,抛出异常)
  6. 线程创建过多,导致 OOM

addWorker 方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理:

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // 加锁
    mainLock.lock();
    try {
        // 移除添加失败的worker
        if (w != null)
            workers.remove(w);
        // 减少worker数量
        decrementWorkerCount();
        // 尝试终止线程池
        tryTerminate();
    } finally {
        // 解锁
        mainLock.unlock();
}

这个方法主要做三件事

  1. 如果 worker 已经构造好了,则从 workers 集合中移除这个 worker
  2. 原子递减核心线程数(因为在 addWorker 方法中先做了原子增加)
  3. 尝试结束线程池

线程池添加线程流程图:

runWorker()方法——工作线程执行任务

其实真正执行任务的操作,是在runWorker()方法中完成的。该方法是ThreadPoolExecutor中的核心方法之一,但是调用该方法确实通过内部类Worker实现的。

上面在addWorker()方法的源码中,ThreadPoolExecutor线程池调用了绑定在Worker对象上的t工作线程的start()方法,也就相当于启动了Worker工作线程。Worker实现了Runnable方法,所以它其中存在run()方法,当启动了工作线程之后,就会自动执行run()方法,而run()方法其实调用的就是ThreadPoolExecutor的runWorker()方法,该方法就会让工作线程持续地去队列中拿任务执行。

至于ThreadPoolExecutor如何执行任务和对其源码的分析,详见:Worker源码分析。这里就不再赘述了。

总结

通过上面对线程池的重要方法地分析,我们了解了线程池中普通任务执行的流程。

(1)execute(),提交任务的方法,根据核心数量、任务队列大小、最大数量,分成四种情况判断任务应该往哪去;
(2)addWorker(),添加工作线程的方法,通过Worker内部类封装一个Thread实例维护工作线程的执行;
(3)runWorker(),真正执行任务的地方,先执行第一个任务,再源源不断从任务队列中取任务来执行;
(4)getTask(),真正从队列取任务的地方,默认情况下,根据工作线程数量与核心数量的关系判断使用队列的poll()还是take()方法,keepAliveTime参数也是在这里使用的。

线程池运作的总流程图:

  1. 如果当前运行的线程数(当前池大小)poolSize 小于 corePoolSize ,则创建新线程执行任务
  2. 如果当前运行的线程数(当前池大小)poolSize 大于 corePoolSize ,且等待队列未满,则进入等待队列
  3. 如果当前运行的线程数(当前池大小)poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待队列已满,则创建新线程执行任务
  4. 如果当前运行的线程数(当前池大小)poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待队列已满,则调用拒绝策略来处理该任务,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。
  5. 线程池里的每个线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行
  6. 如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出,默认只有超过核心数的线程空闲时间超过keepAliveTime才会被销毁。但是也可以通过allowCoreThreadTimeOut(true)来设置,核心数以内的线程空闲超过keepAliveTime后也会关闭

下面这个流程图我感觉可能更能直观的看出ThreadPoolExecutor每一部分的分工:

下面这个流程我我感觉能更直观展现每一个方法的调用流程:

至此,我们再看最开始给出的那个使用例子带来的问题:

观察num值的打印信息,先是打印了04,再打印了1014,最后打印了5~9,竟然不是按顺序打印的,为什么呢?

线程池的参数:核心数量5个,最大数量10个,任务队列5个。

答:执行前5个任务执行时,正好还不到核心数量,所以新建核心线程并执行了他们;
执行中间的5个任务时,已达到核心数量,所以他们先入队列;
执行后面5个任务时,已达核心数量且队列已满,所以新建非核心线程并执行了他们;
再执行最后5个任务时,线程池已达到满负荷状态,所以执行了拒绝策略。
最后再来一个ThreadPoolExecutor运行机制图示:

总结

  1. 线程池的本质就是一种池化的思想,复用线程,减少线程的创建和销毁的开销。常量池和连接池也都是这样的思想。
  2. 线程池的设计思路是生产者和消费者模型,通过队列进行解耦,而使用阻塞队列则是为了并发安全和不必要的自旋。其实消息队列也是生产者和消费者模型,因为消息队列和阻塞队列都是队列。
  3. 线程池如果用不好可能会死锁,即尽量不要在线程池内部使用 execute 方法提交任务,因为可能会出现循环等待的情况。而且线程池使用 submit() 方法要注意选择合适的拒绝策略,因为 submit() 会将任务包装成 FutureTask 对象,如果任务被拒绝,即没有调用 run() 方法,那么调用 get() 方法的线程会被阻塞住。

面试问题

核心线程和非核心线程有什么区别?

答:实际上并没有什么区别,主要是根据corePoolSize来判断任务该去哪里,两者在执行任务的过程中并没有任何区别。有可能新建的时候是核心线程,而keepAliveTime时间到了结束了的也可能是刚开始创建的核心线程。

Worker继承自AQS有何意义?

前面我们看了Worker内部类的定义,它继承自AQS,天生自带锁的特性,那么,它的锁是用来干什么的呢?跟任务的执行有关系吗?

答:既然是跟锁(同步)有关,说明Worker类跨线程使用了,此时我们查看它的lock()方法发现只在runWorker()方法中使用了,但是其tryLock()却是在interruptIdleWorkers()方法中使用的。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

interruptIdleWorkers()方法的意思是中断空闲线程的意思,它只会中断BlockingQueue的poll()或take()方法,而不会中断正在执行的任务。

一般来说,interruptIdleWorkers()方法的调用不是在本工作线程,而是在主线程中调用的,还记得shutdown()和shutdownNow()方法吗?

观察两个方法中中断线程的方法,shutdown()中就是调用了interruptIdleWorkers()方法来中断线程地,而且interruptIdleWorkers()方法中就用到了tryLock(),只有获取到锁了才能中断线程,如果没有获取到锁则不中断。而调用tryLock()后没获取到锁只有一种原因,那就是lock()所在的地方runWorker()方法中,有任务正在执行。这样shutdown()方法就实现了只中断空闲线程,不会中断正在执行任务的线程。

而shutdownNow()中中断线程则很暴力,并没有tryLock(),而是直接中断了线程,所以调用shutdownNow()可能会中断正在执行的任务。

所以,Worker继承自AQS实际是要使用其锁的能力,这个锁主要是用来控制调用shutdown()时不要中断正在执行任务的线程

那么为什么Worker使用AQS实现锁,而不直接用ReentrantLock呢?我们可以看到Worker的tryAcquire 方法,它是不允许重入的,而 ReentrantLock是允许重入的。所以这是为了实现不可重入的特性去反应线程现在的执行状态。

execute() 和 submit() 的区别

很多同学应该关注到了。线程池的执行任务有两种方法,一种是 submit、一种是 execute; 这两个方法是有区别的。

execute:

  1. execute 只可以接收一个 Runnable 的参数
  2. execute 如果出现异常会抛出
  3. execute 没有返回值

submit:

  1. submit 可以接收 Runable 和 Callable 这两种类型的参数,
  2. 对于 submit 方法,如果传入一个 Callable,可以得到一个 Future 的返回值
  3. submit 方法调用不会抛异常,除非调用 Future.get()才会在调用get()方法时抛出异常。

RPC框架中异步调用是怎么实现的?

答:RPC框架常用的调用方式有同步调用、异步调用,其实它们本质上都是异步调用,它们就是用FutureTask的方式来实现的。

一般地,通过一个线程(我们叫作远程线程)去调用远程接口,如果是同步调用,则直接让调用者线程阻塞着等待远程线程调用的结果,待结果返回了再返回;如果是异步调用,则先返回一个未来可以获取到远程结果的东西FutureXxx,当然,如果这个FutureXxx在远程结果返回之前调用了get()方法一样会阻塞着调用者线程。

有兴趣的同学可以先去预习一下dubbo的异步调用(它是把Future扔到RpcContext中的)。