掘金 后端 ( ) • 2024-04-26 10:35

前言

我们在项目中经常使用线程池,但是我们很少注意到线程池要如何安全关闭,比如应用停了有没有正常优雅的关闭线程池,常见的注意点:

  • 线程在执行中当前任务 任务有是否可以等任务执行完再结束
  • 队列有没有堆积任务,如果有是否可以存储起来,或者等待执行完成(不在接受新的任务)在关闭

常见的线程池

  1. ExecutorService接口:这是一个执行提交的RunnableCallable任务的对象服务。它关闭和管理线程的生命周期。
  2. Executors类:这个类为ExecutorService的创建提供了工厂方法。它可以用来创建单线程的执行器、固定线程数量的执行器、可缓存的执行器等。
  3. ThreadPoolExecutor类:这是ExecutorService的一个实现,它使用可能的几个池中线程来执行每个提交的任务,通常用于执行大量异步任务。
  4. ScheduledExecutorService接口:这是ExecutorService的子接口,能够安排命令在给定的延迟后运行,或者定期执行。
  5. ThreadPoolTaskExecutor 是对java.util.concurrent.ThreadPoolExecutor的Spring封装,提供了与Spring生命周期集成的便利。

线程池优雅关闭核心方法介绍

shutDown

  • ShutDown 会首先将线程设置成 SHUTDOWN 状态,然后中断所有没有正在运行的线程
  • 正在执行的线程和已经在队列中的线程并不会被中断,说白了就是使用shutDown 方法其实就是要等待所有任务正常全部结束以后才会关闭线程池
  • 调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。

shutDownNow

执行shutDownNow 方法后,全部正在运行的队列通知中断,正在运行的线程接收到中断信号后选择处理,而在队列中的全部取消执行转移到一个list队列中返回

awaitTermination(long timeout, TimeUnit unit) throws InterruptedException

  • 这个方法的作用是,调用后等待timeout时间后,反馈线程池的状态,
  • 等待期间(包括进入等待状态之前)线程池已关闭并且所有已提交的任务(包括正在执行的和队列中等待的)都执行完毕,相当于线程池已经“终结”了,方法便会返回 true
  • 等待超时时间到后,第一种线程池“终结”的情况始终未发生,方法返回 false
  • 等待期间线程被中断,方法会抛出 InterruptedException 异常。

isShutDown

  • isShutDown 方法正如名字,判断线程池是否停止,返回的是 Boolean 类型,如果已经开始停止线程池则返回 true 否则放回false
  • 当调用了shutDown 或shutDownNow 时之后,会返回 true 不过需要注意,这时候只是代表线程池关闭流程的开始,并不是说线程池已经停止了

isTerminated

  • 这个方法与上面的方法的区别就是这是正真检测线程池是否真的终结了
  • 这不仅代表线程池已关闭,同时代表线程池中的所有任务都已经都执行完毕了,因为在调用 shutdown 方法之后,线程池会继续执行里面未完成的任务,包括正在执行的任务和在任务队列中等待的任务。
  • 如果调用了 shutdown 方法,但是有一个线程依然在执行任务,那么此时调用 isShutdown 方法返回的是 true,而调用 isTerminated方法返回的便是 false,因为线程池中还有任务正在在被执行,线程池并没有真正“终结”。
  • 直到所有任务都执行完毕了,调用 isTerminated() 方法才会返回 true,这表示线程池已关闭并且线程池内部是空的,所有剩余的任务都执行完毕了。

案例一 应用停止后如何保证任务被执行完成后才结束

模拟定时任务通过线程池并发处理业务,为了演示效果线程池数量设置1,任务执行更符合真实场景 加了redis 和 sleep 因为有的业务场景可能时间很长

@Configuration
public class ThreadPoolConfig {
    @Bean
    public ThreadPoolTaskExecutor createThreadPoolTaskExecutor () {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setQueueCapacity(100);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("xxx");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        executor.initialize();
        return executor;
    }

}
@Service
public class OrderService {

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

   public void saveOrder() {
     System.out.println("开始保存订单信息:"+Thread.currentThread().getName());
    stringRedisTemplate.opsForValue().set("No2021081501", "test" + System.nanoTime(), 60, TimeUnit.HOURS);
    System.out.println("保存订单信息成功");
    try {
        Thread.sleep(400);
    } catch (InterruptedException e) {
    }
    String orderInfo = stringRedisTemplate.opsForValue().get("No2021081501");
    System.out.println("orderInfo:" + orderInfo);

    System.out.println("剩余任务数量" + threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size());

  }

