前言
接着上一篇博客—Dubbo服务引用的内容,我们分析一次服务调用的过程。我们从消费者和提供者两个部分的视角来看下一次调用的过程原理。
服务消费者

在服务引用之后,我们把 spring.xml 中的配置通过具体的协议转换成了 invoker 对象,并且通过 JavassistProxyFactory 生成了代理类,还会启动 Netty 客户端去连接了服务提供者(默认是在服务真实调用的时候才会去连接服务提供者)。所以服务调用的时候,我们调用的接口其实是个代理类,也就是如下图,demoService 这里真正的是代理类 proxy0,接着像上篇博客的分析,最后会调用到 InvokerInvocationHandler 类执行具体 invoker 的 invoke() 方法,发起RPC调用。本文的内容就是从这开始分析。
我们再看下分享的服务引用和服务调用的实体类关系图。接下来我们就一层一层的剥下来看看调用的流程。
发送Request消息
MockClusterInvoker—服务降级包装
MockClusterInvoker 是通过包装类 MockClusterWrapper 来创建的,MockClusterWrapper 类(实现 Cluster 接口)是通过SPI-AOP 的方式注入的, 会在服务引用的时候执行 cluster.join 这行代码的时候创建 MockClusterInvoker 实例。所以我们会调用到 MockClusterInvoker.invoke() 方法。
public class MockClusterWrapper implements Cluster {
private Cluster cluster;
public MockClusterWrapper(Cluster cluster) {
this.cluster = cluster;
}
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
// new了一个MockClusterInvoker
return new MockClusterInvoker<T>(directory,this.cluster.join(directory));
}
}
MockClusterInvoker 是当配置了 <dubbo:reference mock=“XXX.service”> 的时候用来做服务降级的,当我们没有配置 mock 属性的时候就会直接执行其内部持有的 invoker (默认是 FailoverClusterInvoker)的方法。
// MockClusterInvoker.invoke
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//没有mock属性就调用到这里的invoker
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
...
FailoverClusterInvoker—服务容错
FailoverClusterInvoker 并没有 invoker 方法,invoker 方法在其父类 AbstractClusterInvoker 中,这里也是典型的模版设计的方式。
// AbstractClusterInvoker.invoke
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance = null;
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);//Directory.list获取可用的服务列表
if (invokers != null && !invokers.isEmpty()) {
//负载均衡实现选择
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);//这里调用子类的doInvoke实现
}
- Directory.list 这里获取所有可用的服务列表,Directory 持有 DubboInvoker 的 Map 这里会根据接口信息去 Map 中查找。需要注意的是,在这一步就会调用 Router.route 根据路由规则过滤一部分。
- 然后根据负载均衡 LoadBalance 实现选择负载均衡算法,最后调用子类的 doInvoke 方法,默认也就是 FailoverClusterInvoker.doInvoke。
介绍完了模版类 AbstractClusterInvoker 我们就看下具体的子类实现,就以 FailoverClusterInvoker 为例。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
// FailoverClusterInvoker重试的次数,默认是2次
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
...
for (int i = 0; i < len; i++) {
// 负载均衡
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
Result result = invoker.invoke(invocation);// 发起RPC调用
...
}
}
先进行负载均衡选择,然后选出一个 invoker 发起调用。我们看下最后发起调用的 invoker 实例到底是什么。从下面的图中,我们可以获取到几个重要的点:
真实调用的 DubboInvoker 外面还有两个包装类 ListenerInvokerWrapper 和 ProtocolFilterWrapper ,并且 DubboInvoker 位于过滤链的最尾部;

