在工作中同步转异步处理的场景非常多,例如日志记录,图片异步上传等。sleuth中trace链路+ELK收集,可以让我们很直观看到整个完整的链路,我们知道sleuth中在同一线程的链路的传递是通过ThreadLocal来做到的,而http或rpc调用这种方式的链路一般通过添加header头来进行传递的(这个需要我们去自己实现对http请求的拦截处理-当然我们不需要重复造轮子,spring已经帮我们把这些都实现了,我们只需要进行调用正常调用即可),那么如果我们创建一个线程pool,并异步执行数据,那么原线程中的trace信息能否传递到新的线程之中。
我们创建一个线程ThreadPoolTaskExecutor来进行测试(在一个有spring cloud sleuth的环境中测试)。
@Bean
public ThreadPoolExecutor threadPoolExecutor(){
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
return threadPoolExecutor;
} @GetMapping(value = "test")
public JsonResult<Boolean> test(){
log.info("进入代码哈哈哈哈哈!!!!");
threadPoolExecutor.execute(()->{
log.info("这是threadPoolExecutor异步执行的哈哈哈哈哈哈");
});
return JsonResult.success();
}最终的结果我们发现:

trace在两个线程中进行传递了。为何出现这个情况的,我们在被执行的代码中可以debug下发现:

发现在执行的方法栈中多出了TraceRunnable类,但是我们在使用时候并没有创建该类,联想下最终的结果,我们先做出一个猜测,spring使用TraceRunnable 该类对我们正在执行的Runnable进行了代理,最终达到了链路跨线程传递的。
查看TraceRunnable源码发现,该类在以下instrunment下:


public class TraceRunnable implements Runnable {
//默认
private static final String DEFAULT_SPAN_NAME = "async";
//整个链路对象
private final Tracer tracer;
//被代理的对象
private final Runnable delegate;
//传递来的上一个span 上下文对象
private final TraceContext parent;
//新生成的span名
private final String spanName;
public TraceRunnable(Tracing tracing, SpanNamer spanNamer, Runnable delegate) {
this(tracing, spanNamer, delegate, null);
}
//初始化
public TraceRunnable(Tracing tracing, SpanNamer spanNamer, Runnable delegate,
String name) {
this.tracer = tracing.tracer();
this.delegate = delegate;
this.parent = tracing.currentTraceContext().get();
this.spanName = name != null ? name : spanNamer.name(delegate, DEFAULT_SPAN_NAME);
}
@Override
public void run() {
//感觉上一个span创建一个写的span
ScopedSpan span = this.tracer.startScopedSpanWithParent(this.spanName,
this.parent);
try {
//执行代理类方法
this.delegate.run();
}
catch (Exception | Error e) {
span.error(e);
throw e;
}
finally {
span.finish();
}
}
}那么该TraceRunnable在什么时候被spring进行替换的呢?在TraceRunnable的构造方法上进行加入断点debug发现了TraceableExecutorService的踪影

TraceableExecutorService该类实现了ExecutorService接口(而我们使用的ThreadPoolExecutor也是实现该接口)并重写了对应的核心方法,就拿excute() 方法来说,它的执行逻辑很简单,就是创建TraceRunnable代理类,传入span,trace对象并执行被代理的TraceableExecutorService对应方法

而TraceableExecutorService又在什么时候被创建,并且传入被代理对象的,继续debug追踪

从该类的命名可以发现 它是一个interceptor拦截类,该类实现了MethodInterceptor(这个接口是springaop中动态代理的核心执行接口类),如下所示:
class ExecutorMethodInterceptor<T extends Executor> implements MethodInterceptor {
//被代理类 实际上就是上文创建的pool实例
private final T delegate;
//spring的Bean工厂类
private final BeanFactory beanFactory;
ExecutorMethodInterceptor(T delegate, BeanFactory beanFactory) {
this.delegate = delegate;
this.beanFactory = beanFactory;
}
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
//构建executor
T executor = executor(this.beanFactory, this.delegate);
//被执行的方法
Method methodOnTracedBean = getMethod(invocation, executor);
if (methodOnTracedBean != null) {
try {
//反射调用method
return methodOnTracedBean.invoke(executor, invocation.getArguments());
}
catch (InvocationTargetException ex) {
// gh-1092: throw the target exception (if present)
Throwable cause = ex.getCause();
throw (cause != null) ? cause : ex;
}
}
return invocation.proceed();
}
private Method getMethod(MethodInvocation invocation, Object object) {
Method method = invocation.getMethod();
return ReflectionUtils.findMethod(object.getClass(), method.getName(),
method.getParameterTypes());
}
T executor(BeanFactory beanFactory, T executor) {
return (T) new LazyTraceExecutor(beanFactory, executor);
}
}ExecutorMethodInterceptor类与ExecutorBeanPostProcessor(实现BeanPostProcessor接口)类在一块,我们从该类的命名可以看出原对象的代理对象在应该在ExecutorBeanPostProcessorBean执行完成之后被创建的。
ExecutorBeanPostProcessorBean的创建在AsyncDefaultAutoConfiguration自动装配类中构建:

而ExecutorBeanPostProcessor中postProcessAfterInitialization()中就有对各种ThreadPool执行器的代理对象的创建
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
boolean alreadyTraced = alreadyTraced(bean);
//被代理对象为ThreadPoolTaskExecutor
if (bean instanceof ThreadPoolTaskExecutor && !alreadyTraced) {
if (isProxyNeeded(beanName)) {
return wrapThreadPoolTaskExecutor(bean);
}
else {
log.info("Not instrumenting bean " + beanName);
}
}
//被代理对象为ScheduledExecutorService子类
else if (bean instanceof ScheduledExecutorService && !alreadyTraced) {
if (isProxyNeeded(beanName)) {
return wrapScheduledExecutorService(bean);
}
else {
log.info("Not instrumenting bean " + beanName);
}
}
//被代理对象为ExecutorService 子类
else if (bean instanceof ExecutorService && !alreadyTraced) {
if (isProxyNeeded(beanName)) {
return wrapExecutorService(bean);
}
else {
log.info("Not instrumenting bean " + beanName);
}
}
//被代理对象为AsyncTaskExecutor 子类
else if (bean instanceof AsyncTaskExecutor && !alreadyTraced) {
if (isProxyNeeded(beanName)) {
return wrapAsyncTaskExecutor(bean);
}
else {
log.info("Not instrumenting bean " + beanName);
}
}
//被代理对象为Executor 子类
else if (bean instanceof Executor && !alreadyTraced) {
return wrapExecutor(bean);
}
return bean;
}
Object createThreadPoolTaskExecutorProxy(Object bean, boolean cglibProxy,
ThreadPoolTaskExecutor executor) {
if (!cglibProxy) {
//静态代理
return new LazyTraceThreadPoolTaskExecutor(this.beanFactory, executor);
}
//cglib动态代理
return getProxiedObject(bean, cglibProxy, executor,
() -> new LazyTraceThreadPoolTaskExecutor(this.beanFactory, executor));
}从上述代码中可以看出,spring把我们常用不同种类的线程池都做构建具体的代理类(感觉具体的实现决定是使用cglib动态代理还是通过已有的类(已实现的类如下所示)进行静态代理)

当然上文是对于Runnable的支持,同时也对Callable提供了对应的支持。关于此可自行研究源码