掘金 后端 ( ) • 2024-03-22 11:05

执行器注册与调度中心 xxl-admin

  1. config目录下有一个配置类 XxlJobAdminConfig 实现了InitializingBean 接口 和 DisposableBean 接口 ,在实例初始化和销毁后做处理。
@Override
public void afterPropertiesSet() throws Exception {
    adminConfig = this;

    xxlJobScheduler = new XxlJobScheduler();
    xxlJobScheduler.init();
}

@Override
public void destroy() throws Exception {
    xxlJobScheduler.destroy();
}
//xxlJobScheduler.init() 中初始化了一些线程池
public void init() throws Exception {
    // init i18n
    initI18n();

    // admin trigger pool start
    JobTriggerPoolHelper.toStart();

    // admin registry monitor run
    JobRegistryHelper.getInstance().start();

    // admin fail-monitor run
    JobFailMonitorHelper.getInstance().start();

    // admin lose-monitor run ( depend on JobTriggerPoolHelper )
    JobCompleteHelper.getInstance().start();

    // admin log report start
    JobLogReportHelper.getInstance().start();

    // start-schedule  ( depend on JobTriggerPoolHelper )
    JobScheduleHelper.getInstance().start();

    logger.info(">>>>>>>>> init xxl-job admin success.");
}
  1. 这些线程的作用分别是:

    1. 初始化国际化配置的方法,即初始化各种语言对应的信息。

    2. 启动任务触发池(JobTriggerPoolHelper)。

      • start(): 启动方法,初始化了两个线程池,分别用于快速触发和慢速触发任务。这两个线程池分别由 fastTriggerPoolslowTriggerPool 表示。

      • stop(): 停止方法,关闭了两个线程池。

      • addTrigger(): 添加触发任务的方法,接受参数包括任务ID、触发类型、失败重试次数、执行器分片参数、执行器参数和执行器地址列表。根据任务的超时次数决定使用快速触发线程池还是慢速触发线程池。然后提交一个任务到对应的线程池中执行,在任务执行完毕后,会检查任务的执行时间,如果超过了阈值(500ms),则会记录任务的超时次数。

      • toStart()toStop(): 对外暴露的启动和停止方法,用于控制整个触发池的启动和停止。

      • trigger(): 提供了一个静态方法,用于外部调用触发任务。内部调用了 addTrigger() 方法。

    3. 启动执行器注册监控器(JobRegistryHelper),用于监控任务的注册情况。

      • start(): 启动方法,初始化了一个线程池 registryOrRemoveThreadPool 和一个监控线程 registryMonitorThread。线程池用于处理执行器的注册和移除,监控线程用于定期检查执行器注册状态。

      • toStop(): 停止方法,用于停止执行器注册监控。关闭了 registryOrRemoveThreadPool,并中断和等待 registryMonitorThread 的结束。

      • registry(): 注册执行器的方法,接受一个 RegistryParam 参数,包括注册组、注册键和注册值。异步执行注册操作,将注册信息保存到数据库,并更新任务组的注册信息。

      • registryRemove(): 移除注册任务的方法,与 registry() 相似,异步执行移除注册操作,从数据库中删除注册信息,并更新执行器的注册信息。

      • freshGroupRegistryInfo(): 刷新执行器注册信息的方法,目前为空实现,待完善。

      • 总体来说,这段代码负责管理执行器的注册信息,通过线程池处理注册和移除执行器的操作,以及通过监控线程定期刷新执行器的注册信息。

    4. 启动任务失败监控器(JobFailMonitorHelper),用于监控任务执行失败情况。

      • start(): 启动方法,初始化了一个监控线程 monitorThread。监控线程定期检查任务执行日志中的失败记录,并进行失败重试和失败告警处理。

      • toStop(): 停止方法,用于停止任务失败监控。中断和等待监控线程的结束。

        • 监控线程 monitorThread 的主要工作如下:
          1. 定期检查任务执行日志中的失败记录,每隔一段时间(此处为10秒)执行一次检查。
          2. 获取最近的失败日志记录,最多获取1000条失败记录。
          3. 对于每条失败记录,首先尝试对日志进行锁定,避免并发操作。
          4. 对于已锁定的失败日志,进行以下处理:
            • 如果该任务允许失败重试(log.getExecutorFailRetryCount() > 0),则触发失败重试,并更新日志记录的触发信息。
            • 对任务进行失败告警处理,调用 JobAlarmer 接口的 alarm() 方法进行告警处理,更新失败日志的告警状态。
          5. 在处理完所有失败记录后,等待一段时间后继续下一轮监控。
      • 监控线程 monitorThread 在启动时设置为守护线程,并命名为 "xxl-job, admin JobFailMonitorHelper"。

      • 总体来说,这段代码负责监控任务失败情况,对失败的任务进行重试和告警处理,保证任务的稳定执行。

    5. 启动任务完成监控器(JobCompleteHelper),用于监控任务的完成情况。

      • start(): 启动方法,初始化了一个监控线程 monitorThread 和一个回调线程池 callbackThreadPool。监控线程定期检查任务执行日志中的任务结果丢失情况,而回调线程池用于处理任务执行结果的回调。

      • toStop(): 停止方法,用于停止任务完成监控。停止回调线程池并等待监控线程结束。

        • 监控线程 monitorThread 的主要工作如下:

          1. 每隔一段时间(此处为60秒)检查任务执行日志中的任务结果丢失情况。
          2. 获取最近超过一定时间(此处为10分钟)但仍处于 "运行中" 状态的任务执行日志。
          3. 对于每个丢失的任务执行日志,设置任务执行失败,更新执行日志的执行结果。
        • 回调线程池 callbackThreadPool 用于处理任务执行结果的回调。当任务执行完成后,异步执行回调方法 callback() 进行回调处理。

          1. callback() 方法遍历传入的回调参数列表,对每个回调参数执行回调处理。
          2. 回调处理包括更新任务执行日志的执行结果和执行消息,并调用 XxlJobCompleter.updateHandleInfoAndFinish() 方法完成任务的执行。
      • callback() 方法用于处理任务执行结果的回调。

        1. 首先验证任务执行日志是否存在,以及任务是否已经回调过,避免重复回调。
          1. 将回调消息添加到执行日志的执行消息中,然后更新执行结果和执行消息,完成任务执行。
    6. 启动任务日志报告(JobLogReportHelper),用于上报任务执行日志。

      • start(): 启动方法,初始化了一个线程 logrThread,该线程负责定期刷新任务日志报告并清理过期的任务日志。

        1. 在一个循环中,先刷新任务日志报告,然后检查是否需要清理过期的任务日志。
        2. 刷新任务日志报告是针对最近的3天的日志进行统计。循环3次,每次统计一天的日志情况,并保存或更新至数据库中。
        3. 检查是否需要清理过期任务日志是通过比较系统配置中设置的日志保留天数和最后清理日志的时间来判断是否需要执行清理操作。
      • toStop(): 停止方法,用于停止任务日志报告处理。停止线程并等待线程结束。

        • 刷新任务日志报告的逻辑:

          1. 针对最近的三天(包括当天),分别统计任务的触发次数、运行成功次数、运行失败次数,并保存或更新至数据库中。
          2. 利用 XxlJobAdminConfig 中的数据访问对象 getXxlJobLogReportDao()getXxlJobLogDao() 进行数据库操作。
        • 清理过期任务日志的逻辑:

          1. 如果系统配置中设置了日志保留天数(logretentiondays > 0)且距离上次清理日志的时间已经超过一天(24小时)。
          2. 计算出过期时间,即当前时间减去日志保留天数得到的时间,用于标记需要清理的日志。
          3. 循环获取需要清理的日志 ID,每次获取 1000 条,然后清理这些日志。
          4. 更新最后清理日志的时间。
      • 总体来说,这段代码负责定时刷新任务日志报告并清理过期的任务日志,保持系统数据的清洁和统计的准确性。

    7. 启动任务调度器(JobScheduleHelper),用于定时触发任务执行

    • 首先是start()方法,用于启动调度器。在该方法中,创建了两个线程:scheduleThreadringThreadscheduleThread负责扫描任务并执行触发操作,ringThread负责处理时间环触发任务。这两个线程在启动后,会持续运行直到调用toStop()方法停止。

      • scheduleThread的主要工作包括:

        1. 等待初始5秒,使得调度器的执行与系统时间对齐

           try {
                              
                 TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
               } catch (InterruptedException e) {
                       if (!scheduleThreadToStop) {
                               logger.error(e.getMessage(), e);
                          }
               }
          
        2. 扫描之前加锁

          conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
          connAutoCommit = conn.getAutoCommit();
          conn.setAutoCommit(false);
          
          preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
          preparedStatement.execute();
          
        3. 定期扫描待执行的任务。扫描未来5s即将执行的任务,预读数量 =

          线程池大小(快慢线程池) * 触发的QPS (平均触发时间为50ms , 1000 / 50 = 20 ) )

          long nowTime = System.currentTimeMillis();
          List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
          
        4. 逐个遍历拿到的任务,根据任务的下次触发时间决定触发策略,执行任务触发操作。

          // 时间环跳过
          if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
              // 触发过期时间 > 5秒:跳过 && 生成下次触发时间
              logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
          
              // 1、匹配触发策略
              MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
              if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                  // FIRE_ONCE_NOW 》 触发
                  JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                  logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
              }
          
              // 2、更新下次触发时间
              refreshNextValidTime(jobInfo, new Date());
          
          } else if (nowTime > jobInfo.getTriggerNextTime()) {
              // 触发过期时间 < 5秒:直接触发 && 生成下次触发时间
          
              // 1、触发
              JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
              logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
          
              // 2、更新下次触发时间
              refreshNextValidTime(jobInfo, new Date());
          
              // 下次触发时间在5秒内,再次预读
              if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
          
                  // 1、生成时间环秒数
                  int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
          
                  // 2、推入时间环
                  pushTimeRing(ringSecond, jobInfo.getId());
          
                  // 3、更新下次触发时间
                  refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
          
              }
          
          } else {
              // 触发预读:时间环触发 && 生成下次触发时间
          
              // 1、生成时间环秒数
              int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
          
              // 2、推入时间环
              pushTimeRing(ringSecond, jobInfo.getId());
          
              // 3、更新下次触发时间
              refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
          
          }
          
        5. 更新任务的下次触发时间和触发状态。

          for (XxlJobInfo jobInfo: scheduleList) {
              XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
          }
          
        6. 使用时间环方式进行预读和触发。

          private void pushTimeRing(int ringSecond, int jobId){
              // push async ring
              List<Integer> ringItemData = ringData.get(ringSecond);
              if (ringItemData == null) {
                  ringItemData = new ArrayList<Integer>();
                  ringData.put(ringSecond, ringItemData);
              }
              ringItemData.add(jobId);
          
              logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
          }
          
      • ringThread主要负责时间环触发任务的处理:

        1. 对齐时间

          try {
              TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
          } catch (InterruptedException e) {
              if (!ringThreadToStop) {
                  logger.error(e.getMessage(), e);
              }
          }
          
        2. 定期扫描时间环。

          List<Integer> ringItemData = new ArrayList<>();
          int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   
          // 避免处理耗时太长导致对齐到下一秒,向前校验1秒;
          for (int i = 0; i < 2; i++) {
              List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
              if (tmpData != null) {
                  ringItemData.addAll(tmpData);
              }
          }
          
        3. 根据当前时间获取相应时间环的任务列表,并执行触发操作。

          if (ringItemData.size() > 0) {
              // 执行触发
              for (int jobId: ringItemData) {
                  // 执行触发
                  JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
              }
              // 清空
              ringItemData.clear();
          }
          
    • toStop()方法用于停止调度器,包括:

      1. scheduleThreadToStopringThreadToStop设置为true,以通知线程停止运行。
        1. 中断线程并等待线程结束。
        2. 最后输出日志表示调度器已停止。
  • 代码中用到了数据库连接、定时任务触发、线程控制等技术。
  • 在异常处理方面,如果出现异常,会记录错误日志,但不会停止调度器的运行,以确保调度器的稳定性。
  • 代码中使用了单例模式,通过getInstance()方法获取JobScheduleHelper的唯一实例。

