java Future模式想必大家都比较熟悉,大体实现起来也比较简单,因为模式单一,我先介绍一下一般步骤,再讲一下,目前项目中遇到具体问题的解决方式
一般来说,使用java Future模式实现多线程,具体步骤如下,
1.新建一个异步任务类,如 xxxTask 实现 Callable<xxxTask.Result>(或者Runnable<xxx>)
2.重写call方法,返回值为xxxTask.Result
3.调用时,使用线程池调用,如TaskExxcutor.exe()即可
具体场景可能比较复杂,我说一下目前我使用比较规范的写法,
准备工作:
1.写好线程池调用:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Created on 2019/1/9
* 使用线程池异步处理任务
*
* @author lvyunxiao
*/
@Data
@Slf4j
@NoArgsConstructor
public class TaskExecutor {
/**
* 线程池核心池的大小
*/
private static final int CORE_POOL_SIZE = 50;
/**
* 线程池的最大线程数
*/
private static final int MAXIMUM_POOL_SIZE = 100;
/**
* 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
*/
private static final long KEEP_ALIVE_TIME = 60;
/**
* keepAliveTime 的时间单位
*/
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
/**
* 用来储存等待执行任务的队列
*/
private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingDeque<>(1024);
private static final String NAME_FORMATE = "alpha-thread-factory-%d";
/**
* 线程工厂
*/
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
.setNameFormat(NAME_FORMATE).build();
/**
* 拒绝策略
*/
private static final RejectedExecutionHandler HANDLER = new ThreadPoolExecutor.DiscardOldestPolicy();
/**
* 创建线程池
*/
private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAXIMUM_POOL_SIZE,
KEEP_ALIVE_TIME,
TIME_UNIT,
WORK_QUEUE,
THREAD_FACTORY,
HANDLER
);
/**
* 使用执行器
*/
private static final ExecutorService EXEC = THREAD_POOL_EXECUTOR;
/**
* 执行不返回参数的任务
*
* @param task 任务
*/
public static void submit(Runnable task) {
EXEC.submit(task);
}
/**
* 执行返回参数的任务
*
* @param task 任务
* @param <V> 参数
* @return 返回值
*/
public static <V> Future<V> submit(Callable<V> task) {
return EXEC.submit(task);
}
}
具体参数解释的很清楚了
2.具体场景的bean类
如,我遇到了一个展示历史任务状态展示的列表,数据需要从mesos/marathon中取,实时展示
样例如图:

