使用java Future模式异步调用详细实例展示

    java Future模式想必大家都比较熟悉,大体实现起来也比较简单,因为模式单一,我先介绍一下一般步骤,再讲一下,目前项目中遇到具体问题的解决方式

    一般来说,使用java Future模式实现多线程,具体步骤如下,

    1.新建一个异步任务类,如 xxxTask 实现 Callable<xxxTask.Result>(或者Runnable<xxx>)

    2.重写call方法,返回值为xxxTask.Result

    3.调用时,使用线程池调用,如TaskExxcutor.exe()即可

具体场景可能比较复杂,我说一下目前我使用比较规范的写法,

准备工作:

    1.写好线程池调用:

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created on 2019/1/9
 * 使用线程池异步处理任务
 *
 * @author lvyunxiao
 */
@Data
@Slf4j
@NoArgsConstructor
public class TaskExecutor {

    /**
     * 线程池核心池的大小
     */
    private static final int CORE_POOL_SIZE = 50;

    /**
     * 线程池的最大线程数
     */
    private static final int MAXIMUM_POOL_SIZE = 100;

    /**
     * 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
     */
    private static final long KEEP_ALIVE_TIME = 60;

    /**
     * keepAliveTime 的时间单位
     */
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

    /**
     * 用来储存等待执行任务的队列
     */
    private static final BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingDeque<>(1024);

    private static final String NAME_FORMATE = "alpha-thread-factory-%d";

    /**
     * 线程工厂
     */
    private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
            .setNameFormat(NAME_FORMATE).build();

    /**
     * 拒绝策略
     */
    private static final RejectedExecutionHandler HANDLER = new ThreadPoolExecutor.DiscardOldestPolicy();

    /**
     * 创建线程池
     */
    private static final ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAXIMUM_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TIME_UNIT,
            WORK_QUEUE,
            THREAD_FACTORY,
            HANDLER
    );

    /**
     * 使用执行器
     */
    private static final ExecutorService EXEC = THREAD_POOL_EXECUTOR;

    /**
     * 执行不返回参数的任务
     *
     * @param task 任务
     */
    public static void submit(Runnable task) {
        EXEC.submit(task);
    }

    /**
     * 执行返回参数的任务
     *
     * @param task 任务
     * @param <V>  参数
     * @return 返回值
     */
    public static <V> Future<V> submit(Callable<V> task) {
        return EXEC.submit(task);
    }

}

具体参数解释的很清楚了

2.具体场景的bean类

如,我遇到了一个展示历史任务状态展示的列表,数据需要从mesos/marathon中取,实时展示

样例如图:

为此,编写bean:

/**
 * Created on 2019/1/27
 *
 * @author lvyunxiao
 */
@Data
@NoArgsConstructor
public class ResourceStatus {

    private int id;
    private String type;
    private String role;
    private String appId;
    private String startTime;
    private String endTime;
    private String state;
    private String image;
    private String host;
    private double cpu;
    private double mem;
    private String dockerName;
    private String logPath;
    private String basePath;
    private double cpuUtilization;
    private double memUtilization;
    private String memUse;
    private String cluster;
    private String date;

    @Override
    public String toString() {
        return FastJsonUtils.toJSONString(this);
    }

}

然后,编写任务Task类,实现Callable接口,顺便,需要什么方法和参数也加进去

/**
 * Created on 2019/1/29
 *
 * @author lvyunxiao
 */
@Data
@Slf4j
public class HistoryResTask implements Callable<HistoryResTask.HistoryResTaskResult> {

    private final String keyword;
    private final String cluster;
    private final String mesosSlavesHost;

    public HistoryResTask(final String keyword, final String cluster, final String mesosSlavesHost) {
        this.cluster = cluster;
        this.keyword = keyword;
        this.mesosSlavesHost = mesosSlavesHost;
    }

    /**
     * get role by app's id
     *
     * @param appId app id
     * @return role
     */
    private String getRoleByAppId(String appId) {
        if (appId.contains(SCHEDULER)) {
            return SCHEDULER;
        } else {
            return SWORKER;
        }
    }

    /**
     * get state map from list of Statues
     *
     * @param statuses list of statues
     * @return state map
     * eg:
     * {
     * "state":"TASK_xxx",
     * "start":"2018-01-01 xx:xx:xx",
     * "end":"2019-12-31 xx:xx:xx",
     * }
     */
    private ResourceMagServiceImpl.StateMap extractStatues(List<Statuses> statuses) {
        ResourceMagServiceImpl.StateMap stateMap = new ResourceMagServiceImpl.StateMap();
        for (Statuses status : statuses) {
            String state = status.getState();
            final double timestamp = status.getTimestamp();
            final String timestampFormat = DateUtils.getDateFromDoubleTimeStamp(timestamp);
            if (TASK_RUNNING.equals(state)) {
                stateMap.setStart(timestampFormat);
            } else {
                stateMap.setState(state);
                stateMap.setEnd(timestampFormat);
            }
        }
        if (StringUtils.isNotEmpty(stateMap.getStart()) && StringUtils.isEmpty(stateMap.getState())) {
            stateMap.setState(TASK_RUNNING);
            stateMap.setEnd(DEFAULT_END_TIME);
        }
        if (StringUtils.isEmpty(stateMap.getStart())) {
            stateMap.setStart(DEFAULT_START_TIME);
        }
        return stateMap;
    }

