掘金 后端 ( ) • 2024-05-15 14:41

一:线程池是什么

本质上是一种池化技术,将创建好的线程放入线程池中,有任务时,直接提交给已经存在的线程,避免频繁创建线程和消亡线程对系统性能的损耗。从某种意义上来说,是空间换时间来提升性能的一种方式。

二:线程池的工作原理

核心参数

private volatile ThreadFactory threadFactory;

/**
 * Handler called when saturated or shutdown in execute.
 */
private volatile RejectedExecutionHandler handler;

/**
 * Timeout in nanoseconds for idle threads waiting for work.
 * Threads use this timeout when there are more than corePoolSize
 * present or if allowCoreThreadTimeOut. Otherwise they wait
 * forever for new work.
 */
private volatile long keepAliveTime;

/**
 * If false (default), core threads stay alive even when idle.
 * If true, core threads use keepAliveTime to time out waiting
 * for work.
 */
private volatile boolean allowCoreThreadTimeOut;

/**
 * Core pool size is the minimum number of workers to keep alive
 * (and not allow to time out etc) unless allowCoreThreadTimeOut
 * is set, in which case the minimum is zero.
 *
 * Since the worker count is actually stored in COUNT_BITS bits,
 * the effective limit is {@code corePoolSize & COUNT_MASK}.
 */
private volatile int corePoolSize;

/**
 * Maximum pool size.
 *
 * Since the worker count is actually stored in COUNT_BITS bits,
 * the effective limit is {@code maximumPoolSize & COUNT_MASK}.
 */
private volatile int maximumPoolSize;

/**
 * The default rejected execution handler.
 */
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
参数 描述 corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 空闲线程存活时间 unit 时间单位 workQueue 工作队列 threadFactory 线程工厂 handler 拒绝策略

工作原理

一图胜千言,通过下面的图就能清晰的理解其工作原理。

image.png

有几点需要注意:

1、并不是线程池创建后,立马就创建核心线程,而是要等任务到来后,以懒汉式的方式进行创建

2、任务拒绝对应具体的拒绝策略,如果线程池不是运行状态,那么如果想让任务还是要被执行,那么可以使用当前线程去执行

3、当队列中的任务都执行完毕之后,非核心线程消亡,仅存活核心线程,优先消亡等待时间更长的线程

三:如何创建线程池

1、使用默认提供的线程池

FixThreadPool

