掘金 后端 ( ) • 2024-06-17 17:51

通常开发者都是利用 Executors 提供的通用线程池创建方法,去创建不同配置的线程池,主要区别在于不同的 ExecutorService 类型或者不同的初始参数。

Executor的基本组成

image.png

Executor

它是一个基础的接口 设计初衷是将任务提交和任务执行解耦

void execute(Runnable command);
ExecutorService

它是Executor的完善版本 提供service的管理功能(shutdown等方法)、更加全面的任务提交机制(如返回Future而不是void的submit方法)

<T> Future<T> submit(Callable<T> task);

这里输入是Callable,它解决了Runnable无法返回结果的困扰

ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool

这些线程池的设计特点在于其高度的可调节性和灵活性,以尽量满足复杂多变的实际应用场景

Executors

它从简化的使用角度,提供了各种方便的静态工厂方法

Executors目前提供了五种不同的线程池创建配置

newCachedThreadPool()

用于处理大量短时间工作任务的线程池,它会是缓存线程并重用,当无缓存线程可用时,会创建新的工作线程;如果线程闲置时间超过60秒,就会被终止并移出缓存;长时间闲置时,这种线程不会消耗什么资源;其内部是使用SynchronousQueue作为工作队列

适用场景:

任务数量动态变化:当任务数量随着时间变化较大,且无法预测时,newCachedThreadPool 可以自动调整线程数量来处理这些任务。

短生命周期的任务:如果任务执行时间短,那么创建和销毁线程的开销相对较小,此时使用 newCachedThreadPool 可以减少线程的创建和销毁次数,提高系统性能。

资源利用率:由于线程池中的线程会在60秒(默认值)内回收,所以它适合于需要高资源利用率的场景

image.png

newFixedThreadPool(int nThreads)

指定数目的线程,使用的是无界队列,任何时候最多有nThreads个线程是活动的;任务多余线程数,则会等待空闲线程,如果有工作线程退出或终止了,将会创建新的线程,补足nThreads数量;

适用场景:

固定数量的任务:当你预先知道需要处理固定数量的任务时,使用newFixedThreadPool可以创建一个固定大小的线程池,从而避免资源的浪费。

资源限制:由于newFixedThreadPool使用的是固定大小的线程数组,它可以避免因为创建过多线程而导致的资源耗尽问题。

顺序执行:如果任务需要顺序执行或者需要一定的执行顺序,固定大小的线程池可以保证这一点。

简化同步:使用固定大小的线程池可以简化同步逻辑,因为你可以确信在任何给定时间只有固定数量的线程在执行。

长时间任务:对于一些可能需要较长时间才能完成的任务,使用固定线程池可以在任务执行期间保持线程数量稳定。

image.png

newSingleThreadExecutor()

它的特点是工作线程的数目限制为1,操作一个无界工作队列;所有任务都是顺序执行,最多只会有一个任务处于活动状态,并且不允许使用者修改线程实例,因此可以避免改变其线程数目

这里创建线程出来 corePoolSize和maximumPoolSize都是1,通过FinalizableDelegatedExecutorService包装保证线程池无法被修改。这个工作模式下ThreadFactory会被使用一次 用于创建线程池中的唯一线程,如果线程因为异常终止,ThreadFactory可能会被再次调用创建一个新的线程

newSingleThreadExecutor的应用场景主要适合于以下情况:

顺序执行任务:由于newSingleThreadExecutor限制了工作线程数为1,因此它能保证所有提交的任务都按照提交顺序串行执行,这对于需要顺序处理任务的场景非常有用。

简化同步:在单线程的上下文中,不需要考虑线程同步的问题,因为不会有多个线程同时访问共享资源。

资源限制:当你需要限制某个操作或任务的最大并发数量时,使用newSingleThreadExecutor可以确保同一时刻只有一个任务在执行。

定时任务:尽管newSingleThreadExecutor本身不直接提供定时任务的功能,但它可以结合其他工具来实现简单的定时任务处理。

package com.chinairi.redisson.redisson.util.lock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {

    public static void main(String[] args) {
        // 创建一个单线程线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        // 提交多个任务
        for(int index = 0; index < 5; index++) {
            final int taskId = index;
            executorService.submit(() -> {
                System.out.println("任务ID:" + taskId + ";线程名称:" + Thread.currentThread().getName());
            });
        }
        // 关闭线程池
        executorService.shutdown();
    }

}        

image.png

newSingleThreadScheduledExecutor()和newScheduledThreadPool(int corePoolSize)

创建的是个ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程

适用场景:

需要定时执行的任务:当你有一些需要按计划在将来的某个时间点执行的任务时,例如定时发送消息、定时更新状态等。

周期性执行的任务:当你需要周期性地执行某个任务,例如每隔一段时间进行数据统计、检查服务等。

需要顺序执行的任务:由于它只使用一个工作线程,因此它可以保证所有提交的任务都会按照提交顺序串行执行。

package com.chinairi.redisson.redisson.util.threads;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class SingleThreadScheduledExample {
    public static void main(String[] args) {

        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        // 延迟2秒后执行一次任务
        executor.schedule(() -> System.out.println("执行任务了!"), 2, TimeUnit.SECONDS);

        // 延迟2秒后,每隔3秒执行一次任务
        ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
            // 任务内容
            System.out.println("这里是延迟两秒,每隔三秒执行一次!");
        }, 2, 3, TimeUnit.SECONDS);
    }
}

image.png

newWorkStealingPool(int parallelism)