任务执行器 XxlJobSpringExecutor

  1. afterSingletonsInstantiated(): 实现 SmartInitializingSingleton 接口的方法,在 Spring 单例对象初始化完成后被调用。

    public void afterSingletonsInstantiated() {
        //初始化任务处理器的方法仓库
        this.initJobHandlerMethodRepository(applicationContext);
        //刷新 Glue 实例,1 代表使用 Spring 环境下的任务处理器。
        GlueFactory.refreshInstance(1);
    
        try {
            //在启动过程中,执行器会初始化日志路径、Admin 客户端、任务日志清理线程、触发器回调线程以及嵌入式服务器。
            super.start();
        } catch (Exception var2) {
            throw new RuntimeException(var2);
        }
    }
    
  2. destroy(): 实现了 DisposableBean 接口的方法,用于销毁资源,在该方法中调用了父类的 destroy() 方法。

    public void destroy() {
        //执行器会停止嵌入式服务器,并清理任务线程和相关资源。
        super.destroy();
    }
    
  3. initJobHandlerMethodRepository(ApplicationContext applicationContext): 初始化任务处理器的方法仓库。

    1. 遍历 Spring 容器中的所有 Bean,对标记了 @XxlJob 注解的方法进行解析。

    2. 对每个 Bean,使用 MethodIntrospector.selectMethods() 方法解析出标记了 @XxlJob 注解的方法。

    3. 遍历解析出的方法,将其向任务处理器注册。

    4. 对于每个标记了 @XxlJob 注解的方法,解析出其中的属性,如任务名称、初始化方法、销毁方法等,并注册。

