一个任务分解成多个子任务每个子任务分配一个线程处理(多线程处理一批大数据)

1、首先需要理解 CountDownLatch:

CountDownLatch中count down是倒数的意思,latch则是门闩的含义。整体含义可以理解为倒数的门栓,似乎有一点“三二一,芝麻开门”的感觉。CountDownLatch的作用也是如此,在构造CountDownLatch的时候需要传入一个整数n,在这个整数“倒数”到0之前,主线程需要等待在门口,而这个“倒数”过程则是由各个执行线程驱动的,每个线程执行完一个任务“倒数”一次。总结来说,CountDownLatch的作用就是等待其他的线程都执行完任务,必要时可以对各个任务的执行结果进行汇总,然后主线程才继续往下执行。

        CountDownLatch主要有两个方法:countDown()和await()。countDown()方法用于使计数器减一,其一般是执行任务的线程调用,await()方法则使调用该方法的线程处于等待状态,其一般是主线程调用。这里需要注意的是,countDown()方法并没有规定一个线程只能调用一次,当同一个线程调用多次countDown()方法时,每次都会使计数器减一;另外,await()方法也并没有规定只能有一个线程执行该方法,如果多个线程同时执行await()方法,那么这几个线程都将处于等待状态,并且以共享模式享有同一个锁。

2、代码实列、我的项目是要处理一批数据然后插入到数据库表中之前单线程处理的时候因为涉及到数据的加工筛选然后在把符合的数据插入表中,现在我先把需要处理的数据一个list把它每500个分为一组,然后开启一个线程去处理插入数据库表中、最后所有分组的数据也就是多个线程把数据都处理完之后主线程返回处理结果。

   //每个线程处理的数据量
    private static final int count = 500;

    //定义线程池数量为8,每个线程处理500条数据
    private static ExecutorService execPool = Executors.newFixedThreadPool(8);

    private CountDownLatch threadsSignal;

 /**
     * 多线程批量执行插入,百万数据需要大约不到20秒   64位4核处理
     * 需要插入数据库的数据  limodel
     *
     * @return
     */
  public JsonData batchAddData(List<Entity> limodel) {
        JsonData jd = new JsonData();
        try {
            if (limodel.size() <= count) {
                threadsSignal = new CountDownLatch(1);
                //往线程池里面放创建的子线程
                execPool.submit(new InsertDate(limodel));
            } else {
                List<List<Entity>> li = createList(limodel, count);
                threadsSignal = new CountDownLatch(li.size());
                for (List<Entity> liop : li) {
                    execPool.submit(new InsertDate(liop));
                }
            }
            //主线程调用了await()方法,此时主线程将在此处等待创建的多个子线程执行完任务之后才继续往下执行
            threadsSignal.await();
        } catch (Exception e) {
            // TODO: handle exception
            log.error("异常:" + e.toString());
            //execPool.shutdownNow();
        }
        return jd;
    }

    /**
     * 数据拆分
     *
     * @param targe
     * @param size
     * @return
     */
    public static List<List<Entity>> createList(List<Entity> targe, int size) {
        List<List<Entity>> listArr = new ArrayList<List<Entity>>();
        //获取被拆分的数组个数
        int arrSize = targe.size() % size == 0 ? targe.size() / size : targe.size() / size + 1;
        for (int i = 0; i < arrSize; i++) {
            List<Entity> sub = new ArrayList<Entity>();
            //把指定索引数据放入到list中
            for (int j = i * size; j <= size * (i + 1) - 1; j++) {
                if (j <= targe.size() - 1) {
                    sub.add(targe.get(j));
                }
            }
            listArr.add(sub);
        }
        return listArr;
    }


    /**
     * 内部类,开启线程批量保存数据
     */
    class InsertDate extends Thread {

        List<Entity> lientity = new ArrayList<Entity>();

        public InsertDate(List<Entity> list1) {
           try{
              if (list1.size() > 0 && list1 != null) {
                 for (Entity pu : list1) {
                    //数据处理逻辑省略...........
                 }
               }
           }catch(Exception e){
             log.error("格式化数据异常:" + e); 
           }
            
        }


        public void run() {
            try {
                dao.save(lientity);
            } catch (Exception e) {
                log.error("多线程插入数据异常:" + e);
            } finally {
                //每开启一个线程计数器减一,注意保证线程执行是否异 
                //常都要进行减一,不然的话主线程会一直等threadsSignal等于0的时候才开始执行下面 
                //操作
               threadsSignal.countDown();
            }
        }
    }

 CountDownLatch详解


版权声明:本文为qq_28643437原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。