掘金 后端 ( ) • 2024-07-02 09:19

引言

最近在写公共线程池工具类时,发现 new ThreadPoolExecutor时候 总得给出一个线程池的拒绝策略。 故此针对任务需要:给出一下拒绝策略。 - getSingleThreadPoolgetSingleCall方法使用了DiscardOldestPolicy策略,当任务被拒绝时,它会丢弃队列中等待时间最长的任务,然后尝试提交当前任务。
getAssign方法使用了DiscardPolicy策略,当任务被拒绝时,它会默默地丢弃被拒绝的任务,不做任何处理。

    private static final ConcurrentHashMap<String, ThreadPoolExecutor> ASSIGN = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, ThreadPoolExecutor> POOL_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, ThreadPoolExecutor> CALL = new ConcurrentHashMap<>();
public static ThreadPoolExecutor getSingleThreadPool(String s) {
    synchronized (POOL_MAP) {
        ThreadPoolExecutor threadPoolExecutor = POOL_MAP.get(s);
        if (threadPoolExecutor == null) {

            threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(1),
                    new CustomizableThreadFactory("ARRIVE-" + s),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
            POOL_MAP.put(s, threadPoolExecutor);
        }
        return threadPoolExecutor;
    }
}

public static ThreadPoolExecutor getSingleCall(String s) {
    synchronized (CALL) {
        ThreadPoolExecutor threadPoolExecutor = CALL.get(s);
        if (threadPoolExecutor == null) {
            threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(1),
                    new CustomizableThreadFactory("AUTO-" + s),
                    new ThreadPoolExecutor.DiscardOldestPolicy());
            CALL.put(s, threadPoolExecutor);
        }
        return threadPoolExecutor;
    }
}

public static ThreadPoolExecutor getAssign(String s) {
    synchronized (ASSIGN) {
        ThreadPoolExecutor threadPoolExecutor = ASSIGN.get(s);
        if (threadPoolExecutor == null) {
            threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(1),
                    new CustomizableThreadFactory("ASSIGN-" + s),
                    new ThreadPoolExecutor.DiscardPolicy());
            ASSIGN.put(s, threadPoolExecutor);
        }
        return threadPoolExecutor;
    }
}

企业微信截图_17198814124576.png

正文

针对以上使用,故此想学习了解一下所有拒绝策略:

在高并发的Java应用开发中,线程池是提升性能和资源利用率的重要工具。它能够有效管理线程生命周期,控制并发数,以及减少线程创建和销毁的开销。然而,当系统负载急剧增加,超出线程池处理能力时,如何优雅地处理多余的任务就显得尤为关键。这便引出了一个高级话题——线程池的拒绝策略。本文将深入探讨Java线程池中拒绝策略的内涵、种类及其在实际开发中的应用技巧,帮助开发者在面对极端情况时,能够巧妙地设计和选择合适的策略,确保应用的鲁棒性和稳定性

Java中的线程池ThreadPoolExecutor提供了四种内置的拒绝策略,当任务被提交到线程池,但线程池无法接受任务时,这些策略会被触发。以下是这四种拒绝策略的详细介绍和分析:

1. AbortPolicy(中止策略)

  • 描述:这是默认的拒绝策略。当任务被拒绝时,AbortPolicy策略会抛出一个RejectedExecutionException异常。
  • 分析:这种策略直接给予反馈,明确告知任务无法被执行。它适用于那些希望立即知道任务未能成功提交的场景。然而,这种策略可能会导致系统不稳定,因为异常需要被上层逻辑适当处理。
  • 适用场景:当任务提交者需要立即知道任务被拒绝,并且愿意处理这种异常情况时。通常用于快速失败的场景,比如启动时的初始化任务。

企业微信截图_1719880468473.png

2. CallerRunsPolicy(调用者运行策略)

  • 描述:这种策略不会抛出异常。相反,当任务被拒绝时,任务会被提交给正在执行execute方法的线程来运行。
  • 分析:这种策略提供了一种简单的反馈机制,可以减缓新任务的提交速度。它允许调用者线程直接运行任务,从而提供了一种减压机制。但是,这可能会影响到调用者线程原本的执行流程,尤其是当调用者线程本身是UI线程或者其他关键线程时。 - 适用场景:适合于降低新任务提交速度的场合。例如,当任务是由某些限制并发的控制器提交时,可以让控制器线程自己执行任务,从而减缓任务提交速度。

3. DiscardPolicy(丢弃策略)

  • 描述:这种策略默默地丢弃无法处理的任务,不做任何处理也不抛出异常。
  • 分析:这是一种最为简单的策略,因为它不提供任何反馈。该策略可能会导致任务的丢失,因为提交的任务没有得到任何处理。在某些场景下,如果任务的丢失不会造成严重后果,可以使用此策略。 - 适用场景:当任务可以被丢弃且不影响系统整体功能时,例如在某些非关键的日志记录或统计信息更新上。 企业微信截图_17198805405917.png

