掘金 后端 ( ) • 2024-06-30 17:58

highlight: androidstudio theme: github

1 前言

我有个朋友(懂的都懂),八股文很溜,往往能跟面试官打的有来有回,尤其是应对高频面试题时,那熟练的操作,像极了一个大师分段的玩家在打黄金分段局时的那副嘴脸,总以为能够在确保击杀对方的同时,还要秀一波操作,在心态上彻底击垮对手。

可常在河边站哪有不湿鞋,有次面试某一大厂,就遇到了同样是炸鱼的硬茬。

当对方面试官抛出线程池高频面试题时,我的朋友熟练地输出了线程池的工作流程,包括七大核心参数在内,上至阻塞队列,下至拒绝策略,回答堪称完美,这套小连招这足以应对黄金局玩家。可不料对方也是个炸鱼的高端局玩家,对方仅深入问了一句:你刚才说的先判断工作线程数量与所设定核心线程数进行比对,那你能讲讲线程池中是如何进行状态维护的吗?是每个状态都设置一个变量吗?

我的朋友顿时一脸懵逼,八股文只顾着背流程了,于是支支吾吾回答:难道... 不是每个状态都维护一个变量吗?我们平时设计状态值都是这样玩儿的,常量、枚举...

结果可想而知,面试官摇摇头,朋友也回家等通知到现在...

2 概述

作为JDK底层,同时也是Java.util.concurrent包下的工具,线程池里的设计是很精妙的,Doug Lea 老爷子亲手指导的实现自然考虑的是面面俱到,不仅要支持高并发,同时对内存、cpu计算性能也同样考虑的很到位。

