Invoker接口
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
* 当前执行器的服务接口是哪一个
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
* 执行请求操作
*/
Result invoke(Invocation invocation) throws RpcException;
}
org.apache.dubbo.rpc.protocol.AbstractInvoker
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
//判断系统是否关闭
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
//
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
//设置所有的RPCContext中的附加信息
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addObjectAttachmentsIfAbsent(attachment);
}
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
* invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
* because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
* by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
* a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
*/
invocation.addObjectAttachments(contextAttachments);
}
//设置执行模式
invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
//设置请求ID,主要用于异步
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
AsyncRpcResult asyncResult;
try {
//调用子类 执行
asyncResult = (AsyncRpcResult) doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
}
} catch (RpcException e) {
//异常
if (e.isBiz()) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
} else {
throw e;
}
} catch (Throwable e) {
asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
}
//设置执行的结果信息
RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
return asyncResult;
}
org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker
protected Result doInvoke(final Invocation invocation) throws Throwable {
//获取执行的方法名
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
//设置请求的PATH
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
//客户端 网络
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
//判断是否需要返回值 异步请求
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
//超时时间
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
// 如果不需要返回值信息(异步)
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
//发送命令
currentClient.send(inv, isSent);
//返回告诉是异步请求
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
//同步于请求-获取真正执行的线程池(ThreadPool中的SPI)
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
//发送请求并且等待结果
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
//2.6.x中使用,设置完成的额结果信息
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
//创建新的结果信息并且返回
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
ExchangeClient
// 真实的发送请求信息
// request: RPCInvocation
// timeout: 超时
// executor: 业务线程池
CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException
HeaderExchangeClient
hannel 会交 HeaderExchangeChannel 来进行封装
public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor);
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
//创建一个新的request对象
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
//创建一个执行结果的回调信息处理
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
// 交给真正的业务渠道进行处理
// 这里的渠道是交给Transporter这个SPI进行创建的,其中我们的NettyChannel就是在这里产生的
channel.send(req);
} catch (RemotingException e) {
//请求出现异常则取消当前的请求封装
future.cancel();
throw e;
}
return future;
}
版权声明:本文为ko0491原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。