分布式调度问题

资料参考来源拉钩Java高薪训练营


前言

分布式调度其实就是在分布式环境下的定时任务。

一、什么是分布式调度?

分布式调度两种场景:

  1. 运行在分布式集群环境下的定时任务,同一个定时任务部署了多份,应该只有一个定时任务在执行。
  2. 把一个大的定时任务拆分成多个小的定时任务,同时执行,这就是定时任务的分布式。

二、定时任务和消息队列的区别

相同应用场景:

  • 异步处理
  • 解耦
  • 流量削峰

不同点:

  • 定时任务是时间驱动,MQ是事件驱动
  • 事件驱动是不可替代的,很多场景只能使用定时任务批量处理数据
  • 定时任务倾向于批量处理,MQ倾向于单条处理

三、分布式调度框架Elastic-Job

1.介绍

Elastic-Job是当当网开源的一个分布式调度框架,是基于Quartz二次开发的,有两个独立的模块组成:Elastic-Job-Lite和Elastic-JobCloud。一般使用Lite就可以了,Lite定位是轻量级无中心化的解决方案。

主要功能:

  • 分布式调度协调
  • 丰富的调度策略,使用成熟的Quartz cron表达式配置时间触发器。
  • 弹性扩容和缩容
  • 错过执⾏作业重触发
  • ⽀持并⾏调度,也就是任务分片
  • 作业分片的一致性,保证同一分片在分布式环境下只能有一个运行

2.Elastic-Job-Lite使用

Elastic-Job主要依赖于zookeeper进行分布式协调,zookeeper版本需要在3.4.6以上。

现在开发一个简单的归档功能,把user_log的表数据归档到user_log_bak中。下面是部分实现代码:

首先加入pom依赖:

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>

Main启动类:

public class ElasticJobMain {

    public static void main(String[] args) {
        //注册到Zookeeper
        ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181","data-archive-job");
        CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
        coordinatorRegistryCenter.init();

        //定义任务  分片数为3个
        JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job","*/2 * * * * ?",3)
                .shardingItemParameters("0=0,1=1,2=2").build();
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,TestJob.class.getName());

        JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).build());
        jobScheduler.init();
    }
}

任务类:

public class TestJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        //分片总数
        int shardingTotalCount = shardingContext.getShardingTotalCount();
        //当前分片
        int shardingItem = shardingContext.getShardingItem();
        System.out.println("---->>>当前分片:"+shardingItem);

        //查询一条数据 通过id和分片总数取余等于当前分片来区分该分片需要处理的数据
        String selectSql = "select * from user_log where is_del = 1 and id%?=? limit 1";
        List<Map<String, Object>> list = JdbcUtil.executeQuery(selectSql,shardingTotalCount,shardingItem);
        Map<String, Object> map = list.stream().findFirst().get();
        Integer id = (Integer) map.get("id");
        String name = (String) map.get("name");

        System.out.println("-------->>> id:"+id+" name:"+name);

        //备份这条数据
        String insertSql = "insert into user_log_bak select * from user_log where id=?";
        JdbcUtil.executeUpdate(insertSql,id);

        //删除这条数据
        String updateSql = "delete from user_log where id=?";
        JdbcUtil.executeUpdate(updateSql,id);
    }
}

再把一个jdbc工具类贴上:

public class JdbcUtil {
    //url
    private static String url = "jdbc:mysql://localhost:3306/my-test?characterEncoding=utf8&useSSL=false";
    //user
    private static String user = "root";
    //password
    private static String password = "123456";
    //驱动程序类
    private static String driver = "com.mysql.cj.jdbc.Driver";

    static {
        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

    public static Connection getConnection() {
        try {
            return DriverManager.getConnection(url, user,
                    password);
        } catch (SQLException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void close(ResultSet rs, PreparedStatement ps,
                             Connection con) {
        if (rs != null) {
            try {
                rs.close();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                if (ps != null) {
                    try {
                        ps.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    } finally {
                        if (con != null) {
                            try {
                                con.close();
                            } catch (SQLException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        }
    }

    public static void executeUpdate(String sql, Object... obj) {
        Connection con = getConnection();
        PreparedStatement ps = null;
        try {
            ps = con.prepareStatement(sql);
            for (int i = 0; i < obj.length; i++) {
                ps.setObject(i + 1, obj[i]);
            }
            ps.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            close(null, ps, con);
        }
    }

    public static List<Map<String, Object>> executeQuery(String sql, Object... obj) {
        Connection con = getConnection();
        ResultSet rs = null;
        PreparedStatement ps = null;
        try {
            ps = con.prepareStatement(sql);
            for (int i = 0; i < obj.length; i++) {
                ps.setObject(i + 1, obj[i]);
            }
            rs = ps.executeQuery();
            List<Map<String, Object>> list = new ArrayList<>();
            int count = rs.getMetaData().getColumnCount();
            while (rs.next()) {
                Map<String, Object> map = new HashMap<String, Object>();
                for (int i = 0; i < count; i++) {
                    Object ob = rs.getObject(i + 1);
                    String key = rs.getMetaData().getColumnName(i
                            + 1);
                    map.put(key, ob);
                }
                list.add(map);
            }
            return list;
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            close(rs, ps, con);
        }
        return null;
    }


    public static void executeUpdate(Connection con,String sql, Object... obj) {
        PreparedStatement ps = null;
        try {
            ps = con.prepareStatement(sql);
            for (int i = 0; i < obj.length; i++) {
                ps.setObject(i + 1, obj[i]);
            }
            ps.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            close(null, ps, null);
        }
    }
}

当idea启动多个该程序时,很明现看出来分布式调度的功能的:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
当把其中一个服务器停掉过后,这个服务器的分片任务降会分配到其他服务器执行,可能是服务器2同时执行2个分片任务。

总结

上面这个例子是Elastic-Job原始使用,现在的项目一般都是springboot项目,可以直接使用elastic-job-spring-boot-starter开发,非常的简单,只需要在任务类上使用@ElasticJobConf注解配置,然后在spring配置文件中配置zookeeper信息就可以了。


版权声明:本文为u012387539原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。