超详细Dubbo服务导出源码解读(二)

服务导出部分源码太多,因此分成两部分编写,前半部分请参考:超详细Dubbo服务导出源码解读,本文将接着上部分但协议多注册中心源码继续解读(版本2.7.7)。

1. 导出核心逻辑

前文已经分析到服务导出逻辑,所以本文我们首先分析导出服务的具体核心逻辑:

/**
	 * 导出服务到本地(及jvm导出)
	 */
	private void exportLocal(URL url) {
		//组装导出地址,协议为injvm,主机127.0.0.0 即injvm://127.0.0.0
		URL local = URLBuilder.from(url).setProtocol(LOCAL_PROTOCOL).setHost(LOCALHOST_VALUE).setPort(0).build();
		//使用SPI自适应拓展机制,获取ProxyFactory和Protocol的实现。
		//使用ProxyFactory生成导出服务代理的实现,默认实现JavassistProxy
		//使用Protocol调用服务导出逻辑,默认实现是DubboProtocol
		Exporter<?> exporter = PROTOCOL.export(PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
		//添加导出服务到缓存中
		exporters.add(exporter);
		logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
	}
		// 导出到远程的部分源码
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
	//如果存在,则为注册地址添加代理实现参数
	registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//为服务引用生成Invoker对象
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//生成提供者和配置包装Invoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker,this);
//通过SPI自适应拓展获取Protocol的拓展实现,调用导出方法
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
//添加到导出器缓存
exporters.add(exporter);

根据上面的两段源码,可以看出导出到本地或者元和远程逻辑基本一致,需要先通过SPI方式获取ProxyFactory 创建Invoker,然后同样通过SPI的方式获取Protocol实现导出。
Dubbo 官方文档中对 Invoker 进行了说明:Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。下面我们到 JavassistProxyFactory 代码中,探索 Invoker 的创建过程。

2. Invoker 的创建

org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
JavassistProxyFactory 继承自 AbstractProxyInvoker,并覆写了抽象方法 getInvoker。覆写后的 getInvoker逻辑比较简单,仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    	//为目标类创建包装类,无法解决类名包含$的情况
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        //创建了一个继承自 AbstractProxyInvoker 类的匿名对象
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
            	// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

org.apache.dubbo.common.bytecode.Wrapper.getWrapper(Class<?>)
Wrapper 用于“包裹”目标类,Wrapper 是一个抽象类,仅可通过 getWrapper(Class) 方法创建子类。不过getWrapper方法中仅仅包含一些判断及缓存操作,主要的创建Wrapper实例是在makeWrapper中完成的。

    /**
     * get wrapper.
     *
     * @param c Class instance.
     * @return Wrapper instance(not null).
     */
    public static Wrapper getWrapper(Class<?> c) {
    	//因为无法代理动态类,因此是动态类时会循环找到不是动态类的父类
        while (ClassGenerator.isDynamicClass(c)) 
        {	//获取父类
            c = c.getSuperclass();
        }
        //如果代理Object类,则直接返回固定实现OBJECT_WRAPPER
        if (c == Object.class) {
            return OBJECT_WRAPPER;
        }
        //代理其他类则调用makeWrapper创建一个实例,并存入缓存中(如果不存在),然后返回该包装类实例
        return WRAPPER_MAP.computeIfAbsent(c, key -> makeWrapper(key));
    }

org.apache.dubbo.common.bytecode.Wrapper.makeWrapper(Class<?>)
在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 makeWrapper方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再通过反射创建 Wrapper 实例。相关的代码如下:

	private static Wrapper makeWrapper(Class<?> c) {
		// 判断是否是基本类型的包装类,如果是则抛出异常
		if (c.isPrimitive()) {
			throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);
		}
		// 获取类的全限定名称(包名+类名)
		String name = c.getName();
		// 获取类加载器,优先级如下:线程上下文加载器 -> c的类加载器 -> 系统类加载器
		ClassLoader cl = ClassUtils.getClassLoader(c);
		// 存放setPropertyValue方法代码
		StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
		// 存放getPropertyValue方法代码
		StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
		// 存放invokeMethod方法代码
		StringBuilder c3 = new StringBuilder(
				"public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws "
						+ InvocationTargetException.class.getName() + "{ ");

		// 生成类型转换代码及异常抛出代码
		c1.append(name).append(" w; try{ w = ((").append(name)
				.append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
		c2.append(name).append(" w; try{ w = ((").append(name)
				.append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
		c3.append(name).append(" w; try{ w = ((").append(name)
				.append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");
		// 存放属性<属性名,属性类型>
		Map<String, Class<?>> pts = new HashMap<>();
		// 存放方法<描述信息(可理解为方法签名), Method 实例>
		Map<String, Method> ms = new LinkedHashMap<>();
		// 存放所有的方法名
		List<String> mns = new ArrayList<>();
		// 所有在当前类中声明的方法名
		List<String> dmns = new ArrayList<>();

		// 获取 public访问级别的字段
		for (Field f : c.getFields()) {
			String fn = f.getName();
			Class<?> ft = f.getType();
			// 如果是static 或 transient 修饰的变量,则忽略
			if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) {
				continue;
			}
			// 生成条件判断及赋值语句,比如:
			// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}
			c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3"))
					.append("; return; }");
			// 生成条件判断及返回语句,比如:
			// if( $2.equals("name") ) { return ($w)w.name; }
			c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");
			// 添加<属性名,属性类型>到集合中
			pts.put(fn, ft);
		}
		
		//获取所有public访问级别的方法
		Method[] methods = c.getMethods();
		// 检测是否包含在当前类中声明的方法
		boolean hasMethod = hasMethods(methods);
		if (hasMethod) {
			c3.append(" try{");
			for (Method m : methods) {
				// 忽略Object类声明的方法
				if (m.getDeclaringClass() == Object.class) {
					continue;
				}

				String mn = m.getName();
				// 生成方法名判断语句,比如:
		        // if ( "setName".equals( $2 )
				c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");
				//参数个数
				int len = m.getParameterTypes().length;
				// 生成“运行时传入的参数数量与方法参数列表长度”判断语句,比如:
		        // && $3.length == 1
				c3.append(" && ").append(" $3.length == ").append(len);
				 // 检测方法是否存在重载情况,条件为:方法对象不同 && 方法名相同
				boolean override = false;
				for (Method m2 : methods) {
					if (m != m2 && m.getName().equals(m2.getName())) {
						override = true;
						break;
					}
				}
				// 对重载方法进行处理。判断是统一个方法时除了方法名、参数列表长度相同外,还必须判断参数类型是否一致,如:
				// void setName(String name) 和  void setName(Integer name)
				if (override) {
					if (len > 0) {
						for (int l = 0; l < len; l++) {
							// 生成参数类型进行检测代码,比如:
		                    // && $3[0].getName().equals("java.lang.String") 
							c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"")
									.append(m.getParameterTypes()[l].getName()).append("\")");
						}
					}
				}
				 // 添加 ) {,完成方法判断语句
				c3.append(" ) { ");
				// 上面的方法校验完成后的代码类似这样:
		        // if ("setName".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {
				
				// 根据返回值类型生成目标方法调用语句
				if (m.getReturnType() == Void.TYPE) {
					//返回值类型为空,则 w.setName((java.lang.String)$4[0]); return null;
					c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");")
							.append(" return null;");
				} else {
					//返回值类型不为空,则  // return ($w)w.getName();
					c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4"))
							.append(");");
				}
				// 添加 },
				c3.append(" }");
				/* 上面的方法校验完成后的代码类似这样:
		        *if ("setName".equals($2) && $3.length == 1 && $3[0].getName().equals("java.lang.String")) {
		        *   w.setName((java.lang.String)$4[0]); return null;
		        *  }
		        */
				
				///添加方法名到集合中
				mns.add(mn);
				//如果是当前类声明的方法,则添加到声明方法集合中
				if (m.getDeclaringClass() == c) {
					dmns.add(mn);
				}
				//添加<方法描述,方法>到集合中
				ms.put(ReflectUtils.getDesc(m), m);
			}
			//添加异常捕获及抛出代码
			c3.append(" } catch(Throwable e) { ");
			c3.append("     throw new java.lang.reflect.InvocationTargetException(e); ");
			c3.append(" }");
		}
		//添加NoSuchMethodException异常代码
		c3.append(" throw new " + NoSuchMethodException.class.getName()
				+ "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");

		// 处理getter和setter方法
		Matcher matcher;
		for (Map.Entry<String, Method> entry : ms.entrySet()) {
			String md = entry.getKey();
			Method method = entry.getValue();
			//是否是get方法
			if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
				//获取属性名
				String pn = propertyName(matcher.group(1));
				// 生成属性判断以及返回语句,示例如下:
	            // if( $2.equals("name") ) { return ($w).w.getName(); }
				c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName())
						.append("(); }");
				//存放<属性名,属性类型>到集合中
				pts.put(pn, method.getReturnType());
			//判断是否是is|has|can开头方法,生成代码逻辑同get方法
			} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {
				String pn = propertyName(matcher.group(1));
				c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName())
						.append("(); }");
				pts.put(pn, method.getReturnType());
			//判断是set开头方法
			} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {
				//获取参数(属性)类型
				Class<?> pt = method.getParameterTypes()[0];
				//获取属性名
				String pn = propertyName(matcher.group(1));
				//生成属性判断及set方法语句,示例如下:
				// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }
				c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(")
						.append(arg(pt, "$3")).append("); return; }");
				//存放<属性名,属性类型>到集合中
				pts.put(pn, pt);
			}
		}
		// 添加 抛出 NoSuchPropertyException 异常代码
		c1.append(" throw new " + NoSuchPropertyException.class.getName()
				+ "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");
		c2.append(" throw new " + NoSuchPropertyException.class.getName()
				+ "(\"Not found property \\\"\"+$2+\"\\\" field or setter method in class " + c.getName() + ".\"); }");

		// 生成class
		//包装类数量加1并获取,原子操作
		long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
		//根据类加载器创建类生成器实例
		ClassGenerator cc = ClassGenerator.newInstance(cl);
		// 生成类名:有公共修饰符则org.apache.dubbo.common.bytecode.Wrapper,否则当前包装类名+$sw+当前包装类数量
		cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
		// 设置父类
		cc.setSuperClass(Wrapper.class);
		// 设置默认构造函数
		cc.addDefaultConstructor();
		// 添加属性名称数组字段
		cc.addField("public static String[] pns;"); 
		// 属性即类类型字段
		cc.addField("public static " + Map.class.getName() + " pts;"); 
		// 添加方法名称集合属性
		cc.addField("public static String[] mns;");
		// 添加本类声明的方法名称集合字段
		cc.addField("public static String[] dmns;");
		// 添加类属性
		for (int i = 0, len = ms.size(); i < len; i++) {
			cc.addField("public static Class[] mts" + i + ";");
		}
		//添加属性名获取方法
		cc.addMethod("public String[] getPropertyNames(){ return pns; }");
		// 添加是否存在某个属性方法
		cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");
		// 获取某个属性类型
		cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");
		// 获取方法名集合
		cc.addMethod("public String[] getMethodNames(){ return mns; }");
		//获取声明方法属性
		cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");
		// 添加setPropertyValue方法、getPropertyValue方法、invokeMethod方法代码
		cc.addMethod(c1.toString());
		cc.addMethod(c2.toString());
		cc.addMethod(c3.toString());

		try {
			Class<?> wc = cc.toClass();
			// 设置字段值
			wc.getField("pts").set(null, pts);
			wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));
			wc.getField("mns").set(null, mns.toArray(new String[0]));
			wc.getField("dmns").set(null, dmns.toArray(new String[0]));
			int ix = 0;
			for (Method m : ms.values()) {
				wc.getField("mts" + ix++).set(null, m.getParameterTypes());
			}
			//创建Wrapper实例
			return (Wrapper) wc.newInstance();
		} catch (RuntimeException e) {
			throw e;
		} catch (Throwable e) {
			throw new RuntimeException(e.getMessage(), e);
		} finally {
			//清理缓存
			cc.release();
			ms.clear();
			mns.clear();
			dmns.clear();
		}
	}

