掘金 后端 ( ) • 2024-04-24 10:15

highlight: arduino-light theme: channing-cyan

线程池(ThreadPool): 是一种基于池化思想管理线程的工具,简单理解就是预先创建多个线程放在池子里,等到需要使用线程时就从池子里拿,用完后再放回去。在Java中,线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。

1、Executors 中常用线程池

image.png

Java中的Executors类中提供了以下几种常用的线程池:

  1. newFixedThreadPool:创建一个固定数目的,可重用的线程池。适用于为了满足资源管理的需求,需要限制当前线程数量的应用场景,它可控制线程最大并发数,超出的线程会在队列中等待。
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
    executorService.execute(new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread().getName() + " is running.");
        }
    });
}
executorService.shutdown();
  1. newCachedThreadPool:创建一个可根据需要创建新线程的线程池。适用于执行很多短期异步的小程序或者负载较轻的服务器。
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    executorService.execute(new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread().getName() + " is running.");
        }
    });
}
executorService.shutdown();
  1. newSingleThreadExecutor:创建一个只有一个线程的线程池,它可以保证先进先出的执行顺序。适用于需要保证顺序执行的场景,并且只有一个后台任务时使用。
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
    executorService.execute(new Runnable() {
        public void run() {
            System.out.println(Thread.currentThread().getName() + " is running.");
        }
    });
}
executorService.shutdown();
  1. newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。适用于需要进行定时/周期性任务的场景。
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running.");
    }
}, 1, 3, TimeUnit.SECONDS);

除了上面几种常见的线程池外,springboot框架中还有特殊的线程池

2、springboot框架中的线程池

springboot 框架中有特殊的线程池 ThreadPoolTaskScheduler 和 ThreadPoolTaskExecutor

ThreadPoolTaskScheduler 线程池

在SpringBoot项目中使用@Scheduled注解创建的定时任务默认是在单一线程中顺序执行的。这意味着即使前一个任务还没有执行完,下一个任务也不会开始。这可能会导致一些任务的执行时间比预定的时间要晚。 我们可以创建一个线程池(ThreadPoolTaskScheduler)来让多个线程执行定时任务.具体代码实现如下:

@Configuration
public class ScheduleConfig implements SchedulingConfigurer {
  @Override
  public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
      ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
      taskScheduler.setPoolSize(10); // 设置线程池大小
      taskScheduler.initialize();
      taskRegistrar.setTaskScheduler(taskScheduler);
  }
}

或者直接定义一个名称为 taskScheduler 的Bean,Spring会自动使用那个 taskScheduler 来执行所有的@Scheduled任务。 代码如下:

@Configuration
public class SchedulerConfig {
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.setThreadNamePrefix("my-scheduled-task-pool-");
        taskScheduler.initialize();
        return taskScheduler;
    }
}

ThreadPoolTaskExecutor 线程池

SpringBoot项目中的异步任务默认使用的是SimpleAsyncTaskExecutor线程池。这个线程池并不是真正意义上的线程池,因为它不会重用线程,每次调用都会创建一个新的线程。这样可能会导致当任务很多时,创建大量的线程,最后导致OOM。所以当任务很多时我们要自定义线程池。代码如下:

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

@Component
public class CustomAsyncConfigurer implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5); //设置核心线程数
        executor.setMaxPoolSize(10); //设置最大线程数
        executor.setQueueCapacity(20); //设置队列容量
        executor.setKeepAliveSeconds(60); //设置线程空闲时间
        executor.setThreadNamePrefix("MyAsync-"); //设置线程名字前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //设置拒绝策略

        //初始化线程池
        executor.initialize();
        return executor;
    }

    // 打印异步线程池中捕获的异常
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
                System.out.println("Exception message - " + throwable.getMessage());
                System.out.println("Method name - " + method.getName());
                for (Object param : obj) {
                    System.out.println("Parameter value - " + param);
                }
            }
        };
    }
}

或者直接定义 taskExecutor 的 Bean, 然后在 @Async 注解中指定使用哪一个线程池 (不指定默认使用 taskExecutor)

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class AppConfig {

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(20);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("MyAsync-");
        executor.initialize();
        return executor;
    }

}