阅读 63

Ribbon源码2-Ribbon工作流程

0. 环境

  • nacos版本:1.4.1

  • Spring Cloud : Hoxton.SR9

  • Spring Boot :2.4.4

  • Spring Cloud alibaba: 2.2.5.RELEASE

  • Spring Cloud openFeign 2.2.2.RELEASE

测试代码:github.com/hsfxuebao/s…

1. Ribbon的工作流程

我们知道,微服务在启动成功之后,默认30s/次会从注册中心拉取服务注册表到本地缓存起来,而我们使用Ribbon时是通过RestTemplate发起请求,URL以:http://user-server/user/... 服务名方式去调用服务,其实Ribbon干的事情就是根据URL中的服务名去本地的服务注册表中查找服务名对应的服务实例(一个或多个),然后通过负载均衡算法选择其中一个服务后,发起Http请求,那接下来我们就来看一下Ribbon的底层到底是怎么工作的

2. Ribbon配合RestTemplate使用

首选需要定义RestTemplate

@SpringBootApplication @EnableEurekaClient public class OrderServerApplication1030 {     //配置一个RestTemplate ,http客户端,支持Rest风格     //@LoadBalanced :负载均衡注册,让RestTmplate可以实现负载均衡请求     //这个标签标记RestTemplate可以使用LoadBalancerClient进行负载均衡     @Bean     @LoadBalanced     public RestTemplate restTemplate(){         return new RestTemplate();     }     //省略... } 复制代码

调用服务的是否使用服务名调用:

@RestController public class OrderController {     //需要配置成Bean     @Autowired     private RestTemplate  restTemplate ;     //浏览器调用该方法     @RequestMapping(value = "/order/{id}",method = RequestMethod.GET)     public User getById(@PathVariable("id")Long id){         //发送http请求调用 user的服务,获取user对象 : RestTemplate         //user的ip,user的端口,user的Controller路径         //String url = "http://localhost:1020/user/"+id;         String url = "http://user-server/user/"+id;         //发送http请求         return restTemplate.getForObject(url, User.class);     } } 复制代码

@LoadBalanced标记的RestTemplate 可以使用LoadBalancerClient负载均衡客户端实现负载均衡,我们在使用RestTemplate 发起请求的时候需要跟上服务名的方式http://user-server/user/

3. LoadBalancerClient负载均衡客户端

我们从 @LoadBalanced入手,先看一下LoadBalancerClient它的源码。

/** 注解标记RestTemplate 可以使用LoadBalancerClient 负载均衡客户端  * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient  * @author Spencer Gibb  */ @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Qualifier public @interface LoadBalanced { } 复制代码

这个注解@LoadBalanced 的作用在注释上说的非常清楚,就是标记RestTemplate可以使用使用LoadBalancerClient来实现负载均衡,LoadBalancerClient就是Ribbon实现负载均衡的一个客户端,它在spring-cloud-commons包下,我们可以直接看LoadBalancerClient的源码:

/** 客户端负载均衡  * Represents a client side load balancer  * @author Spencer Gibb  */ public interface LoadBalancerClient extends ServiceInstanceChooser { //执行请求,会根据serviceId使用负载均衡查找服务 /** 使用负载均衡器执行指定服务  * execute request using a ServiceInstance from the LoadBalancer for the specified service  * 服务Id - 服务ID来查找负载平衡器  * @param serviceId the service id to look up the LoadBalancer  *   * @param request allows implementations to execute pre and post actions such as  * incrementing metrics  * 返回选择的服务  * @return the result of the LoadBalancerRequest callback on the selected  * ServiceInstance  */ <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; //执行请求,这个方法多了一个参数ServiceInstance ,即请求指定的服务 /**  * execute request using a ServiceInstance from the LoadBalancer for the specified  * service  * @param serviceId the service id to look up the LoadBalancer  * @param serviceInstance the service to execute the request to  * @param request allows implementations to execute pre and post actions such as  * incrementing metrics  * @return the result of the LoadBalancerRequest callback on the selected  * ServiceInstance  */ <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; //重构URL,把http://myservice/path/to/service重构成http://ip:端口/path/to/service /**  * Create a proper URI with a real host and port for systems to utilize.  * Some systems use a URI with the logical serivce name as the host,  * such as http://myservice/path/to/service.  This will replace the  * service name with the host:port from the ServiceInstance.  * @param instance  * @param original a URI with the host as a logical service name  * @return a reconstructed URI  */ URI reconstructURI(ServiceInstance instance, URI original); } 复制代码

LoadBalancerClient接口三个方法,excute()为执行请求,reconstructURI()用来重构url,它实现了ServiceInstanceChooser 接口,这个接口的作用是用来选择服务的,看下源码

/** 通过使用负载平衡器,选择一个服务器发送请求。  * Implemented by classes which use a load balancer to choose a server to  * send a request to.  *  * @author Ryan Baxter  */ public interface ServiceInstanceChooser {     /**       从LoadBalancer中为指定服务选择一个ServiceInstance      * Choose a ServiceInstance from the LoadBalancer for the specified service      *       * //根据服务id去LoadBalancer查找服务      * @param serviceId the service id to look up the LoadBalancer      *       * 返回查找到的服务实例ServiceInstance       * @return a ServiceInstance that matches the serviceId      */     ServiceInstance choose(String serviceId); } 复制代码

提供了一个choose方法,根据服务ID serviceId 查找一个ServiceInstance 服务实例,这里的serviceId其实就是http://user-server/… url中带的服务名。

LoadBalancerClient 还有一个默认实现类RibbonLoadBalancerClient,这个实现是针对Ribbon的客户端负载均衡,继承关系如下:

image.png

RibbonLoadBalancerClient是一个非常核心的类,最终的负载均衡的请求处理由它来执行,源码如下:

public class RibbonLoadBalancerClient implements LoadBalancerClient { private SpringClientFactory clientFactory; public RibbonLoadBalancerClient(SpringClientFactory clientFactory) { this.clientFactory = clientFactory; } //重构URL,找到服务之后,把http://服务名/  格式 重构成 http://ip:port/ 格式 @Override public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); URI uri; Server server; if (instance instanceof RibbonServer) { RibbonServer ribbonServer = (RibbonServer) instance; server = ribbonServer.getServer(); uri = updateToSecureConnectionIfNeeded(original, ribbonServer); } else { server = new Server(instance.getScheme(), instance.getHost(), instance.getPort()); IClientConfig clientConfig = clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server); } return context.reconstructURIWithServer(server, uri); } //根据服务名,查找服务实例,选择一个返回ServiceInstance  @Override public ServiceInstance choose(String serviceId) { //查找服务 Server server = getServer(serviceId); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } //执行请求 @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { //获取负载均衡器[重要] ILoadBalancer loadBalancer = getLoadBalancer(serviceId); //选择服务,使用负载均衡器,根据服务的ID,选择一个服务 Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } //选择的服务封装成一个RibbonServer:RibbonServer implements ServiceInstance RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); //执行请求调用服务 return execute(serviceId, ribbonServer, request); } //执行请求调用服务 @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if(serviceInstance instanceof RibbonServer) { server = ((RibbonServer)serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { //使用 LoadBalancerRequest 向服务发请求 T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; } private ServerIntrospector serverIntrospector(String serviceId) { ServerIntrospector serverIntrospector = this.clientFactory.getInstance(serviceId, ServerIntrospector.class); if (serverIntrospector == null) { serverIntrospector = new DefaultServerIntrospector(); } return serverIntrospector; } //是否是https请求 private boolean isSecure(Server server, String serviceId) { IClientConfig config = this.clientFactory.getClientConfig(serviceId); ServerIntrospector serverIntrospector = serverIntrospector(serviceId); return RibbonUtils.isSecure(config, serverIntrospector, server); } //根据服务ID选择服务 protected Server getServer(String serviceId) { return getServer(getLoadBalancer(serviceId)); } //负载均衡器选择服务 protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null) { return null; } return loadBalancer.chooseServer("default"); // TODO: better handling of key } //根据服务id得到负载均衡器 protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } ...省略... 复制代码

解释一下:

这里的ServiceInstance choose(String serviceId)方法的作用是根据ServideId选择一个服务,底层实现是通过LoadBalancer.chooseServer 负载均衡器LoadBalancer来完成的服务的选择的

选择到服务之后调用execute向选择到的服务发起请求,通过LoadBalancerRequest来完成其请求。

4. RestTemplate的执行流程

RestTmplate发请求时地址 "http://user-server/user/"+iduser-server是当前服务需要调用的目标服务的服务名,那么Ribbon到底是如何实现负载均衡调用的呢?我们可以从这里跟踪一下RestTemplate的执行流程:

public class RestTemplate extends InterceptingHttpAccessor implements RestOperations { ...省略...  @Nullable     protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {         Assert.notNull(url, "URI is required");         Assert.notNull(method, "HttpMethod is required");         ClientHttpResponse response = null;         Object var14;         try {          //创建请求对象,使用SimpleClientHttpRequestFactory创建ClientHttpRequest              ClientHttpRequest request = this.createRequest(url, method);             if (requestCallback != null) {              //设置header和body                 requestCallback.doWithRequest(request);             }             response = request.execute();             this.handleResponse(url, method, response);             var14 = responseExtractor != null ? responseExtractor.extractData(response) : null;         } catch (IOException var12) {             String resource = url.toString();             String query = url.getRawQuery();             resource = query != null ? resource.substring(0, resource.indexOf(63)) : resource;             throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + var12.getMessage(), var12);         } finally {             if (response != null) {                 response.close();             }         }         return var14;     } 复制代码

请求来到RestTemplate#doExecute方法,首选是通过使用SimpleClientHttpRequestFactory根据url和method创建ClientHttpRequest 请求对象,使用的实现是InterceptingClientHttpRequestFactory,然后使用response = request.execute();去执行请求,一路跟踪,请求来到InterceptingClientHttpRequest#executeInternal:

class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest { //headers请求头 , bufferedOutput输出内容     protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {      //创建拦截器执行器         InterceptingClientHttpRequest.InterceptingRequestExecution requestExecution = new InterceptingClientHttpRequest.InterceptingRequestExecution();         return requestExecution.execute(this, bufferedOutput);     } 复制代码

这里通过InterceptingClientHttpRequest.InterceptingRequestExecution() 拦截器执行器去执行请求,请求来到InterceptingClientHttpRequest.InterceptingRequestExecution#execute:

 private class InterceptingRequestExecution implements ClientHttpRequestExecution {         private final Iterator<ClientHttpRequestInterceptor> iterator;         public InterceptingRequestExecution() {             this.iterator = InterceptingClientHttpRequest.this.interceptors.iterator();         }         public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {             if (this.iterator.hasNext()) {              //[重要]这里取到的正是  LoadBalancerInterceptor                 ClientHttpRequestInterceptor nextInterceptor = (ClientHttpRequestInterceptor)this.iterator.next();                 return nextInterceptor.intercept(request, body, this);             } else {                 HttpMethod method = request.getMethod();                 Assert.state(method != null, "No standard HTTP method");                 //如果iterator中没有拦截器了,就创建一个ClientHttpRequest去执行请求                 ClientHttpRequest delegate = InterceptingClientHttpRequest.this.requestFactory.createRequest(request.getURI(), method);                 request.getHeaders().forEach((key, value) -> {                     delegate.getHeaders().addAll(key, value);                 });                 if (body.length > 0) {                     if (delegate instanceof StreamingHttpOutputMessage) {                         StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage)delegate;                         streamingOutputMessage.setBody((outputStream) -> {                             StreamUtils.copy(body, outputStream);                         });                     } else {                         StreamUtils.copy(body, delegate.getBody());                     }                 } //执行请求                 return delegate.execute();             }         }     } 复制代码

InterceptingRequestExecution 中维护了一个Iterator<ClientHttpRequestInterceptor> iterator;其中LoadBalancerInterceptor 就在该集合中,所以请求来到LoadBalancerInterceptor #intercept(request, body, this); 方法

//负载均衡拦截器 public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { //负载均衡客户端[重要] private LoadBalancerClient loadBalancer; //负载均衡请求创建工厂 private LoadBalancerRequestFactory requestFactory; //初始化 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) { this.loadBalancer = loadBalancer; this.requestFactory = requestFactory; } //初始化 public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { // for backwards compatibility this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer)); } //拦截器核心方法【重要】 //request请求对象 //body 内容 @Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { //请求的URL,格式如:http://user-server/user/1 ,user-server是服务名 final URI originalUri = request.getURI(); //URL中的服务名 String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); //通过requestFactory.createRequest(request, body, execution)创建LoadBalancerRequest //然后调用负载均衡器执行请求,参数:服务名,LoadBalancerRequest return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution)); } } 复制代码

这里蛮重要的,请求调用了LoadBalancerInterceptor #intercept负载均衡拦截器的拦截方法,获取到URL,从中获取到主机名即调用的服务名(Ribbon客户端服务名),然后使用LoadBalancerRequestFactory 创建了LoadBalancerRequest请求对象,调用loadBalancer#execute 负载均衡器执行请求

5. ILoadBalancer 选择服务(负载均衡)

请求来到RibbonLoadBalancerClient#execute:

 @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { //获取负载均衡器 ILoadBalancer loadBalancer = getLoadBalancer(serviceId); //loadBalancer选择服务 Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } //选择的服务封装成RibbonServer  RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); //LoadBalancerRequest对服务执行请求 return execute(serviceId, ribbonServer, request); } 复制代码

这里就蛮关键了:

  • 首选是通过服务名调用getLoadBalancer方法得到负载均衡器

  • 然后getServer(loadBalancer)是通过负载均衡器选择一个服务,底层会使用IRule的算法

  • 然后将服务封装成RibbonServer 对象,交给LoadBalancerRequest去执行请求

这里的负载均衡器默认会走ZoneAwareLoadBalancer,它是通过SpringClientFactory 从Ribbon上下文对象中获取到的负载均衡器对象,关于这个我们在上一章讨论过

public class RibbonLoadBalancerClient implements LoadBalancerClient { ...省略... private SpringClientFactory clientFactory; protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } 复制代码

而得到ILoadBalancer之后,调用getServer(loadBalancer)方法选择服务,我们跟踪一下

public class RibbonLoadBalancerClient implements LoadBalancerClient { ...省略... protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null) { return null; } //ZoneAwareLoadBalancer#chooseServer return loadBalancer.chooseServer("default"); // TODO: better handling of key } 复制代码

这里loadBalancer.chooseServer("default");请求来到ZoneAwareLoadBalancer#chooseServer,源码如下:

public class ZoneAwareLoadBalancer<T extends Server> extends DynamicServerListLoadBalancer<T> { ...省略... @Override     public Server chooseServer(Object key) {         if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {          //如果禁用了zone,或者自由一个zone会走这里             logger.debug("Zone aware logic disabled or there is only one zone");             return super.chooseServer(key);         } //下面就是根据zone选择服务了,默认情况下不会走下面         Server server = null;         try {             LoadBalancerStats lbStats = getLoadBalancerStats();             //得到zone快照             Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);             logger.debug("Zone snapshots: {}", zoneSnapshot);             if (triggeringLoad == null) {                 triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty(                         "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);             }             if (triggeringBlackoutPercentage == null) {                 triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty(                         "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);             }             //得到可用的zone             Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());             logger.debug("Available zones: {}", availableZones);             if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {              //随机选择区域                 String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);                 logger.debug("Zone chosen: {}", zone);                 if (zone != null) {                     BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);                     //选择服务                     server = zoneLoadBalancer.chooseServer(key);                 }             }         } catch (Exception e) {             logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);         }         if (server != null) {             return server;         } else {             logger.debug("Zone avoidance logic is not invoked.");             return super.chooseServer(key);         }     } 复制代码

这里做了一个判断,如果没有设置zone或者只有一个zone(默认),这里会调用 return super.chooseServer(key);通过父类的BaseLoadBalancer#chooseServer方法选择服务,这也是默认的执行流程,代码走到了BaseLoadBalancer#chooseServer方法中,源码如下

public class BaseLoadBalancer extends AbstractLoadBalancer implements         PrimeConnections.PrimeConnectionListener, IClientConfigAware {  public Server chooseServer(Object key) {         if (counter == null) {          //创建一个计数器             counter = createCounter();         }         //计数器增加         counter.increment();         //如果负载均衡规则为空,返回空         if (rule == null) {             return null;         } else {             try {              //[重要]调用了负载均衡器算法类的choose方法                 return rule.choose(key);             } catch (Exception e) {                 logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);                 return null;             }         }     } 复制代码

BaseLoadBalancer #chooseServer方法中调用了IRule#choose方法进行服务的选择服务,IRule有很多是算法策略实现类,默认会走轮询算法,如果有定义负载均衡算法,这里rule.choose调用的就是定义的算法类。

这里我打了个端点,跟踪了一下源码发现默认情况下会从BaseLoadBalancer#chooseServer方法中调用PredicateBasedRule#choose ,PredicateBasedRule本身是继承ClientConfigEnabledRoundRobinRule,也就是说PredicateBasedRule是使用的是轮询算法,同时它扩展了Predicate功能,即:提供了服务器过滤逻辑

  /**   一个规则,提供了服务器过滤逻辑,具体使用的是AbstractServerPredicate实现过滤功能。 过滤后,服务器从过滤列表中的循环方式返回。  * A rule which delegates the server filtering logic to an instance of {@link AbstractServerPredicate}.  * After filtering, a server is returned from filtered list in a round robin fashion.  *   *   * @author awang  *  */ public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {         /**     抽象函数,返回AbstractServerPredicate,用来对服务做过滤的      * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.      *       */     public abstract AbstractServerPredicate getPredicate();              /**      * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.      * The performance for this method is O(n) where n is number of servers to be filtered.      */     @Override     public Server choose(Object key) {      //得到负载均衡器         ILoadBalancer lb = getLoadBalancer();          //通过AbstractServerPredicate的chooseRoundRobinAfterFiltering选出具体的服务实例         //AbstractServerPredicate的子类实现的Predicate逻辑来过滤一部分服务实例         //然后在以线性轮询的方式从过滤后的实例中选出一个         Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);         if (server.isPresent()) {             return server.get();         } else {             return null;         }            } } 复制代码

这里使用了AbstractServerPredicate#chooseRoundRobinAfterFiltering来选择服务从lb.getAllServers()得到所有的服务作为参数,继续跟踪下去

    /**      * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.       */     public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {      //得到合格的服务列表,主要根据zone做一个过滤         List<Server> eligible = getEligibleServers(servers, loadBalancerKey);         if (eligible.size() == 0) {           //没找到合格的服务             return Optional.absent();         }         //以线性轮询的方式合格的服务列表获取一个实例         //incrementAndGetModulo方法会以轮询的方式计算一个下标值         return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));     } ...省略...  /**   引用于 RoundRobinRule 算法策略 , 轮询      * Referenced from RoundRobinRule      * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.      *      * @param modulo The modulo to bound the value of the counter.      * @return The next value.      */      //增量和取模实现轮询     private int incrementAndGetModulo(int modulo) {         for (;;) {             int current = nextIndex.get();             int next = (current + 1) % modulo;             if (nextIndex.compareAndSet(current, next) && current < modulo)                 return current;         }     } 复制代码

这里首先会通过zone过滤出可用的服务列表,然后使用轮询算法选择一个服务返回,到这里选择服务的流程调用

6. LoadBalancerRequest 执行服务

代码继续回到 RibbonLoadBalancerClient#execute,选择完服务之后,服务被封装成RibbonServer:

 @Override public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { //得到负载均衡器 ILoadBalancer loadBalancer = getLoadBalancer(serviceId); //选择服务 Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } //把server 封装成RibbonServer  RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); //执行服务调用 return execute(serviceId, ribbonServer, request); } 复制代码

找到服务后,调用了execute方法执行后续请求

@Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if(serviceInstance instanceof RibbonServer) { server = ((RibbonServer)serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } //加载Ribbon负载均衡器上下文对象 RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { //LoadBalancerRequest.apply执行请求 T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; } 复制代码

这里调用LoadBalancerRequest.apply执行请求,后面还会调用LoadBalancerRequestFactory#createRequest方法创建请求,调用ClientHttpRequestExecution#execute执行,然后又会将请求委派给ClientHttpRequest#execute去执行,再往后面走就是创建HttpURLConnection链接对象发送请求了,我们就不继续跟下去了。

7. 总结

纵观Ribbon的工作流程大致如下:

  • 初始化的时候创建好Ribbon的上下文,以及相关的组件,如ILoadBalancer,IConfig,IRule等等

  • 初始化过程中会通过ServerList从EurekaClient加载负载均衡候选的服务列表,并定时更新服务列表,使用ServerListFilter过滤之后,使用IPing检查是否更新服务列表

  • 被注解了@LoadBalance标签的RestTemplate可以使用LoadBalancerClient作负载均衡,并添加好拦截器LoadBalancerInterceptor

  • 当请求发起RestTemplate会把请求交给LoadBalancerInterceptor 拦截器,LoadBalancerInterceptor 拦截器调用

  • LoadBalancerClient接收到请求使用ILoadBalancer负载均衡器选择服务,底层用到IRule算法 选择好服务之后,LoadBalancerClient把请求交给LoadBalancerRequest去执行

image.png


作者:hsfxuebao
链接:https://juejin.cn/post/7169399707800698911


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