3. 协议导出

3.1 本地协议导出

接着第一节导出源码逻辑,在获取Invoker之后,就会调用具体的Protocol实现类导出方法进行导出,首先是导出到本地,使用的是InjvmProtocol协议,导出逻辑很简单,源码如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    	//创建一个InjvmExporter实例,InjvmExporter只是将道歉Invoker等缓存到本地
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }
3.2 远程协议导出

与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程。这两个过程涉及到了大量的调用,比较复杂。按照代码执行顺序,本节先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。首先分析RegistryProtocol 的 export 方法。

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    	// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
    	URL registryUrl = getRegistryUrl(originInvoker);
        // 服务提供中地址(本地导出服务地址)
        URL providerUrl = getProviderUrl(originInvoker);

        // 获取订阅覆盖地址
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        //创建覆盖监听器,存入缓存
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        //使用配置覆盖提供中者地址
        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //导出服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        // 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        // 获取已注册的服务提供者 URL
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
       // 获取 register 参数,默认为true
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        
        if (register) {
        	//为true则表示需要注册服务,向注册中心注册
            register(registryUrl, registeredProviderUrl);
        }

        // 不建议使用!订阅2.6.x或之前版本中的重写规则。
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        // 设置注册和订阅地址
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //通知RegistryProtocolListener,服务导出
        notifyExport(exporter);
        //创建DestroyableExporter实例,确保每次导出时都返回一个新的导出器实例
        return new DestroyableExporter<>(exporter);
    }