java8新建的方法 内部会构建ForkJoinPool 利用Work-Stealing算法 并行处理任务 不保证处理顺序

适用的场景:

并行计算:当你有一组任务可以并行处理,并且任务之间没有依赖关系时,可以使用 newWorkStealingPool。这种线程池能够动态地分配工作线程,提高 CPU 利用率。

大量短生命周期的任务:对于那些生成很多短生命周期的子任务的工作负载,工作窃取算法可以帮助重用线程,减少线程创建和销毁的开销。

减少线程上下文切换:由于工作窃取算法倾向于在有空闲线程的处理器上执行任务,这样可以减少跨多个处理器的线程上下文切换。

无界队列任务处理:由于 ForkJoinPool 默认使用无界队列,适合于处理大量任务而不会因为队列满而阻塞。

package com.chinairi.redisson.redisson.util.threads;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class WorkStealingPoolDemo {

    public static void main(String[] args) {
        // 创建具有特定并行度的线程池
        int parallelism = Runtime.getRuntime().availableProcessors();
        ExecutorService executor = Executors.newWorkStealingPool(parallelism);
        // 提交任务
        executor.submit(() -> {
            // 这里执行任务代码
            System.out.println("任务执行了!");
        });
        executor.shutdown();
    }
}

分析下线程池的设计与实现

image.png

工作队列

工作队列的任务是存储用户提交的任务,这个队列可以是容量为0的SyschronousQueue(使用newCachedThreadPool), 也可以是固定大小的LinkedBlockingQueue(使用newFixedThreadPool)。

private final BlockingQueue<Runnable> wordQueue;
内部线程池-工作线程管理

线程池中管理线程的创建、销毁。例如对于带缓存的线程池,当任务压力大的时候会创建新的线程,当任务压力退去的时候又会闲置一些线程(闲置线程60秒后会结束)

private final HashSet<Work> workers = new HashSet<>();

线程池的工作线程被抽象为静态内部类Worker 基于AQS算法实现

ThreadFactory

提供工作线程管理中的所需要的创建线程的逻辑 如果从应用逻辑提交任务被拒绝 比如线程池已经处于shutdown状态,需要为其提供处理逻辑,java标准库提供了类似ThreadPoolExecutor.AbortPolicy(终止策略)等默认实现

从上面可以看到线程池的几个基本组成部分,都能在线程池的构造函数中提现,具体的构造函数参数有:
corePoolSize

核心线程数,也就是长期驻留的线程数目 不同的线程池 值的差距很大 newFixedThreadPool的核心线程数就是nThreads,newCacheThreadPool则是0

maximumPoolSize

线程不够的时候能够创建的最大线程数。newFixedThreadPool的最大线程数依然是nThreads;newCacheThreadPool这是Integer.MAX_VALUE。

keepAliveTime和TimeUnit

指定额外的线程能够闲置多久 一个是时间一个是时间单位

workQueue

指定工作队列 必须是BlockingQueue

ThreadFactory

用于创建新线程的工厂

RejectedExecutionHandler

当任务无法被线程池执行的时候的拒绝策略

例如:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit timeUnit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler
                          )

那么 线程的生命周期又是如何的呢?

image.png

execute的工作方式

public void execute(Runnable command) {
…
  int c = ctl.get();
// 检查工作线程数目,低于corePoolSize则添加Worker
  if (workerCountOf(c) < corePoolSize) {
      if (addWorker(command, true))
          return;
      c = ctl.get();
  }
// isRunning就是检查线程池是否被shutdown
// 工作队列可能是有界的,offer是比较友好的入队方式
  if (isRunning(c) && workQueue.offer(command)) {
      int recheck = ctl.get();
// 再次进行防御性检查
      if (! isRunning(recheck) && remove(command))
          reject(command);
      else if (workerCountOf(recheck) == 0)
          addWorker(null, false);
  }
// 尝试添加一个worker,如果失败意味着已经饱和或者被shutdown了
  else if (!addWorker(command, false))
      reject(command);
}

使用线程池 应该注意:

避免任务堆积

如使用newFixedThreadPool,其创建线程的数量是指定的,但是其默认工作队列是无界的,如果工作线程太少,但是又有大量的任务入队,导致处理速度跟不上入队的速度则可能会占用大量的系统内存,甚至内存溢出(当然 也可以修改线程池的工作队列为有界的ArrayBlockingQueue)。诊断时,使用jmap之类的工具查看是否有大量对象入队

避免过度扩展线程

如果可以预料到任务压力,可以适当的指定线程数量,但是在实际运用中,很难预料任务压力,通常都是用缓存线程池(newCacheThreadPool);在最新的HTTP/2 client API中,目前的默认实现就是是用的缓存线程池

线程数目不断增长问题

线程数目不断增长 可以使用jstack等工具来检查,也有可能是线程泄露,这种往往是任务逻辑问题导致工作线程不能被释放

避免死锁问题

注意线程不要相互调用 当A持有锁1 B持有锁2的时候A去获取2 B去获取1 这种场景要避免出现

线程池使用中不操作ThreadLocal 工作线程的生命周期通常会超过任务的生命周期

线程池大小的选择策略

任务主要是计算

当任务主要是用于计算逻辑 那么会比较占用CPU的资源 不能使用太多的线程 线程太多会增加上下文切换的开销 这种通常建议按照cpu的核数N或者N+1

较多等待的任务

I/O操作较多 可以参考Brain Goetz推荐的计算方法

线程数 = CPU核数 * 目标CPU利用率 * (1 + 平均等待时间/平均工作时间)