    /**
     * get log path from completedtask
     *
     * @param task completed task
     * @return log path
     */
    private String getLogPathFromTask(CompletedTasks task) {
        String logPath = "";
        Container container = task.getContainer();
        if (ObjectUtils.notEqual(container, null)) {
            List<Volumes> volumes = container.getVolumes();
            if (CollectionUtils.isNotEmpty(volumes)) {
                for (Volumes volume : volumes) {
                    logPath = volume.getHostPath();
                }
            }
        }
        return logPath;
    }

    @Override
    public HistoryResTaskResult call() throws Exception {
        List<ResourceStatus> resourceStatuses = Lists.newArrayList();
        ResourceStatus resourceStatus;
        // async
        MesosSlaveState mesosSlaveState = ClusterInfoUtils.getMesosSlaveState(mesosSlavesHost);
        List<Frameworks> frameworks = mesosSlaveState.getFrameworks();
        for (Frameworks framework : frameworks) {
            List<CompletedExecutors> completedExecutors = framework.getCompletedExecutors();
            for (CompletedExecutors completedExecutor : completedExecutors) {
                // base path
                String directory = completedExecutor.getDirectory();
                String id = completedExecutor.getId();
                if (!id.contains(keyword)) {
                    continue;
                }
                List<CompletedTasks> completedTasks = completedExecutor.getCompletedTasks();
                for (CompletedTasks completedTask : completedTasks) {
                    for (CompletedTasks task : completedTasks) {
                        resourceStatus = new ResourceStatus();
                        // role
                        resourceStatus.setRole(getRoleByAppId(task.getId()));
                        resourceStatus.setCluster(cluster);
                        // host basePath logPath
                        resourceStatus.setHost(mesosSlavesHost);
                        resourceStatus.setLogPath(getLogPathFromTask(task));
                        resourceStatus.setBasePath(directory);
                        // cpu memory
                        Resources resources = task.getResources();
                        resourceStatus.setCpu(resources.getCpus());
                        resourceStatus.setMem(resources.getMem());
                        // type
                        resourceStatus.setType(ResourceMagType.history.getType());
                        List<Statuses> statuses = task.getStatuses();
                        // start end state
                        ResourceMagServiceImpl.StateMap stateMap = extractStatues(statuses);
                        resourceStatus.setState(stateMap.getState());
                        resourceStatus.setStartTime(stateMap.getStart());
                        resourceStatus.setEndTime(stateMap.getEnd());
                        // add it
                        resourceStatuses.add(resourceStatus);
                    }
                }
            }
        }
        return new HistoryResTaskResult(resourceStatuses);
    }

    @NoArgsConstructor
    @AllArgsConstructor
    @Data
    public static class HistoryResTaskResult {
        private List<ResourceStatus> resourceStatuses;
    }

}

最后,service层写方法调用

    /**
     * get history status
     *
     * @param
     * @return list of ResourceStatus
     * @throws Exception
     */
    private List<ResourceStatus> getHistoryStatusFromClusterAsync(String keyword, String cluster) throws Exception {
        List<ResourceStatus> ret = Lists.newArrayList();
        ResourceStatus resourceStatus;
        String mesosUrl = MESOS_MAP.getOrDefault(cluster, WANGJING_MARATHONS);
        Set<String> mesosSlavesHosts = ClusterInfoUtils.getMesosSlavesHosts(mesosUrl);
        List<Future<HistoryResTask.HistoryResTaskResult>> futureList = Lists.newArrayListWithExpectedSize(mesosSlavesHosts.size());
        for (final String mesosSlavesHost : mesosSlavesHosts) {
            // new history resource mag task
            HistoryResTask historyResTask = new HistoryResTask(keyword, cluster, mesosSlavesHost);
            // submit task
            Future<HistoryResTask.HistoryResTaskResult> future = TaskExecutor.submit(historyResTask);
            futureList.add(future);
        }
        for (final Future<HistoryResTask.HistoryResTaskResult> future : futureList) {
            HistoryResTask.HistoryResTaskResult resTaskResult = future.get();
            ret.addAll(resTaskResult.getResourceStatuses());
        }
        return ret;
    }

这样异步多线程调用,由于异步多线程的使用,使得之前需要从多台机器节点上的取信息异步进行,节省了很多时间

编写测试执行时间的用例:

    /**
     * get resource management information by app id
     *
     * @param id appid
     * @return resource management information
     * @throws Exception
     */
    @Override
    public List<ResourceStatus> getResourceMagByAppId(String id) throws Exception {
        AppInfo appInfo = appInfoService.findById(id);
        String keyword = appInfo.getKeyword();
        String cluster = appInfo.getCluster().toString();
        //List<ResourceStatus> ret = getCurrentRunningStatus(keyword, cluster);
        List<ResourceStatus> ret = getCurrentRunningStatusFromCluster(keyword, cluster);
        //List<ResourceStatus> historyStatus = getHistoryStatus(id);
        long start = System.currentTimeMillis();
        //List<ResourceStatus> historyStatus = getHistoryStatusFromCluster(keyword, cluster);
        List<ResourceStatus> historyStatus = getHistoryStatusFromClusterAsync(keyword, cluster);
        long end = System.currentTimeMillis();
        log.info("get history status use time {} ms", end - start);
        ret.addAll(historyStatus);
        return ret;
    }

本次任务需要从200多台服务器组成的mesos集群取任务状态信息,比较异步调用前后的使用时间:

使用前:

使用后:

这个是本地IDE测试,上传服务器调用后,时间更短:

由此看出,在数据量比较大,或者需要分布式调用时,还是很有必要这样写的


版权声明:本文为l1212xiao原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。