    @SneakyThrows
    @Scheduled(fixedRate = 100)
    public void init() {
        threadPoolTaskExecutor.execute(() -> saveOrder());
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest
public class ThreadPoolTest {
    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Resource
    private OrderService orderService;
    @Test
    public void test(){

        threadPoolTaskExecutor.execute(new Runnable() {
            @Override
            public void run() {
                orderService.saveOrder();
            }
        });
        System.out.println("保存订单信息成功");
    }
}

运行结果

orderInfo:test30411734489600
剩余任务数量93
开始保存订单信息:
保存订单信息成功
orderInfo:test30412152864100
剩余任务数量96
开始保存订单信息:
保存订单信息成功
orderInfo:test30412575444700

spring boot 优雅停机

剩余任务数量92
开始保存订单信息:
保存订单信息成功
与目标 VM 断开连接, 地址为: ''127.0.0.1:62841',传输: '套接字''
2024-04-24 20:44:17.266  INFO 168232 --- [ionShutdownHook] o.s.b.w.e.tomcat.GracefulShutdown        : Commencing graceful shutdown. Waiting for active requests to complete
orderInfo:test30732458121700
剩余任务数量95
开始保存订单信息:
保存订单信息成功
2024-04-24 20:44:17.345  INFO 168232 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete
Exception in thread "xxx1" java.lang.IllegalStateException: LettuceConnectionFactory was destroyed and cannot be used anymore
	at org.springframework.util.Assert.state(Assert.java:76)
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.assertInitialized(LettuceConnectionFactory.java:1263)
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:414)
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193)
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144)
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:211)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:191)
	at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:97)
	at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:54)
	at com.tuanzhang.service.OrderService.saveOrder(OrderService.java:37)
	at com.tuanzhang.service.OrderService.lambda$init$0(OrderService.java:47)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

在系统关闭的时候 当前任务其实并没有被执行完成 redis 连接就被关闭了。所以此时是线程池是非安全关闭的即便做了spring boot优雅停机也是无效果的

解决方案一 设置线程池参数

  1. setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) :
    这个方法设置了一个布尔值,用来指定Spring容器关闭时是否等待当前正在执行的任务完成。如果设置为true,那么容器将会等待所有任务执行完毕后才会完全关闭;如果设置为false,那么容器在关闭时不会等待这些任务,会立即尝试停止所有正在执行的任务。
  2. setAwaitTerminationSeconds(int awaitTerminationSeconds) :
    这个方法设置了一个整数值,代表Spring容器在关闭时,等待所有任务完成的最大时间(以秒为单位)。如果设置了setWaitForTasksToCompleteOnShutdown(true),那么这个方法定义了一个最长等待时间。容器会等待指定的时间让所有任务完成。如果超过这个时间,那么未完成的任务可能会被强制终止。如果所有任务在这个时间内完成,容器则会继续关闭流程。
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);

spring boot 优雅停机 通过日志发现 堆积的100个任务虽然执行完了 但是redis连接被关闭其实还是无法正常的优雅停机,每次执行任务就会触发异常导致每次都会新建线程去执行,这个参数可用但是条件有限要保证执行的bean 没有被销毁

开始保存订单信息:xxx1
保存订单信息成功
orderInfo:test31228267258300
剩余任务数量90
开始保存订单信息:xxx1
保存订单信息成功
orderInfo:test31228685038500
剩余任务数量93
开始保存订单信息:xxx1
保存订单信息成功
orderInfo:test31229104728500
剩余任务数量96
开始保存订单信息:xxx1
保存订单信息成功
与目标 VM 断开连接, 地址为: ''127.0.0.1:63891',传输: '套接字''
2024-04-24 20:52:34.372  INFO 162592 --- [ionShutdownHook] o.s.b.w.e.tomcat.GracefulShutdown        : Commencing graceful shutdown. Waiting for active requests to complete
orderInfo:test31229524026900
剩余任务数量99
开始保存订单信息:xxx1
保存订单信息成功
2024-04-24 20:52:34.459  INFO 162592 --- [tomcat-shutdown] o.s.b.w.e.tomcat.GracefulShutdown        : Graceful shutdown complete
Exception in thread "xxx1" java.lang.IllegalStateException: LettuceConnectionFactory was destroyed and cannot be used anymore
	at org.springframework.util.Assert.state(Assert.java:76)
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.assertInitialized(LettuceConnectionFactory.java:1263)
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:414)
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193)
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144)
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:211)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:191)
	at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:97)
	at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:54)
	at com.tuanzhang.service.OrderService.saveOrder(OrderService.java:37)
	at com.tuanzhang.service.OrderService.lambda$init$0(OrderService.java:47)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Exception in thread "xxx2" java.lang.IllegalStateException: LettuceConnectionFactory was 
Exception in thread "xxx15" java.lang.IllegalStateException: LettuceConnectionFactory was destroyed and cannot be used anymore
	at org.springframework.util.Assert.state(Assert.java:76)
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.assertInitialized(LettuceConnectionFactory.java:1263)
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:414)
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193)
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144)
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:211)
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:191)
	at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:97)
	at org.springframework.data.redis.core.DefaultValueOperations.set(DefaultValueOperations.java:325)
	at com.tuanzhang.service.OrderService.saveOrder(OrderService.java:31)
	at com.tuanzhang.service.OrderService.lambda$init$0(OrderService.java:47)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

开始保存订单信息:xxx2
开始保存订单信息:xxx3
。。。

修改代码,这个时候把redis 移除 只有本地代码 看下线程池的效果


