资料参考来源拉钩Java高薪训练营
前言
分布式调度其实就是在分布式环境下的定时任务。
一、什么是分布式调度?
分布式调度两种场景:
- 运行在分布式集群环境下的定时任务,同一个定时任务部署了多份,应该只有一个定时任务在执行。
- 把一个大的定时任务拆分成多个小的定时任务,同时执行,这就是定时任务的分布式。
二、定时任务和消息队列的区别
相同应用场景:
- 异步处理
- 解耦
- 流量削峰
不同点:
- 定时任务是时间驱动,MQ是事件驱动
- 事件驱动是不可替代的,很多场景只能使用定时任务批量处理数据
- 定时任务倾向于批量处理,MQ倾向于单条处理
三、分布式调度框架Elastic-Job
1.介绍
Elastic-Job是当当网开源的一个分布式调度框架,是基于Quartz二次开发的,有两个独立的模块组成:Elastic-Job-Lite和Elastic-JobCloud。一般使用Lite就可以了,Lite定位是轻量级无中心化的解决方案。
主要功能:
- 分布式调度协调
- 丰富的调度策略,使用成熟的Quartz cron表达式配置时间触发器。
- 弹性扩容和缩容
- 错过执⾏作业重触发
- ⽀持并⾏调度,也就是任务分片
- 作业分片的一致性,保证同一分片在分布式环境下只能有一个运行
2.Elastic-Job-Lite使用
Elastic-Job主要依赖于zookeeper进行分布式协调,zookeeper版本需要在3.4.6以上。
现在开发一个简单的归档功能,把user_log的表数据归档到user_log_bak中。下面是部分实现代码:
首先加入pom依赖:
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
Main启动类:
public class ElasticJobMain {
public static void main(String[] args) {
//注册到Zookeeper
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181","data-archive-job");
CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
coordinatorRegistryCenter.init();
//定义任务 分片数为3个
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job","*/2 * * * * ?",3)
.shardingItemParameters("0=0,1=1,2=2").build();
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,TestJob.class.getName());
JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());
jobScheduler.init();
}
}
任务类:
public class TestJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
//分片总数
int shardingTotalCount = shardingContext.getShardingTotalCount();
//当前分片
int shardingItem = shardingContext.getShardingItem();
System.out.println("---->>>当前分片:"+shardingItem);
//查询一条数据 通过id和分片总数取余等于当前分片来区分该分片需要处理的数据
String selectSql = "select * from user_log where is_del = 1 and id%?=? limit 1";
List<Map<String, Object>> list = JdbcUtil.executeQuery(selectSql,shardingTotalCount,shardingItem);
Map<String, Object> map = list.stream().findFirst().get();
Integer id = (Integer) map.get("id");
String name = (String) map.get("name");
System.out.println("-------->>> id:"+id+" name:"+name);
//备份这条数据
String insertSql = "insert into user_log_bak select * from user_log where id=?";
JdbcUtil.executeUpdate(insertSql,id);
//删除这条数据
String updateSql = "delete from user_log where id=?";
JdbcUtil.executeUpdate(updateSql,id);
}
}
再把一个jdbc工具类贴上:
public class JdbcUtil {
//url
private static String url = "jdbc:mysql://localhost:3306/my-test?characterEncoding=utf8&useSSL=false";
//user
private static String user = "root";
//password
private static String password = "123456";
//驱动程序类
private static String driver = "com.mysql.cj.jdbc.Driver";
static {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static Connection getConnection() {
try {
return DriverManager.getConnection(url, user,
password);
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
public static void close(ResultSet rs, PreparedStatement ps,
Connection con) {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
e.printStackTrace();
} finally {
if (con != null) {
try {
con.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
}
}
public static void executeUpdate(String sql, Object... obj) {
Connection con = getConnection();
PreparedStatement ps = null;
try {
ps = con.prepareStatement(sql);
for (int i = 0; i < obj.length; i++) {
ps.setObject(i + 1, obj[i]);
}
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(null, ps, con);
}
}
public static List<Map<String, Object>> executeQuery(String sql, Object... obj) {
Connection con = getConnection();
ResultSet rs = null;
PreparedStatement ps = null;
try {
ps = con.prepareStatement(sql);
for (int i = 0; i < obj.length; i++) {
ps.setObject(i + 1, obj[i]);
}
rs = ps.executeQuery();
List<Map<String, Object>> list = new ArrayList<>();
int count = rs.getMetaData().getColumnCount();
while (rs.next()) {
Map<String, Object> map = new HashMap<String, Object>();
for (int i = 0; i < count; i++) {
Object ob = rs.getObject(i + 1);
String key = rs.getMetaData().getColumnName(i
+ 1);
map.put(key, ob);
}
list.add(map);
}
return list;
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(rs, ps, con);
}
return null;
}
public static void executeUpdate(Connection con,String sql, Object... obj) {
PreparedStatement ps = null;
try {
ps = con.prepareStatement(sql);
for (int i = 0; i < obj.length; i++) {
ps.setObject(i + 1, obj[i]);
}
ps.executeUpdate();
} catch (SQLException e) {
e.printStackTrace();
} finally {
close(null, ps, null);
}
}
}
当idea启动多个该程序时,很明现看出来分布式调度的功能的:


当把其中一个服务器停掉过后,这个服务器的分片任务降会分配到其他服务器执行,可能是服务器2同时执行2个分片任务。
总结
上面这个例子是Elastic-Job原始使用,现在的项目一般都是springboot项目,可以直接使用elastic-job-spring-boot-starter开发,非常的简单,只需要在任务类上使用@ElasticJobConf注解配置,然后在spring配置文件中配置zookeeper信息就可以了。
版权声明:本文为u012387539原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。