博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
搞懂Dubbo服务发布与服务注册
阅读量:4587 次
发布时间:2019-06-09

本文共 19183 字,大约阅读时间需要 63 分钟。

一.前言

  本文讲服务发布与服务注册,服务提供者本地发布服务,然后向注册中心注册服务,将服务实现类以服务接口的形式提供出去,以便服务消费者从注册中心查阅并调用服务。

  本文源码分析基于org.apache.dubbo:dubbo:2.7.2,服务端代码例子是上文的例子

  如果没有Dubbo SPI的基础知识,建议先看Dubbo SPI,否则源码怎么跳转的将毫无头绪

  Dubbo SPI:

 

二.服务发布

调用顺序

  首先讲一下大致的服务发布的调用顺序图,蓝色方法不分析,主要是起一个netty服务

-org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent

  -org.apache.dubbo.config.ServiceConfig#export
    -org.apache.dubbo.config.ServiceConfig#doExport
      -org.apache.dubbo.config.ServiceConfig#doExportUrls
        -org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
          -org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
            -org.apache.dubbo.registry.integration.RegistryProtocol#export
              -org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
                -org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
                  -org.apache.dubbo.remoting.exchange.Exchangers#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler)
                    -org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
                      -org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
                        -org.apache.dubbo.remoting.transport.netty.NettyTransporter#bind

                          ...

源码分析

1.org.apache.dubbo.config.spring.ServiceBean#onApplicationEvent,这个方法就不多说了,注释写的很详细了

