springclou中Sleuth异步中trace传递

在工作中同步转异步处理的场景非常多,例如日志记录,图片异步上传等。sleuth中trace链路+ELK收集,可以让我们很直观看到整个完整的链路,我们知道sleuth中在同一线程的链路的传递是通过ThreadLocal来做到的,而http或rpc调用这种方式的链路一般通过添加header头来进行传递的(这个需要我们去自己实现对http请求的拦截处理-当然我们不需要重复造轮子,spring已经帮我们把这些都实现了,我们只需要进行调用正常调用即可),那么如果我们创建一个线程pool,并异步执行数据,那么原线程中的trace信息能否传递到新的线程之中。

我们创建一个线程ThreadPoolTaskExecutor来进行测试(在一个有spring cloud sleuth的环境中测试)。

@Bean
    public ThreadPoolExecutor threadPoolExecutor(){

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());

        return threadPoolExecutor;

    }
 @GetMapping(value = "test")
    public JsonResult<Boolean> test(){

        log.info("进入代码哈哈哈哈哈!!!!");
        
        threadPoolExecutor.execute(()->{
            log.info("这是threadPoolExecutor异步执行的哈哈哈哈哈哈");
        });

        return JsonResult.success();
    }

最终的结果我们发现:

 trace在两个线程中进行传递了。为何出现这个情况的,我们在被执行的代码中可以debug下发现:

发现在执行的方法栈中多出了TraceRunnable类,但是我们在使用时候并没有创建该类,联想下最终的结果,我们先做出一个猜测,spring使用TraceRunnable 该类对我们正在执行的Runnable进行了代理,最终达到了链路跨线程传递的。

查看TraceRunnable源码发现,该类在以下instrunment下:

 

public class TraceRunnable implements Runnable {

	//默认
	private static final String DEFAULT_SPAN_NAME = "async";
    //整个链路对象
	private final Tracer tracer;
    //被代理的对象
	private final Runnable delegate;
    //传递来的上一个span 上下文对象
	private final TraceContext parent;
    //新生成的span名
	private final String spanName;

	public TraceRunnable(Tracing tracing, SpanNamer spanNamer, Runnable delegate) {
		this(tracing, spanNamer, delegate, null);
	}
    //初始化
	public TraceRunnable(Tracing tracing, SpanNamer spanNamer, Runnable delegate,
			String name) {
		this.tracer = tracing.tracer();
		this.delegate = delegate;
		this.parent = tracing.currentTraceContext().get();
		this.spanName = name != null ? name : spanNamer.name(delegate, DEFAULT_SPAN_NAME);
	}

	@Override
	public void run() {
        //感觉上一个span创建一个写的span
		ScopedSpan span = this.tracer.startScopedSpanWithParent(this.spanName,
				this.parent);
		try {
            //执行代理类方法
			this.delegate.run();
		}
		catch (Exception | Error e) {
			span.error(e);
			throw e;
		}    
		finally {
			span.finish();
		}
	}

}

 那么该TraceRunnable在什么时候被spring进行替换的呢?在TraceRunnable的构造方法上进行加入断点debug发现了TraceableExecutorService的踪影

 TraceableExecutorService该类实现了ExecutorService接口(而我们使用的ThreadPoolExecutor也是实现该接口)并重写了对应的核心方法,就拿excute() 方法来说,它的执行逻辑很简单,就是创建TraceRunnable代理类,传入span,trace对象并执行被代理的TraceableExecutorService对应方法

而TraceableExecutorService又在什么时候被创建,并且传入被代理对象的,继续debug追踪

 从该类的命名可以发现 它是一个interceptor拦截类,该类实现了MethodInterceptor(这个接口是springaop中动态代理的核心执行接口类),如下所示:

class ExecutorMethodInterceptor<T extends Executor> implements MethodInterceptor {

	//被代理类 实际上就是上文创建的pool实例
    private final T delegate;
    
    //spring的Bean工厂类
	private final BeanFactory beanFactory;