接着上面的源码分析导出服务方法doLocalExport的源码。

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
       //根据Invoker参数获取缓存key
    	String key = getCacheKey(originInvoker);
    	//调用computeIfAbsent方法,将key和ExporterChangeableWrapper实例放入缓存
        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
        	// 创建InvokerDelegate委托类实例
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
           // 创建ExporterChangeableWrapper实例,调用 protocol 的 export 方法导出服务
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

上面源码分析可知,会创建实例是调用Protocol的export方法。下面使用默认协议dubbo作为示例,分析导出逻辑。此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。

	@Override
	public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
		// 获取URL
		URL url = invoker.getUrl();

		// 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
		// demoGroup/com.qqxhb.DemoService:0.0.1:20880
		String key = serviceKey(url);
		// 创建DubboExporter实例并放入缓存
		DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
		exporterMap.put(key, exporter);

		// 导出本地存根服务以发送事件
		Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
		Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
		if (isStubSupportEvent && !isCallbackservice) {
			String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
			if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
				if (logger.isWarnEnabled()) {
					logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY)
							+ "], has set stubproxy support event ,but no stub methods founded."));
				}

			}
		}
		// 启动服务器
		openServer(url);
		// 优化序列化
		optimizeSerialization(url);

		return exporter;
	}

	private void openServer(URL url) {
		// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
		String key = url.getAddress();
		// 客户端可以导出只供服务器调用的服务,默认为true
		// 在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。
		boolean isServer = url.getParameter(IS_SERVER_KEY, true);
		if (isServer) {
			// 双重检测创建服务(保证只创建一次)
			ProtocolServer server = serverMap.get(key);
			if (server == null) {
				synchronized (this) {
					server = serverMap.get(key);
					if (server == null) {
						serverMap.put(key, createServer(url));
					}
				}
			} else {
				// 服务器已创建,则根据 url 中的配置重置服务器
				server.reset(url);
			}
		}
	}

	private ProtocolServer createServer(URL url) {
		url = URLBuilder.from(url)
				// 服务器关闭时发送只读事件,默认情况下启用
				.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
				// 添加默认心跳检测参数60 * 1000
				.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
				// 添加编码解码器参数
				.addParameter(CODEC_KEY, DubboCodec.NAME).build();
		// 获取 server 参数,默认为 netty
		String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
		// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
		if (str != null && str.length() > 0
				&& !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
			throw new RpcException("Unsupported server type: " + str + ", url: " + url);
		}

		ExchangeServer server;
		try {// 创建 ExchangeServer
			server = Exchangers.bind(url, requestHandler);
		} catch (RemotingException e) {
			throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
		}
		// 获取 client 参数,可指定 netty,mina
		str = url.getParameter(CLIENT_KEY);
		if (str != null && str.length() > 0) {
			// 通过SPI获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
			Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
			// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中, 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
			if (!supportedTypes.contains(str)) {
				throw new RpcException("Unsupported client type: " + str);
			}
		}
		// 创建DubboProtocolServer实例并返回
		return new DubboProtocolServer(server);
	}

