CompletableFuture多线程批量异步处理
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author: baifan
* @date: 2021/3/8
*/
public class DemoTest {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 5, 3000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5));
List<Integer> ids = new ArrayList<>();
ids.add(1);
ids.add(2);
ids.add(3);
CompletableFuture<Integer>[] completableFutures = ids.stream().map(id -> {
return CompletableFuture.supplyAsync(() -> getNewId(id), threadPoolExecutor);
}).toArray(CompletableFuture[]::new);
// 等待所有任务执行完
CompletableFuture.allOf(completableFutures).join();
for (CompletableFuture<Integer> completableFuture : completableFutures) {
try {
System.out.println(completableFuture.get());
} catch (Exception e) {
e.printStackTrace();
}
}
return;
}
public static int getNewId(int id) {
return id + 3;
}
}
定时任务批处理
public class OrderManualPushJob {
/**
* 手动推送订单查询服务
*/
@Autowired
private HotelOrderManualPushServie hotelOrderManualPushServie;
@Resource(name ="asyncOrderPushServiceExecutor")
private ExecutorService asyncExecutor;
@Override
public void veExecute(ShardingContext shardingContext) {
long t0 = System.currentTimeMillis();
//1.从表中查询出1000条数据处理
List<OrderManualPush> orderManualPushList = hotelOrderManualPushServie.selectUnhandleList();
if(CollectionUtils.isEmpty(orderManualPushList)){
return;
}
long t1 = System.currentTimeMillis();
//每条子线程处理数据的数量
int perCount = 100;
//拆分集合,每个100
List<List<OrderManualPush>> partitions = Lists.partition(orderManualPushList, perCount);
logger.info("批量处理多线程开始,本次处理的订单数量:{}",orderManualPushList.size());
logger.info("多线程数量:{}",partitions.size());
List<CompletableFuture> futures = Lists.newArrayList();
for(List<OrderManualPush> dataList : partitions){
List<OrderManualPush> finalDataList = dataList;
CompletableFuture<List<OrderManualPush>> future = CompletableFuture.supplyAsync(() -> pushOrder(finalDataList),asyncExecutor);
futures.add(future);
}
//等待所有线程执行完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
long t2 = System.currentTimeMillis();
logger.info("批量处理多线程结束,总耗时:{}ms",t2 - t1);
//获取线程返回结果,封装集合
List<OrderManualPush> updateList = new ArrayList<>();
futures.stream().forEach(future->{
try {
List<OrderManualPush> list = (List<OrderManualPush>) future.get();
if(CollectionUtils.isNotEmpty(list)){
updateList.addAll(list);
}
}catch (Exception e){
logger.error("获取多线程返回结果数据异常",e);
}
});
hotelOrderManualPushServie.updateBatchList(updateList);
}
/**
* 推送中间表
* @param dataList
* @return
*/
private List<OrderManualPush> pushOrder(List<OrderManualPush> dataList){
logger.info("当前线程:{},处理数据量:{},开始时间{}",Thread.currentThread().getName(),dataList.size(),System.currentTimeMillis());
List<OrderManualPush> updateList = Lists.newArrayList();
for (OrderManualPush order : dataList){
OrderManualPush updateBean = BeanMapper.map(order,OrderManualPush.class);
try {
//调用推送中间表服务
hotelZjbService.datafz(order.getDdbh(),order.getDdlx(),new FcmqBody());
updateBean.setState("1");
}catch (Exception e){
updateBean.setState("2");
}
updateBean.setUpdateTime(VeDate.getNow());
updateList.add(updateBean);
}
logger.info("当前线程:{},处理数据量:{},结束时间{}",Thread.currentThread().getName(),dataList.size(),System.currentTimeMillis());
return updateList;
}
}
版权声明:本文为t194978原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。