1 /** 2  * ServiceBean实现了ApplicationListener接口 3  * 在IOC的容器的启动过程,当所有的bean都已经处理完成之后,spring ioc容器会发布ContextRefreshedEvent事件。 4  * 此处就是接收到事件处理的逻辑,开始服务发布之旅 5  * 6  * @param event 7  */ 8 @Override 9 public void onApplicationEvent(ContextRefreshedEvent event) {10     // 是否已发布  &&  是否已经被取消发布11     if (!isExported() && !isUnexported()) {12         if (logger.isInfoEnabled()) {13             logger.info("The service ready on spring started. service: " + getInterface());14         }15         // 发布16         export();17     }18 }

2.org.apache.dubbo.config.ServiceConfig#export,这里的checkAndUpdateSubConfigs主要做的就是检测标签合法,检测各种对象是否为空,为空则创建。之后判断是否可以发布和是否需要延迟发布,需要则延迟再doExport

1 public synchronized void export() { 2     // 检测
的interface是否合法 3 // 检查provider为空 4 // 检查各种对象是否为空,为空则创建 5 checkAndUpdateSubConfigs(); 6 if (!shouldExport()) { 7 return; 8 } 9 // 是否延迟?10 if (shouldDelay()) {11 // 延迟12 DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);13 } else {14 doExport();15 }16 }

3.org.apache.dubbo.config.ServiceConfig#doExport,注释写的很清楚,还是没有走到核心逻辑

1 protected synchronized void doExport() { 2     // 是否已经被取消发布 3     if (unexported) { 4         throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); 5     } 6     // 是否已经发布,注意这个变量volatile修饰 7     if (exported) { 8         return; 9     }10     exported = true;11     if (StringUtils.isEmpty(path)) {12         path = interfaceName;13     }14     doExportUrls();15 }

4.org.apache.dubbo.config.ServiceConfig#doExportUrls,先加载要注册的url,然后遍历所有协议,发布服务并注册

1 @SuppressWarnings({"unchecked", "rawtypes"}) 2 private void doExportUrls() { 3     // 加载要注册的url 4     // registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&pid=10516&registry=zookeeper&timestamp=1559889491339 5     List
registryURLs = loadRegistries(true); 6 // for循环每个协议,发布服务并注册到注册中心 7 for (ProtocolConfig protocolConfig : protocols) { 8 String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version); 9 ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);10 ApplicationModel.initProviderModel(pathKey, providerModel);11 doExportUrlsFor1Protocol(protocolConfig, registryURLs);12 }13 }

5.org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol,看方法名字也知道,单一协议多导出服务。这个方法做的事情比较多,我这边拆成4个部分,第一部分、第二部分和第三部分都是在做填充map的事情,第三部分最后生成导出url。第四部分开始发布服务,里面会判断到底是发布服务并且注册到注册中心呢还是仅发布服务。

1 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List
registryURLs) { 2 /** 第一部分开始 **/ 3 String name = protocolConfig.getName(); 4 // 为空或者为空字符串,默认dubbo协议 5 if (StringUtils.isEmpty(name)) { 6 name = DUBBO; 7 } 8 9 Map
map = new HashMap
(); 10 map.put(SIDE_KEY, PROVIDER_SIDE); 11 12 appendRuntimeParameters(map); 13 appendParameters(map, metrics); 14 appendParameters(map, application); 15 appendParameters(map, module); 16 // remove 'default.' prefix for configs from ProviderConfig 17 // appendParameters(map, provider, Constants.DEFAULT_KEY); 18 appendParameters(map, provider); 19 appendParameters(map, protocolConfig); 20 appendParameters(map, this); 21 // 上面的代码就是将版本,方法,各种配置放到map里去 22 // 这里给出debug的时候的map对象 23 // map.toString() = {side=provider, application=dubbo-server, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368} 24 /** 第一部分结束 **/ 25 26 /** 第二部分开始 **/ 27 // 这段if里做的事情主要是检测
标签中的配置信息,填充map 28 if (CollectionUtils.isNotEmpty(methods)) { 29 for (MethodConfig method : methods) { 30 // 添加MethodConfig到map中,key=方法名.属性 value=属性值 31 // ex sayHello.retries:2 32 appendParameters(map, method, method.getName()); 33 String retryKey = method.getName() + ".retry"; 34 if (map.containsKey(retryKey)) { 35 String retryValue = map.remove(retryKey); 36 if ("false".equals(retryValue)) { 37 map.put(method.getName() + ".retries", "0"); 38 } 39 } 40 41 // 获取ArgumentConfig列表 42 List
arguments = method.getArguments(); 43 if (CollectionUtils.isNotEmpty(arguments)) { 44 for (ArgumentConfig argument : arguments) { 45 // convert argument type 46 if (argument.getType() != null && argument.getType().length() > 0) { 47 Method[] methods = interfaceClass.getMethods(); 48 // visit all methods 49 if (methods != null && methods.length > 0) { 50 for (int i = 0; i < methods.length; i++) { 51 String methodName = methods[i].getName(); 52 // target the method, and get its signature 53 if (methodName.equals(method.getName())) { 54 Class
[] argtypes = methods[i].getParameterTypes(); 55 // one callback in the method 56 if (argument.getIndex() != -1) { 57 if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { 58 // 添加ArgumentConfig信息到map中 59 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 60 } else { 61 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 62 } 63 } else { 64 // multiple callbacks in the method 65 for (int j = 0; j < argtypes.length; j++) { 66 Class
argclazz = argtypes[j]; 67 if (argclazz.getName().equals(argument.getType())) { 68 appendParameters(map, argument, method.getName() + "." + j); 69 if (argument.getIndex() != -1 && argument.getIndex() != j) { 70 throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType()); 71 } 72 } 73 } 74 } 75 } 76 } 77 } 78 } else if (argument.getIndex() != -1) { 79 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); 80 } else { 81 throw new IllegalArgumentException("Argument config must set index or type attribute.eg:
or
"); 82 } 83 84 } 85 } 86 } // end of methods for 87 } 88 /** 第二部分结束 **/ 89 90 /** 第三部分开始 **/ 91 if (ProtocolUtils.isGeneric(generic)) { 92 map.put(GENERIC_KEY, generic); 93 map.put(METHODS_KEY, ANY_VALUE); 94 } else { 95 String revision = Version.getVersion(interfaceClass, version); 96 if (revision != null && revision.length() > 0) { 97 map.put(REVISION_KEY, revision); 98 } 99 100 // 生成包装类101 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();102 // 添加方法到map中103 if (methods.length == 0) {104 logger.warn("No method found in service interface " + interfaceClass.getName());105 map.put(METHODS_KEY, ANY_VALUE);106 } else {107 map.put(METHODS_KEY, StringUtils.join(new HashSet
(Arrays.asList(methods)), ","));108 }109 }110 111 // 添加token112 if (!ConfigUtils.isEmpty(token)) {113 if (ConfigUtils.isDefault(token)) {114 map.put(TOKEN_KEY, UUID.randomUUID().toString());115 } else {116 map.put(TOKEN_KEY, token);117 }118 }119 120 // export service121 // 此处map.toString() = {side=provider, application=dubbo-server, methods=hello, dubbo=2.5.3, pid=10554, interface=com.grimmjx.edu.HelloService, timeout=100, anyhost=true, timestamp=1559890675368}122 String host = this.findConfigedHosts(protocolConfig, registryURLs, map);123 Integer port = this.findConfigedPorts(protocolConfig, name, map);124 // 此处url=dubbo://192.168.5.16:20880/com.grimmjx.edu.HelloService?anyhost=true&application=dubbo-server&dubbo=2.5.3&interface=com.grimmjx.edu.HelloService&methods=hello&pid=11917&side=provider&timeout=100&timestamp=1559973693109125 URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);126 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {127 url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)128 .getExtension(url.getProtocol()).getConfigurator(url).configure(url);129 }130 /** 第三部分结束 **/131 132 /** 第四部分开始 **/133 // 开始发布服务134 String scope = url.getParameter(SCOPE_KEY);135 // don't export when none is configured136 // //配置为none不暴露137 if (!SCOPE_NONE.equalsIgnoreCase(scope)) {138 139 // export to local if the config is not remote (export to remote only when config is remote)140 // 配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)141 if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {142 exportLocal(url);143 }144 // export to remote if the config is not local (export to local only when config is local)145 // 如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)146 if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {147 if (!isOnlyInJvm() && logger.isInfoEnabled()) {148 logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);149 }150 if (CollectionUtils.isNotEmpty(registryURLs)) {151 // 发布服务152 for (URL registryURL : registryURLs) {153 //if protocol is only injvm ,not register154 if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {155 continue;156 }157 url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));158 URL monitorUrl = loadMonitor(registryURL);159 if (monitorUrl != null) {160 url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());161 }162 if (logger.isInfoEnabled()) {163 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);164 }165 166 // For providers, this is used to enable custom proxy to generate invoker167 String proxy = url.getParameter(PROXY_KEY);168 if (StringUtils.isNotEmpty(proxy)) {169 registryURL = registryURL.addParameter(PROXY_KEY, proxy);170 }171 172 // 生成Invoker173 // Invoker是十分重要的对象,可向它发起invoke调用174 Invoker
invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));175 // 持有this和invoker176 // 此处的invoker.getUrl()=registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-server&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D10738%26side%3Dprovider%26timeout%3D100%26timestamp%3D1559895851366&pid=10738&registry=zookeeper&timestamp=1559895851283177 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);178 179 // 发布,并生成Exporter180 Exporter
exporter = protocol.export(wrapperInvoker);181 exporters.add(exporter);182 }183 184 // 没有注册中心,仅发布服务185 } else {186 Invoker
invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);187 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);188 189 Exporter
exporter = protocol.export(wrapperInvoker);190 exporters.add(exporter);191 }192 /**193 * @since 2.7.0194 * ServiceData Store195 */196 MetadataReportService metadataReportService = null;197 if ((metadataReportService = getMetadataReportService()) != null) {198 metadataReportService.publishProvider(url);199 }200 }201 }202 this.urls.add(url);203 /** 第四部分结束 **/204 }