上面源码分析过程可以看出,DubboExporter的export方法主要调用openServer方法,而openServer逻辑也很简单,获取参数isserver,选择执行createServer 还是server的重置方法。createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。接下来就主要看看Exchangers是如何创建出ExchangeServer服务的。首先调用org.apache.dubbo.remoting.exchange.Exchangers.bind(URL, ExchangeHandler)方法

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        //添加URL参数codec=exchange
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 获取 Exchanger,默认为 HeaderExchanger。
        // 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
        return getExchanger(url).bind(url, handler);
    }

然后是org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger.bind(URL, ExchangeHandler)方法

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    	// 创建 HeaderExchangeServer 实例:
    	//   1. new HeaderExchangeHandler(handler)
    	//	 2. new DecodeHandler(new HeaderExchangeHandler(handler))
    	//   3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

该方法中的主要逻辑也就是调用org.apache.dubbo.remoting.Transporters.bind(URL, ChannelHandler…)

	public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
		if (url == null) {
			throw new IllegalArgumentException("url == null");
		}
		if (handlers == null || handlers.length == 0) {
			throw new IllegalArgumentException("handlers == null");
		}
		ChannelHandler handler;
		if (handlers.length == 1) {
			handler = handlers[0];
		} else {
			// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
			handler = new ChannelHandlerDispatcher(handlers);
		}
		// 获取自适应 Transporter
		// 实例(ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();),并调用bind实例方法
		return getTransporter().bind(url, handler);
	}

