TaskDecorator解决父子线程间传递上下文数据

一般同步编程模型中我们使用ThreadLocal即可,但是在异步编程模型中(可能有同学有疑问,为什么不用InheritThreadLocal?看这篇记一次线上踩坑实录)会导致上下文失效。

但是spring 4.3给出了好的方案,利用TaskDecorator。

看这个名称大概就能猜出是一个装饰器设计原理

我们分析下线程池的源码

@Override
	protected ExecutorService initializeExecutor(
			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

		ThreadPoolExecutor executor;
        //当线程池的装饰器不为空时,执行execute方法会进入这里,因为它重写了execute方法
		if (this.taskDecorator != null) {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler) {
                
                //这里是一个代理设计模式的实现,对execute做了一层代理
				@Override
				public void execute(Runnable command) {
                    //执行装饰器的逻辑,注意这段代码是在主线程中运行
					Runnable decorated = taskDecorator.decorate(command);
					if (decorated != command) {
						decoratedTaskMap.put(decorated, command);
					}
                    //子线程真正执行的方法(异步模块)...初始化核心线程数,核心线程满了入队列,队列满开启至最大线程数
					super.execute(decorated);
				}
			};
		}
		else {
			executor = new ThreadPoolExecutor(
					this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
					queue, threadFactory, rejectedExecutionHandler);

		}

		if (this.allowCoreThreadTimeOut) {
			executor.allowCoreThreadTimeOut(true);
		}

		this.threadPoolExecutor = executor;
		return executor;
	}

demo演示

主线程16个,子线程2个,执行10次,目的是尽可能让子线程复用。

@Test
    public void testThreadLocal() {
        ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
        ExecutorService mainThreadPool = Executors.newFixedThreadPool(16);

        ThreadPoolTaskExecutor childThreadPool = new ThreadPoolTaskExecutor();
        childThreadPool.setCorePoolSize(2);
        childThreadPool.setMaxPoolSize(2);

        childThreadPool.setTaskDecorator(runnable -> {
            int v = threadLocal.get();
            System.out.println("装饰器中获取到主线程=" + Thread.currentThread().getName() + " 获取上下文=" + v);
            return () -> {
                try {
                    //重新copy传递给子线程
                    threadLocal.set(v);
                    runnable.run();
                } finally {
                    threadLocal.remove();
                }
            };
        });
        childThreadPool.initialize();

        for (int i = 0; i < 10; i++) {
            int finalI = i;
            mainThreadPool.execute(() -> {
                //模拟在主线程设置上下文变量
                threadLocal.set(finalI);
                childThreadPool.execute(() -> System.out.println("子线程" + Thread.currentThread().getName() + " 获取上下文变量=" + threadLocal.get()));
                threadLocal.remove();
            });
        }
        try {
            childThreadPool.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

 可以看到,我们传递的值都能正确获取到,没有像inheritThreadLocal出现紊乱。

2020-12-03 10:59:26.350 [main] INFO  o.s.s.c.ThreadPoolTaskExecutor - Initializing ExecutorService
装饰器中获取到主线程=pool-1-thread-1 获取上下文=0
装饰器中获取到主线程=pool-1-thread-2 获取上下文=1
装饰器中获取到主线程=pool-1-thread-3 获取上下文=2
装饰器中获取到主线程=pool-1-thread-4 获取上下文=3
装饰器中获取到主线程=pool-1-thread-5 获取上下文=4
装饰器中获取到主线程=pool-1-thread-6 获取上下文=5
装饰器中获取到主线程=pool-1-thread-8 获取上下文=7
装饰器中获取到主线程=pool-1-thread-10 获取上下文=9
装饰器中获取到主线程=pool-1-thread-9 获取上下文=8
装饰器中获取到主线程=pool-1-thread-7 获取上下文=6

Disconnected from the target VM, address: '127.0.0.1:4991', transport: 'socket'
子线程ThreadPoolTaskExecutor-1 获取上下文变量=2
子线程ThreadPoolTaskExecutor-1 获取上下文变量=3
子线程ThreadPoolTaskExecutor-1 获取上下文变量=1
子线程ThreadPoolTaskExecutor-1 获取上下文变量=0
子线程ThreadPoolTaskExecutor-1 获取上下文变量=5
子线程ThreadPoolTaskExecutor-1 获取上下文变量=7
子线程ThreadPoolTaskExecutor-1 获取上下文变量=9
子线程ThreadPoolTaskExecutor-1 获取上下文变量=8
子线程ThreadPoolTaskExecutor-1 获取上下文变量=6
子线程ThreadPoolTaskExecutor-2 获取上下文变量=4

Process finished with exit code 0


版权声明:本文为lmx1989219原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。