掘金 后端 ( ) • 2024-04-21 16:29

theme: smartblue

  • 👏作者简介:大家好,我是爱敲代码的小黄,阿里巴巴淘天Java开发工程师,CSDN博客专家
  • 📕系列专栏:Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列
  • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
  • 🍂博主正在努力完成2024计划中:以梦为马,扬帆起航,2024追梦人
  • 📝联系方式:smallyellow521,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬👀

一、背景

大家好呀,好久没有更新博文了

最近,我有一个朋友写了一个 bug,导致线上损失了几百万

说是bug,也不算bug

罪魁祸首就是 @Scheduler 这个注解

如果你的项目中用到了这个注解,我真的建议你花几分钟看一下这个问题

说不定下一个踩坑被裁的就是你

废话不多说,我们一起来看看这个注解有啥隐藏的问题

二、架构

朋友线上的架构如下:

简单介绍下业务逻辑:

  • 调度引擎将一些SQL封装成SQL任务提交至大数据平台
  • 大数据平台执行任务返回结果于调度引擎执行后续逻辑
  • 监控系统启动定时任务定时扫描SQL Task,若超过90分钟还未返回结果,重新提交该任务

三、问题

当天朋友正乐呵呵的工作,想着下班后去哪搞点好吃的

突然,下游找上门来,问:你们怎么回事,离线表已经好久没有产出了,客户都投诉了!

前面还风淡云轻,听到投诉,立马跳了起来:我靠,怎么回事

朋友排查之后发现:线上SQL Task全部超时导致离线表未产出

奇怪,按照上述的描述,我们的监控系统应该能识别到超时的任务并将其重新启动

当然,这种危机关头也不是查询原因的时候,赶紧先重新将这些任务启动再说

后面排查之后发现:监控系统的超时扫描定时任务未启动,从而导致线上超时离线任务未重新提交

这里先埋一个伏笔:监控系统不止一个定时任务

四、原理

OK~终于到了解密了时刻了

1、业务逻辑

我们先看下业务代码如何写的

@SpringBootApplication
@EnableScheduling
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

定时任务代码:

@Component
public class SchedulerTest {

    // 初始化5秒,之后每隔5秒执行一次
    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    public void scheduled1() {
        System.out.println("I am scheduled1, thread = " + Thread.currentThread());
    }

    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    public void scheduled2() {
        System.out.println("I am scheduled2, thread = " + Thread.currentThread());
    }

    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    public void scheduled3() {
        System.out.println("I am scheduled3, thread = " + Thread.currentThread());
    }
}

启动程序输出:

I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled2, thread = Thread[scheduling-1,5,main]
I am scheduled3, thread = Thread[scheduling-1,5,main]

通过输出你能看出来什么原因嘛?

2、报错逻辑

我们将上述的 SchedulerTest稍微修改一下:将第一个定时任务改为死循环

@Component
public class SchedulerTest {

    // 初始化5秒,之后每隔5秒执行一次
    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    public void scheduled1() throws InterruptedException {
        while (true) {
            System.out.println("I am scheduled1, thread = " + Thread.currentThread());
            Thread.sleep(5000);
        }
    }

    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    public void scheduled2() throws InterruptedException {
        while (true) {
            System.out.println("I am scheduled2, thread = " + Thread.currentThread());
            Thread.sleep(5000);
        }
    }

    @Scheduled(initialDelay = 5000, fixedRate = 5000)
    public void scheduled3() throws InterruptedException {
        while (true) {
            System.out.println("I am scheduled3, thread = " + Thread.currentThread());
            Thread.sleep(5000);
        }
        
    }
}

输出数据:

I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]
I am scheduled1, thread = Thread[scheduling-1,5,main]

有没有发现什么问题?

3、原理揭露

通过上面两个案例,我们很明显的可以看出来:**我们 **@Scheduled是单线程的!!!

所以,当我们存在三个定时任务时,其中一个定时任务卡死,就会导致其余定时任务无法启动 从而造成线上故障