6.org.apache.dubbo.registry.integration.RegistryProtocol#export,为什么是RegistryProtocol?注释里写的很清楚了,这里的url是registry://开头的。

1 public 
Exporter
export(final Invoker
originInvoker) throws RpcException { 2 // 获取注册地址 3 URL registryUrl = getRegistryUrl(originInvoker); 4 // url to export locally 5 // 获取provider url 6 URL providerUrl = getProviderUrl(originInvoker); 7 // Subscribe the override data 8 // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call 9 // the same service. Because the subscribed is cached key with the name of the service, it causes the10 // subscription information to cover.11 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);12 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);13 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);14 // 要注册的url15 providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);16 //export invoker17 // 导出服务18 final ExporterChangeableWrapper
exporter = doLocalExport(originInvoker, providerUrl);19 // url to registry     ....38 }

  之后上面调用顺序图的蓝色部分的代码不做分析,主要是创建一个NettyServer(默认)。可自行研究

 

三.服务注册

调用顺序

  首先讲一下大致的服务注册的调用顺序图,我们只分析红色部分。

-org.apache.dubbo.registry.integration.RegistryProtocol#register

  -org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry

    -org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry

      -org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry

  -org.apache.dubbo.registry.support.FailbackRegistry#register

    -org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister

源码分析

1.org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry,具体还要看下一步

1 @Override 2 public Registry getRegistry(URL url) { 3     url = URLBuilder.from(url) 4             .setPath(RegistryService.class.getName()) 5             .addParameter(INTERFACE_KEY, RegistryService.class.getName()) 6             .removeParameters(EXPORT_KEY, REFER_KEY) 7             .build(); 8     String key = url.toServiceStringWithoutResolving(); 9     // Lock the registry access process to ensure a single instance of the registry10     LOCK.lock();11     try {12         // 缓存中获取13         Registry registry = REGISTRIES.get(key);14         if (registry != null) {15             return registry;16         }17         18         //create registry by spi/ioc19         // 用Dubbo SPI创建Registry20         registry = createRegistry(url);21         if (registry == null) {22             throw new IllegalStateException("Can not create registry " + url);23         }24         // 写入缓存25         REGISTRIES.put(key, registry);26         return registry;27     } finally {28         // Release the lock29         LOCK.unlock();30     }31 }

2.org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory#createRegistry,创建一个ZooKeeperRegistry实例并返回

1 @Override2 public Registry createRegistry(URL url) {3     return new ZookeeperRegistry(url, zookeeperTransporter);4 }

3.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#ZookeeperRegistry,主要做的就是利用zk创建Zk客户端

1 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 2     super(url); 3     if (url.isAnyHost()) { 4         throw new IllegalStateException("registry address == null"); 5     } 6     String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); 7     if (!group.startsWith(PATH_SEPARATOR)) { 8         group = PATH_SEPARATOR + group; 9     }10     this.root = group;11     zkClient = zookeeperTransporter.connect(url);12     zkClient.addStateListener(state -> {13         if (state == StateListener.RECONNECTED) {14             try {15                 recover();16             } catch (Exception e) {17                 logger.error(e.getMessage(), e);18             }19         }20     });21 }

