Dubbo之服务导入流程解析
接着上回《Dubbo之服务暴露流程浅析》,上一篇文章已经介绍完了Dubbo的服务提供者的服务暴露的整个流程,本文主要介绍Dubbo服务消费者的服务导入流程。
前言:
Dubbo服务消费者的服务相关代码下:
xml配置:配置了两个
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion), don't set it same as provider --> <dubbo:application name="demo-consumer"/> <!-- use multicast registry center to discover service --> <dubbo:registry group="aaa" address="zookeeper://127.0.0.1:2181"/> <!-- generate proxy for the remote service, then demoService can be used in the same way as the local regular interface --> <dubbo:reference id="helloService" check="false" interface="org.apache.dubbo.samples.api.client.HelloService"/> <dubbo:reference id="greetingService" check="false" interface="org.apache.dubbo.samples.api.client.GreetingService" init="true"/> </beans> 复制代码
消费者启动类:
public class BasicConsumerBootstrap { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-demo-consumer.xml"}); context.start(); HelloService helloService = (HelloService) context.getBean("helloService"); / while (true) { try { Thread.sleep(1000); String hello = helloService.sayHello("world"); System.out.println(hello); } catch (Throwable throwable) { throwable.printStackTrace(); } } } } 复制代码
Dubbo的引入服务的方式
Dubbo引入服务的方法有如下两种方式:
1.懒汉式引入服务
Dubbo默认的引入服务的方式为懒汉式,当我们的程序从容器中获取对应的Bean的时候会进行Dubbo的Consumer Bean的装载。
具体配置如下:
PS:对应具体的代码则是在 (HelloService) context.getBean("helloService")的时候装载Bean。
Spring懒汉式引入Dubbo的Bean的流程如下:
2.饿汉式引入服务
饿汉式引入服务需要我们在Dubbo的Consumer 配置中额外配置reference 的init属性为true。然后Spring则会在容器启动时,装载Dubbo的Bean,即 new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-demo-consumer.xml"})的时候。
具体配置如下:
Spring饿汉式引入Dubbo服务的流程如下:
Dubbo之服务引用流程
从Dubbo的引入服务方式的流程来看,无论是懒汉式引入服务的方式 还是说饿汉式引入服务的方式,最终都会调用到ReferenceBean#getObject()的方法。接下来我们会着重解析ReferenceBean#getObject()这个方法。
然而ReferenceBean的getObject()是实现于FactoryBean的getObject方法,那么什么时候FactoryBean呢?下面是网上较多的解释:
FactoryBean是一个工厂Bean,可以生成某一个类型Bean实例,它最大的一个作用是:可以让我们自定义Bean的创建过程。FactoryBean本质就是用来给我们实例化、或者动态的注入一些比较复杂的Bean,比如像一些接口的代理对象。 复制代码
可以总结以下几点:
FactoryBean是个Bean,泛指一种类型。
可以自定义创建过程,比较灵活
通常配合代理模式一起使用
常用的使用场景:
Mybatis的DAO的Bean的生成。
Dubbo consumer Bean的生成。
Feign Client Bean的生成。
......
关于Dubbo引入服务流程,由于直接撸源码会比较干,我们先简单过一下Dubbo整体流程:
整体流程:
1.校验Dubbo的配置及初始化
关于Dubbo 的配置类的介绍 可以查看上一篇文章《Dubbo地址服务暴露流程浅析》,相关的类继承关系上一篇文章也有介绍。有兴趣的可以翻一下上一篇文章。
2.导入服务,构造Invoker
通过Protocol的refer方法进行Invoker对象的构造,具体流程如下:
1.将服务注册到注册中心
Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类,然后根据适配类走整个调用链路。
在把服务注册到注册中心的过程中,不会涉及到Filter、Listener 链式组装(在调用DubboProtocol.refer的过程中会涉及到),但是Dubbo默认会开启QOS的服务,用于服务的监控。
最后调用RegistryProtocol的refer方法,将Dubbo的服务注册到注册中心中。
2.开启Netty 客户端
在RegistryProtocol的refer方法中, 第一步会将Dubbo服务注册到注册中心中,然后会调用directory的subscribe方法,间接调用Protocol.refer方法从而触发DubboProtocol的refer调用生成DubboInvoker,并将DubboInvoker 缓存到RegistryDirectory中,以便后续调用时使用。
同理,Dubbo会通过ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 构造一个Protocol的Adaptive类,然后根据适配类走整个调用链路。
1.ProtocolFilterWrapper的refer方法,组装一个Filter的链 Dubbo默认包含:ConsumerContextFilter、FutureFilter、MonitorFilter。
2.ProtocolListenerWrapper的refer方法,会组装一个Listener的列表(Dubbo默认不组装)
3.QosProtocolWrapper:默认啥都不做。
4.DubboProtocol的refer方法,构造一个DubboInvoker,DubboInvoker中包含了NettyClient。
3.构造Invoker,缓存到注册表
仍然是通过Dubbo 的SPI机制加载的Cluster的服务提供者,具体的调用链路如下:
通过这个流程会构造一个MockClusterInbvoker,具体的结构如下
3.生成代理Bean
1.将步骤2生成的Invoker 通过ProxyFactory.getProxy(invoker) 生成代理类。
相关代理类的结构:
源码的主要流程
1.会通过调用ReferenceBean的getObject() 执行Dubbo 服务导入的流程。
(1)通过refProtocol.refer的方法获取invoker(会通过Dubbo的SPI 生成调用链: Adaptive、 ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper、RegistryProtocol)
(2)最终会调用RegistryProtocol的refer方法: 1> 注册服务
2> 通过DubboProtocol的refer生成DubboInvoker(DubboInvoker中持有一个NettyClient用于网络请求),然后由各层Wapper类装饰(其中FilterWrapper会封装Filter链)。
3> new InvokerDelegate(invoker)
4> 在通过cluster.join(directory) 生成MockClusterInvoker 如上图。
通过proxyFactory.getBean(invoker)创建一个Bean。
源码解析
由于ReferenceBean实现了InitializingBean接口,所以会现在ReferenceBean的afterPropertiesSet()的方法
具体源码如下:
@Override @SuppressWarnings({"unchecked"}) public void afterPropertiesSet() throws Exception { // 判断是否有consumer 配置,有则设置(一般情况下不用) if (getConsumer() == null) { Map<String, ConsumerConfig> consumerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ConsumerConfig.class, false, false); if (consumerConfigMap != null && consumerConfigMap.size() > 0) { ConsumerConfig consumerConfig = null; for (ConsumerConfig config : consumerConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (consumerConfig != null) { throw new IllegalStateException("Duplicate consumer configs: " + consumerConfig + " and " + config); } consumerConfig = config; } } if (consumerConfig != null) { setConsumer(consumerConfig); } } } // 判断是否有 Application配置, 有则设置application 属性 (xml配置的application配置,默认有值,因为主要配置application) if (getApplication() == null && (getConsumer() == null || getConsumer().getApplication() == null)) { Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false); if (applicationConfigMap != null && applicationConfigMap.size() > 0) { ApplicationConfig applicationConfig = null; for (ApplicationConfig config : applicationConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (applicationConfig != null) { throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config); } applicationConfig = config; } } if (applicationConfig != null) { setApplication(applicationConfig); } } } // 判断是否有Module配置,有则设置moudle属性(一般情况下 没有值) if (getModule() == null && (getConsumer() == null || getConsumer().getModule() == null)) { Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false); if (moduleConfigMap != null && moduleConfigMap.size() > 0) { ModuleConfig moduleConfig = null; for (ModuleConfig config : moduleConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (moduleConfig != null) { throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config); } moduleConfig = config; } } if (moduleConfig != null) { setModule(moduleConfig); } } } // 判断是否有Registry配置,有则设置Registry属性(一般情况下 有值) if ((getRegistries() == null || getRegistries().isEmpty()) && (getConsumer() == null || getConsumer().getRegistries() == null || getConsumer().getRegistries().isEmpty()) && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) { Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false); if (registryConfigMap != null && registryConfigMap.size() > 0) { List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>(); for (RegistryConfig config : registryConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { registryConfigs.add(config); } } if (registryConfigs != null && !registryConfigs.isEmpty()) { super.setRegistries(registryConfigs); } } } // 判断是否有Monitor配置,有则设置 if (getMonitor() == null && (getConsumer() == null || getConsumer().getMonitor() == null) && (getApplication() == null || getApplication().getMonitor() == null)) { Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false); if (monitorConfigMap != null && monitorConfigMap.size() > 0) { MonitorConfig monitorConfig = null; for (MonitorConfig config : monitorConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (monitorConfig != null) { throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config); } monitorConfig = config; } } if (monitorConfig != null) { setMonitor(monitorConfig); } } } // 判断xml配置中 是否配置了init属性为true, 如果没有设置,则为懒加载,等这个Consumer 真正需要使用到的时候再去加载。 Boolean b = isInit(); if (b == null && getConsumer() != null) { b = getConsumer().isInit(); } // 判断init属性 是不是不为空,且为true ? 不满足条件则懒加载, 满足条件则直接加载 if (b != null && b.booleanValue()) { // 直接预加载(**重点**) getObject(); } } 复制代码
核心逻辑: 1. 判断是否有Consumer配置,有则设值。 2. 判断是否有Application配置,有则设值。 3. 判断是否有Module配置,有则设值。 4. 判断是否有Registry配置,有则设置。 5. 判断是否有Monitor配置,有则设置。 6. 判断对应的配置是否为懒加载, 不是懒加载则直接加载。 复制代码
接下来 我们看一下getObject()的逻辑:
注:ReferenceBean为FactoryBean,不了解FactoryBean可以先自行百度一下。
@Override public Object getObject() throws Exception { // 调用get()方法进行加载 return get(); } /** * get()方法 */ public synchronized T get() { // 如果destroyed了 则直接抛错 if (destroyed) { throw new IllegalStateException("Already destroyed!"); } // 判断ref 是否为空(默认为空),为空则加载init()方法 if (ref == null) { init(); } return ref; } /** * init()方法 */ private void init() { // 如果已经初始化了,则返回 if (initialized) { return; } initialized = true; // 如果interfaceName 为空,则报错 if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("<dubbo:reference interface="" /> interface not allow null!"); } // get consumer's global configuration // 获取consumer的全局配置,有则加载,没有则初始化一个 checkDefault(); // 获取当前配置的全局配置,有则加载 appendProperties(this); // 设置泛化调用 if (getGeneric() == null && getConsumer() != null) { setGeneric(getConsumer().getGeneric()); } // 如果是泛化调用,则设值interfaceClass 为GenericService.class if (ProtocolUtils.isGeneric(getGeneric())) { interfaceClass = GenericService.class; } else { // 获取接口类型 try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 校验接口和方法(不是接口报错,没有方法报错) checkInterfaceAndMethods(interfaceClass, methods); } String resolve = System.getProperty(interfaceName); String resolveFile = null; if (resolve == null || resolve.length() == 0) { resolveFile = System.getProperty("dubbo.resolve.file"); if (resolveFile == null || resolveFile.length() == 0) { File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties"); if (userResolveFile.exists()) { resolveFile = userResolveFile.getAbsolutePath(); } } if (resolveFile != null && resolveFile.length() > 0) { Properties properties = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(new File(resolveFile)); properties.load(fis); } catch (IOException e) { throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e); } finally { try { if (null != fis) fis.close(); } catch (IOException e) { logger.warn(e.getMessage(), e); } } resolve = properties.getProperty(interfaceName); } } if (resolve != null && resolve.length() > 0) { url = resolve; if (logger.isWarnEnabled()) { if (resolveFile != null) { logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service."); } else { logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service."); } } } if (consumer != null) { if (application == null) { application = consumer.getApplication(); } if (module == null) { module = consumer.getModule(); } if (registries == null) { registries = consumer.getRegistries(); } if (monitor == null) { monitor = consumer.getMonitor(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } // 检验ApplicationConfig不为空,并append系统参数 checkApplication(); // stub、mock的合理性校验 checkStub(interfaceClass); checkMock(interfaceClass); // 构造URL的param Map<String, String> map = new HashMap<String, String>(); Map<Object, Object> attributes = new HashMap<Object, Object>(); map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (!isGeneric()) { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("NO method found in service interface " + interfaceClass.getName()); map.put("methods", Constants.ANY_VALUE); } else { map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } map.put(Constants.INTERFACE_KEY, interfaceName); appendParameters(map, application); appendParameters(map, module); appendParameters(map, consumer, Constants.DEFAULT_KEY); appendParameters(map, this); String prefix = StringUtils.getServiceKey(map); if (methods != null && !methods.isEmpty()) { for (MethodConfig method : methods) { appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } appendAttributes(attributes, method, prefix + "." + method.getName()); checkAndConvertImplicitConfig(method, map, attributes); } } String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY); if (hostToRegistry == null || hostToRegistry.length() == 0) { hostToRegistry = NetUtils.getLocalHost(); } else if (isInvalidLocalHost(hostToRegistry)) { throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry); } map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //attributes are stored by system context. // 将属性放到容器中 StaticContext.getSystemContext().putAll(attributes); // 通过createProxy(map) 构造InvokerInvocationHandler(***重要**) ref = createProxy(map); // 构造ConsumerModel并放入consumedServices中 ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods()); ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel); } 复制代码
最终map信息如下:
主要流程如下: 1.校验参数,并初始化配置信息 2.组装InvokerInvocationHandler参数map如上图. 复制代码
通过createProxy构造InvokerHandler
private T createProxy(Map<String, String> map) { // temp://localhost?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248®ister.ip=10.166.66.100&side=consumer×tamp=1669037035244 URL tmpUrl = new URL("temp", "localhost", 0, map); final boolean isJvmRefer; // 判断isJvmRefer是否为true if (isInjvm() == null) { if (url != null && url.length() > 0) { // if a url is specified, don't do local reference isJvmRefer = false; } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) { // by default, reference local service if there is isJvmRefer = true; } else { isJvmRefer = false; } } else { isJvmRefer = isInjvm().booleanValue(); } // 如果isJvmRefer 为true if (isJvmRefer) { // injvm://127.0.0.1/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248®ister.ip=10.166.66.100&side=consumer×tamp=1669037035244 URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map); // 1.本地导入invoker(**重要**) invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } else { // 默认情况下 url 为空 if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address. String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url); if (us != null && us.length > 0) { for (String u : us) { URL url = URL.valueOf(u); if (url.getPath() == null || url.getPath().length() == 0) { url = url.setPath(interfaceName); } if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } else { urls.add(ClusterUtils.mergeUrl(url, map)); } } } } else { // assemble URL from register center's configuration // 获取注册中心的URL列表 List<URL> us = loadRegistries(false); if (us != null && !us.isEmpty()) { // 遍历注册中心的URL列表,并加载监控(没配置的话monitor 为空) for (URL u : us) { URL monitorUrl = loadMonitor(u); if (monitorUrl != null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } // 缓存注册中心URL到urls中 // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=12248&refer=application=demo-consumer&check=false&dubbo=2.0.2&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=12248®ister.ip=10.166.66.100&side=consumer×tamp=1669037035244®istry=zookeeper×tamp=1669037368525 urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); } } if (urls.isEmpty()) { throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address="..." /> to your spring config."); } } if (urls.size() == 1) { // 2.远程导入invoker(**重要**) invoker = refprotocol.refer(interfaceClass, urls.get(0)); } else { List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // use last registry url } } if (registryURL != null) { // registry url is available // use AvailableCluster only when register's cluster is available URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // not a registry url invoker = cluster.join(new StaticDirectory(invokers)); } } } Boolean c = check; if (c == null && consumer != null) { c = consumer.isCheck(); } if (c == null) { c = true; // default true } if (c && !invoker.isAvailable()) { // make it possible for consumer to retry later if provider is temporarily unavailable initialized = false; throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion()); } if (logger.isInfoEnabled()) { logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl()); } // create service proxy // 3.通过proxyFactory.getProxy(invoker) 生成代理对象 (**重要**) return (T) proxyFactory.getProxy(invoker); } 复制代码
上述标注了几个重要的方法
本地暴露的生成invoker方法: refprotocol.refer(interfaceClass, urls.get(0))
远程暴露的生成invoker方法: refprotocol.refer(interfaceClass, urls.get(0))
生成代理对象的方法: (T) proxyFactory.getProxy(invoker)
重要方法解析
1.本地暴露生成invoker方法的逻辑如下
具体的逻辑和《Dubbo之服务暴露流程浅析》该文中的本地暴露流程类似,这边就不详细介绍了。我们重点看远程暴露生成invoker的方法。
2.远程暴露生成invoker的方法解析
1.在执行的时候对应的URL的协议为registry,所以最终走的是RegistryProtocol的refer方法,入参的URL如下:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=25887&refer=application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887®ister.ip=10.166.66.100&side=consumer×tamp=1669080319427®istry=zookeeper×tamp=1669080594410 复制代码
具体的调用链如下:由于其他的逻辑和《Dubbo之服务暴露流程浅析》该文中的远程暴露流程类似,我们主要看一下RegistryProtocol的refer方法
RegistryProtocol的refer方法:
@Override @SuppressWarnings("unchecked") public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 设置协议,我们配置的为ZK,URL 如下: // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&group=aaa&pid=25887&refer=application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887®ister.ip=10.166.66.100&side=consumer×tamp=1669080319427×tamp=1669080594410 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); // 根据URL获取注册中心ZK Registry registry = registryFactory.getRegistry(url); // 如果入参type的类型为RegistryService.class, 则直接通过proxyFactory获取Invoker(一般情况下不走该逻辑) if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" // 获取 URL 的参数group,如果配置中有配置group相关的信息,则走第一个doRefer,不然则走第二个doRefer // 两个 doRefer的 区别为Cluster不一样 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); String group = qs.get(Constants.GROUP_KEY); if (group != null && group.length() > 0) { if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { // getMergeableCluster() 是通过Dubbo的SPI 获取name为'mergeable'的Cluster提供者. return doRefer(getMergeableCluster(), registry, type, url); } } // 最后调用doRefer获取Invoker return doRefer(cluster, registry, type, url); } 复制代码
RegistryProtocol的doRefer方法:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { // 构造注册目录,并设置注册中心及协议 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY // 获取所有的属性 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 构造真正的服务消费者的URL // consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer×tamp=1669080319427 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); // 如果URL中的interface不是* if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { // 补充url的属性信息(category) // consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&category=consumers&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer×tamp=1669080319427 URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url); // 将url 注册到注册中心, 并将url设置缓存到注册目录中 registry.register(registeredConsumerUrl); directory.setRegisteredConsumerUrl(registeredConsumerUrl); } // 注册目录订阅URL,内部逻辑会调用Protocol的refer,然后将invoker封装到RegistryDirectory的methodInvokerMap中(**重要**) // 具体的RegistryDirectory的methodInvokerMap的属性可以看下图。 // consumer://10.166.66.100/org.apache.dubbo.samples.api.client.HelloService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=true&interface=org.apache.dubbo.samples.api.client.HelloService&methods=sayHello&pid=25887&side=consumer×tamp=1669080319427 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 通过Dubbo的SPI构造一个Invoker对象(**重要**) Invoker invoker = cluster.join(directory); // 缓存到consumerInvokers中 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; } 复制代码
2.1 RegistryDirectory#subscribe的流程图如下:
最后会在RegistryDirectory的toInvokers方法中调用protocol#refer方法,通过protocol最终生成ProtocolFilterWrapper,然后由InvokerDelegate封装一下ProtocolFilterWrapper对象,放入到urlInvokerMap和methodInvokerMap中。所以在RegistryDirectory中的urlInvokerMap和methodInvokerMap已经缓存了通过Protocol链的refer方法一层层生成的Invoker。 (ProtocolFilterWrapper -> ListenerInvokerWrapper -> DubboInvoker)
Protocol的refer的调用链和《Dubbo之服务暴露流程浅析》的暴露流程类似,具体流程代码不做详细介绍。我们主要看DubboProtocol的refer方法
DubboProtocol的refer方法流程如下:
@Override public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // 如果url中有配置optimizer,则将类放入到优化器中 optimizeSerialization(url); // create rpc invoker. // 创建一个DubboInvoker(**重要**)会创建一个Client用于网络请求 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); // 将invoker 缓存到invokers中 invokers.add(invoker); return invoker; } /** * 构造一个Client */ private ExchangeClient[] getClients(URL url) { // whether to share connection boolean service_share_connect = false; // url中的connections的值是否为0,没有connections值 默认为0 int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); // if not configured, connection is shared, otherwise, one connection for one service if (connections == 0) { service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { // 默认为true if (service_share_connect) { // Client 为共享模式 clients[i] = getSharedClient(url); } else { // 新创建一个Client clients[i] = initClient(url); } } return clients; } /** * Get shared connection */ private ExchangeClient getSharedClient(URL url) { // 访问地址(ip:port) String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); // 第一次调用 client 为空 if (client != null) { if (!client.isClosed()) { client.incrementAndGetCount(); return client; } else { referenceClientMap.remove(key); } } // 加锁 locks.putIfAbsent(key, new Object()); // 同步锁 synchronized (locks.get(key)) { // duble check,有值则返回 if (referenceClientMap.containsKey(key)) { return referenceClientMap.get(key); } // 构造一个ExchangeClient ExchangeClient exchangeClient = initClient(url); // 构造一个ReferenceCountExchangeClient client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap); // 设置到缓存中 referenceClientMap.put(key, client); ghostClientMap.remove(key); //解锁 locks.remove(key); return client; } } /** * Create new connection */ private ExchangeClient initClient(URL url) { // client type setting. // 获取url中的client属性(默认为netty) String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); // 添加codec 属性为dubbo url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default // 添加心跳属性 heartbeat 为 60s url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue. // 校验name为str(client属性,默认为netty) 的Transporter提供者是否存在,不存在则抛错 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client; try { // connection should be lazy // 判断URL的lazy属性是否为true, 默认为fasle if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) { // 构建一个Lazy的Client,等要网路请求时,会调用私有的initClient方法构造一个HeaderExchangeClient,和下面的Exchangers.connect(url, requestHandler) 逻辑类似 client = new LazyConnectExchangeClient(url, requestHandler); } else { // 调用ExchangeClient的connect方法创建client(**重点接口**) client = Exchangers.connect(url, requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; } 复制代码
上述代码主要逻辑为: 1.构造一个DubboInvoker 2.DubboInvoker会持有一个ExchangeClient 3.ExchangeClient为根据是 是否为共享模式,去初始化 3.1. 如果是共享模式,则根据url的address(ip:port) 查询本地缓存中是否有对应的client,没有则会根据是否要懒加载创建一个征程的或者懒加载的Client 3.2. 如果不是共享模式,则直接创建一个Client 复制代码
2.2 通过Exchangers.connect生成Client的流程
1.获取Exchanger真正的服务提供者
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 获取Exchanger服务提供者(默认为HeaderExchanger对象),然后调用connect方法 return getExchanger(url).connect(url, handler); } public static Exchanger getExchanger(URL url) { // 获取URL的exchanger属性,默认为header,获取HeaderExchanger对象 String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { // 通过Dubbo的SPI获取服务提供者,为HeaderExchanger return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); } 复制代码
2.通过HeaderExchanger.connect生成client
@Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { // 1.通过Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))) 创建client // 2.将Client封装为HeaderExchangeClient return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } 复制代码
3.通过Transporters.connect生成Client
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // getTransporter() 最终获取NettyTransporter, // 通过NettyTransporter.connect方法获取NettyClient. return getTransporter().connect(url, handler); } public static Transporter getTransporter() { // 通过Dubbo SPI获取Transporter的适配类对象(Transporter$Adaptive -> NettyTransporter) return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); } 复制代码
NettyTransporter代码
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; @Override public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } @Override public Client connect(URL url, ChannelHandler listener) throws RemotingException { // 创建NettyClient return new NettyClient(url, listener); } } 复制代码
关于NettyClient后续的代码就不跟了,有兴趣的小伙伴可以自行去了解。
2.3 cluster.join(directory)的流程如下:
最终生成MockClusterInvoker,具体结构如下:
3.构造Invoker,缓存到注册表
主要流程如下:
StubProxyFactoryWrapper的getProxy方法
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public <T> T getProxy(Invoker<T> invoker) throws RpcException { // 继续通过Dubbo SPI的方式获取proxy T proxy = proxyFactory.getProxy(invoker); // 如果不是泛化类 if (GenericService.class != invoker.getInterface()) { // 如果有stub配置(默认没有),不走下面的逻辑直接返回proxy String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY)); if (ConfigUtils.isNotEmpty(stub)) { Class<?> serviceType = invoker.getInterface(); if (ConfigUtils.isDefault(stub)) { if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) { stub = serviceType.getName() + "Stub"; } else { stub = serviceType.getName() + "Local"; } } try { Class<?> stubClass = ReflectUtils.forName(stub); if (!serviceType.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName()); } try { Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType); proxy = (T) constructor.newInstance(new Object[]{proxy}); //export stub service URL url = invoker.getUrl(); if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) { url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ",")); url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString()); try { export(proxy, (Class) invoker.getInterface(), url); } catch (Exception e) { LOGGER.error("export a stub service error.", e); } } } catch (NoSuchMethodException e) { throw new IllegalStateException("No such constructor "public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")" in stub implementation class " + stubClass.getName(), e); } } catch (Throwable t) { LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t); // ignore } } } return proxy; } 复制代码
AbstractProxyFactory的getProxy方法
@Override public <T> T getProxy(Invoker<T> invoker) throws RpcException { // 调用重载的方法 return getProxy(invoker, false); } @Override public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException { Class<?>[] interfaces = null; String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i++) { interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class}; } if (!invoker.getInterface().equals(GenericService.class) && generic) { int len = interfaces.length; Class<?>[] temp = interfaces; interfaces = new Class<?>[len + 1]; System.arraycopy(temp, 0, interfaces, 0, len); interfaces[len] = GenericService.class; } // 调用子类的getProxy方法 return getProxy(invoker, interfaces); } 复制代码
JavassistProxyFactory的getProxy方法
@Override @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { // 构造一个InvokerInvocationHandler对象,其中的入参为MockClusterInvoker对象 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } 复制代码
自此Dubbo 的服务导入流程就结束啦。
作者:爱学习的某人
链接:https://juejin.cn/post/7168753442872098829