最后在 DubboInvoker 中含有 NettyClient ,从上图可以清除的看到 consumer 连接的端口和 provider 暴露的 20880 端口。
DubboInvoker—发起调用
最后发起服务调用的是在 DubbokInvoker.doInvoke 方法中,无论是单工通信、异步调用还是同步调用,最终都是通过 currentClient.request(inv, timeout) 发起请求的。
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;// 封装的请求对象
...
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {//单工通信
...
} else if (isAsync) {// 异步调用
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));// 放入线程上下文
return new RpcResult();
} else {// 同步调用
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();// 调用线程直接获取调用结果
}
...
}
异步调用和同步调用的唯一区别就是获取返回值的时机不同(Future.get()直接的地方不同):
- 同步调用,在调用线程中直接取获取返回结果
currentClient.request(inv, timeout).get(),这个时候如果没有结果还未返回,就会让调用线程进行等待状态; - 异步调用是将结果放入 RpcContext ,然后先返回一个没有结果的 RpcResult 给调用线程,后面用户可以自己从 RpcContext 中取异步调用的结果。
// DefaultFuture.get 获取调用结果
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
while (!isDone()) {
// 这里线程进入等待状态
done.await(timeout, TimeUnit.MILLISECONDS);
...
}
}
}
// DefaultFuture.doReceived 这里是当接收到返回结果的时候会唤醒调用线程
private void doReceived(Response res) {
lock.lock();
response = res;
if (done != null) {
done.signal(); // 唤醒调用线程
}
...
}
如果在调用线程中先发起了两次异步调用,再获取异步调用结果的时候,我们只能取到第二次的调用结果,这是因为在 DubboInvoker.doInvoke 的时候会把第一次的调用 Future 给覆盖了。
Netty客户端发送请求
在执行currentClient.request就发起了RPC调用,最终会调用到 NettyChannel 中,这中间包括 Exchange 层封装成 Request 对象、线程派发等处理。
在 NettyChannel.send 之后,就是 Netty 的处理了,主要就是经过一系列的 Handler,然后把数据发出去。这里需要注意几点:
- Handler中最重要的就是编解码器,本文不分析编解码和 dubbo 协议;
- 为了防止各个功能形成很长的 Handler 链,Dubbo 用装饰者模式进行了优化,最终在 Handler 链上的只会有3个(如下图),在 NettyClient 和 NettyServer 上可以看到;
- 服务消费者也是有线程派发策略的,具体分析见下面。

在编解码的时候就涉及 Dubbo 的几种序列化方式,序列化的作用就是发送请求/响应的时候,把Java对象转换成二进制流才能传输,反序列化就是接收请求/响应的时候把二进制流转为Java对象然后再进行业务处理。Dubbo 默认的序列化方式是 hessian2,此外还有 fastjson 和 JDK 的序列化方式。
接收Response消息
服务消费者的 handler 和服务提供者的封装一样。但是在 HeaderExchangeHandler 这里就返回了,我们看下HeaderExchangeHandler 这里的代码。
从下面这段代码我们也能看出消费者和提供者虽然 Handler 都是一样的,但是处理逻辑的不同,在 HeaderExchangeHandler 中如果收到的是 Response响应消息,就不会再去往下调用 handler 了。那么最后我们看下 DefaultFuture 中是如何处理接收到的数据,并通知用户线程的。
public void received(Channel channel, Object message) throws RemotingException {
...
if (message instanceof Request) {
...//
// 服务提供者接收Request请求然后去执行,会接着去请求DubboProtocol里的处理器
handler.received(exchangeChannel, request.getData());
} else if (message instanceof Response) {
handleResponse(channel, (Response) message); // 这里就是消费者接收Response
}
...
}
static void handleResponse(Channel channel, Response response) throws {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
通知用户线程
Dubbo 一个消费者会发起多个不同的 RPC 请求,但当接收返回的时候,如何把不同的响应和之前发送的请求对应起来呢?在 Dubbo 中是通过请求的时候附加 id ,并响应的时候带着这个 id 这种方式实现的。然后从 FUTURES 中获取对应的 future 就可以了。
public static void received(Channel channel, Response response) {
try {
// 根据请求/响应的id从缓存的FUTURES中获取这个请求的future,即对应到哪个请求
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} ...
} finally {
CHANNELS.remove(response.getId());
}
}
剩下最后一步,业务线程中获取 RPC 调用结果,是通过 DefaultFuture.get 的方式(无论是异步/同步,本质都是调用了 get 方法),会让线程进入等待状态,而 future.doReceived(response); 就可以把业务线程唤醒,这里等待唤醒用的是条件锁,ReentrantLock.newCondition。
// DefaultFuture.get 获取调用结果
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
while (!isDone()) {
// 这里线程进入等待状态
done.await(timeout, TimeUnit.MILLISECONDS);
...
}
}
}
// DefaultFuture.doReceived 这里是当接收到返回结果的时候会唤醒调用线程
private void doReceived(Response res) {
lock.lock();
response = res;
if (done != null) {
done.signal(); // 唤醒调用线程
}
...
}
服务提供者

