阅读 66

Dubbo之服务导入流程解析

接着上回《Dubbo之服务暴露流程浅析》,上一篇文章已经介绍完了Dubbo的服务提供者的服务暴露的整个流程,本文主要介绍Dubbo服务消费者的服务导入流程。

前言:

Dubbo服务消费者的服务相关代码下:

xml配置:配置了两个

<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"        xmlns="http://www.springframework.org/schema/beans"        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd        http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">      <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),     don't set it same as provider -->     <dubbo:application name="demo-consumer"/>      <!-- use multicast registry center to discover service -->     <dubbo:registry group="aaa" address="zookeeper://127.0.0.1:2181"/>      <!-- generate proxy for the remote service, then demoService can be used in the same way as the     local regular interface -->     <dubbo:reference id="helloService" check="false" interface="org.apache.dubbo.samples.api.client.HelloService"/>      <dubbo:reference id="greetingService" check="false" interface="org.apache.dubbo.samples.api.client.GreetingService"  init="true"/>  </beans> 复制代码

消费者启动类:

public class BasicConsumerBootstrap {      public static void main(String[] args) {         ClassPathXmlApplicationContext context =                        new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-demo-consumer.xml"});         context.start();         HelloService helloService = (HelloService) context.getBean("helloService"); /          while (true) {             try {                 Thread.sleep(1000);                 String hello = helloService.sayHello("world");                  System.out.println(hello);              } catch (Throwable throwable) {                 throwable.printStackTrace();             }         }     } } 复制代码

Dubbo的引入服务的方式

Dubbo引入服务的方法有如下两种方式:

1.懒汉式引入服务

Dubbo默认的引入服务的方式为懒汉式,当我们的程序从容器中获取对应的Bean的时候会进行Dubbo的Consumer Bean的装载。

具体配置如下:

PS:对应具体的代码则是在 (HelloService) context.getBean("helloService")的时候装载Bean。

image-20221030141328643.png

Spring懒汉式引入Dubbo的Bean的流程如下:

image-20221030143402254.png

2.饿汉式引入服务

饿汉式引入服务需要我们在Dubbo的Consumer 配置中额外配置reference 的init属性为true。然后Spring则会在容器启动时,装载Dubbo的Bean,即 new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-demo-consumer.xml"})的时候。

具体配置如下:

image-20221030143825083.png Spring饿汉式引入Dubbo服务的流程如下:

image-20221030145656109.png

Dubbo之服务引用流程

从Dubbo的引入服务方式的流程来看,无论是懒汉式引入服务的方式 还是说饿汉式引入服务的方式,最终都会调用到ReferenceBean#getObject()的方法。接下来我们会着重解析ReferenceBean#getObject()这个方法。

然而ReferenceBean的getObject()是实现于FactoryBean的getObject方法,那么什么时候FactoryBean呢?下面是网上较多的解释:

FactoryBean是一个工厂Bean,可以生成某一个类型Bean实例,它最大的一个作用是:可以让我们自定义Bean的创建过程。FactoryBean本质就是用来给我们实例化、或者动态的注入一些比较复杂的Bean,比如像一些接口的代理对象。 复制代码

可以总结以下几点:

  1. FactoryBean是个Bean,泛指一种类型。

  2. 可以自定义创建过程,比较灵活

  3. 通常配合代理模式一起使用

  4. 常用的使用场景:

    1. Mybatis的DAO的Bean的生成。

    2. Dubbo consumer Bean的生成。

    3. Feign Client Bean的生成。