那问题来了,整个 Spring是如何对 @Scheduled解析和运行的呢,底层又是如何结合实现的

黄哥主打的就是源码,接下来开始源码揭露

4、源码揭露

我们可以直接看其源码:

4.1 扫描注解

我们先从 ScheduledAnnotationBeanPostProcessor这个类看起

相信看过之前Spring源码解析系列的人应该认识这个这个类吧

对的,经典的后置处理器

主要的作用:Bean初始化过程中扫描目标Bean中的@Scheduled注解,为其创建相应的调度任务

浅挖下源码:

public Object postProcessAfterInitialization(Object bean, String beanName) {
    
    // 获取当前类描述
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);

    // 检查目标类targetClass是否已经存在于nonAnnotatedClasses集合中
    // targetClass是否是Scheduled.class或Schedules.class的候选类
    if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {

        // 获取当前带有Scheduled注解的方法
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                                                                                        (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                                                                                            Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                                                                                method, Scheduled.class, Schedules.class);
                                                                                            return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
                                                                                        });

        // 如果注解方法为空的,则将类加入Set中,避免下一次判断
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
        }else {
            // 反之遍历其带有注解的方法,封装成
            annotatedMethods.forEach((method, scheduledAnnotations) ->
                                     scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
            if (logger.isTraceEnabled()) {
                logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                             "': " + annotatedMethods);
            }
        }
    }
}

比如上述例子: image.png

4.2 封装任务

在上面的 processScheduled(scheduled, method, bean)方法里面,封装了我们的任务

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    // 将当前的bean和方法封装成Runable
    Runnable runnable = createRunnable(bean, method);

    // 获取初始化延时时间
    long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
    // 获取cron命令
    String cron = scheduled.cron();

    // 根据模式不同注册不同的定时任务
    
    long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
    if (fixedDelay >= 0) {
        tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
    }

    long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
    if (fixedRate >= 0) {
        tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
    }
}

所以,我们的重点就在这一句了:tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)))

public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
    ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
        scheduledTask = new ScheduledTask(task);
        newTask = true;
    }
    // 等到后续应用事件初始化后,才会正式运行该任务
    if (this.taskScheduler != null) {
        if (task.getInitialDelay() > 0) {
            Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());
            scheduledTask.future =
            this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
        }
        else {
            scheduledTask.future =
            this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
        }
    }
    else {
        // 第一次进入会走这里添加对应的定时任务
        addFixedRateTask(task);
        this.unresolvedTasks.put(task, scheduledTask);
    }
    return (newTask ? scheduledTask : null);
}

我们在第一次运行时,该函数只会向fixedRateTasks中添加对应的task

4.3 任务运行

我们继续回到 ScheduledAnnotationBeanPostProcessor中,找到 onApplicationEvent这个方法

onApplicationEvent是Spring框架中的一个方法,用于处理应用程序事件。它是ApplicationListener接口的方法之一,用于响应特定类型的应用程序事件。

public void onApplicationEvent(ContextRefreshedEvent event) {
		if (event.getApplicationContext() == this.applicationContext) {
			// Running in an ApplicationContext -> register tasks this late...
			// giving other ContextRefreshedEvent listeners a chance to perform
			// their work at the same time (e.g. Spring Batch's job registration).
			finishRegistration();
		}
	}
private void finishRegistration() {
    if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
        this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
    }

    // ........
    this.registrar.afterPropertiesSet();
}

OK,这一句也就是我们的重点:this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));

// 这里将我们的taskScheduler进行初始化
public void setTaskScheduler(TaskScheduler taskScheduler) {
    this.taskScheduler = taskScheduler;
}

最后执行 afterPropertiesSet方法

public void afterPropertiesSet() {
    scheduleTasks();
}

protected void scheduleTasks() {
    if (this.fixedRateTasks != null) {
        for (IntervalTask task : this.fixedRateTasks) {
            addScheduledTask(scheduleFixedRateTask(task));
        }
    }
}