为此,编写bean:
/**
* Created on 2019/1/27
*
* @author lvyunxiao
*/
@Data
@NoArgsConstructor
public class ResourceStatus {
private int id;
private String type;
private String role;
private String appId;
private String startTime;
private String endTime;
private String state;
private String image;
private String host;
private double cpu;
private double mem;
private String dockerName;
private String logPath;
private String basePath;
private double cpuUtilization;
private double memUtilization;
private String memUse;
private String cluster;
private String date;
@Override
public String toString() {
return FastJsonUtils.toJSONString(this);
}
}然后,编写任务Task类,实现Callable接口,顺便,需要什么方法和参数也加进去
/**
* Created on 2019/1/29
*
* @author lvyunxiao
*/
@Data
@Slf4j
public class HistoryResTask implements Callable<HistoryResTask.HistoryResTaskResult> {
private final String keyword;
private final String cluster;
private final String mesosSlavesHost;
public HistoryResTask(final String keyword, final String cluster, final String mesosSlavesHost) {
this.cluster = cluster;
this.keyword = keyword;
this.mesosSlavesHost = mesosSlavesHost;
}
/**
* get role by app's id
*
* @param appId app id
* @return role
*/
private String getRoleByAppId(String appId) {
if (appId.contains(SCHEDULER)) {
return SCHEDULER;
} else {
return SWORKER;
}
}
/**
* get state map from list of Statues
*
* @param statuses list of statues
* @return state map
* eg:
* {
* "state":"TASK_xxx",
* "start":"2018-01-01 xx:xx:xx",
* "end":"2019-12-31 xx:xx:xx",
* }
*/
private ResourceMagServiceImpl.StateMap extractStatues(List<Statuses> statuses) {
ResourceMagServiceImpl.StateMap stateMap = new ResourceMagServiceImpl.StateMap();
for (Statuses status : statuses) {
String state = status.getState();
final double timestamp = status.getTimestamp();
final String timestampFormat = DateUtils.getDateFromDoubleTimeStamp(timestamp);
if (TASK_RUNNING.equals(state)) {
stateMap.setStart(timestampFormat);
} else {
stateMap.setState(state);
stateMap.setEnd(timestampFormat);
}
}
if (StringUtils.isNotEmpty(stateMap.getStart()) && StringUtils.isEmpty(stateMap.getState())) {
stateMap.setState(TASK_RUNNING);
stateMap.setEnd(DEFAULT_END_TIME);
}
if (StringUtils.isEmpty(stateMap.getStart())) {
stateMap.setStart(DEFAULT_START_TIME);
}
return stateMap;
}
/**
* get log path from completedtask
*
* @param task completed task
* @return log path
*/
private String getLogPathFromTask(CompletedTasks task) {
String logPath = "";
Container container = task.getContainer();
if (ObjectUtils.notEqual(container, null)) {
List<Volumes> volumes = container.getVolumes();
if (CollectionUtils.isNotEmpty(volumes)) {
for (Volumes volume : volumes) {
logPath = volume.getHostPath();
}
}
}
return logPath;
}
@Override
public HistoryResTaskResult call() throws Exception {
List<ResourceStatus> resourceStatuses = Lists.newArrayList();
ResourceStatus resourceStatus;
// async
MesosSlaveState mesosSlaveState = ClusterInfoUtils.getMesosSlaveState(mesosSlavesHost);
List<Frameworks> frameworks = mesosSlaveState.getFrameworks();
for (Frameworks framework : frameworks) {
List<CompletedExecutors> completedExecutors = framework.getCompletedExecutors();
for (CompletedExecutors completedExecutor : completedExecutors) {
// base path
String directory = completedExecutor.getDirectory();
String id = completedExecutor.getId();
if (!id.contains(keyword)) {
continue;
}
List<CompletedTasks> completedTasks = completedExecutor.getCompletedTasks();
for (CompletedTasks completedTask : completedTasks) {
for (CompletedTasks task : completedTasks) {
resourceStatus = new ResourceStatus();
// role
resourceStatus.setRole(getRoleByAppId(task.getId()));
resourceStatus.setCluster(cluster);
// host basePath logPath
resourceStatus.setHost(mesosSlavesHost);
resourceStatus.setLogPath(getLogPathFromTask(task));
resourceStatus.setBasePath(directory);
// cpu memory
Resources resources = task.getResources();
resourceStatus.setCpu(resources.getCpus());
resourceStatus.setMem(resources.getMem());
// type
resourceStatus.setType(ResourceMagType.history.getType());
List<Statuses> statuses = task.getStatuses();
// start end state
ResourceMagServiceImpl.StateMap stateMap = extractStatues(statuses);
resourceStatus.setState(stateMap.getState());
resourceStatus.setStartTime(stateMap.getStart());
resourceStatus.setEndTime(stateMap.getEnd());
// add it
resourceStatuses.add(resourceStatus);
}
}
}
}
return new HistoryResTaskResult(resourceStatuses);
}
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class HistoryResTaskResult {
private List<ResourceStatus> resourceStatuses;
}
}
最后,service层写方法调用
/**
* get history status
*
* @param
* @return list of ResourceStatus
* @throws Exception
*/
private List<ResourceStatus> getHistoryStatusFromClusterAsync(String keyword, String cluster) throws Exception {
List<ResourceStatus> ret = Lists.newArrayList();
ResourceStatus resourceStatus;
String mesosUrl = MESOS_MAP.getOrDefault(cluster, WANGJING_MARATHONS);
Set<String> mesosSlavesHosts = ClusterInfoUtils.getMesosSlavesHosts(mesosUrl);
List<Future<HistoryResTask.HistoryResTaskResult>> futureList = Lists.newArrayListWithExpectedSize(mesosSlavesHosts.size());
for (final String mesosSlavesHost : mesosSlavesHosts) {
// new history resource mag task
HistoryResTask historyResTask = new HistoryResTask(keyword, cluster, mesosSlavesHost);
// submit task
Future<HistoryResTask.HistoryResTaskResult> future = TaskExecutor.submit(historyResTask);
futureList.add(future);
}
for (final Future<HistoryResTask.HistoryResTaskResult> future : futureList) {
HistoryResTask.HistoryResTaskResult resTaskResult = future.get();
ret.addAll(resTaskResult.getResourceStatuses());
}
return ret;
}这样异步多线程调用,由于异步多线程的使用,使得之前需要从多台机器节点上的取信息异步进行,节省了很多时间
编写测试执行时间的用例:
/**
* get resource management information by app id
*
* @param id appid
* @return resource management information
* @throws Exception
*/
@Override
public List<ResourceStatus> getResourceMagByAppId(String id) throws Exception {
AppInfo appInfo = appInfoService.findById(id);
String keyword = appInfo.getKeyword();
String cluster = appInfo.getCluster().toString();
//List<ResourceStatus> ret = getCurrentRunningStatus(keyword, cluster);
List<ResourceStatus> ret = getCurrentRunningStatusFromCluster(keyword, cluster);
//List<ResourceStatus> historyStatus = getHistoryStatus(id);
long start = System.currentTimeMillis();
//List<ResourceStatus> historyStatus = getHistoryStatusFromCluster(keyword, cluster);
List<ResourceStatus> historyStatus = getHistoryStatusFromClusterAsync(keyword, cluster);
long end = System.currentTimeMillis();
log.info("get history status use time {} ms", end - start);
ret.addAll(historyStatus);
return ret;
}
本次任务需要从200多台服务器组成的mesos集群取任务状态信息,比较异步调用前后的使用时间:
使用前:

使用后:

这个是本地IDE测试,上传服务器调用后,时间更短:

由此看出,在数据量比较大,或者需要分布式调用时,还是很有必要这样写的
版权声明:本文为l1212xiao原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。