4. DiscardOldestPolicy(丢弃最旧策略)

  • 描述:这种策略将丢弃最早的未处理任务(队列中等待最久的任务),然后尝试重新提交新任务。
  • 分析:这种策略试图为新提交的任务腾出空间,但在高负载的情况下可能会丢弃大量的任务。它适用于那些任务时效性较高,可以接受放弃一些旧任务的场景。 - 适用场景:适用于需要处理更多新鲜数据,而不是积压数据的场景。这种策略通常用于实时性要求较高的任务执行。

企业微信截图_17198806013207.png

在选择合适的拒绝策略时,需要根据实际应用的需求和特点来决定。例如,如果任务对时效性要求不高,可以选择CallerRunsPolicy策略;如果系统对稳定性要求极高,不希望因为任务拒绝而出现异常,可以选择DiscardOldestPolicyDiscardPolicy策略。无论选择哪种策略,都应该在系统设计阶段就考虑好任务拒绝的可能性,并确保系统能够以适当的方式处理这种情况。

以上是ThreadPoolExecutor 自带四种拒绝策略,如果觉得不合适可以自己实现一个:

5.自定义拒绝策略

需要创建一个实现了RejectedExecutionHandler接口的类。这个接口定义了一个方法rejectedExecution(Runnable r, ThreadPoolExecutor executor), 需要在这个方法中定义当任务被拒绝时的行为。 下面是一个自定义拒绝策略的示例,该策略会记录被拒绝任务的信息,并尝试在一定时间后重新提交任务到线程池:

import java.util.concurrent.RejectedExecutionHandler;  
import java.util.concurrent.ThreadPoolExecutor;  
import java.util.concurrent.TimeUnit;  
  
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {  
    @Override  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
        // 记录被拒绝的任务信息  
        System.err.println("任务被拒绝: " + r.toString());  
  
        // 可以在这里实现更复杂的逻辑,例如记录日志、发送通知等  
  
        // 如果线程池未关闭,尝试在一定时间后重新提交任务  
        if (!executor.isShutdown()) {  
            try {  
                // 等待一段时间  
                TimeUnit.SECONDS.sleep(1);  
                // 重新提交任务  
                executor.execute(r);  
            } catch (InterruptedException e) {  
                // 线程被中断的处理  
                Thread.currentThread().interrupt();  
            }  
        }  
    }  
}  

要使用这个自定义的拒绝策略,需要在创建ThreadPoolExecutor时将其作为参数传入:

import java.util.concurrent.ArrayBlockingQueue;  
import java.util.concurrent.ThreadPoolExecutor;  
import java.util.concurrent.TimeUnit;  
  
public class ThreadPoolDemo {  
    public static void main(String[] args) {  
        // 创建线程池  
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(  
                1, // 核心线程数  
                1, // 最大线程数  
                0L, // 保持活动时间  
                TimeUnit.MILLISECONDS, // 时间单位  
                new ArrayBlockingQueue<>(1), // 工作队列  
                new MyRejectedExecutionHandler() // 自定义拒绝策略  
        );  
  
        // 提交任务  
        for (int i = 0; i < 10; i++) {  
            final int index = i;  
            threadPoolExecutor.execute(() -> {  
                try {  
                    // 假设每个任务执行需要一段时间  
                    TimeUnit.SECONDS.sleep(2);  
                    System.out.println("任务执行: " + index);  
                } catch (InterruptedException e) {  
                    Thread.currentThread().interrupt();  
                }  
            });  
        }  
  
        // 关闭线程池  
        threadPoolExecutor.shutdown();  
    }  
}  

在这个例子中,当任务被拒绝时,自定义的拒绝策略会首先打印出被拒绝任务的信息,然后等待1秒钟,尝试重新将任务提交到线程池中。当然,可以根据实际需求定制更加复杂的拒绝策略,例如将任务存入持久化队列、限流、降级处理等。

总结:

通过本文的深入分析,了解了线程池拒绝策略的重要性和实践意义。从AbortPolicy的快速失败,到CallerRunsPolicy的调用者运行,再到DiscardPolicy与DiscardOldestPolicy的不同丢弃方式,每种内置策略都有其适用场景和潜在影响。此外,还探讨了自定义拒绝策略的设计思路,包括日志记录、任务持久化、等待队列、优先级调整和流量控制等方案。这些自定义策略为处理任务溢出提供了更多的灵活性和可控性。线程池和拒绝策略的合理应用,不仅能够提升系统的并发处理能力,还能在高负载情况下保障系统的稳定运行,是高质量Java应用不可或缺的一部分。希望读者能够根据本文的内容,结合自己的业务需求,设计出既合理又高效的线程池拒绝策略,为构建高可用的Java应用奠定坚实的基础。