Dubbo-Invoker执行逻辑

Invoker接口


public interface Invoker<T> extends Node {

    /**
     * get service interface.
     *
     * @return service interface.
     *  当前执行器的服务接口是哪一个
     */
    Class<T> getInterface();

    /**
     * invoke.
     *
     * @param invocation
     * @return result
     * @throws RpcException
     * 执行请求操作
     */
    Result invoke(Invocation invocation) throws RpcException;

}

org.apache.dubbo.rpc.protocol.AbstractInvoker


   public Result invoke(Invocation inv) throws RpcException {
        // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
        //判断系统是否关闭
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
        //
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        //设置所有的RPCContext中的附加信息
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addObjectAttachmentsIfAbsent(attachment);
        }

        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            /**
             * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here,
             * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered
             * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is
             * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information).
             */
            invocation.addObjectAttachments(contextAttachments);
        }
        //设置执行模式
        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        //设置请求ID,主要用于异步
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        AsyncRpcResult asyncResult;
        try {
            //调用子类 执行
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            Throwable te = e.getTargetException();
            if (te == null) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                if (te instanceof RpcException) {
                    ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
                }
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, te, invocation);
            }
        } catch (RpcException e) {
            //异常
            if (e.isBiz()) {
                asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
            } else {
                throw e;
            }
        } catch (Throwable e) {
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
        //设置执行的结果信息
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;
    }

org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker


    protected Result doInvoke(final Invocation invocation) throws Throwable {
        //获取执行的方法名
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        //设置请求的PATH
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);
        //客户端 网络
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            //判断是否需要返回值 异步请求
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            //超时时间
            int timeout = calculateTimeout(invocation, methodName);
            if (isOneway) {
                // 如果不需要返回值信息(异步)
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                //发送命令
                currentClient.send(inv, isSent);
                //返回告诉是异步请求
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                //同步于请求-获取真正执行的线程池(ThreadPool中的SPI)
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                //发送请求并且等待结果
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                //2.6.x中使用,设置完成的额结果信息
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                //创建新的结果信息并且返回
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

ExchangeClient

// 真实的发送请求信息
// request: RPCInvocation
// timeout: 超时
// executor: 业务线程池
  CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException

HeaderExchangeClient

hannel 会交 HeaderExchangeChannel 来进行封装

 public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
        return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor);
    }

    @Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        //创建一个新的request对象
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        //创建一个执行结果的回调信息处理
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
         // 交给真正的业务渠道进行处理
            // 这里的渠道是交给Transporter这个SPI进行创建的,其中我们的NettyChannel就是在这里产生的
            channel.send(req);
        } catch (RemotingException e) {
        //请求出现异常则取消当前的请求封装
            future.cancel();
            throw e;
        }
        return future;
    }

版权声明:本文为ko0491原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。