阅读 325

spring cloud gateway 一个请求到转发走过的路途-源码解析

简介

Spring Cloud Gateway是基于Spring Boot 2.x,Spring WebFlux构建的,是新一代的网关解决方案。目前在打算用gateway网关替换掉原有的zuul网关,利用gateway提供的特性来提升原有网关性能。所以借此机会分析了下网关源码。

工作原理

这里贴一张官网的图 image.png 客户端向Gateway网关发出请求。如果网关处理映射请求与路由匹配,则将其发送到网关处理请求。请求经过网关多个过滤器链(这是涉及一个设计模式:责任链模式)。过滤器由虚线分隔的原因是,过滤器可以在发送请求之前和之后运行逻辑。所有“前置”过滤器逻辑均被执行。然后发出代理请求。发出代理请求后,将运行“后置”过滤器逻辑。

源码解析

首先从GatewayAutoConfiguration看起

负载均衡

GatewayAutoConfiguration注解中的GatewayLoadBalancerClientAutoConfiguration注解中有个LoadBalancerClientFilter是处理负载均衡的关键代码。

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } // preserve the original url addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url before: " + url); } final ServiceInstance instance = choose(exchange); if (instance == null) { throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } URI uri = exchange.getRequest().getURI(); // if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default, // if the loadbalancer doesn't provide one. String overrideScheme = instance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } URI requestUrl = loadBalancer.reconstructURI( new DelegatingServiceInstance(instance, overrideScheme), uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); return chain.filter(exchange); } 复制代码

我们在配置路由转发的时候在配置服务时会写lb://xxx,源码中看到了熟悉的lb。