默认的Transporter 是NettyTransporter,不过是注意是netty4.x,dubbo也提供了3.x的实现。

/**
 * Default extension of {@link Transporter} using netty4.x.
 */
public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

}

上面的bind方法就一句创建 NettyServer实例代码

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    	//调用父类构造函数
        // 添加线程名称参数,可以在CommonConstants中按thread_name_KEY和THREADPOOL_KEY自定义客户机线程池的名称和类型,默认名称DubboServerHandler
        // 对handler进行包装MultiMessageHandler->HeartbeatHandler->handler
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }

紧接跟踪父类构造函数:

	public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
		// 调用父类构造方法(简单判断及解码器、超时时间等属性的获取赋值)
		super(url, handler);
		// 获取套接字地址
		localAddress = getUrl().toInetSocketAddress();
		 获取 ip 和端口
		String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
		int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
		if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
			// anyhost=true或者是无效的本地主机则设置ip为0.0.0.0
			bindIp = ANYHOST_VALUE;
		}
		// 创建绑定套接字地址
		bindAddress = new InetSocketAddress(bindIp, bindPort);
		// 获取最大可接受连接数,默认0
		this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
		// 获取超时时间 默认600000毫秒
		this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
		try {
			// 打开服务
			doOpen();
			if (logger.isInfoEnabled()) {
				logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export "
						+ getLocalAddress());
			}
		} catch (Throwable t) {
			throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
					+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
		}
		// 创建执行器
		executor = executorRepository.createExecutorIfAbsent(url);
	}

有上面代码分析可知,需要在回到子类查看,doOpen方法的具体试下:

	protected void doOpen() throws Throwable {
		// 创建服务启动器
		bootstrap = new ServerBootstrap();
		// 创建 boss 和 worker 事件组
		bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
		workerGroup = NettyEventLoopFactory.eventLoopGroup(
				getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), "NettyServerWorker");
		// 创建NettyServer处理类实例
		final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
		channels = nettyServerHandler.getChannels();
		// 设置boss和worker事件组
		bootstrap.group(bossGroup, workerGroup)
				// 设置SocketChannel
				.channel(NettyEventLoopFactory.serverSocketChannelClass())
				// SO_REUSEADDR是让端口释放后立即就可以被再次使用。
				.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
				// TCP_NODELAY选项是用来控制是否开启Nagle算法,该算法是为了提高较慢的广域网传输效率
				.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
				// 创建一个 池化或非池化的缓存区分配器
				.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
				// 添加孩子渠道初始化处理实例
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						// 初始化通道时进行管道pipeline编码解码器等设置
						int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
						NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
						if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
							ch.pipeline().addLast("negotiation",
									SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
						}
						ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder())
								.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
								.addLast("handler", nettyServerHandler);
					}
				});
		// 绑定到指定的 ip 和端口上
		ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
		// 不间断同步
		channelFuture.syncUninterruptibly();
		channel = channelFuture.channel();

	}