    4. ......

关于Dubbo引入服务流程,由于直接撸源码会比较干,我们先简单过一下Dubbo整体流程:

整体流程:

image-20221029192236041.png

1.校验Dubbo的配置及初始化

关于Dubbo 的配置类的介绍 可以查看上一篇文章《Dubbo地址服务暴露流程浅析》,相关的类继承关系上一篇文章也有介绍。有兴趣的可以翻一下上一篇文章。

2.导入服务,构造Invoker

通过Protocol的refer方法进行Invoker对象的构造,具体流程如下:

1.将服务注册到注册中心

Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类,然后根据适配类走整个调用链路。

image-20221030222423632.png

在把服务注册到注册中心的过程中,不会涉及到Filter、Listener 链式组装(在调用DubboProtocol.refer的过程中会涉及到),但是Dubbo默认会开启QOS的服务,用于服务的监控。

最后调用RegistryProtocol的refer方法,将Dubbo的服务注册到注册中心中。

2.开启Netty 客户端

image-20221030222423632.png

在RegistryProtocol的refer方法中, 第一步会将Dubbo服务注册到注册中心中,然后会调用directory的subscribe方法,间接调用Protocol.refer方法从而触发DubboProtocol的refer调用生成DubboInvoker,并将DubboInvoker 缓存到RegistryDirectory中,以便后续调用时使用。

同理,Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类,然后根据适配类走整个调用链路。

1.ProtocolFilterWrapper的refer方法,组装一个Filter的链 Dubbo默认包含:ConsumerContextFilter、FutureFilter、MonitorFilter。

2.ProtocolListenerWrapper的refer方法,会组装一个Listener的列表(Dubbo默认不组装)

3.QosProtocolWrapper:默认啥都不做。

4.DubboProtocol的refer方法,构造一个DubboInvoker,DubboInvoker中包含了NettyClient。

3.构造Invoker,缓存到注册表

仍然是通过Dubbo 的SPI机制加载的Cluster的服务提供者,具体的调用链路如下:

image-20221031124453404.png

通过这个流程会构造一个MockClusterInbvoker,具体的结构如下

image-20221031130455589.png

3.生成代理Bean

1.将步骤2生成的Invoker 通过ProxyFactory.getProxy(invoker) 生成代理类。

image-20221030230350222.png

相关代理类的结构:

image-20221030173714741.png

源码的主要流程

1.会通过调用ReferenceBean的getObject() 执行Dubbo 服务导入的流程。

(1)通过refProtocol.refer的方法获取invoker(会通过Dubbo的SPI 生成调用链: Adaptive、 ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper、RegistryProtocol)

(2)最终会调用RegistryProtocol的refer方法: 1> 注册服务

2> 通过DubboProtocol的refer生成DubboInvoker(DubboInvoker中持有一个NettyClient用于网络请求),然后由各层Wapper类装饰(其中FilterWrapper会封装Filter链)。

3> new InvokerDelegate(invoker)

4> 在通过cluster.join(directory) 生成MockClusterInvoker 如上图。