public void saveOrder() {
    System.out.println("开始保存订单信息:"+Thread.currentThread().getName());
    try {
        Thread.sleep(400);
    } catch (InterruptedException e) {
    }
    System.out.println("开始保存订单信息完成"+Thread.currentThread().getName());
}

优雅停机,这样系统会等待60s 在停止。如果任务堆积过多 要适当调整 等待时间 setWaitForTasksToCompleteOnShutdown

剩余任务数量212
开始保存订单信息:xxx1
开始保存订单信息完成xxx1
剩余任务数量211
开始保存订单信息:xxx1
开始保存订单信息完成xxx1
剩余任务数量210
开始保存订单信息:xxx1
2024-04-25 11:04:37.393  WARN 295832 --- [ionShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Timed out while waiting for executor 'createThreadPoolTaskExecutor' to terminate
开始保存订单信息完成xxx1
剩余任务数量209
开始保存订单信息:xxx1

解决方案二 shutdown 方法

刚才方案一 有个问题 虽然能保证线程等待 60s后在关闭,但是被spring托管的redis连接被关闭了 导致任务虽然会被执行但是无法正常执行,我们知道spring 关闭的时候 会执行 @PreDestroyor 实现DisposableBean接口 这个时候我们把 spring 关闭流程给阻塞住 那么redis链接就不会被关闭。

@Service
public class OrderService {

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public void saveOrder() {
        System.out.println("开始保存订单信息:" + Thread.currentThread().getName());
        stringRedisTemplate.opsForValue().set("No2021081501", "test" + System.nanoTime(), 60, TimeUnit.HOURS);
        System.out.println("保存订单信息成功");
        try {
            Thread.sleep(400);
        } catch (InterruptedException e) {
        }
        String orderInfo = stringRedisTemplate.opsForValue().get("No2021081501");
        System.out.println("orderInfo:" + orderInfo);

        System.out.println("剩余任务数量" + threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size());

    }

    @SneakyThrows
    @Scheduled(fixedRate = 100)
    public void init() {
        threadPoolTaskExecutor.execute(() -> saveOrder());
    }

    @PreDestroy
    public void shutdown() {
        long l = System.currentTimeMillis();
        System.out.println("线程池优雅停机");
        threadPoolTaskExecutor.shutdown();

        System.out.println("线程池优雅停机结束,耗时:" + (System.currentTimeMillis() - l));
    }
}

优雅停机 可以正常结束的

开始保存订单信息:xxx1
保存订单信息成功
orderInfo:test83341983233700
剩余任务数量1
开始保存订单信息:xxx1
保存订单信息成功
orderInfo:test83342419193800
剩余任务数量0
线程池优雅停机结束,耗时:16440

如果线程池在指定的时间 无法执行完所有任务怎么办?

@Service
public class OrderService {

    @Resource
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Resource
    private StringRedisTemplate stringRedisTemplate;



    volatile Integer x = 1;

    @SneakyThrows
    @Scheduled(fixedRate = 100)
    public void init() {
        // threadPoolTaskExecutor.execute(() -> saveOrder());
        threadPoolTaskExecutor.execute(new OrderRunnable(x.toString()));
        x += 1;
    }

    @PreDestroy
    public void shutdown() {
        long l = System.currentTimeMillis();
        out.println("线程池优雅停机");
        ThreadPoolExecutor threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor();
        threadPoolExecutor.shutdown();
        List<Runnable> runnables = threadPoolExecutor.shutdownNow();
        out.println("未执行任务size:"+runnables.size());
        runnables.stream().forEach(obj -> {
                    if (obj instanceof Future) {
                        ((Future<?>) obj).cancel(true);
                    }
                    out.println("未执行的任务" + ((OrderRunnable) obj).getOrderNo());
                }
        );

        try {
            threadPoolExecutor.awaitTermination(6000, TimeUnit.MILLISECONDS);
            out.println("线程池被优雅的关闭:"+threadPoolExecutor.isTerminated());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
        out.println("线程池优雅停机结束,耗时:" + (System.currentTimeMillis() - l));
    }


    public class OrderRunnable implements Runnable {

        @Getter
        private String orderNo;

        OrderRunnable(String orderNo) {
            this.orderNo = orderNo;
        }

        @Override
        public void run() {
            out.println("开始保存订单信息:" + Thread.currentThread().getName());
            stringRedisTemplate.opsForValue().set("No2021081501", "test" + System.nanoTime(), 60, TimeUnit.HOURS);
            out.println("保存订单信息成功:" + orderNo);
            try {
                Thread.sleep(400);
            } catch (InterruptedException e) {
                out.println("线程被中断"+ Thread.currentThread().getName());
            }
            String orderInfo = stringRedisTemplate.opsForValue().get("No2021081501");
            out.println("orderInfo:" + orderNo);

            out.println("剩余任务数量" + threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size());

        }
    }
}

1、通过 shutdown 把状态设置为 shutdown 不再接受新的任务 执行完当前的任务 2、执行 shutdownNow 获取队列中的任务 存储到mq 文件 或者db 等待下次执行 3、执行 awaitTermination 等待返回的任务都处理完毕 判断线程池是否真正的关闭