3.3 远程服务注册

在远程服务的源码分析开始,我看见导出到远程的服务除了执行导出逻辑,需要执行服务的注册即org.apache.dubbo.registry.integration.RegistryProtocol.register(URL, URL)方法

    public void register(URL registryUrl, URL registeredProviderUrl) {
    	//获取注册Registry实例
        Registry registry = registryFactory.getRegistry(registryUrl);
        //执行注册
        registry.register(registeredProviderUrl);
        // 获取提供者模型
        ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
       //添加RegisterStatedURL到缓存
        model.addStatedUrl(new ProviderModel.RegisterStatedURL(
                registeredProviderUrl,
                registryUrl,
                true
        ));
    }

getRegistry方法由org.apache.dubbo.registry.support.AbstractRegistryFactory实现,具体逻辑如下:

 public Registry getRegistry(URL url) {
    	//AtomicBoolean destroyed,已经执行福哦哦销毁则抛出异常
        if (destroyed.get()) {
            LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
            return DEFAULT_NOP_REGISTRY;
        }
        // 组装URL(路径、接口等参数)
        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        //创建注册缓存key
        String key = createRegistryCacheKey(url);
        // 锁定注册表访问进程以确保注册表的单个实例
        LOCK.lock();
        try {
        	//从缓存中获取,获取到则直接返回
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //缓存中不存在则通过SPI反方式创建,创建失败抛出异常
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            //放入缓存并返回
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // 释放锁
            LOCK.unlock();
        }
    }

根据上面源码分析可知,getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 创建 Registry,然后写入缓存。这里的 createRegistry 由具体的子类实现,这里以ZookeeperRegistryFactory 为例解析。

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

	private ZookeeperTransporter zookeeperTransporter;

	/**
	 * zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
	 */
	public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
		this.zookeeperTransporter = zookeeperTransporter;
	}

	@Override
	public Registry createRegistry(URL url) {
		// 创建 ZookeeperRegistry
		return new ZookeeperRegistry(url, zookeeperTransporter);
	}

}

ZookeeperRegistryFactory的实现很简单就一句代码,直接创建ZookeeperRegistry实例。
因此下面将分析ZookeeperRegistry的构造方法源码。

 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    	//调用父类构造(获取一些例如重试的时间)
        super(url);
        //如果anyhost=true或者主机地址是0.0.0.0,则抛出异常
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        // 获取组名,默认为 dubbo
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        //不是/开头,则加上
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        //设置路径为group
        this.root = group;
        // 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
        zkClient = zookeeperTransporter.connect(url);
        // 添加状态监听器
        zkClient.addStateListener((state) -> {
            if (state == StateListener.RECONNECTED) {//重新连接状态
                logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
                //当zookeeper连接从连接丢失中恢复时,它需要获取最新的提供程序列表。重新注册观察者只是一个副作用,不是强制性的。
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) {//新会话创建
                logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
                try {
                	//尝试重新注册URL并将此实例的侦听器重新订阅到注册表
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }

上面的源码分析可知,zkClient是通过zookeeperTransporter的connect方法创建的,这里的 zookeeperTransporter 类型为自适应拓展类,默认为 CuratorZookeeperTransporter。因此下面我们分析下 CuratorZookeeperTransporter的源码。

public class CuratorZookeeperTransporter extends AbstractZookeeperTransporter {
    @Override
    public ZookeeperClient createZookeeperClient(URL url) {
    	//创建CuratorZookeeperClient实例
        return new CuratorZookeeperClient(url);
    }
}

可以看到CuratorZookeeperTransporter逻辑很简单,直接创建了CuratorZookeeperClient实例。

	public CuratorZookeeperClient(URL url) {
		// 调用父类构造,对url属性赋值
		super(url);
		try {
			// 获取超时时间,默认5秒
			int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_CONNECTION_TIMEOUT_MS);
			// 获取会话过期时间,默认1分钟
			int sessionExpireMs = url.getParameter(ZK_SESSION_EXPIRE_KEY, DEFAULT_SESSION_TIMEOUT_MS);
			// 创建 CuratorFramework 构造器
			CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
					.connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000))
					.connectionTimeoutMs(timeout).sessionTimeoutMs(sessionExpireMs);
			// 获取权限信息
			String authority = url.getAuthority();
			if (authority != null && authority.length() > 0) {
				builder = builder.authorization("digest", authority.getBytes());
			}
			// 构建 CuratorFramework 实例
			client = builder.build();
			// 添加CuratorConnectionStateListener监听器
			client.getConnectionStateListenable().addListener(new CuratorConnectionStateListener(url));
			// 启动客户端
			client.start();
			// 阻塞连接,知道连接成功或者超时
			boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
			if (!connected) {
				// 连接失败抛出异常
				throw new IllegalStateException("zookeeper not connected");
			}
		} catch (Exception e) {
			throw new IllegalStateException(e.getMessage(), e);
		}
	}