  1. 通过proxyFactory.getBean(invoker)创建一个Bean。

源码解析

由于ReferenceBean实现了InitializingBean接口,所以会现在ReferenceBean的afterPropertiesSet()的方法

具体源码如下:

@Override @SuppressWarnings({"unchecked"}) public void afterPropertiesSet() throws Exception {     // 判断是否有consumer 配置,有则设置(一般情况下不用)     if (getConsumer() == null) {         Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false);         if (consumerConfigMap != null && consumerConfigMap.size() > 0) {             ConsumerConfig consumerConfig = null;             for (ConsumerConfig config : consumerConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (consumerConfig != null) {                         throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config);                     }                     consumerConfig = config;                 }             }             if (consumerConfig != null) {                 setConsumer(consumerConfig);             }         }     }     // 判断是否有 Application配置, 有则设置application 属性 (xml配置的application配置,默认有值,因为主要配置application)     if (getApplication() == null             && (getConsumer() == null || getConsumer().getApplication() == null)) {         Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false);         if (applicationConfigMap != null && applicationConfigMap.size() > 0) {             ApplicationConfig applicationConfig = null;             for (ApplicationConfig config : applicationConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (applicationConfig != null) {                         throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config);                     }                     applicationConfig = config;                 }             }             if (applicationConfig != null) {                 setApplication(applicationConfig);             }         }     }     // 判断是否有Module配置,有则设置moudle属性(一般情况下 没有值)     if (getModule() == null             && (getConsumer() == null || getConsumer().getModule() == null)) {         Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false);         if (moduleConfigMap != null && moduleConfigMap.size() > 0) {             ModuleConfig moduleConfig = null;             for (ModuleConfig config : moduleConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (moduleConfig != null) {                         throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config);                     }                     moduleConfig = config;                 }             }             if (moduleConfig != null) {                 setModule(moduleConfig);             }         }     }     // 判断是否有Registry配置,有则设置Registry属性(一般情况下 有值)     if ((getRegistries() == null || getRegistries().isEmpty())             && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty())             && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) {         Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false);         if (registryConfigMap != null && registryConfigMap.size() > 0) {             List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>();             for (RegistryConfig config : registryConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     registryConfigs.add(config);                 }             }             if (registryConfigs != null && !registryConfigs.isEmpty()) {                 super.setRegistries(registryConfigs);             }         }     }     // 判断是否有Monitor配置,有则设置     if (getMonitor() == null             && (getConsumer() == null || getConsumer().getMonitor() == null)             && (getApplication() == null || getApplication().getMonitor() == null)) {         Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false);         if (monitorConfigMap != null && monitorConfigMap.size() > 0) {             MonitorConfig monitorConfig = null;             for (MonitorConfig config : monitorConfigMap.values()) {                 if (config.isDefault() == null || config.isDefault().booleanValue()) {                     if (monitorConfig != null) {                         throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config);                     }                     monitorConfig = config;                 }             }             if (monitorConfig != null) {                 setMonitor(monitorConfig);             }         }     }     // 判断xml配置中 是否配置了init属性为true, 如果没有设置,则为懒加载,等这个Consumer 真正需要使用到的时候再去加载。     Boolean b = isInit();     if (b == null && getConsumer() != null) {         b = getConsumer().isInit();     }     // 判断init属性 是不是不为空,且为true ? 不满足条件则懒加载, 满足条件则直接加载     if (b != null && b.booleanValue()) {         // 直接预加载(**重点**)         getObject();     } } 复制代码

核心逻辑: 1. 判断是否有Consumer配置,有则设值。 2. 判断是否有Application配置,有则设值。 3. 判断是否有Module配置,有则设值。 4. 判断是否有Registry配置,有则设置。 5. 判断是否有Monitor配置,有则设置。 6. 判断对应的配置是否为懒加载, 不是懒加载则直接加载。 复制代码

接下来 我们看一下getObject()的逻辑:

注:ReferenceBean为FactoryBean,不了解FactoryBean可以先自行百度一下。

@Override public Object getObject() throws Exception {     // 调用get()方法进行加载     return get(); }  /**  * get()方法  */ public synchronized T get() {     // 如果destroyed了 则直接抛错     if (destroyed) {         throw new IllegalStateException("Already destroyed!");     }     // 判断ref 是否为空(默认为空),为空则加载init()方法     if (ref == null) {         init();     }     return ref; }  /**  * init()方法  */ private void init() {     // 如果已经初始化了,则返回     if (initialized) {         return;     }     initialized = true;     // 如果interfaceName 为空,则报错     if (interfaceName == null || interfaceName.length() == 0) {         throw new IllegalStateException("<dubbo:reference interface="" /> interface not allow null!");     }     // get consumer's global configuration     // 获取consumer的全局配置,有则加载,没有则初始化一个     checkDefault();     // 获取当前配置的全局配置,有则加载     appendProperties(this);     // 设置泛化调用     if (getGeneric() == null && getConsumer() != null) {         setGeneric(getConsumer().getGeneric());     }     // 如果是泛化调用,则设值interfaceClass 为GenericService.class     if (ProtocolUtils.isGeneric(getGeneric())) {         interfaceClass = GenericService.class;     } else {         // 获取接口类型         try {             interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()                     .getContextClassLoader());         } catch (ClassNotFoundException e) {             throw new IllegalStateException(e.getMessage(), e);         }         // 校验接口和方法(不是接口报错,没有方法报错)         checkInterfaceAndMethods(interfaceClass, methods);     }     String resolve = System.getProperty(interfaceName);     String resolveFile = null;     if (resolve == null || resolve.length() == 0) {         resolveFile = System.getProperty("dubbo.resolve.file");         if (resolveFile == null || resolveFile.length() == 0) {             File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");             if (userResolveFile.exists()) {                 resolveFile = userResolveFile.getAbsolutePath();             }         }         if (resolveFile != null && resolveFile.length() > 0) {             Properties properties = new Properties();             FileInputStream fis = null;             try {                 fis = new FileInputStream(new File(resolveFile));                 properties.load(fis);             } catch (IOException e) {                 throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);             } finally {                 try {                     if (null != fis) fis.close();                 } catch (IOException e) {                     logger.warn(e.getMessage(), e);                 }             }             resolve = properties.getProperty(interfaceName);         }     }     if (resolve != null && resolve.length() > 0) {         url = resolve;         if (logger.isWarnEnabled()) {             if (resolveFile != null) {                 logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");             } else {                 logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");             }         }     }     if (consumer != null) {         if (application == null) {             application = consumer.getApplication();         }         if (module == null) {             module = consumer.getModule();         }         if (registries == null) {             registries = consumer.getRegistries();         }         if (monitor == null) {             monitor = consumer.getMonitor();         }     }     if (module != null) {         if (registries == null) {             registries = module.getRegistries();         }         if (monitor == null) {             monitor = module.getMonitor();         }     }     if (application != null) {         if (registries == null) {             registries = application.getRegistries();         }         if (monitor == null) {             monitor = application.getMonitor();         }     }     // 检验ApplicationConfig不为空,并append系统参数     checkApplication();     // stub、mock的合理性校验     checkStub(interfaceClass);     checkMock(interfaceClass);     // 构造URL的param     Map<String, String> map = new HashMap<String, String>();     Map<Object, Object> attributes = new HashMap<Object, Object>();     map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);     map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());     map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));     if (ConfigUtils.getPid() > 0) {         map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));     }     if (!isGeneric()) {         String revision = Version.getVersion(interfaceClass, version);         if (revision != null && revision.length() > 0) {             map.put("revision", revision);         }          String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();         if (methods.length == 0) {             logger.warn("NO method found in service interface " + interfaceClass.getName());             map.put("methods", Constants.ANY_VALUE);         } else {             map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));         }     }     map.put(Constants.INTERFACE_KEY, interfaceName);     appendParameters(map, application);     appendParameters(map, module);     appendParameters(map, consumer, Constants.DEFAULT_KEY);     appendParameters(map, this);     String prefix = StringUtils.getServiceKey(map);     if (methods != null && !methods.isEmpty()) {         for (MethodConfig method : methods) {             appendParameters(map, method, method.getName());             String retryKey = method.getName() + ".retry";             if (map.containsKey(retryKey)) {                 String retryValue = map.remove(retryKey);                 if ("false".equals(retryValue)) {                     map.put(method.getName() + ".retries", "0");                 }             }             appendAttributes(attributes, method, prefix + "." + method.getName());             checkAndConvertImplicitConfig(method, map, attributes);         }     }      String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);     if (hostToRegistry == null || hostToRegistry.length() == 0) {         hostToRegistry = NetUtils.getLocalHost();     } else if (isInvalidLocalHost(hostToRegistry)) {         throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);     }     map.put(Constants.REGISTER_IP_KEY, hostToRegistry);      //attributes are stored by system context.     // 将属性放到容器中     StaticContext.getSystemContext().putAll(attributes);     // 通过createProxy(map) 构造InvokerInvocationHandler(***重要**)     ref = createProxy(map);     // 构造ConsumerModel并放入consumedServices中     ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());     ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); } 复制代码

最终map信息如下:

image-20221121190733271.png

主要流程如下: 1.校验参数,并初始化配置信息 2.组装InvokerInvocationHandler参数map如上图. 复制代码

通过createProxy构造InvokerHandler

private T createProxy(Map<String, String> map) {     // temp://localhost?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248&register.ip=10.166.66.100&side=consumer&timestamp=1669037035244     URL tmpUrl = new URL("temp", "localhost", 0, map);     final boolean isJvmRefer;        // 判断isJvmRefer是否为true     if (isInjvm() == null) {         if (url != null && url.length() > 0) { // if a url is specified, don't do local reference             isJvmRefer = false;         } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {             // by default, reference local service if there is             isJvmRefer = true;         } else {             isJvmRefer = false;         }     } else {         isJvmRefer = isInjvm().booleanValue();     }          // 如果isJvmRefer 为true     if (isJvmRefer) {         // injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248&register.ip=10.166.66.100&side=consumer&timestamp=1669037035244         URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);         // 1.本地导入invoker(**重要**)         invoker = refprotocol.refer(interfaceClass, url);         if (logger.isInfoEnabled()) {             logger.info("Using injvm service " + interfaceClass.getName());         }     } else {         // 默认情况下 url 为空         if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.             String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);             if (us != null && us.length > 0) {                 for (String u : us) {                     URL url = URL.valueOf(u);                     if (url.getPath() == null || url.getPath().length() == 0) {                         url = url.setPath(interfaceName);                     }                     if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {                         urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));                     } else {                         urls.add(ClusterUtils.mergeUrl(url, map));                     }                 }             }         } else { // assemble URL from register center's configuration             // 获取注册中心的URL列表             List<URL> us = loadRegistries(false);             if (us != null && !us.isEmpty()) {                 // 遍历注册中心的URL列表,并加载监控(没配置的话monitor 为空)                 for (URL u : us) {                     URL monitorUrl = loadMonitor(u);                     if (monitorUrl != null) {                         map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));                     }                     // 缓存注册中心URL到urls中                     // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=12248&refer=application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248&register.ip=10.166.66.100&side=consumer&timestamp=1669037035244&registry=zookeeper&timestamp=1669037368525                     urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));                 }             }             if (urls.isEmpty()) {                 throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config.");             }         }          if (urls.size() == 1) {             // 2.远程导入invoker(**重要**)             invoker = refprotocol.refer(interfaceClass, urls.get(0));         } else {             List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();             URL registryURL = null;             for (URL url : urls) {                 invokers.add(refprotocol.refer(interfaceClass, url));                 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {                     registryURL = url; // use last registry url                 }             }             if (registryURL != null) { // registry url is available                 // use AvailableCluster only when register's cluster is available                 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);                 invoker = cluster.join(new StaticDirectory(u, invokers));             } else { // not a registry url                 invoker = cluster.join(new StaticDirectory(invokers));             }         }     }      Boolean c = check;     if (c == null && consumer != null) {         c = consumer.isCheck();     }     if (c == null) {         c = true; // default true     }     if (c && !invoker.isAvailable()) {         // make it possible for consumer to retry later if provider is temporarily unavailable         initialized = false;         throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());     }     if (logger.isInfoEnabled()) {         logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());     }     // create service proxy     // 3.通过proxyFactory.getProxy(invoker) 生成代理对象 (**重要**)     return (T) proxyFactory.getProxy(invoker); } 复制代码

上述标注了几个重要的方法

  1. 本地暴露的生成invoker方法: refprotocol.refer(interfaceClass, urls.get(0))

  2. 远程暴露的生成invoker方法: refprotocol.refer(interfaceClass, urls.get(0))

  3. 生成代理对象的方法: (T) proxyFactory.getProxy(invoker)

重要方法解析

1.本地暴露生成invoker方法的逻辑如下

image-20221122091535249.png

具体的逻辑和《Dubbo之服务暴露流程浅析》该文中的本地暴露流程类似,这边就不详细介绍了。我们重点看远程暴露生成invoker的方法。

2.远程暴露生成invoker的方法解析

1.在执行的时候对应的URL的协议为registry,所以最终走的是RegistryProtocol的refer方法,入参的URL如下:

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=25887&refer=application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&register.ip=10.166.66.100&side=consumer&timestamp=1669080319427&registry=zookeeper&timestamp=1669080594410 复制代码

具体的调用链如下:由于其他的逻辑和《Dubbo之服务暴露流程浅析》该文中的远程暴露流程类似,我们主要看一下RegistryProtocol的refer方法

image-20221122104245752.png

RegistryProtocol的refer方法:

@Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {     // 设置协议,我们配置的为ZK,URL 如下:     // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=25887&refer=application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&register.ip=10.166.66.100&side=consumer&timestamp=1669080319427&timestamp=1669080594410     url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);     // 根据URL获取注册中心ZK     Registry registry = registryFactory.getRegistry(url);     // 如果入参type的类型为RegistryService.class, 则直接通过proxyFactory获取Invoker(一般情况下不走该逻辑)     if (RegistryService.class.equals(type)) {         return proxyFactory.getInvoker((T) registry, type, url);     }      // group="a,b" or group="*"     // 获取 URL 的参数group,如果配置中有配置group相关的信息,则走第一个doRefer,不然则走第二个doRefer     // 两个 doRefer的 区别为Cluster不一样     Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));     String group = qs.get(Constants.GROUP_KEY);     if (group != null && group.length() > 0) {         if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1                 || "*".equals(group)) {             // getMergeableCluster() 是通过Dubbo的SPI 获取name为'mergeable'的Cluster提供者.             return doRefer(getMergeableCluster(), registry, type, url);         }     }     // 最后调用doRefer获取Invoker     return doRefer(cluster, registry, type, url); } 复制代码

RegistryProtocol的doRefer方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {     // 构造注册目录,并设置注册中心及协议     RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);     directory.setRegistry(registry);     directory.setProtocol(protocol);     // all attributes of REFER_KEY     // 获取所有的属性     Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());          // 构造真正的服务消费者的URL     // consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer&timestamp=1669080319427     URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);     // 如果URL中的interface不是*     if (!Constants.ANY_VALUE.equals(url.getServiceInterface())             && url.getParameter(Constants.REGISTER_KEY, true)) {         // 补充url的属性信息(category)         // consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer&timestamp=1669080319427         URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);         // 将url 注册到注册中心, 并将url设置缓存到注册目录中         registry.register(registeredConsumerUrl);         directory.setRegisteredConsumerUrl(registeredConsumerUrl);     }     // 注册目录订阅URL,内部逻辑会调用Protocol的refer,然后将invoker封装到RegistryDirectory的methodInvokerMap中(**重要**)     // 具体的RegistryDirectory的methodInvokerMap的属性可以看下图。     // consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer&timestamp=1669080319427     directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,             Constants.PROVIDERS_CATEGORY                     + "," + Constants.CONFIGURATORS_CATEGORY                     + "," + Constants.ROUTERS_CATEGORY));     // 通过Dubbo的SPI构造一个Invoker对象(**重要**)     Invoker invoker = cluster.join(directory);     // 缓存到consumerInvokers中     ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);     return invoker; } 复制代码

2.1 RegistryDirectory#subscribe的流程图如下:

image-20221122134744431.png

最后会在RegistryDirectory的toInvokers方法中调用protocol#refer方法,通过protocol最终生成ProtocolFilterWrapper,然后由InvokerDelegate封装一下ProtocolFilterWrapper对象,放入到urlInvokerMap和methodInvokerMap中。所以在RegistryDirectory中的urlInvokerMap和methodInvokerMap已经缓存了通过Protocol链的refer方法一层层生成的Invoker。 (ProtocolFilterWrapper -> ListenerInvokerWrapper -> DubboInvoker)

Protocol的refer的调用链和《Dubbo之服务暴露流程浅析》的暴露流程类似,具体流程代码不做详细介绍。我们主要看DubboProtocol的refer方法

DubboProtocol的refer方法流程如下:

@Override public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {     // 如果url中有配置optimizer,则将类放入到优化器中     optimizeSerialization(url);     // create rpc invoker.     // 创建一个DubboInvoker(**重要**)会创建一个Client用于网络请求     DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);     // 将invoker 缓存到invokers中     invokers.add(invoker);     return invoker; }  /**  * 构造一个Client  */ private ExchangeClient[] getClients(URL url) {     // whether to share connection     boolean service_share_connect = false;     // url中的connections的值是否为0,没有connections值 默认为0     int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);     // if not configured, connection is shared, otherwise, one connection for one service     if (connections == 0) {         service_share_connect = true;         connections = 1;     }      ExchangeClient[] clients = new ExchangeClient[connections];     for (int i = 0; i < clients.length; i++) {         // 默认为true         if (service_share_connect) {             // Client 为共享模式             clients[i] = getSharedClient(url);         } else {             // 新创建一个Client             clients[i] = initClient(url);         }     }     return clients; }  /**  * Get shared connection  */ private ExchangeClient getSharedClient(URL url) {     // 访问地址(ip:port)     String key = url.getAddress();     ReferenceCountExchangeClient client = referenceClientMap.get(key);     // 第一次调用 client 为空     if (client != null) {         if (!client.isClosed()) {             client.incrementAndGetCount();             return client;         } else {             referenceClientMap.remove(key);         }     }      // 加锁     locks.putIfAbsent(key, new Object());     // 同步锁     synchronized (locks.get(key)) {         // duble check,有值则返回         if (referenceClientMap.containsKey(key)) {             return referenceClientMap.get(key);         }                  // 构造一个ExchangeClient         ExchangeClient exchangeClient = initClient(url);         // 构造一个ReferenceCountExchangeClient         client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);         // 设置到缓存中         referenceClientMap.put(key, client);         ghostClientMap.remove(key);         //解锁         locks.remove(key);         return client;     } }  /**  * Create new connection  */ private ExchangeClient initClient(URL url) {      // client type setting.     // 获取url中的client属性(默认为netty)     String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));     // 添加codec 属性为dubbo     url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);     // enable heartbeat by default     // 添加心跳属性 heartbeat 为 60s     url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));      // BIO is not allowed since it has severe performance issue.     // 校验name为str(client属性,默认为netty) 的Transporter提供者是否存在,不存在则抛错     if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {         throw new RpcException("Unsupported client type: " + str + "," +                 " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));     }      ExchangeClient client;     try {         // connection should be lazy         // 判断URL的lazy属性是否为true, 默认为fasle         if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {             // 构建一个Lazy的Client,等要网路请求时,会调用私有的initClient方法构造一个HeaderExchangeClient,和下面的Exchangers.connect(url, requestHandler) 逻辑类似             client = new LazyConnectExchangeClient(url, requestHandler);         } else {             // 调用ExchangeClient的connect方法创建client(**重点接口**)             client = Exchangers.connect(url, requestHandler);         }     } catch (RemotingException e) {         throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);     }     return client; } 复制代码

上述代码主要逻辑为: 1.构造一个DubboInvoker 2.DubboInvoker会持有一个ExchangeClient 3.ExchangeClient为根据是 是否为共享模式,去初始化 3.1. 如果是共享模式,则根据url的address(ip:port) 查询本地缓存中是否有对应的client,没有则会根据是否要懒加载创建一个征程的或者懒加载的Client 3.2. 如果不是共享模式,则直接创建一个Client 复制代码

2.2 通过Exchangers.connect生成Client的流程

1.获取Exchanger真正的服务提供者

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {     if (url == null) {         throw new IllegalArgumentException("url == null");     }     if (handler == null) {         throw new IllegalArgumentException("handler == null");     }     url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");     // 获取Exchanger服务提供者(默认为HeaderExchanger对象),然后调用connect方法     return getExchanger(url).connect(url, handler); }  public static Exchanger getExchanger(URL url) {     // 获取URL的exchanger属性,默认为header,获取HeaderExchanger对象     String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);     return getExchanger(type); }  public static Exchanger getExchanger(String type) {     // 通过Dubbo的SPI获取服务提供者,为HeaderExchanger     return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); } 复制代码

2.通过HeaderExchanger.connect生成client

@Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {     // 1.通过Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 创建client     // 2.将Client封装为HeaderExchangeClient     return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } 复制代码

3.通过Transporters.connect生成Client

public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {     if (url == null) {         throw new IllegalArgumentException("url == null");     }     ChannelHandler handler;     if (handlers == null || handlers.length == 0) {         handler = new ChannelHandlerAdapter();     } else if (handlers.length == 1) {         handler = handlers[0];     } else {         handler = new ChannelHandlerDispatcher(handlers);     }     // getTransporter() 最终获取NettyTransporter,     // 通过NettyTransporter.connect方法获取NettyClient.     return getTransporter().connect(url, handler); }  public static Transporter getTransporter() {     // 通过Dubbo SPI获取Transporter的适配类对象(Transporter$Adaptive -> NettyTransporter)     return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } 复制代码

NettyTransporter代码

public class NettyTransporter implements Transporter {     public static final String NAME = "netty";     @Override     public Server bind(URL url, ChannelHandler listener) throws RemotingException {         return new NettyServer(url, listener);     }     @Override     public Client connect(URL url, ChannelHandler listener) throws RemotingException {        // 创建NettyClient         return new NettyClient(url, listener);     } } 复制代码

关于NettyClient后续的代码就不跟了,有兴趣的小伙伴可以自行去了解。

2.3 cluster.join(directory)的流程如下:

image-20221122140237254.png

最终生成MockClusterInvoker,具体结构如下:

image-20221031130455589.png

3.构造Invoker,缓存到注册表

主要流程如下:

image-20221030230350222.png

StubProxyFactoryWrapper的getProxy方法

@Override @SuppressWarnings({"unchecked", "rawtypes"}) public <T> T getProxy(Invoker<T> invoker) throws RpcException {     // 继续通过Dubbo SPI的方式获取proxy     T proxy = proxyFactory.getProxy(invoker);     // 如果不是泛化类     if (GenericService.class != invoker.getInterface()) {         // 如果有stub配置(默认没有),不走下面的逻辑直接返回proxy         String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));         if (ConfigUtils.isNotEmpty(stub)) {             Class<?> serviceType = invoker.getInterface();             if (ConfigUtils.isDefault(stub)) {                 if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {                     stub = serviceType.getName() + "Stub";                 } else {                     stub = serviceType.getName() + "Local";                 }             }             try {                 Class<?> stubClass = ReflectUtils.forName(stub);                 if (!serviceType.isAssignableFrom(stubClass)) {                     throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());                 }                 try {                     Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);                     proxy = (T) constructor.newInstance(new Object[]{proxy});                     //export stub service                     URL url = invoker.getUrl();                     if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {                         url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));                         url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());                         try {                             export(proxy, (Class) invoker.getInterface(), url);                         } catch (Exception e) {                             LOGGER.error("export a stub service error.", e);                         }                     }                 } catch (NoSuchMethodException e) {                     throw new IllegalStateException("No such constructor "public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")" in stub implementation class " + stubClass.getName(), e);                 }             } catch (Throwable t) {                 LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);                 // ignore             }         }     }     return proxy; } 复制代码

AbstractProxyFactory的getProxy方法

@Override public <T> T getProxy(Invoker<T> invoker) throws RpcException {     // 调用重载的方法     return getProxy(invoker, false); }  @Override public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {     Class<?>[] interfaces = null;     String config = invoker.getUrl().getParameter("interfaces");     if (config != null && config.length() > 0) {         String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);         if (types != null && types.length > 0) {             interfaces = new Class<?>[types.length + 2];             interfaces[0] = invoker.getInterface();             interfaces[1] = EchoService.class;             for (int i = 0; i < types.length; i++) {                 interfaces[i + 1] = ReflectUtils.forName(types[i]);             }         }     }     if (interfaces == null) {         interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};     }      if (!invoker.getInterface().equals(GenericService.class) && generic) {         int len = interfaces.length;         Class<?>[] temp = interfaces;         interfaces = new Class<?>[len + 1];         System.arraycopy(temp, 0, interfaces, 0, len);         interfaces[len] = GenericService.class;     }     // 调用子类的getProxy方法     return getProxy(invoker, interfaces); } 复制代码

JavassistProxyFactory的getProxy方法

@Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {     // 构造一个InvokerInvocationHandler对象,其中的入参为MockClusterInvoker对象     return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } 复制代码

自此Dubbo 的服务导入流程就结束啦。


作者:爱学习的某人
链接:https://juejin.cn/post/7168753442872098829


文章分类
代码人生
文章标签
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