4.org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister,连接好zk后,是不是就要创建服务提供者的节点了?所以这一步就是注册服务

1 @Override2 public void doRegister(URL url) {3     try {4         // toUrlPath(url) = /dubbo/com.grimmjx.edu.HelloService/providers/dubbo%3A%2F%2F192.168.5.16%3A20880%2Fcom.grimmjx.edu.HelloService%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.grimmjx.edu.HelloService%26methods%3Dhello%26pid%3D11917%26side%3Dprovider%26timeout%3D100%26timestamp%3D15599736931095         zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));6     } catch (Throwable e) {7         throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);8     }9 }

这一步之后,我们用zkCli来连接zk看一下节点数据

 

转载于:https://www.cnblogs.com/GrimMjx/p/10990970.html

你可能感兴趣的文章
.net core 根据数据库生成实体类
查看>>
STM32F10x_模拟I2C读写_硬件I2C读写
查看>>
MDK下调试时提示AXF文件无法导入的解决方法(转)
查看>>
只要单片机具有真正唯一ID,就可以让加密坚不可摧(转)
查看>>
CSS box-sizing 属性
查看>>
Android(java)学习笔记143:Android中View动画之 XML实现 和 代码实现
查看>>
Java(Android)编程思想笔记01:多态性的理解
查看>>
Django REST framework —— 认证组件源码分析
查看>>
asp中通过Connection链接数据库
查看>>
1109
查看>>
(20)模型层 -ORM之msql 基于双下划线的跨表查询(一对一,一对多,多对多)...
查看>>
nginx日志格式定义和nginx.conf配置模板说明
查看>>
深入DLR语言——IronJS
查看>>
Apache Solr 3.6.2 发布
查看>>
ES5新增
查看>>
Js获取当前浏览器的高和宽度
查看>>
MAC常用的快捷键
查看>>
注册码
查看>>
记录一下中间过程2
查看>>
Retrofit
查看>>