接收Request消息
Nettyhandler
众所周知,Dubbo RPC 的调用走的是 Netty(默认情况),所以服务提供者接收到消息也就是 Netty 服务端接收到消息,所以我们直接看下 NettyServer 的 handler 构造。
可以看到 NettyServer 的 handler 链也是3个,除了编解码器 handler,我们重点看下业务处理相关的 nettyServerHandler。nettyServerHandler 通过装饰者模式进行了封装,我们 debug 一探究竟。
AllChannelHandler
RPC 调用过程中,一方(无论是服务消费者还是提供者都有线程派发策略)接收请求并进行业务处理,如果没有将接收请求的 IO 线程和业务处理线程分开的话,也就是直接用接收 IO 操作的线程去处理业务逻辑,如果业务耗时长,那么就会浪费 IO 线程,造成别的请求不了的问题。
为了应对这种情况,Dubbo 设计了多种线程派发模式,可以将 IO 线程和业务线程分开,让业务代码在别的线程上执行,而不占用 IO 线程资源。支持 5 种线程派发模式:
| 策略 | 用途 |
|---|---|
| all | 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等,默认 |
| direct | 所有消息都不派发到线程池,全部在 IO 线程上直接执行 |
| message | 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行 |
| execution | 只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行 |
| connection | 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池 |
IO线程和业务线程的分离方式
Netty 把接收到消息、建立连接、断开连接等事件都封装成了一个单独的方式,而分离线程或者说,分离不同事件的方式就是基于这个事件机制。
如果要分离 IO 线程和业务线程,就把传入的数据封装成一个 Runnable 然后放到业务线程处理池就可以了,如果不需要分离,就让业务处理在原来的 IO 线程上即可。
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();// 业务处理线程池
try {// 把业务操作提交给线程池就好了
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
...
}
Dubbo 提供了4种业务线程池,默认的是 fixThreadPool ,线程数是200,其他还有 CachedThreadPool、EagerThreadPool、LimitedThreadPool。
HeaderExchangeHandler
在 AllChannelHandler 中分离 IO线程和业务线程,把 Request 消息封装成 ChannelEventRunnable 传给了线程池处理,ChannelEventRunnable 又把任务交给了 HeaderExchangeHandler.received() 执行。
// HeaderExchangeHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
try {
if (message instanceof Request) {
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {// 一般都是双工的,一般的请求走这里
Response response = handleRequest(exchangeChannel, request);//处理消息
channel.send(response);//发送response消息
}
...
}
所以 Request 消息会在 handleRequest() 里处理,并且处理之后会把 Response 消息再发送给消费者,这里注意返回的 Response 会用 Request 的id,为的就是消费者接收 Response 消息的时候,能够对应到当时请求的 Future。
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
// Response中用了request的id
Response res = new Response(req.getId(), req.getVersion());
try {
Object result = handler.reply(channel, msg);//交给DubboProtocol中的handler处理
res.setStatus(Response.OK);
res.setResult(result);
}
return res;
}
DubboProtocol$1
接下来我们在看一下 DubboProtocol$1 是如何去调用实际的接口实现类的,答案在 DubboProtocol 内部的 requestHandler.reply 方法中。如下面的注释,我们需要关注两个地方:如何获取 invoker 对象和使用 invoker 对象去调用真实的服务接口实现类。
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);// 获取 invoker
...
return invoker.invoke(inv); //这里就是调用真实的接口实现
}
}
getInvoker 获取 invoker对象很简单,就是根据端口、接口名、版本、分组从 exporterMap中找到对应的 exporter,(之前服务暴露的时候把 key=端口、接口名、版本、分组,value=exporter 放入了 exporterMap 中),DubboExporter 中持有 AbstractProxyInvoker。看下上面的实体关系图就明了了。
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
...
String serviceKey = serviceKey(port, path,inv.getAttachments().get(Constants.VERSION_KEY),inv.getAttachments().get(Constants.GROUP_KEY));
// 根据端口、接口名、版本、分组 从 exporterMap中找到对应的 exporter,然后在获取 invoker
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}
接着 AbstractProxyInvoker 中持有 wrapper0 ,wrapper0 中又持有真实的服务实现 demoServiceImpl,这样就可以实现服务调用了。
发送Response消息
在完成了服务调用之后,会把结果封装成 Response 对象,然后通过 channel.send 的方式把结果返回给服务消费者,这里的内容和消费者发送差不多,不做过多分析了。
//HeaderExchangeHandler.received()
channel.send(response);