if (url == null  || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {     return chain.filter(exchange); } 复制代码

url为null或不是lb则跳过此过滤器,否则进行负载均衡处理。 负载关键代码是choose方法,返回ServiceInstance对象(serviceId,host,port等信息)作为负载均衡后的结果。

protected ServiceInstance choose(ServerWebExchange exchange) { return loadBalancer.choose( ((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost()); } 复制代码

choose方法内部通过serviceId,通过ribbon去nacos里找到服务名对应的实例,并负载均衡选出一台实例服务ip和端口返回。将lb://xxx那部分替换成具体ip+端口后接请求路径,放入到上下文中key为gatewayRequestUrl。后通过NettyRoutingFilter过滤器(可以看到这个过滤器的order顺序是Integer.MAX_VALUE,目的就是为了处在最后的位置发送请求)使用httpclient发送请求到下游服务。

负载均衡流程图

image.png

请求转发

核心代码:DispatcherHandler.handle(ServcerWebExchange exchange),它是org.springframework.web.reactive包下的,所有的请求都会经过这里。webflux暂时没有研究,不过大体能看出关键代码和逻辑。

if (this.handlerMappings == null) { return createNotFoundError(); } return Flux.fromIterable(this.handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result)); 复制代码

我们可以看到mapping -> mapping.getHandler(exchange) debug进去发现getHandlerInternal()方法里面。

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { // don't handle requests on management port if set and different than server port if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); return lookupRoute(exchange) // .log("route-predicate-handler-mapping", Level.FINER) //name this .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug( "Mapping [" + getExchangeDesc(exchange) + "] to " + r); } exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))); } 复制代码

它的实现方法在RoutePredicateHandlerMapping类中。开头if判断不用看,核心方法是lookupRoute(exchange)

protected Mono<Route> lookupRoute(ServerWebExchange exchange) { return this.routeLocator.getRoutes() // individually filter routes so that filterWhen error delaying is not a // problem .concatMap(route -> Mono.just(route).filterWhen(r -> { // add the current route we are testing exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId()); return r.getPredicate().apply(exchange); }) // instead of immediately stopping main flux due to error, log and // swallow it .doOnError(e -> logger.error( "Error applying predicate for route: " + route.getId(), e)) .onErrorResume(e -> Mono.empty())) // .defaultIfEmpty() put a static Route not found // or .switchIfEmpty() // .switchIfEmpty(Mono.<Route>empty().log("noroute")) .next() // TODO: error handling .map(route -> { if (logger.isDebugEnabled()) { logger.debug("Route matched: " + route.getId()); } validateRoute(route, exchange); return route; }); /*  * TODO: trace logging if (logger.isTraceEnabled()) {  * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }  */ } 复制代码

首先outeLocator.getRoutes()的实现方法RouteDefinitionRouteLocator.getRoutes

Flux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions() .map(this::convertToRoute); if (!gatewayProperties.isFailOnRouteDefinitionError()) { // instead of letting error bubble up, continue routes = routes.onErrorContinue((error, obj) -> { if (logger.isWarnEnabled()) { logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId() + " will be ignored. Definition has invalid configs, " + error.getMessage()); } }); } return routes.map(route -> { if (logger.isDebugEnabled()) { logger.debug("RouteDefinition matched: " + route.getId()); } return route; }); 复制代码

routeDefinitionLocator.getRouteDefinitions()又有很多实现方法。。。分别都是从缓存中获取路由信息,从注册中心获取路由信息,从配置文件中获取路由信息等,总而言之就是获取到路由RouteDefinition对象,通过convertToRoute方法将RouteDefinition对象转换成Route对象(咱们网关配置的谓词和过滤器都放入了Route对象,构建Route对象的时候又涉及一个设计模式:建造者模式)。 再往上看,获取到路由信息后Mono.just(route).filterWhen()大概就是我们请求过来的url对某个路由信息做匹配过滤。将我们在路由里配置的id放入上下文中,key为GATEWAY_PREDICATE_ROUTE_ATTR(id如不指定,则为UUID) image.png 我这里配置的路由有两个,从图中断点可以看出,第一个路由信息和当前访问的url不匹配,返回为false,第二个路由信息匹配上了,返回为true。 这样我们再一路返回到lookupRoute方法,经过上面的一顿操作,又将route路由放入上下文中key为GATEWAY_ROUTE_ATTR。 再一路往上返回,回到最初的handle image.png 经过对mapping和路由的一系列前置处理,我们是不是就应该执行真正的过滤等处理逻辑了,下面就是执行处理的关键代码。 关键代码:invokeHandler(exchange, handler)

private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) { if (this.handlerAdapters != null) { for (HandlerAdapter handlerAdapter : this.handlerAdapters) { if (handlerAdapter.supports(handler)) { return handlerAdapter.handle(exchange, handler); } } } return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler)); } 复制代码

我们进入handlerAdapter.handle(exchange, handler)方法,再经过多个实现 SimpleHandlerAdapter -> FilteringWebHandler

public Mono<Void> handle(ServerWebExchange exchange) { Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); List<GatewayFilter> gatewayFilters = route.getFilters(); List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters); // TODO: needed or cached? AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } return new DefaultGatewayFilterChain(combined).filter(exchange); } 复制代码

我们看到之前放入上下文的key为GATEWAY_ROUTE_ATTR的路由现在可以用到了!我们取出Route路由对象,记得之前RouteDefinition对象转Route的时候做了什么吗?是不是放入了过滤器?这里取出之前set进的过滤器集合,然后new DefaultGatewayFilterChain(combined).filter(exchange),执行过滤!(又是个设计模式:责任链模式,设计模式写法不固定,主要是思想哈,它这里的写法是通过index标记改执行哪一个过滤器,然后通过index游标移动来经过过滤器链条) 还记得上面的负载均衡过滤器吗,他的order为int最大值,所以肯定最后要走到LoadBalancerClientFilter.filter,然后执行choose方法通过负载均衡算法选举出服务器实例,再通过httpClient调用下游服务。是不是又和前面负载均衡源码解析的步骤连起来了!

ps

里面有很多细节其实还没有写进去,比如路由信息会放入本地缓存中,路由信息获取也可自定义获取方式,比如从数据库中,从redis中等。大体上的流程就是这样了,有不对的地方欢迎指正!


作者:MZ_
链接:https://juejin.cn/post/6992541090754592781


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