elastic-job是一个无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但是由于Quartz没有分布式扩容、高可用等能力,所以在系统中引进了Zookeeper,用Zookeeper来实现分布式管理的功能,在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。
一、Quartz
Quartz说由OpenSymphony提供的强大的开源任务调度框架,用来执行定时任务。比如每天凌晨三点钟需要从数据库导出数据,这时候就需要一个任务调度框架,帮我们自动去执行这些程序。那Quartz时怎样实现的呢?
1)首先我们需要定义一个运行业务逻辑的接口,即Job,我们的类继承这个接口来实现业务逻辑,比如凌晨三点读取数据库并且导出数据。
2)再有了Job之后需要按时执行这个任务,这就需要一个触发器Trigger,触发器Trigger就是按照我们的要求在每天凌晨三点执行我们定义的Job。
3)有了任务Job和触发器Trigger后,就需要把它们结合起来,让触发器Trigger在规定的时间调用Job,这时需要一个Schedule来实现这个功能。
所以,Quartz主要有三个部分组成:
- 调度器:Scheduler
- 任务:JobDetail
- 触发器:Trigger,包括SimpleTrigger和CronTrigger
一个简单的Quartz的demo如下:
自定义Job:
package com.hcx.HelloQuartz;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Created by HCX on 2017/9/6.
* 自定义任务
*/
public class HelloJob implements Job {
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//打印当前的执行时间,格式为:2017-09-06 00:00:00
Date date = new Date();
SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("当前的时间为(HelloJob):" + sf.format(date));
//编写具体的业务逻辑
System.out.println("Hello Job!");
}
}任务调度类:
package com.hcx.HelloQuartz;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Created by HCX on 2017/9/6.
*/
public class HelloScheduler {
public static void main(String[] args) throws SchedulerException {
/**
* JobDetail:用来绑定Job,并且在job执行的时候携带一些执行的信息
*/
//创建一个JobDetail实例,将该实例与HelloJob Class绑定
JobDetail jobDetail = JobBuilder.newJob(HelloJob.class)
.withIdentity("myJob","group1").build();
/**
* Trigger:用来触发job去执行的,包括定义了什么时候去执行,
* 第一次执行,是否会一直重复地执行下去,执行几次等
*/
//创建一个Trigger实例,定义该job立即执行,并且每隔2秒钟重复执行一次,直到程序停止
/**
* trigger通过builder模式来创建,TriggerBuilder.newTrigger()
* withIdentity():定义一个标识符,定义了组
* startNow():定义现在开始执行,
* withSchedule(SimpleScheduleBuilder.simpleSchedule():withSchedule也是builder模式创建
*.withIntervalInSeconds(2).repeatForever()):定义了执行频度:每2秒钟执行一次,不间断地重复执行
* build():创建trigger
*/
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("myTrigger","group1").startNow()
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(2).repeatForever()).build();
//创建scheduler实例:
/**
* scheduler区别于trigger和jobDetail,是通过factory模式创建的
*/
//创建一个ScheduleFactory
SchedulerFactory sfact = new StdSchedulerFactory();
Scheduler scheduler = sfact.getScheduler();
//需要将jobDetail和trigger传进去,并将jobDetail和trigger绑定在一起。
scheduler.scheduleJob(jobDetail,trigger);
//启动调度任务
scheduler.start();
}
}这样运行程序后,每隔两秒钟就会打印出当前时间。
二、Elastic-Job的基本概念
1. 分片
Elastic-Job为了提高任务的并发能力,引入了分片的概念,即将一个任务划分成多个分片,然后由多个执行的机器分别领取这些分片来执行。
比如一个数据库中有1亿条数据,需要将这些数据读取出来并计算,然后再写入到数据库中。就可以将这1亿条数据划分成10个分片,每一个分片读取其中的1千万条数据,然后计算后写入数据库。这10个分片编号为0,1,2...9,如果有三台机器执行,A机器分到分片(0,1,2,9),B机器分到分片(3,4,5),C机器分到分片(6,7,8) 。
2、作业调度与执行
Elastic-Job是去中心化的任务调度框架,当多个节点运行时,会先选择一个主节点,当到达执行时间后,每个实例开始执行任务,主节点负责分片的划分,其它节点等待划分完成,主节点将划分后的结果存放到zookeeper中,然后每个节点再从zookeeper中获取划分好的分片项,将分片信息作为参数,传入到本地的任务函数中,从而执行任务。
3、作业的类型
elastic-job支持三种类型的作业任务处理!
- Simple 类型作业:Simple 类型用于一般任务的处理,只需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行,与Quartz原生接口相似。
- Dataflow 类型作业:Dataflow 类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
- Script类型作业:Script 类型作业意为脚本类型作业,支持 shell,python,perl等所有类型脚本。只需通过控制台或代码配置 scriptCommandLine 即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
4、作业执行的实例
public class MyElasticJob implements SimpleJob {
public void execute(ShardingContext context) {
System.out.println(context.toString());
switch (context.getShardingItem()) {
case 0:
System.out.println("------------->>>>0");
break;
case 1:
System.out.println("------------->>>>1");
break;
case 2:
System.out.println("------------->>>>2");
break;
default:
System.out.println("------------->>>>default");
break;
}
}
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
// 对zookeeper进行设置,作为分布式任务的注册中心
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
//设置任务的执行频率、执行的类
private static LiteJobConfiguration createJobConfiguration() {
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// 定义SIMPLE类型配置
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
// 定义Lite作业根配置
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
}创建一个Elastic-Job的任务并执行,步骤如下:
1)需要先设置zookeeper的基本信息,Elastic-Job使用zookeeper来进行分布式管理,如选主、元数据存储与读取、分布式监听机制等。
2)创建一个执行任务的Job类,以Simple 类型作业为例,创建一个继承SimpleJob的类,在这个类中实现execute函数。
3)设置作业的基本信息,在JobCoreConfiguration 中设置作业的名称(jobName),作业执行的时间表达式(cron),总的分片数(shardingTotalCount);然后在SimpleJobConfiguration 中设置执行作业的Job类,最后定义Lite作业根配置
4)创建JobScheduler(作业调度器)实例,然后JobScheduler的init()方法中执行作业的初始化,这样作业就开始运行了。