一、背景
对于java开发中,十万百万级别的list进行各种操作,然后入库等操作好多时候会遇到。普通单线程处理,处理时间长,还经常报gc问题。针对此问题,查阅了网上很多资料,好多都使用多线程来处理。跟着好多的博客进行处理,要么是线程安全问题,要么根本速度就提高不了。我针对我项目中的使用场景,结合资料进行了修改,特提交此文,为有共同需求的小伙伴提供一点建议。
本次使用场景为:从数据库中查阅数十万数据生成list,将list进行遍历转json,拼接属性,组转成新list批量入库es.
二、代码如下:
//从数据库中查询需要同步的航班
List<Map> fvaluesList = abss.queryListFvaluesOfDays(start, end);
long beginTime = System.currentTimeMillis();
// 开始时间
long start1 = System.currentTimeMillis();
// 每500条数据开启一条线程
int threadSize = 5000;
// 总数据条数
int dataSize = fvaluesList.size();
// 线程数
int threadNum = dataSize / threadSize + 1;
// 定义标记,过滤threadNum为整数
boolean special = dataSize % threadSize == 0;
// 创建一个线程池
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
// 定义一个任务集合
List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
Callable<Integer> task = null;
CopyOnWriteArrayList<Map> cutList = null;
final int[] num = {0};
// 确定每条线程的数据
for (int n = 0; n < threadNum; n++) {
if (n == threadNum - 1) {
if (special) {
break;
}
cutList = new CopyOnWriteArrayList(fvaluesList.subList(threadSize * n, dataSize).toArray()) ;
} else {
cutList = new CopyOnWriteArrayList(fvaluesList.subList(threadSize * n, threadSize * (n + 1)).toArray());
}
final CopyOnWriteArrayList<Map> listStr = cutList;
task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(Thread.currentThread().getName());
CopyOnWriteArrayList<SqaEsFvalues> esfvalues = new CopyOnWriteArrayList<SqaEsFvalues>();
for (Map fvalues : listStr) {
if (fvalues.get("items") != null || !fvalues.get("items").equals("")) {
JSONObject items = JSONObject.parseObject(fvalues.get("items").toString());
//items转Map
Map<String, Object> map = (Map<String, Object>) items;
//当map1和map中有相同字段,则保留map中的,丢弃map1中的
for (Object key : fvalues.keySet()) {
if (map.containsKey(key)) {
map.remove(key);
}
}
fvalues.putAll(map);
fvalues.remove("items");
}
String jsonString = JSON.toJSONString(fvalues);
//把map系列化到对象中
SqaEsFvalues esfvalue = JSON.parseObject(jsonString, SqaEsFvalues.class);
esfvalues.add(esfvalue);
}
synchronized (fvaluesRepository){
if (!esfvalues.isEmpty()) {
num[0] +=esfvalues.size();
System.out.println(num[0]);
fvaluesRepository.saveAll(esfvalues);
}
}
return 1;
}
};
// 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
tasks.add(task);
}
System.out.println("tasksSize=" + tasks.size());
List<Future<Integer>> results = exec.invokeAll(tasks);
for (Future<Integer> future : results) {
System.out.println("results.size=" + results.size());
// System.out.println(future.get());
}
// 关闭线程池
exec.shutdown();
System.out.println("线程任务执行结束");
System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start1) + "毫秒");三、特别说明
1.使用CopyOnWriteArrayList替代Arraylist,因为ArrayList是线程不安全的。
2.在存入es时候,会出现线程安全问题,使用synchronized代码块处理。

四、效果
本机实验,二百万数据花半小时成功进入es.数据有待提高,其中从数据库中一次查询几百万数据用时较长,可以进一步优化。
版权声明:本文为qq_28025423原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。