xxljob调度中心启动源码解析

xxljob调度中心启动源码解析

源码版本:

<dependency>
	<groupId>com.xuxueli</groupId>
	<artifactId>xxl-job</artifactId>
	<version>2.4.0-SNAPSHOT</version>
</dependency>

入口类为:com.xxl.job.admin.core.conf.XxlJobAdminConfig。因为该类实现了InitializingBean接口,因此在spring启动初始化bean时会回调该类的afterPropertiesSet方法。

@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {

	@Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;

        xxlJobScheduler = new XxlJobScheduler();
        // 进行xxljob调度中心的初始化
        xxlJobScheduler.init();
    }
}

接着来到com.xxl.job.admin.core.scheduler.XxlJobScheduler的init方法:

public void init() throws Exception {
        // init i18n
        // 初始化新增任务页面中的阻塞处理策略相关的国际化信息。国际化提供中文、英文两种可选语言,默认为中文
        initI18n();

        // admin trigger pool start
        // 初始化一个快处理线程池和一个慢处理线程池。如果一个任务在1分钟之内出现10次调用超时,则这个任务会加入到慢处理线程池中进行调度
        // 使执行任务的线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性。
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run
        // 开启一个守护线程主要管理执行器的注册和注销(即任务的调用地址管理)
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run
        // 开启一个守护线程主要管理异常调用的任务。如果任务调用失败,它会判断这个任务是否配置了失败重试(默认不重试)。
        // 如果配置重试次数大于0就会执行失败重试,如果一直失败最大重试次数就是你配置的重试次数
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        // 初始化一个回调线程池、以及启动一个监听线程(对于任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败)
        JobCompleteHelper.getInstance().start();

        // admin log report start
        // 报表相关的功能,保存任务执行的日志。并且生成日结报表,并且清理过期日志。
        // 支持实时查看运行数据,如任务数量、调度次数、执行器数量等;以及调度报表,如调度日期分布图,调度成功分布图等
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        // 这里会创建两个线程:一个线程是从xxl_job_info表中取出未来5秒内要执行的任务,然后将任务按照执行时间加入到ringData(时间轮)中
        // 另一个线程则是从ringData(时间轮)中取出数据进行任务调度
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

接下来主要看com.xxl.job.admin.core.thread.JobScheduleHelper的start方法中ringThread线程的处理逻辑

ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                // // 触发任务的执行
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();

然后进入到JobTriggerPoolHelper的trigger方法

/**
     * @param jobId
     * @param triggerType
     * @param failRetryCount
     * 			>=0: use this param
     * 			<0: use param from job info config
     * @param executorShardingParam
     * @param executorParam
     *          null: use job param
     *          not null: cover job param
     */
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
    }

在进入到addTrigger方法

/**
* add trigger
*/
public void addTrigger(final int jobId,
                      final TriggerTypeEnum triggerType,
                      final int failRetryCount,
                      final String executorShardingParam,
                      final String executorParam,
                      final String addressList) {

   // choose thread pool
   ThreadPoolExecutor triggerPool_ = fastTriggerPool;
   AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
   if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
       triggerPool_ = slowTriggerPool;
   }

   // trigger
   triggerPool_.execute(new Runnable() {
       @Override
       public void run() {

           long start = System.currentTimeMillis();

           try {
               // do trigger
               // 触发任务的执行
               XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
           } catch (Exception e) {
               logger.error(e.getMessage(), e);
           } finally {

               // check timeout-count-map
               long minTim_now = System.currentTimeMillis()/60000;
               if (minTim != minTim_now) {
                   minTim = minTim_now;
                   jobTimeoutCountMap.clear();
               }

               // incr timeout-count-map
               long cost = System.currentTimeMillis()-start;
               if (cost > 500) {       // ob-timeout threshold 500ms
                   AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                   if (timeoutCount != null) {
                       timeoutCount.incrementAndGet();
                   }
               }

           }

       }
   });
}

然后进入到XxlJobTrigger的trigger方法

/**
* trigger job
*
* @param jobId
* @param triggerType
* @param failRetryCount
* 			>=0: use this param
* 			<0: use param from job info config
* @param executorShardingParam
* @param executorParam
*          null: use job param
*          not null: cover job param
* @param addressList
*          null: use executor addressList
*          not null: cover
*/
public static void trigger(int jobId,
                          TriggerTypeEnum triggerType,
                          int failRetryCount,
                          String executorShardingParam,
                          String executorParam,
                          String addressList) {

   // load data
   XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
   if (jobInfo == null) {
       logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
       return;
   }
   if (executorParam != null) {
       jobInfo.setExecutorParam(executorParam);
   }
   int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
   XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

   // cover addressList
   if (addressList!=null && addressList.trim().length()>0) {
       group.setAddressType(1);
       group.setAddressList(addressList.trim());
   }

   // sharding param
   int[] shardingParam = null;
   if (executorShardingParam!=null){
       String[] shardingArr = executorShardingParam.split("/");
       if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
           shardingParam = new int[2];
           shardingParam[0] = Integer.valueOf(shardingArr[0]);
           shardingParam[1] = Integer.valueOf(shardingArr[1]);
       }
   }
   if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
           && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
           && shardingParam==null) {
       for (int i = 0; i < group.getRegistryList().size(); i++) {
           processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
       }
   } else {
       if (shardingParam == null) {
           shardingParam = new int[]{0, 1};
       }
       processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
   }

}

然后进入到processTrigger方法

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

        // param
        // 获取阻塞策略
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
        // 获取路由策略
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

        // 1、save log-id
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、init trigger-param
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
        triggerParam.setExecutorParams(jobInfo.getExecutorParam());
        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
        triggerParam.setLogId(jobLog.getId());
        triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
        triggerParam.setGlueType(jobInfo.getGlueType());
        triggerParam.setGlueSource(jobInfo.getGlueSource());
        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
        triggerParam.setBroadcastIndex(index);
        triggerParam.setBroadcastTotal(total);

        // 3、init address
        String address = null;
        ReturnT<String> routeAddressResult = null;
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {
                    address = group.getRegistryList().get(index);
                } else {
                    address = group.getRegistryList().get(0);
                }
            } else {
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else {
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }

        // 4、trigger remote executor
        ReturnT<String> triggerResult = null;
        if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

        // 5、collection trigger info
        StringBuffer triggerMsgSb = new StringBuffer();
        triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
        if (shardingParam != null) {
            triggerMsgSb.append("("+shardingParam+")");
        }
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

        triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

        // 6、save log trigger-info
        jobLog.setExecutorAddress(address);
        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
        jobLog.setExecutorParam(jobInfo.getExecutorParam());
        jobLog.setExecutorShardingParam(shardingParam);
        jobLog.setExecutorFailRetryCount(finalFailRetryCount);
        //jobLog.setTriggerTime();
        jobLog.setTriggerCode(triggerResult.getCode());
        jobLog.setTriggerMsg(triggerMsgSb.toString());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }

再进入到runExecutor方法

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        return runResult;
    }

然后调用ExecutorBizClient的run方法

public class ExecutorBizClient implements ExecutorBiz {

	public ExecutorBizClient() {
    }
    public ExecutorBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }

    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

}

最终执行的是
XxlJobRemotingUtil.postBody(addressUrl + “run”, accessToken, timeout, triggerParam, String.class);


版权声明:本文为qq_34396672原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。