public class Executors {

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue, using the provided
     * ThreadFactory to create new threads when needed.  At any point,
     * at most {@code nThreads} threads will be active processing
     * tasks.  If additional tasks are submitted when all threads are
     * active, they will wait in the queue until a thread is
     * available.  If any thread terminates due to a failure during
     * execution prior to shutdown, a new one will take its place if
     * needed to execute subsequent tasks.  The threads in the pool will
     * exist until it is explicitly {@link ExecutorService#shutdown
     * shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created thread pool
     * @throws NullPointerException if threadFactory is null
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory{
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
}

通过上面的代码,可以发现:

1、核心线程数corePoolSize和最大线程数maximumPoolSize是相等的

2、工作队列workQueue采用的LinkedBlockingQueue这种无界队列,链表的长度设置为Integer.MAX_VALUE=2147483647

3、线程空闲存活时间keepAliveTime为0,因为没有空闲时间,全是核心线程,也就不存在线程消亡问题

SingleThreadPool

public class Executors {

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue, and uses the provided ThreadFactory to
     * create a new thread when needed. Unlike the otherwise
     * equivalent {@code newFixedThreadPool(1, threadFactory)} the
     * returned executor is guaranteed not to be reconfigurable to use
     * additional threads.
     *
     * @param threadFactory the factory to use when creating new threads
     * @return the newly created single-threaded Executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
}

通过上面的代码,可以发现:

1、是一种特殊的FixThreadPool

2、核心线程数corePoolSize和最大线程数maximumPoolSize是相等的,都为1

3、工作队列workQueue也采用的LinkedBlockingQueue这种无界队列

CachedThreadPool

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available, and uses the provided
 * ThreadFactory to create new threads when needed.
 *
 * @param threadFactory the factory to use when creating new threads
 * @return the newly created thread pool
 * @throws NullPointerException if threadFactory is null
 */
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}

通过上面的代码,可以发现:

1、核心线程数corePoolSize为0,最大线程数maximumPoolSize为Integer.MAX_VALUE

2、工作队列workQueue采用的SynchronousQueue这种同步队列,支持公平或者非公平的从队列中取任务

ScedureThreadPool

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @return the newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

/**
 * Creates a thread pool that can schedule commands to run after a
 * given delay, or to execute periodically.
 * @param corePoolSize the number of threads to keep in the pool,
 * even if they are idle
 * @param threadFactory the factory to use when the executor
 * creates a new thread
 * @return the newly created scheduled thread pool
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 * @throws NullPointerException if threadFactory is null
 */
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

/**
 * Creates a new {@code ScheduledThreadPoolExecutor} with the
 * given core pool size.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @throws IllegalArgumentException if {@code corePoolSize < 0}
 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

通过上面的代码,可以发现:

1、核心线程数corePoolSize可以自定义,最大线程数maximumPoolSize为Integer.MAX_VALUE

2、工作队列workQueue采用的DelayedWorkQueue这种延时队列

弊端

不知道通过上面几种系统自带的线程池,有没有发现一些共同的弊端。

FixThreadPool 和 SingleThreadPool的workQueue使用的是无界的LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,有导致系统OOM的风险

CachedThreadPool:使用的是同步队列SynchronousQueue,不持有任务,收到任务后就立马交给线程去执行。但是允许创建的非核心线程数量为Integer.MAX_VALUE,那么极端情况下就有两个弊端

1、线程频繁的创建和消亡,系统开销大

2、大量的线程被创建,有导致系统OOM的风险

ScedureThreadPool使用的无界的延迟阻塞队列DelayedWorkQueue,非核心线程数量为Integer.MAX_VALUE,有导致系统OOM的风险

另外赋一张阻塞队列的图,加深下了解。 image.png

2、自定义线程

基于系统自带的线程池有导致系统OOM的弊端,程序中建议自定义线程池。

规划

那么在自定义线程池的时候,有几点建议:

1、核心原则是使用有界队列,控制线程数量

2、线程数量也需要考虑业务是属于CPU密集型还是IO密集型。如果是IO密集型的话,可以稍微加大线程数,避免CPU长时间等待;如果CPU密集型的话,就默认设置成系统的核数就好。如果想调整到最佳状态,还需压测及观测性能

3、不要所有业务都使用同一个线程池,最好根据业务隔离

4、给各自的线程池设置有意义的名字,方便区分

5、根据业务特性,选取合适的工作队列,如果业务需要快速响应,那么也可以酌情选用同步队列

6、根据业务特性,设置合适的拒绝策略(不处理、立即处理、延时处理、记日志......)

实施

使用ThreadPoolExcutor来自定义线程池。

ThreadPoolExecutor pool 
    // 核心线程数和最大线程,默认都设置成系统的核数
    = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,
       Runtime.getRuntime().availableProcessors() + 1,
       15,
       TimeUnit.SECONDS,
       // 使用了有界的ArrayBlockingQueue队列,同时设置非公平访问
       new ArrayBlockingQueue<>(512, false),
       // 线程工厂使用google guava的ThreadFactoryBuilder
       new ThreadFactoryBuilder().setNameFormat("业务1").setDaemon(true).build(),
       // 自定义拒绝策略
       new RejectedExecutionHandler() {
          @Override
          public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
             // 拒绝执行暂停30秒再尝试提交
             try {
                synchronized (lock) {
                   lock.wait(30000);
                }
             } catch (InterruptedException e) {
                e.printStackTrace();
             }
             pool.submit(r);
          }
       });

四:如何使用和监控线程池

// 将任务提交给线程池
pool.execute(new Runnable() {
    @Override
    public void run() {
       System.out.println("这是一个线程");
    }
});

使用的建议:

1、尽量避免将耗时任务提交到线程池

2、尽量使用全局的线程池,如果一定要在局部方法里面定义线程池,使用完毕后,记得要执行shutdown方法

如何去监控线程池

方法 描述 监控方面 getActiveCount() 获取正在工作的线程数 监控线程的变化 getPoolSize() 获取当前存在的线程数 监控线程的变化 getLargestPoolSize() 获取历史最大的线程数 监控线程的变化 getTaskCount() 获取计划执行的任务总数 监控任务的变化 getCompletedTaskCount() 获取已完成的任务数 监控任务的变化 getQueue() 获取任务队列 监控任务的变化

另外,也可以集成美团的Hippo4j。 https://hippo4j.cn/zh/