	ExecutorMethodInterceptor(T delegate, BeanFactory beanFactory) {
		this.delegate = delegate;
		this.beanFactory = beanFactory;
	}

	@Override
	public Object invoke(MethodInvocation invocation) throws Throwable {
        //构建executor
		T executor = executor(this.beanFactory, this.delegate);
        //被执行的方法
		Method methodOnTracedBean = getMethod(invocation, executor);
		if (methodOnTracedBean != null) {
			try {
                //反射调用method
				return methodOnTracedBean.invoke(executor, invocation.getArguments());
			}
			catch (InvocationTargetException ex) {
				// gh-1092: throw the target exception (if present)
				Throwable cause = ex.getCause();
				throw (cause != null) ? cause : ex;
			}
		}
		return invocation.proceed();
	}

	private Method getMethod(MethodInvocation invocation, Object object) {
		Method method = invocation.getMethod();
		return ReflectionUtils.findMethod(object.getClass(), method.getName(),
				method.getParameterTypes());
	}

	T executor(BeanFactory beanFactory, T executor) {
		return (T) new LazyTraceExecutor(beanFactory, executor);
	}

}

ExecutorMethodInterceptor类与ExecutorBeanPostProcessor(实现BeanPostProcessor接口)类在一块,我们从该类的命名可以看出原对象的代理对象在应该在ExecutorBeanPostProcessorBean执行完成之后被创建的。

ExecutorBeanPostProcessorBean的创建在AsyncDefaultAutoConfiguration自动装配类中构建:

 而ExecutorBeanPostProcessor中postProcessAfterInitialization()中就有对各种ThreadPool执行器的代理对象的创建

public Object postProcessAfterInitialization(Object bean, String beanName)
			throws BeansException {
		boolean alreadyTraced = alreadyTraced(bean);
                //被代理对象为ThreadPoolTaskExecutor 
		if (bean instanceof ThreadPoolTaskExecutor && !alreadyTraced) {
			if (isProxyNeeded(beanName)) {
				return wrapThreadPoolTaskExecutor(bean);
			}
			else {
				log.info("Not instrumenting bean " + beanName);
			}
		}
                 //被代理对象为ScheduledExecutorService子类 
		else if (bean instanceof ScheduledExecutorService && !alreadyTraced) {
			if (isProxyNeeded(beanName)) {
				return wrapScheduledExecutorService(bean);
			}
			else {
				log.info("Not instrumenting bean " + beanName);
			}
		}
                //被代理对象为ExecutorService 子类  
		else if (bean instanceof ExecutorService && !alreadyTraced) {
			if (isProxyNeeded(beanName)) {
				return wrapExecutorService(bean);
			}
			else {
				log.info("Not instrumenting bean " + beanName);
			}
		}
                  //被代理对象为AsyncTaskExecutor 子类  

		else if (bean instanceof AsyncTaskExecutor && !alreadyTraced) {
			if (isProxyNeeded(beanName)) {
				return wrapAsyncTaskExecutor(bean);
			}
			else {
				log.info("Not instrumenting bean " + beanName);
			}
		}
                //被代理对象为Executor 子类  
		else if (bean instanceof Executor && !alreadyTraced) {
			return wrapExecutor(bean);
		}
		return bean;
	}

        Object createThreadPoolTaskExecutorProxy(Object bean, boolean cglibProxy,
			ThreadPoolTaskExecutor executor) {
		if (!cglibProxy) {
                        //静态代理
			return new LazyTraceThreadPoolTaskExecutor(this.beanFactory, executor);
		}   
                        //cglib动态代理
		return getProxiedObject(bean, cglibProxy, executor,
				() -> new LazyTraceThreadPoolTaskExecutor(this.beanFactory, executor));
	}

从上述代码中可以看出,spring把我们常用不同种类的线程池都做构建具体的代理类(感觉具体的实现决定是使用cglib动态代理还是通过已有的类(已实现的类如下所示)进行静态代理)

当然上文是对于Runnable的支持,同时也对Callable提供了对应的支持。关于此可自行研究源码


版权声明:本文为F_Hello_World原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。