线程池使用——多层线程嵌套如何保证子线程全部执行完毕再做后续操作及java.util.concurrent.RejectedExecutionException报错解决

当使用线程池的时候,如果多层线程嵌套使用,在子线程提交下一级子线程没有执行完毕的情况下结束线程池,就会报错java.util.concurrent.RejectedExecutionException:也就是shutdown线程池之后又去提交线程就会报改错

如下:

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@425db396 rejected from java.util.concurrent.ThreadPoolExecutor@657c8ad9[Shutting down, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
	at cn.com.yusys.AsynchronyAThread.run(AsynchronyAThread.java:55)
	at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

这个是因为一级子线程里去操作提交二级子线程,但最外面的主线程在一级子线程提交结束之后就去shutdown屏蔽了线程池,导致报错。如何解决呢?最简单的方式就是用一个AtomicInteger标记参数,一级线程提交前加一,一级子线程提交二级子线程之后减一,最后在主线程里CAS比较AtomicInteger标记参数做判断。

如下是一个实际示例:

最开始的构想如图,最后没有采用,因为IO瓶颈阻隔一级线程,共用一个线程池减少资源消耗。

主线程代码:

package cn.com.yusys;

import com.yc.client.ccts.core.syscfg.PropertiesConfig;
import com.yc.client.core.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author wxming.
 * @Description TODO.
 * @date 2020-8-20.
 */
public class AsynchronyThreadMain {
    private static final Logger log = LoggerFactory.getLogger(AsynchronyThreadMain.class);
    static AtomicInteger atomicInteger = new AtomicInteger();

    public static void main(String[] args) {
        int a = atomicInteger.get();
        long startTime = System.currentTimeMillis();
        long aThreadEndTime = 0;
        //1.1 加载自定义配置文件
        PropertiesConfig proNew = CommonUtil.getPropertiesConfig();
        //1.2 加载命令行参数
        //java -jar imsdatamove.jar 100 /home/ycms/imageindex.txt
        int cSize = proNew.getIntConfig("threadSize").intValue();
        String readListPath = proNew.getStringConfig("readListPath");
        if(args.length>1){
            cSize = Integer.parseInt(args[0]);
            readListPath = args[1];
        }
        CommonUtil.setcSize(cSize);
        log.info("线程数:{},流水号清单文件地址:{}", cSize,readListPath);
        //1.3初始化线程池
        //ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(cSize);
        ThreadPoolExecutor executor = CommonUtil.creatPool(cSize);
        //2、读取清单
        File file = new File(readListPath);
        FileInputStream inputStream = null;
        InputStreamReader reader = null;
        BufferedReader bufferedReader = null;
        try {
            inputStream = new FileInputStream(file);
            reader = new InputStreamReader(inputStream,"UTF-8");
            bufferedReader = new BufferedReader(reader);
            String strAPPL = "";
            String strBsiSH = proNew.getStringConfig("strBsiSH");
            while ((strAPPL = bufferedReader.readLine()) != null){
                if(StringUtil.isEmpty(strAPPL)){
                    break;
                }
                log.info("开始业务流水号:{}",strAPPL);
                //线程中传入proNew、strAPPL
                //Runnable runnable = new AllMoveThread(proNew,strAPPL,strBsiSH);
                Runnable runnable = new AsynchronyAThread(proNew,strAPPL,strBsiSH,atomicInteger);
                CommonUtil.forExecutor(executor,cSize);
                atomicInteger.incrementAndGet();//不能到里面加
                executor.submit(runnable);

            }

            for (;;) {
                log.debug("检测线程是否全部提交,此时原子值{}",atomicInteger.get());
                boolean isCasSuccess = atomicInteger.compareAndSet(a, 0);
                if(isCasSuccess){
                    aThreadEndTime = System.currentTimeMillis();
                    log.info("=======对比数据线程全部结束,全部提交到下载上传线程,用时{}毫秒=========",aThreadEndTime - startTime);
                    //结束线程池
                    executor.shutdown();
                    //等待子线程全部执行完毕(最大等待时间:1 Day)
                    executor.awaitTermination(1, TimeUnit.DAYS);
                    break;
                }else{
                    Thread.sleep(10);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            CommonUtil.toDoFinally(inputStream, reader, bufferedReader);
        }
        long endTime = System.currentTimeMillis();
        log.info("结束用时{}毫秒,其中最后只剩下载上传耗时{}毫秒" , endTime - startTime,endTime - aThreadEndTime);
    }
}

一级子线程代码:

package cn.com.yusys;

import com.yc.ccts.CCTSFileInfo;
import com.yc.client.ccts.CCTSClientAPI;
import com.yc.client.ccts.core.syscfg.PropertiesConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author wxming.
 * @Description TODO.
 * @date 2020-8-20.
 */
public class AsynchronyAThread implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(AsynchronyAThread.class);

    private PropertiesConfig proNew;
    private String strAPPL;
    private String strBsiSH;
    private AtomicInteger atomicInteger;

    public AsynchronyAThread(PropertiesConfig proNew, String strAPPL,String strBsiSH,AtomicInteger atomicInteger) {
        this.proNew = proNew;
        this.strAPPL = strAPPL;
        this.strBsiSH = strBsiSH;
        this.atomicInteger = atomicInteger;
    }

    @Override
    public void run() {
        try {
            //1.1 老系统配置初始化 开始
            CCTSClientAPI tcOld = CommonUtil.getOldClientAPI();
            //1.3 新系统配置初始化
            CCTSClientAPI tcNew = CommonUtil.getNewClientApi(proNew);
            //4. 业务参数设置
            Map<String, Object> mapParm = CommonUtil.getParmMap(proNew, strAPPL);

            //5、获取老系统的
            String xmlGet = tcOld.documentOpen(mapParm);
            Map<String, CCTSFileInfo> mapFileParmOld = new HashMap<String, CCTSFileInfo>();
            boolean bResult = tcOld.documentGetFiles(mapFileParmOld);
            //6、获取新系统该业务批次清单.移除重复数据。
            String xmlPut = tcNew.documentCreate(mapParm);
            Map<String, CCTSFileInfo> mapFileParmNew = new HashMap<String, CCTSFileInfo>();
            boolean bResultNew = tcNew.documentGetFiles(mapFileParmNew);
            if(mapFileParmNew.size()>0){
                for (Map.Entry<String, CCTSFileInfo> entry:mapFileParmNew.entrySet()) {
                    mapFileParmOld.remove(entry.getValue().getFileOrgName());
                }
            }
            if(mapFileParmOld.size()>0){
                Runnable runnable = new AsynchronyBThread(tcOld, tcNew, mapFileParmOld ,strBsiSH);
                CommonUtil.forExecutor(CommonUtil.getExecutor(),CommonUtil.getcSize());
                CommonUtil.getExecutor().submit(runnable);
            }else{
                log.info("业务流水号:{}文件全部存在",strAPPL);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        atomicInteger.decrementAndGet();
    }
}

二级子线程代码:

无关

 

 

 

 


版权声明:本文为itwxming原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。