多线程处理十万百万级List(大list处理)

一、背景

       对于java开发中,十万百万级别的list进行各种操作,然后入库等操作好多时候会遇到。普通单线程处理,处理时间长,还经常报gc问题。针对此问题,查阅了网上很多资料,好多都使用多线程来处理。跟着好多的博客进行处理,要么是线程安全问题,要么根本速度就提高不了。我针对我项目中的使用场景,结合资料进行了修改,特提交此文,为有共同需求的小伙伴提供一点建议。

    本次使用场景为:从数据库中查阅数十万数据生成list,将list进行遍历转json,拼接属性,组转成新list批量入库es.

二、代码如下:

//从数据库中查询需要同步的航班
                List<Map> fvaluesList = abss.queryListFvaluesOfDays(start, end);
                long beginTime = System.currentTimeMillis();
                // 开始时间
                long start1 = System.currentTimeMillis();
                // 每500条数据开启一条线程
                int threadSize = 5000;
                // 总数据条数
                int dataSize = fvaluesList.size();
                // 线程数
                int threadNum = dataSize / threadSize + 1;
                // 定义标记,过滤threadNum为整数
                boolean special = dataSize % threadSize == 0;
                // 创建一个线程池
                ExecutorService exec = Executors.newFixedThreadPool(threadNum);
                // 定义一个任务集合
                List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
                Callable<Integer> task = null;
                CopyOnWriteArrayList<Map> cutList = null;
                final int[] num = {0};
                // 确定每条线程的数据
                for (int n = 0; n < threadNum; n++) {
                        if (n == threadNum - 1) {
                            if (special) {
                                break;
                            }
                            cutList = new CopyOnWriteArrayList(fvaluesList.subList(threadSize * n, dataSize).toArray()) ;
                        } else {
                            cutList = new CopyOnWriteArrayList(fvaluesList.subList(threadSize * n, threadSize * (n + 1)).toArray());
                        }
                        final CopyOnWriteArrayList<Map> listStr = cutList;
                        task = new Callable<Integer>() {
                            @Override
                            public Integer call() throws Exception {
                                System.out.println(Thread.currentThread().getName());
                                CopyOnWriteArrayList<SqaEsFvalues> esfvalues = new CopyOnWriteArrayList<SqaEsFvalues>();
                                for (Map fvalues : listStr) {
                                    if (fvalues.get("items") != null || !fvalues.get("items").equals("")) {
                                        JSONObject items = JSONObject.parseObject(fvalues.get("items").toString());
                                        //items转Map
                                        Map<String, Object> map = (Map<String, Object>) items;
                                        //当map1和map中有相同字段,则保留map中的,丢弃map1中的
                                        for (Object key : fvalues.keySet()) {
                                            if (map.containsKey(key)) {
                                                map.remove(key);
                                            }
                                        }
                                        fvalues.putAll(map);
                                        fvalues.remove("items");
                                    }
                                    String jsonString = JSON.toJSONString(fvalues);
                                    //把map系列化到对象中
                                    SqaEsFvalues esfvalue = JSON.parseObject(jsonString, SqaEsFvalues.class);
                                    esfvalues.add(esfvalue);
                                }
                                synchronized (fvaluesRepository){
                                    if (!esfvalues.isEmpty()) {
                                        num[0] +=esfvalues.size();
                                        System.out.println(num[0]);
                                        fvaluesRepository.saveAll(esfvalues);
                                    }
                                }

                                return 1;
                            }
                        };
                        // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系
                        tasks.add(task);
                    }
                System.out.println("tasksSize=" + tasks.size());
                List<Future<Integer>> results = exec.invokeAll(tasks);
                for (Future<Integer> future : results) {
                    System.out.println("results.size=" + results.size());
                    //       System.out.println(future.get());
                }
                // 关闭线程池
                exec.shutdown();
                System.out.println("线程任务执行结束");
                System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start1) + "毫秒");

三、特别说明

1.使用CopyOnWriteArrayList替代Arraylist,因为ArrayList是线程不安全的。

2.在存入es时候,会出现线程安全问题,使用synchronized代码块处理。

四、效果

本机实验,二百万数据花半小时成功进入es.数据有待提高,其中从数据库中一次查询几百万数据用时较长,可以进一步优化。


版权声明:本文为qq_28025423原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。