public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
    // 如果当前的taskScheduler不为null的话
    if (this.taskScheduler != null) {
        if (task.getInitialDelay() > 0) {
            Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay());
            // 提交我们的定时任务
            scheduledTask.future =
            this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval());
        }
        else {
            scheduledTask.future =
            this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
        }
    }
}

那这里是如何提交任务运行的呢

image.png

继续往下追代码:

ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) {
	// 获取ScheduledExecutorService执行类
    ScheduledExecutorService executor = getScheduledExecutor();
		long initialDelay = startTime.getTime() - this.clock.millis();
		try {
            // 调用JUC包进行任务运行
			return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}

OK,到这里我们基本梳理清楚了

但还有最后一个问题,那就是 ThreadPoolTaskScheduler是如何来的?

回到我们一开始的this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));

我们可以看到 resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)中使用 TaskScheduler.class

继续往下看源码

private <T> T resolveSchedulerBean(BeanFactory beanFactory, Class<T> schedulerType, boolean byName) {
    if (beanFactory instanceof AutowireCapableBeanFactory) {
        // 这里使用TaskScheduler.class获取bean
        NamedBeanHolder<T> holder = ((AutowireCapableBeanFactory) beanFactory).resolveNamedBean(schedulerType);
        if (this.beanName != null && beanFactory instanceof ConfigurableBeanFactory) {
            ((ConfigurableBeanFactory) beanFactory).registerDependentBean(holder.getBeanName(), this.beanName);
        }
        return holder.getBeanInstance();
    }
    else {
        return beanFactory.getBean(schedulerType);
    }
}

在往下追的话:

image.png

这个类应该不用介绍了吧,讲了几百遍了

那为什么我们通过 TaskScheduler.class可以得到 ThreadPoolTaskScheduler呢?

很简单,因为这哥们实现了TaskScheduler接口

public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
		implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {

但这哥们有个问题:

public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler {
    private volatile int poolSize = 1;
    
    /**
	 * Set the ScheduledExecutorService's pool size.
	 * Default is 1.
	 * <p><b>This setting can be modified at runtime, for example through JMX.</b>
	 */
    public void setPoolSize(int poolSize) {
        Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
        if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor) {
            ((ScheduledThreadPoolExecutor) this.scheduledExecutor).setCorePoolSize(poolSize);
        }
        this.poolSize = poolSize;
    }
}

是的,这坑爹的ThreadPoolTaskScheduler线程池默认为1

然后所有使用 @Scheduled的定时任务共有这一个注解

所以,我们任务如果一个运行不完,其余任务都在阻塞着

这就是问题的源码根因

4、解决方式

有两种解决方式:

  • 增加配置类
  • 配置文件新增配置

4.1 增加配置类

@Configuration
public class ScheduleConfig {
    /**
     * 修复同一时间无法执行多个定时任务问题。
     * @Scheduled默认是单线程的
     */
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        //核心线程池数量,方法: 返回可用处理器的Java虚拟机的数量。
        taskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        return taskScheduler;
    }
}

4.2 配置文件

spring.task.scheduling.pool.size=10

五. 总结

鲁迅先生曾说:独行难,众行易,和志同道合的人一起进步。彼此毫无保留的分享经验,才是对抗互联网寒冬的最佳选择。

其实很多时候,并不是我们不够努力,很可能就是自己努力的方向不对,如果有一个人能稍微指点你一下,你真的可能会少走几年弯路。

如果你也对 后端架构中间件源码 有兴趣,欢迎添加博主微信:smallyellow521,一起学习,一起成长

我是爱敲代码的小黄,阿里巴巴淘天集团核心事业部Java高级开发工程师,双非二本,培训班出身

通过两年努力,成功拿下阿里、百度、美团、滴滴、快手、拼多多等大厂,想通过自己的事迹告诉大家,努力是会有收获的!

双非本两年经验,我是如何拿下阿里、百度、美团、滴滴、快手、拼多多等大厂offer的?

我们下期再见。

从清晨走过,也拥抱夜晚的星辰,人生没有捷径,你我皆平凡,你好,陌生人,一起共勉。