上面已经分析了获取Registry实例的源码。接下来任然以zookeeper为例分析注册方法register的源码,这个方法定义在 FailbackRegistry 抽象类中。

 public void register(URL url) {
    	//不接受此协议类型的服务,直接返回
        if (!acceptable(url)) {
            logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
            return;
        }
        //调用父类方法,添加URL到registered缓存中
        super.register(url);
        //删除url失败注册任务缓存
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // 执行注册,子类实现
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

           // 获取 check 参数,若 check = true 将会直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // 将失败的注册请求记录到失败列表,定期重试
            addFailedRegistered(url);
        }
    }

上面源码分析可知,执行服务注册方法需要子类实现,这里分析下zookeeper的实现org.apache.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister(URL)

    @Override
    public void doRegister(URL url) {
        try {
        	 // 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
            //   /${group}/${serviceInterface}/providers/${url}
        	// 是否是临时节点由参数dynamic决定,默认为true
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。节点路径由 toUrlPath 方法生成,org.apache.dubbo.remoting.zookeeper.support.AbstractZookeeperClient.create(String, boolean)方法源码如下

	@Override
	public void create(String path, boolean ephemeral) {
		// 不是临时节点
		if (!ephemeral) {
			// 检测持久节点缓存是否存在该节点,若存在则返回
			if (persistentExistNodePath.contains(path)) {
				return;
			}
			// zookeeper是否存在该节点,若存在则放入缓存并返回
			if (checkExists(path)) {
				persistentExistNodePath.add(path);
				return;
			}
		}
		int i = path.lastIndexOf('/');
		if (i > 0) {
			// 递归创建
			create(path.substring(0, i), false);
		}
		if (ephemeral) {
			// 创建临时节点
			createEphemeral(path);
		} else {
			// 创建永久节点并放入缓存
			createPersistent(path);
			persistentExistNodePath.add(path);
		}
	}

上面方法会根据节点类型调用创建临时节点和永久节点方法,由子类实现,主要看org.apache.dubbo.remoting.zookeeper.curator.CuratorZookeeperClient的实现。

	@Override
	public void createPersistent(String path) {
		try {
			client.create().forPath(path);
		} catch (NodeExistsException e) {
			logger.warn("ZNode " + path + " already exists.", e);
		} catch (Exception e) {
			throw new IllegalStateException(e.getMessage(), e);
		}
	}

	@Override
	public void createEphemeral(String path) {
		try {
			client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
		} catch (NodeExistsException e) {
			logger.warn("ZNode " + path
					+ " already exists, since we will only try to recreate a node on a session expiration"
					+ ", this duplication might be caused by a delete delay from the zk server, which means the old expired session"
					+ " may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, "
					+ "we can just try to delete and create again.", e);
			deletePath(path);
			createEphemeral(path);
		} catch (Exception e) {
			throw new IllegalStateException(e.getMessage(), e);
		}
	}

用了接近五天的休息时间,到此关于服务注册的过程就分析终于完了,希望对你有所帮助。关于Zookeeper客户端Curator的使用请参考之前博客:Zookeeper客户端Curator的基本操作、分布式锁及领导者选举示例
源码分析地址(common、compatible、config、registry、rpc、remoting模块):https://github.com/qqxhb/dubbo
服务导出源码分析请参考下篇博客:Dubbo 服务引入源码分析不能再再详细了


版权声明:本文为qq_43792385原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。