执行器基类 XxlJobExecutor

  1. 任务执行器的启动和销毁: start() 方法用于启动执行器,destroy() 方法用于销毁执行器。在启动过程中,执行器会初始化日志路径、Admin 客户端、任务日志清理线程、触发器回调线程以及嵌入式服务器。而在销毁过程中,执行器会停止嵌入式服务器,并清理任务线程和相关资源。
   public void start() throws Exception {
        // 初始化日志路径
        XxlJobFileAppender.initLogPath(logPath);

        // 初始化 Admin 客户端列表
        initAdminBizList(adminAddresses, accessToken);

        // 初始化任务日志清理线程
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // 初始化触发器回调线程
        TriggerCallbackThread.getInstance().start();

        // 初始化执行器服务器
        initEmbedServer(address, ip, port, appname, accessToken);
    }
    public void destroy() {
        // 停止执行器服务器
        stopEmbedServer();

        // 销毁任务线程和相关资源
        if (jobThreadRepository.size() > 0) {
            for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) {
                JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
                // 等待任务线程将结果推送到回调队列
                if (oldJobThread != null) {
                    try {
                        oldJobThread.join();
                    } catch (InterruptedException e) {
                        logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
                    }
                }
            }
            jobThreadRepository.clear();
        }
        jobHandlerRepository.clear();

        // 销毁任务日志清理线程
        JobLogFileCleanThread.getInstance().toStop();

        // 销毁触发器回调线程
        TriggerCallbackThread.getInstance().toStop();
    }
  1. Admin 客户端的初始化: initAdminBizList() 方法用于初始化 Admin 客户端列表,该列表用于向 XXL-Job Admin 发送任务执行情况的统计信息。执行器会通过 Admin 客户端将任务执行信息发送给 XXL-Job Admin 进行统计和监控。

  2. 嵌入式服务器的启动和停止: initEmbedServer() 方法用于初始化嵌入式服务器,该服务器用于接收 XXL-Job Admin 的任务调度请求,并执行相应的任务。执行器在启动过程中会启动嵌入式服务器,以便能够接收任务调度请求;在销毁过程中会停止嵌入式服务器。

   // 初始化执行器服务器
    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
        // 填充 IP 和端口
        port = port > 0 ? port : NetUtil.findAvailablePort(9999);
        ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();

        // 生成地址
        if (address == null || address.trim().length() == 0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // AccessToken
        if (accessToken == null || accessToken.trim().length() == 0) {
            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }

        // 启动
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }
  1. 任务处理器的注册和管理: registJobHandler() 方法用于注册任务处理器,该方法会将任务处理器注册到执行器中,以便能够执行相应的任务。执行器在启动过程中会注册所有配置的任务处理器,并在接收到任务调度请求时执行相应的任务。

  2. 任务线程的管理: registJobThread() 方法用于注册任务线程,该方法会创建并启动一个任务线程,用于执行指定的任务。执行器在启动过程中会根据任务配置创建并启动相应的任务线程,以便执行任务;在销毁过程中会停止并销毁所有的任务线程。

相关内容