前面朋友说的没错,线程池一上来,开局除了判断提交的Runnable任务是否为空以外,就是将工作线程数量与所设定核心线程数进行比对,如果小于定义的核心线程数,则添加任务并使用核心线程进行执行

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
     // 获取ctl
    int c = ctl.get();
    // 根据ctl计算判断当前正在执行的任务数量,也就是线程池中正在运行的线程,与核心线程数corePoolSize比对
    if (workerCountOf(c) < corePoolSize) {
        // 如果没超过核心线程数,则添加任务,并使用核心线程执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

参考上面线程池execute方法节选中,中文注释的部分

至于面试官提到的状态维护,其核心就在于这个ctl。

3 线程池的ctl

首先,要弄明白源码中一个类、函数,甚至是变量的含义,一定要从官方解释入手,最好的输入就是原注释。在JDK8中,关于ctl的源码注释如下:

/**
 * The main pool control state, ctl, is an atomic integer packing
 * two conceptual fields
 *   workerCount, indicating the effective number of threads
 *   runState,    indicating whether running, shutting down etc
 *
 * In order to pack them into one int, we limit workerCount to
 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
 * billion) otherwise representable. If this is ever an issue in
 * the future, the variable can be changed to be an AtomicLong,
 * and the shift/mask constants below adjusted. But until the need
 * arises, this code is a bit faster and simpler using an int.
 *
 * The workerCount is the number of workers that have been
 * permitted to start and not permitted to stop.  The value may be
 * transiently different from the actual number of live threads,
 * for example when a ThreadFactory fails to create a thread when
 * asked, and when exiting threads are still performing
 * bookkeeping before terminating. The user-visible pool size is
 * reported as the current size of the workers set.
 *
 * The runState provides the main lifecycle control, taking on values:
 *
 *   RUNNING:  Accept new tasks and process queued tasks
 *   SHUTDOWN: Don't accept new tasks, but process queued tasks
 *   STOP:     Don't accept new tasks, don't process queued tasks,
 *             and interrupt in-progress tasks
 *   TIDYING:  All tasks have terminated, workerCount is zero,
 *             the thread transitioning to state TIDYING
 *             will run the terminated() hook method
 *   TERMINATED: terminated() has completed
 *
 * The numerical order among these values matters, to allow
 * ordered comparisons. The runState monotonically increases over
 * time, but need not hit each state. The transitions are:
 *
 * RUNNING -> SHUTDOWN
 *    On invocation of shutdown(), perhaps implicitly in finalize()
 * (RUNNING or SHUTDOWN) -> STOP
 *    On invocation of shutdownNow()
 * SHUTDOWN -> TIDYING
 *    When both queue and pool are empty
 * STOP -> TIDYING
 *    When pool is empty
 * TIDYING -> TERMINATED
 *    When the terminated() hook method has completed
 *
 * Threads waiting in awaitTermination() will return when the
 * state reaches TERMINATED.
 *
 * Detecting the transition from SHUTDOWN to TIDYING is less
 * straightforward than you'd like because the queue may become
 * empty after non-empty and vice versa during SHUTDOWN state, but
 * we can only terminate if, after seeing that it is empty, we see
 * that workerCount is 0 (which sometimes entails a recheck -- see
 * below).
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

注释中写的很明确,开篇就说了,这是一个控制状态变量,本质是用AtomicInteger进行包装的,并且这一个Integer变量涵盖了两个概念的变量(two conceptual fields)。

有的同学就有疑问了,我平时业务开发时,定义状态变量,不管是使用常量还是枚举亦或是直接业务代码里写死,都是一个整形或者String来定义一个状态值,它为啥能用一个整形变量来定义“两个概念的变量”,并且同时还能支持多种状态值呢?我的朋友同样发出了惊人的疑惑,仅仅一个Integer,定义了2个变量,并且针对其中的runState变量,就多达以下几种:

* The runState provides the main lifecycle control, taking on values:
*
*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed

3.1 内存的极致利用:一个整形变量按bit位拆分

其实越是底层的源码,对计算机资源的利用的考虑就越多越深。经常阅读源码的同学,尤其是阅读底层实现源码的同学,一定不会觉得奇怪,尤其是JDK本身自带的这些实现,都是经过Java大佬们精打细算、仔细琢磨出来的成果,在当初发布之际,一定是将性能、内存等权衡到极致的。

其中最典型的,就是根据bit位对数据进行存储,要知道,计算机最原始的存储结构就是bit位,即0和1的高低电平。

image.png

而之所以计算机语言要对变量划分long、int、short、byte,无非就是限制了bit位长度而已,就像线程池中的ctl变量,其本质就是一个int的包装类Integer,也就是一个32个bit位大小的数字而已。

因此,我们只需要合理地分配这32个bit位,并且评估线程池中预先定义好的哪些概念变量,每个变量有哪些枚举值即可。

先不卖关子,在线程池中,ctl这一个32个bit位大小的int值,正如该变量注释所说,按照下图进行了针对runState和workerCount的划分

image.png

其中:

  • 高3位:也就是从左往右数第0到第2个bit位,这三个bit用来表示runState
  • 低29位,也就是从左往右数第3到第31个bit位,这二十九个bit用来表示workerCount

3.1.1 ctl低29位工作线程数-workerCount

低29位的workerCount,全部被用来表示当前线程池中正在运行的线程数,也就是运行中的任务数。在线程池中该静态常量的长度定义如下:

private static final int COUNT_BITS = Integer.SIZE - 3;

可以看到,其大小就是用一个int值的size(32)减去3,得到的一个大小为29的int变量。这只是一个长度变量,而对于workerCount本身来说,感兴趣的小伙伴可以拿计算器算一下,29个bit位也就是2的29次幂,足足有536870911这么多

image.png

3.1.2 ctl高3位状态值-runState

至于高3位的runState,其枚举值就会复杂一些,正如ctl变量注释所说,有5种枚举值,但3个bit位也就是2的3次幂,足足有8个独立的值可以取,因此完全够用。

在线程池源码中,定义枚举值的静态常量,是这样表示的:

// 
private static final int COUNT_BITS = Integer.SIZE - 3;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

通过源码可以看到,其就是利用COUNT_BITS(大小为29),通过bit位左移的操作,在类初始化的时候就计算好了。源码中的5种状态值,通过计算得到如下

  • RUNNING(运行中):-1 << 29,将32个全是1的bit位左移29位,得到一个高3位全是1,低位全是0的int值 image.png
  • SHUTDOWN(关闭中,此时不会接收新任务,但会处理剩余任务):0 << 29,将32个全是0的bit位左移29位,得到的自然还是一个全是0的int值 image.png
  • STOP(停止中,此时既不会接收新任务,也不会处理剩余任务):1 << 29,将从左往右第31个bit位是1,其余全是0的左移29位,得到如下的int值 image.png
  • TIDYING(整理中,所有的任务均被关闭,并且池中任务数为0):2 << 29,将从左往右第30个bit位是1,其余全是0的左移29位,得到如下的int值 image.png
  • TERMINATED(已结束,线程池被彻底停止):3 << 29,将从左往右第30和第31个bit位是1,其余全是0的左移29位,得到如下的int值 image.png

这些常量值被static修饰,是在线程池类初始化时放到云空间中独一份儿,不会随着线程池对象的增多而增多。

3.2 线程池运行时计算状态的方式

有的同学可能会问,我平时业务开发过程中,涉及到枚举值或常量值亦或是局部变量的比对,要么是通过 == ,要么是通过equals进行比较,你前面说一个int值内部进行了这么多的拆分,我要怎么拿到这个ctl去进行比较呢?

其实,线程池运行过程中经常会涉及到状态的比对,比如,在addWorker方法(实际往线程池添加任务的方法)中,就会判断线程池的状态:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        // 获取ctl的int值
        int c = ctl.get();
        // 通过ctl的int值提取高3bit位,获取runState
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 判断线程池runState状态
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 判断线程池runState状态
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

关注上述源码中,中文注释的部分,那时我自己加的,帮助大家追踪定位

可以看到,源码中会将ctl通过runStateOf函数进行提取后在通过比较运算符与这些常量值进行比对,runStateOf函数定义如下:

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
private static int runStateOf(int c)     { return c & ~CAPACITY; }

可以看到,入参就是ctl的int值,而算法是通过将CAPACITY通过取反运算符~进行取反后,再与c进行按位与操作,得到的值即是高三位有值,而低29位全是0的runState的int值。

这样说有点拗口,可以支部进行拆解:

  1. 先看CAPACITY的定义,就是将1左移COUNT_BITS(29)位得到一个0010000...的数值,然后-1,那自然就得到了一个高3位是0,低29位全是1的int值,像这样:00011111...
  2. 然后通过取反运算符~,将CAPACITY取反,即:bit位中所有0变1,1变0,因此得到了一个高3位全是1,低29位全是0的int值,像这样:1110000000...
  3. 再与c(ctl的int值)进行按位与运算,尽管ctl高3位与低29位代表的是两个不同概念的变量,但由于此时~CAPACITY是一个低29位全是0的值,因此可以完全忽略低29位,因为&运算得到的始终都是0,而~CAPACITY高3位又都是1,因此&运算得到的值就完全取决于c的值。

    所以这种算法利用了最本质最原始也是最能被计算机快速执行的算法,得到了只有高3位有值的int变量

是不是很精妙,仅仅是一个int值,通过最原始的运算符,就能玩出这么多花样,并且用最小的计算成本,最小的存储成本,就满足了我们需求,再次给Doug Lea老爷子点个赞,学到了。

4 疑问:(高)并发添加任务时,如何精准维护workerCount并与核心线程数进行精确比对呢

其实这也是一个JUC并发工具包中的一个常规问题,用到的解决方案也是常规方案,各位同学不妨先开动你们的小脑袋瓜想想,如果让你通过常规的业务手段,在程序运行时对某一个变量进行累加,你会怎么做呢?

想象一个简单的场景吧,如果你已经是一个资深玩家,请想象一下你刚开始进行CRUD编程时的心态,哈哈。比如,在一个单机或者说单节点单pod应用中,让你基于内存变量统计一下某http接口的访问次数(只是举个例子,实际上肯定不会这么去做),你是不是会不假思索的写出下面的代码:

private static int count = 0;

public void count() {
    // do your business
    this.count ++;
}

然后你会发现,程序里统计出来的count值,总是比实际要少。慢慢地,随着你的成长,你懂得了一个叫做“原子性”的东西,所谓原子性,就是一个操作就像原子一样不可再被拆分,上面这样的写法,针对count++的操作,其实是不满足原子性的,因为它会先将count加载到虚拟机栈中的某个栈帧中的操作数栈(有点拗口,还这么多“栈”,关于jvm相关的知识点不在此赘述,可搜索其它文章进行脑补),然后进行+1操作,再对count进行修改,单线程执行还好,多线程并发场景下,多个线程可能加载到本地栈帧中的count都是同一个数值,本来3个线程应该都+1,最终变成count+3,但实际上是3个线程都自己+1,然后都将count+1的数值修改回了count,最终少了2个数值。

有点啰嗦了,不过看上去总算马马虎虎完成了一波铺垫。

那要怎样保证并发场景下的原子性呢?

4.1 cas操作

比较常用的是cas操作,即:compare and swap(比较后交换),在内存层面,将该对象某一变量在内存中的偏移量offset作为基础,输入原始值和期望值,比较该偏移量中的数据是否与原始值一致,如果一致才与期望值交换,否则返回false

上述的例子可以通过这种方式进行修正:

private static AtomicInteger count = new AtomicInteger(0);

public void count() {
    // do your business
    this.count.incrementAndGet();
}

直接使用JDK自带的原子包装类AtomicInteger即可,其内部就是通过cas操作保证并发场景下针对数值累加的原子性。

/**
 * Atomically increments by one the current value.
 *
 * @return the updated value
 */
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// 追踪unsafe.getAndAddInt函数实现,底层调用的this.compareAndSwapInt
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

于是你可以惊奇地回顾本文最开始提到的ctl变量,这就解释了它为什么也是一个AtomicInteger包装类,就是为了解决在并发场景下的累加问题。

在addWorker函数源码中:

for (;;) {
    int wc = workerCountOf(c);
    if (wc >= CAPACITY ||
        wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
    // 累加workerCount
    if (compareAndIncrementWorkerCount(c))
        break retry;
    c = ctl.get();  // Re-read ctl
    if (runStateOf(c) != rs)
        continue retry;
    // else CAS failed due to workerCount change; retry inner loop
}

通过compareAndIncrementWorkerCount(c)函数进行累加workerCount。其内部实现如下:

/**
 * Attempts to CAS-increment the workerCount field of ctl.
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

同样用到了cas操作,完成原子累加操作。

5 写在最后

经过这一顿分析,我的朋友总算真正的“初窥门径”,从此虚心修炼内功,八股文背的滚瓜烂熟,终归只是招式,内功还得一步一步慢慢练。

本文只讲述了线程池设计及原理的一丢丢部分,后续会持续更新其它原理及精妙的设计。