java分布式调度框架_基于Redis的分布式Java任务执行和调度框架

Redisson是使用Redis实现分布式任务执行和调度的Java开源项目,它是通过标准JDK的ExecutorService和ScheduledExecutorService API实现的,被提交的任务在Redisson节点服务器上运行,它们共同连接同样的Redis数据库。

3115d281f8541628b6a2bfc0063906c6.png

Redisson节点

Redisson节点是标准的Java应用,唯一目标就是执行被提交的任务,每个Redisson节点可以看成是分布式环境中远程worker。

它也可以通过一个Redisson实例在主要应用中跨多个进程。

所有任务都是动态加载,这样你不必将任务放在classpath或者每次任务改变时重启。

任务

一个任务应该实现接口java.util.concurrent.Callable or java.lang.Runnable interface.

下面是Callable 接口实现案例:

public class CallableTask implements Callable {

@RInject

private RedissonClient redissonClient;

private long anyParam;

public CallableTask() { }

public CallableTask(long anyParam) {

this.anyParam = anyParam;

}

@Override

public Long call() throws Exception { // ... } }

下面是Runnable 接口实现:

public class RunnableTask implements Runnable {

@RInject

private RedissonClient redissonClient;

private long anyParam;

public RunnableTask() { }

public RunnableTask(long anyParam) {

this.anyParam = anyParam;

}

@Override

public void run() { // ... } }

任务能够通过构造器赋予参数,一个被提交的任务能够通过@RInject访问Redisson实例。

任务提交执行是通过提交给ExecutorService API,RExecutorService 已经实现了 java.util.concurrent.ExecutorService:

RExecutorService executorService = redisson.getExecutorService("myExecutor");

executorService.submit(new RunnableTask());// or with parameterexecutorService.submit(new RunnableTask(41)); executorService.submit(new CallableTask());// or with parameterexecutorService.submit(new CallableTask(53));

使用 java.util.concurrent.ScheduledExecutorService 递交给调度执行:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");

executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES);// orexecutorService.schedule(new RunnableTask(), 5, TimeUnit.SECONDS);

executorService.scheduleAtFixedRate(new RunnableTask(), 10, 25, TimeUnit.HOURS);// orexecutorService.scheduleWithFixedDelay(new RunnableTask(), 5, 10, TimeUnit.HOURS);

递交给CRON定时执行,兼容于 Quartz cron 格式:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");

executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));// orexecutorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));// orexecutorService.schedule(new RunnableTask(),

CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

任务取消:

Future> f = executorService.schedule(...);

// orFuture> f = executorService.submit(...);

f.cancel(true);

任务取消类似执行Java的线程中断。

下面是将所有值聚合在大的Redis map中,这个过程会花费很长时间:

public class CallableTask implements Callable {

@RInject

private RedissonClient redissonClient;

@Override

public Long call() throws Exception {

RMap map = redissonClient.getMap("myMap");

Long result = 0;

for (Integer value : map.values()) {// check if task has been canceledif (Thread.currentThread().isInterrupted()) {// task has been canceledreturn null;

}

result += value;

}

return result;

} }

提交任务给ExecutorService是同步的,处理结果接受可以使用异步的,这是通过标准java.util.concurrent.Future实现。Redisson也提供一系列方法异步提交任务:RExecutorServiceAsync.*Async。

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");

RFuture future = executorService.submitAsync(new CallableTask());// orRScheduledFuture future = executorService.scheduleAsync(new RunnableTask(), 5, TimeUnit.SECONDS);// orRScheduledFuture future = executorService.scheduleAtFixedRateAsync(new RunnableTask(), 10, 25, TimeUnit.HOURS);

future.addListener(new FutureListener() {

public void operationComplete(Future f) {// ... }});// cancel task by idString taskId = future.getId();// ...executorService.cancelScheduledTask(taskId);

Redisson构成了对另外一个分布式内存数据网格产品Hazelcast的挑战。

项目地址:


版权声明:本文为weixin_30227775原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。