阅读 107

springcloud3(六) 服务降级限流熔断组件Resilience4j

公司的网关(基于Spring Cloud Gateway)上线有一段时间了,目前只有一个简单的动态路由的功能,接下来的工作一部分会涉及到服务的保护和服务健壮性方面,也就是要加入限流,熔断和降级等特性。此处找了下业界成熟的开源框架如下表的对比

 Sentinel(Alibaba开源)Hystrix(不再维护)resilience4j(Spring官方推荐)
隔离策略信号量隔离(并发控制)线程池隔离/信号量隔离信号量隔离
熔断降级策略基于慢调用比例、异常比例、异常数基于异常比例基于异常比例、响应时间
实时统计实现滑动窗口(LeapArray)滑动窗口(基于 RxJava)Ring Bit Buffer
动态规则配置支持多种数据源支持多种数据源有限支持
扩展性多个扩展点插件的形式接口的形式
基于注解的支持支持支持支持
限流基于 QPS,支持基于调用关系的限流有限的支持Rate Limiter
流量整形支持预热模式与匀速排队控制效果不支持简单的 Rate Limiter 模式
系统自适应保护支持不支持不支持
多语言支持Java/Go/C++JavaJava
Service Mesh 支持支持 Envoy/Istio不支持不支持
控制台提供开箱即用的控制台,可配置规则、实时监控、机器发现等简单的监控查看不提供控制台,可对接其它监控系统

对比来自:https://github.com/alibaba/Sentinel/wiki/Guideline:-%E4%BB%8E-Hystrix-%E8%BF%81%E7%A7%BB%E5%88%B0-Sentinel

最终基于公司的需求,准备引入Resilience4j组件, 所以这篇博客是来梳理Resilience4j的组件的使用方式, 下一篇博客写结合Spring Cloud Gateway的实现自定义的服务限流保护策略

1. Resilience4j

Resilience4j官方guide: https://resilience4j.readme.io/docs

Resilience4j 常用的组件有5个 -> CircuitBreakerBulkheadRateLimiterRetry 和 TimeLimiter (Cache不推荐在生产环境使用,所以这篇博客不做介绍 ), 本篇博客基于1.7.0的版本介绍

1.1 CircuitBreaker

断路器是通过具有三个正常状态的有限状态机实现的:CLOSED、OPEN 和 HALF_OPEN 以及两个特殊状态 DISABLED 和 FORCED_OPEN。CircuitBreaker 使用滑动窗口来存储和聚合调用的结果。您可以在基于计数的滑动窗口和基于时间的滑动窗口之间进行选择。基于计数的滑动窗口聚合最后 N 次调用的结果。基于时间的滑动窗口聚合了最近 N 秒的调用结果。

1.1.1 CircuitBreakerConfig

CircuitBreakerConfig看名字大家也知道了它是做什么的(好的编码就是见文知意),CircuitBreaker的配置类,在实际项目中除了全局的配置,有些场景需要我们自定义一些CircuitBreaker的配置,这个时候就需要用到Circuitreakeronfig,Circuitreakeronfig全部属性如下表

配置属性默认值描述
failureRateThreshold50以百分比形式配置失败率阈值。
当故障率等于或大于阈值时,断路器转换为断开并开始短路调用。
slowCallRateThreshold100以百分比配置阈值。当呼叫持续时间大于 或等于阈值时,断路器将呼叫视为慢速呼叫。当慢速呼叫的百分比等于或大于阈值时,断路器转换为断开并开始短路呼叫。slowCallDurationThreshold
slowCallDurationThreshold60000 [毫秒]配置持续时间阈值,该数值的呼叫速度缓慢并增加呼叫的速度。
permittedNumberOfCalls
InHalfOpenState
10配置半开时允许的呼叫数量。
maxWaitDurationInHalfOpenState0 [毫秒]配置最大等待持续时间,控制断路器在切换到打开状态之前可以保持在半开状态的最长时间。
值 0 表示断路器将在 HalfOpen 状态无限等待,直到所有允许的调用都完成。
slidingWindowTypeCOUNT_BASED配置用于记录CircuitBreaker关闭时调用结果的滑动窗口的类型。
滑动窗口可以是基于计数的,也可以是基于时间的。
如果滑动窗口为 COUNT_BASED,则记录并汇总最后一次调用。 如果滑动窗口是 TIME_BASED,则记录和聚合最后几秒的调用。slidingWindowSize
slidingWindowSize
slidingWindowSize100配置用于记录关闭时调用窗口的窗口大小。
minimumNumberOfCalls100配置在断路器计算错误率或慢速调用率之前所需的最小调用数(每个滑动窗口周期)。
例如,如果minimumNumberOfCalls为10,则必须至少记录10个呼叫,然后才能计算失败率。
如果仅记录了9个呼叫,则即使有9个呼叫都失败,断路器也不会转换为打开状态。
waitDurationInOpenState60000 [毫秒]半从打开转换到打开之前应等待的时间。
automaticTransition
FromOpenToHalfOpenEnabled
FALSE如果设置为 true,则意味着 CircuitBreaker 将自动从打开状态转换为半打开状态,并且不需要调用来触发转换。创建一个线程来监视 CircuitBreakers 的所有实例,一旦 waitDurationInOpenState 通过,将它们转换为 HALF_OPEN。然而,如果设置为 false,则仅在进行调用时才会转换到 HALF_OPEN,即使在传递了 waitDurationInOpenState 之后也是如此。这里的优点是没有线程监视所有断路器的状态。
recordExceptionsempty记录为失败并因此增加失败率的异常列表。
任何匹配或从列表之一继承的异常都算作失败,除非通过。 如果您指定异常列表,则所有其他异常都算作成功,除非它们被明确忽略。ignoreExceptions
ignoreExceptionsempty被忽略且既不计为失败也不计为成功的异常列表。
即使异常是。recordExceptions
recordFailurePredicatethrowable -> true
默认情况下,所有异常都记录为失败。
一个自定义Predicate,用于评估是否应将异常记录为失败。
如果异常应算作失败,则谓词必须返回 true。如果异常
应算作成功,则谓词必须返回 false,除非异常被 显式忽略。ignoreExceptions
ignoreExceptionsthrowable -> false
默认情况下不会忽略任何异常。
一个自定义Predicate,用于评估是否应忽略异常并且既不视为失败也不成功。
如果应忽略异常,谓词必须返回 true。
如果异常应算作失败,则谓词必须返回 false。

1.1.2 CircuitBreakerRegistry

 CircuitBreakerRegistry是CircuitBreaker的注册器,其有一个唯一的实现类InMemoryCircuitBreakerRegistry,核心方法如下

复制代码

// 根据name返回CircuitBreaker或返回默认的CircuitBreaker
// 下面的几个重载的方法,也是一样的逻辑,有就直接返回,没有就创建后返回
 CircuitBreaker circuitBreaker(String name, io.vavr.collection.Map<String, String> CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config, io.vavr.collection.Map<String, String> CircuitBreaker circuitBreaker(String name, String configName,  io.vavr.collection.Map<String, String> CircuitBreaker circuitBreaker(String name, Supplier<CircuitBreakerConfig> CircuitBreaker circuitBreaker(String name, Supplier<CircuitBreakerConfig> circuitBreakerConfigSupplier, io.vavr.collection.Map<String, String> tags)

复制代码

1.1.3 CircuitBreaker

现在到了我们的核心接口CircuitBreaker,下面的静态方法有20多个,在这我就列几个常用的方法,其它方法可以看源码注释的描述

复制代码

//  <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker, CheckedFunction0<T>// 返回一个被// 返回一个被 <T> Callable<T> decorateCallable(CircuitBreaker circuitBreaker, Callable<T>// 返回一个被 <T> Supplier<T> decorateSupplier(CircuitBreaker circuitBreaker, Supplier<T> // 返回一个可以retry的 Supplier <T> Supplier<Try<T>> decorateTrySupplier(CircuitBreaker circuitBreaker,  Supplier<Try<T>>// 返回一个被 <T> Consumer<T> decorateConsumer(CircuitBreaker circuitBreaker, Consumer<T> <T> CheckedConsumer<T> decorateCheckedConsumer(CircuitBreaker circuitBreaker, CheckedConsumer<T>// 返回一个被 <T, R> Function<T, R> decorateFunction(CircuitBreaker circuitBreaker, Function<T, R>// 返回一个被 <T, R> CheckedFunction1<T, R> decorateCheckedFunction(CircuitBreaker circuitBreaker,  CheckedFunction1<T, R>// 返回一个被 <T> Supplier<Future<T>> decorateFuture(CircuitBreaker circuitBreaker,   Supplier<Future<T>> supplier)

复制代码

从上面列举的常用方法看到有很多好像有重复的方法,CircuitBreaker有返回封装Supplier, Consumer, Function, Runnable的方法,然后还有一个与之对应的返回封装CheckedSupplier, CheckedConsumer, CheckedFunction, CheckedRunnable的方法。 为什么有两套实现呢?resilience4j,这个项目是基于Java 8开发的,但是java8受限于 Java 标准库的通用性要求和二进制文件大小,Java 标准库对函数式编程的 API 支持相对比较有限。函数的声明只提供了 Function 和 BiFunction 两种,流上所支持的操作的数量也较少。基于这些原因,需要vavr 来更好得使用Java 8进行函数式开发。

简单看下方法decorateCheckedSupplier(CircuitBreaker circuitBreaker, CheckedFunction0<T> supplier)

复制代码

     <T> CheckedFunction0<T><T> () ->  start == duration = circuitBreaker.getCurrentTimestamp() -                  duration = circuitBreaker.getCurrentTimestamp() -

复制代码

 大体流程如下图

关于vavr的详情可以查看官网文档:https://docs.vavr.io/

CircuitBreaker唯一的实现类CircuitBreakerStateMachine

CircuitBreakerStateMachine是一个有线状态的状态机。断路器管理后端系统的状态。断路器通过具有五种状态的有限状态机实现:CLOSED、OPEN、HALF_OPEN、DISABLED 和 FORCED_OPEN。 CircuitBreakerStateMachine可以做到这些状态的转换,比如下面的几个方法

复制代码

@Override  public void transitionToDisabledState() {      stateTransition(DISABLED, currentState -> new DisabledState());  }  @Override  public void transitionToMetricsOnlyState() {      stateTransition(METRICS_ONLY, currentState -> new MetricsOnlyState());  }  @Override  public void transitionToForcedOpenState() {      stateTransition(FORCED_OPEN,          currentState -> new ForcedOpenState(currentState.attempts() + 1));  }  @Override  public void transitionToClosedState() {      stateTransition(CLOSED, currentState -> new ClosedState());  }  @Override  public void transitionToOpenState() {      stateTransition(OPEN,          currentState -> new OpenState(currentState.attempts() + 1, currentState.getMetrics()));  }  @Override  public void transitionToHalfOpenState() {      stateTransition(HALF_OPEN, currentState -> new HalfOpenState(currentState.attempts()));  }

复制代码

这些状态的流转是通过发布事件来完成的,可以看下面都是CircuitBreakerStateMachine的事件

复制代码

private void publishResetEvent() {      final CircuitBreakerOnResetEvent event = new CircuitBreakerOnResetEvent(name);     publishEventIfPossible(event); } private void publishCallNotPermittedEvent() {     final CircuitBreakerOnCallNotPermittedEvent event = new CircuitBreakerOnCallNotPermittedEvent(         name);     publishEventIfPossible(event); } private void publishSuccessEvent(final long duration, TimeUnit durationUnit) {     final CircuitBreakerOnSuccessEvent event = new CircuitBreakerOnSuccessEvent(name,         Duration.ofNanos(durationUnit.toNanos(duration)));     publishEventIfPossible(event); } private void publishCircuitErrorEvent(final String name, final long duration,     TimeUnit durationUnit, final Throwable throwable) {     final CircuitBreakerOnErrorEvent event = new CircuitBreakerOnErrorEvent(name,         Duration.ofNanos(durationUnit.toNanos(duration)), throwable);     publishEventIfPossible(event); } private void publishCircuitIgnoredErrorEvent(String name, long duration, TimeUnit durationUnit,     Throwable throwable) {     final CircuitBreakerOnIgnoredErrorEvent event = new CircuitBreakerOnIgnoredErrorEvent(name,         Duration.ofNanos(durationUnit.toNanos(duration)), throwable);     publishEventIfPossible(event); } private void publishCircuitFailureRateExceededEvent(String name, float failureRate) {     final CircuitBreakerOnFailureRateExceededEvent event = new CircuitBreakerOnFailureRateExceededEvent(name,         failureRate);     publishEventIfPossible(event); } private void publishCircuitSlowCallRateExceededEvent(String name, float slowCallRate) {     final CircuitBreakerOnSlowCallRateExceededEvent event = new CircuitBreakerOnSlowCallRateExceededEvent(name,         slowCallRate);     publishEventIfPossible(event); } private void publishCircuitThresholdsExceededEvent(Result result, CircuitBreakerMetrics metrics) {     if (Result.hasFailureRateExceededThreshold(result)) {         publishCircuitFailureRateExceededEvent(getName(), metrics.getFailureRate());     }     if (Result.hasSlowCallRateExceededThreshold(result)) {         publishCircuitSlowCallRateExceededEvent(getName(), metrics.getSlowCallRate());     } }

复制代码

 1.1.4 CircuitBreaker Demo

引入测试组件spring-cloud-starter-contract-stub-runner

<dependency>     <groupId>org.springframework.cloud</groupId>     <artifactId>spring-cloud-starter-contract-stub-runner</artifactId>     <scope>test</scope> </dependency>

Resilience4jTestHelper测试辅助类

复制代码

    /**      * get the CircuitBreaker status and metrics      *      * @param prefixName      * @param circuitBreaker      * @return circuitBreaker state      */     public static String getCircuitBreakerStatus(String prefixName, CircuitBreaker circuitBreaker) {         CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();         float failureRate = metrics.getFailureRate();         int failedCalls = metrics.getNumberOfFailedCalls();         int successfulCalls = metrics.getNumberOfSuccessfulCalls();         long notPermittedCalls = metrics.getNumberOfNotPermittedCalls();         int bufferedCalls = metrics.getNumberOfBufferedCalls();         float slowCallRate = metrics.getSlowCallRate();         int slowCalls = metrics.getNumberOfSlowCalls();         int slowFailedCalls = metrics.getNumberOfSlowFailedCalls();         int slowSuccessfulCalls = metrics.getNumberOfSlowSuccessfulCalls();         log.info(prefixName + " state=" + circuitBreaker.getState() + " , metrics[ failureRate=" + failureRate +                 ", failedCalls=" + failedCalls +                 ", successCalls=" + successfulCalls +                 ", notPermittedCalls=" + notPermittedCalls +                 ", bufferedCalls=" + bufferedCalls +                 ", \n\tslowCallRate=" + slowCallRate +                 ", slowCalls=" + slowCalls +                 ", slowFailedCalls=" + slowFailedCalls +                 ", slowSuccessfulCalls=" + slowSuccessfulCalls +                 " ]"         );         log.info(prefixName + " circuitBreaker tags:{}", circuitBreaker.getTags());         return circuitBreaker.getState().name();     }     public static void circuitBreakerEventListener(CircuitBreaker circuitBreaker) {         circuitBreaker.getEventPublisher()                 .onSuccess(event -> log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName()))                 .onError(event -> {                     log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName());                     Throwable throwable = event.getThrowable();                     if (throwable instanceof TimeoutException) {                         // TODO record to slow call                     }                 })                 .onIgnoredError(event -> log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName()))                 .onReset(event -> log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName()))                 .onStateTransition(event -> log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName()))                 .onCallNotPermitted(event -> log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName()))                 .onFailureRateExceeded(event -> log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName()))                 .onSlowCallRateExceeded(event -> log.info("---------- CircuitBreakerEvent:{}  CircuitBreakerName:{}", event.getEventType(), event.getCircuitBreakerName()));     }

复制代码

Resilience4jTest测试类

复制代码

    @Rule     public WireMockRule wireMockRule = new WireMockRule(wireMockConfig().port(8080));     private WebTestClient testClient;     private CircuitBreakerRegistry circuitBreakerRegistry;private CircuitBreaker circuitBreaker;     private CircuitBreaker circuitBreakerWithTags;     private CircuitBreakerConfig circuitBreakerConfig;private String PATH_200 = "/api/pancake/v1/yee/query";     private String PATH_400 = "/api/hk/card/v1/er/query";     private String PATH_408 = "/api/pancake/v1/coin/query";     private String PATH_500 = "/api/hk/card/v1/card/query";     @Before     public void setup() {         HttpClient httpClient = HttpClient.create().wiretap(true);         testClient = WebTestClient.bindToServer(new ReactorClientHttpConnector(httpClient))                 .baseUrl("http://localhost:8080")                 .responseTimeout(Duration.ofDays(1))                 .build();         circuitBreakerRegistry = new InMemoryCircuitBreakerRegistry();         circuitBreakerConfig = CircuitBreakerConfig                 .custom()                 .failureRateThreshold(70)                 .slowCallRateThreshold(90)                 .slowCallDurationThreshold(Duration.ofMillis(1000 * 1))                 .minimumNumberOfCalls(10)                 .slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)                 .slidingWindowSize(10)                 .build();         circuitBreaker = circuitBreakerRegistry.circuitBreaker("resilience4jTest", circuitBreakerConfig);         Resilience4jTestHelper.circuitBreakerEventListener(circuitBreaker);         stubFor(post(urlMatching(PATH_200))                 .willReturn(okJson("{}")));         stubFor(post(urlMatching(PATH_400))                 .willReturn(badRequest()));         stubFor(post(urlMatching(PATH_408))                 .willReturn(okJson("{\"message\":\"time out\"}").withFixedDelay(1000 * 2)));         stubFor(post(urlMatching(PATH_500))                 .willReturn(serverError()));     }     @Test     public void When_Test_CircuitBreaker_Expect_Close() {         AtomicInteger count = new AtomicInteger();         for (int i = 0; i < 10; i++) {             Resilience4jTestHelper.recordResponseToCircuitBreaker(circuitBreaker, testClient, PATH_200);             Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> end call " + count.incrementAndGet(), circuitBreaker);         }         assertEquals(CircuitBreaker.State.CLOSED.name(), circuitBreaker.getState().name());     }     @Test     public void When_CircuitBreaker_Expect_Open() {         circuitBreakerWithTags = circuitBreakerRegistry.circuitBreaker("circuitBreakerWithTags", circuitBreakerConfig, HashMap.of("resilience4jTest", "When_CircuitBreaker_Expect_Open"));         Resilience4jTestHelper.circuitBreakerEventListener(circuitBreakerWithTags);         AtomicInteger count = new AtomicInteger();         for (int i = 0; i < 10; i++) {             Resilience4jTestHelper.recordResponseToCircuitBreaker(circuitBreakerWithTags, testClient, PATH_400);             Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> end call " + count.incrementAndGet(), circuitBreakerWithTags);         }         assertEquals(CircuitBreaker.State.OPEN.name(), circuitBreakerWithTags.getState().name());     }     @Test     public void When_Test_CircuitBreaker_Expect_SlowCall() throws Throwable {         AtomicInteger count = new AtomicInteger();         for (int i = 0; i < 10; i++) {             circuitBreaker.executeCheckedSupplier(() -> {                 Resilience4jTestHelper.recordSlowCallResponseToCircuitBreaker(circuitBreaker, testClient, PATH_408);                 return null;             });             Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> end call " + count.incrementAndGet(), circuitBreaker);         }         assertEquals(CircuitBreaker.State.OPEN.name(), circuitBreaker.getState().name());     }     @Test     public void When_CircuitBreaker_Expect_Fallback() {         AtomicInteger count = new AtomicInteger();         for (int i = 0; i < 20; i++) {             String path = PATH_500;             CheckedFunction0<String> response =                     circuitBreaker.decorateCheckedSupplier(() -> Resilience4jTestHelper.responseToCircuitBreaker(circuitBreaker, testClient, path));             Try<String> result = Try.of(response).map(val -> {                 Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> call success " + count.incrementAndGet(), circuitBreaker);                 return val;             }).recover(CallNotPermittedException.class, throwable -> {                 Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> open CircuitBreaker " + count.incrementAndGet(), circuitBreaker);                 return "hit CallNotPermittedException";             }).recover(throwable -> {                 Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> call fallback " + count.incrementAndGet(), circuitBreaker);                 return "hit fallback";             });             log.info(">>>>>>>>>> result:{}", result.get());             if (count.get() > 10) {                 assertEquals("hit CallNotPermittedException", result.get());             }         }     }

复制代码

1.2 Bulkhead

Bulkhead提供了两种隔板模式的实现,可用于限制并发执行的数量

1. 使用信号量  SemaphoreBulkhead 2. 使用有界队列和固定线程池  FixedThreadPoolBulkhead

其中线程池的方式属于资源占用型,在这个不做讨论,如果感兴趣可以去看看官方的样例

1.2.1 BulkheadConfig 

BulkheadConfig是Bulkhead的配置类,使用BulkheadConfig配置类,自定义Blukhead配置。配置类BulkheadConfig有以下属性

 

配置属性默认值描述
maxConcurrentCalls25隔板允许的最大并行执行量
maxWaitDuration0尝试进入饱和的Bulkhead时应阻塞线程的最长时间

1.2.2 BulkheadRegistry

和CircuitBreaker模块一样,BulkheadRegistry提供了一个内存中的实现类InMemoryBulkheadRegistry,可以使用它来管理(创建和获取)Bulkhead实例。

1.2.3 Bulkhead

Bulkhead接口的静态方法和CircuitBreaker方法命名类似,如下下面的decorateCheckedSupplier方法

复制代码

    static <T> CheckedFunction0<T> decorateCheckedSupplier(Bulkhead bulkhead,         CheckedFunction0<T> supplier) {         return () -> {             bulkhead.acquirePermission();             try {                 return supplier.apply();             } finally {                 bulkhead.onComplete();             }         };     }

复制代码

Bulkhead的静态方法,中主要靠bulkhead.acquirePermission()和bulkhead.tryAcquirePermission()申请执行权限,靠bulkhead.onComplete()是释放执行权限,当然还有一个方法bulkhead.releasePermission() 也可以释放执行权限,两者区别就是bulkhead.onComplete()多了一个触发执行完成的事件publishBulkheadEvent(() -> new BulkheadOnCallFinishedEvent(name))。

如果我们不想用Bulkhead自带的静态方法也是可以的,比如我下面的demo, 仅仅使用bulkhead.tryAcquirePermission()和bulkhead.onComplete(),就可以模拟一个服务过载的场景

1.2.4 Bulkhead Demo

Resilience4jTestHelper测试辅助类

复制代码

    public static String responseToBulkhead(Bulkhead bulkhead, WebTestClient testClient, String path) {         WebTestClient.ResponseSpec responseSpec = testClient.post().uri(path).exchange();         if (bulkhead.getMetrics().getAvailableConcurrentCalls() < 1) {             throw BulkheadFullException.createBulkheadFullException(bulkhead);         }         try {             responseSpec.expectStatus().is4xxClientError();             throw new RuntimeException("<<<<< hit 4XX >>>>>");         } catch (Throwable error) {         }         try {             responseSpec.expectStatus().is5xxServerError();             throw new RuntimeException("<<<<< hit 5XX >>>>>");         } catch (Throwable error) {         }         responseSpec.expectStatus().is2xxSuccessful();         return "hit 200";     }     /**      * get the Bulkhead status and metrics      * * @param prefixName      *      * @param bulkhead      */     public static void getBulkheadStatus(String prefixName, Bulkhead bulkhead) {         Bulkhead.Metrics metrics = bulkhead.getMetrics();         int availableCalls = metrics.getAvailableConcurrentCalls();         int maxCalls = metrics.getMaxAllowedConcurrentCalls();         log.info(prefixName + "bulkhead metrics[ availableCalls=" + availableCalls +                 ", maxCalls=" + maxCalls + " ],tags=" + bulkhead.getTags());     }     public static void bulkheadEventListener(Bulkhead bulkhead) {         bulkhead.getEventPublisher()                 .onCallRejected(event -> log.info("---------- BulkheadEvent:{}  BulkheadName:{}", event.getEventType(), event.getBulkheadName()))                 .onCallFinished(event -> log.info("---------- BulkheadEvent:{}  BulkheadName:{}", event.getEventType(), event.getBulkheadName()));     }     static int[] container = new int[100];     // 模拟一定概率的不释放资源     public static boolean releasePermission() {         if (container[0] != 1) {             for (int i = 0; i < 70; i++) {                 container[i] = 1;             }             for (int i = 70; i < 100; i++) {                 container[i] = 0;             }         }         int index = (int) (Math.random() * 100);         return container[index] == 1;     }

复制代码

Resilience4jTest测试类

复制代码

private BulkheadRegistry bulkheadRegistry;private String PATH_200 = "/api/pancake/v1/yee/query";     private String PATH_400 = "/api/hk/card/v1/er/query";     private String PATH_408 = "/api/pancake/v1/coin/query";     private String PATH_500 = "/api/hk/card/v1/card/query";     @Before     public void setup() {         HttpClient httpClient = HttpClient.create().wiretap(true);         testClient = WebTestClient.bindToServer(new ReactorClientHttpConnector(httpClient))                 .baseUrl("http://localhost:8080")                 .responseTimeout(Duration.ofDays(1))                 .build();          bulkheadRegistry = new InMemoryBulkheadRegistry();          stubFor(post(urlMatching(PATH_200))                 .willReturn(okJson("{}")));         stubFor(post(urlMatching(PATH_400))                 .willReturn(badRequest()));         stubFor(post(urlMatching(PATH_408))                 .willReturn(okJson("{\"message\":\"time out\"}").withFixedDelay(1000 * 2)));         stubFor(post(urlMatching(PATH_500))                 .willReturn(serverError()));     }      @Test     public void When_Test_CircuitBreaker_With_Bulkhead_Expect_Hit_BulkheadFullException() {         AtomicInteger count = new AtomicInteger();         ExecutorService executorService = Executors.newFixedThreadPool(50);         Bulkhead bulkhead1 = bulkheadRegistry.bulkhead("bulkhead1",                 BulkheadConfig                         .custom()                         .maxConcurrentCalls(20)                         .maxWaitDuration(Duration.ofMillis(100))                         .build());         Resilience4jTestHelper.bulkheadEventListener(bulkhead1);         for (int i = 0; i < 100; i++) {             if (bulkhead1.tryAcquirePermission()) {                 log.info(">>>>>>>>>> acquire permission {}", count.incrementAndGet());                 Future<String> futureStr = executorService.submit(() -> Resilience4jTestHelper.responseToBulkhead(bulkhead1, testClient, PATH_200));                 Try.of(futureStr::get).andThen(val -> log.info(">>>>>>>>>> success {}: {}", count.get(), val)).recover(throwable -> {                     if (throwable instanceof ExecutionException) {                         Throwable cause = (ExecutionException) throwable.getCause();                         if (cause instanceof BulkheadFullException) {                             log.info(">>>>>>>>>> BulkheadFullException {}: {}", count.get(), throwable.getMessage());                         } else {                             log.info(">>>>>>>>>> ExecutionException {}: {}", count.get(), throwable.getMessage());                         }                     }                     return "hit ExecutionException";                 });                 if (releasePermission()) {                     bulkhead1.onComplete();                     log.info("---------- release permission");                 }                 Resilience4jTestHelper.getBulkheadStatus(")))))))))) ", bulkhead1);             } else {                 log.info(">>>>>>>>>> tryAcquirePermission false {}", count.incrementAndGet());                 continue;             }         }         executorService.shutdown();     }

复制代码

1.3 RateLimiter

Resilience4j提供了一个RateLimiter作为限速器,Ratelimiter限制了服务被调用的次数,每隔一段时间重置该次数,服务在超出等待时间之后返回异常或者fallback方法。跟CircuitBreaker的代码结构一样,核心类有RateLimiterRegistry和其实现类InMemoryRateLimiterRegistry,RateLimiterConfig 还有RateLimiter

其中RateLimiterConfig的属性如下表

配置属性默认值描述
timeoutDuration5 [s]线程等待权限的默认等待时间
limitRefreshPeriod500 [ns]限制刷新的周期。在每个周期之后,速率限制器将其权限计数设置回 limitForPeriod 值
limitForPeriod50一个限制刷新期间可用的权限数

所以如果你想限制某个方法的调用率不高于1000 req/s,可以做如下配置

RateLimiterConfig.custom()      .timeoutDuration(Duration.ofMillis(1000*5))      .limitRefreshPeriod(Duration.ofSeconds(1))      .limitForPeriod(1000)      .build());

1.3.1 RateLimiter Demo

Resilience4jTestHelper测试辅助类

复制代码

    /**      * get the RateLimiter status and metrics      * * @param prefixName      *      * @param rateLimiter      */     public static void getRateLimiterStatus(String prefixName, RateLimiter rateLimiter) {         RateLimiter.Metrics metrics = rateLimiter.getMetrics();         int availablePermissions = metrics.getAvailablePermissions();         int waitingThreads = metrics.getNumberOfWaitingThreads();         log.info(prefixName + "rateLimiter metrics[ availablePermissions=" + availablePermissions +                 ", waitingThreads=" + waitingThreads + " ]"         );     }     public static void rateLimiterEventListener(RateLimiter rateLimiter) {         rateLimiter.getEventPublisher()                 .onSuccess(event -> log.info("---------- rateLimiter success:{}", event))                 .onFailure(event -> log.info("---------- rateLimiter failure:{}", event));     }     public static String responseToRateLimiter(RateLimiter rateLimiter,WebTestClient testClient, String path) {         WebTestClient.ResponseSpec responseSpec = testClient.post().uri(path).exchange();         try {             responseSpec.expectStatus().is4xxClientError();             rateLimiter.onError(new RuntimeException("<<<<< hit 4XX >>>>>"));             throw new RuntimeException("<<<<< hit 4XX >>>>>");         } catch (Throwable error) {         }         try {             responseSpec.expectStatus().is5xxServerError();             rateLimiter.onError(new RuntimeException("<<<<< hit 5XX >>>>>"));             throw new RuntimeException("<<<<< hit 5XX >>>>>");         } catch (Throwable error) {         }         responseSpec.expectStatus().is2xxSuccessful();         rateLimiter.onSuccess();         return "hit 200";     }

复制代码

Resilience4jTest测试类

复制代码

    private RateLimiterRegistry rateLimiterRegistry;     private RateLimiter rateLimiter;              rateLimiterRegistry = new InMemoryRateLimiterRegistry();         rateLimiter = rateLimiterRegistry.rateLimiter("resilience4jTest",                 RateLimiterConfig                         .custom()                         .timeoutDuration(Duration.ofMillis(100))                         .limitRefreshPeriod(Duration.ofSeconds(1))                         .limitForPeriod(20)                         .build());         Resilience4jTestHelper.rateLimiterEventListener(rateLimiter);              @Test     public void When_Test_CircuitBreaker_Expect_Hit_RateLimiter() throws Exception {         AtomicInteger count = new AtomicInteger();         ExecutorService executorService = Executors.newFixedThreadPool(50);         String path = expectError() ? PATH_500 : PATH_200;         for (int i = 0; i < 100; i++) {             Future<String> futureStr = executorService.submit(() -> Resilience4jTestHelper.responseToRateLimiter(rateLimiter, testClient, path));             try {                 Future<String> stringFuture = rateLimiter.executeCallable(() -> futureStr);                 Try.of(stringFuture::get).andThen(val -> {                     log.info(">>>>>>>>>> success {}: {}", count.incrementAndGet(), val);                 }).recover(throwable -> {                     log.info(">>>>>>>>>> exception {}: {}", count.incrementAndGet(), throwable.getMessage());                     return "hit fallback";                 });                 Resilience4jTestHelper.getRateLimiterStatus(")))))))))) ", rateLimiter);             } catch (RequestNotPermitted exception){                 assertEquals("RateLimiter 'resilience4jTest' does not permit further calls" , exception.getMessage());             }         }         executorService.shutdown();     }

复制代码

1.4 Retry

Retry在服务调用返回失败时提供了额外尝试调用的功能,其中RetryConfig的属性如下表

配置属性默认值描述
maxAttempts3最大尝试次数(包括首次调用作为第一次尝试)
waitDuration500 [ms]两次重试的时间间隔
intervalFunctionnumOfAttempts -> waitDuration自定义的IntervalFunction,可以根据当前尝试的次数动态的修改重试的时间间隔
intervalBiFunction(numOfAttempts, Either<throwable, result>) -> waitDuration根据尝试次数和结果或异常修改失败后等待间隔的函数。与 intervalFunction 一起使用时会抛出 IllegalStateException。
retryOnResultPredicateresult -> false自定义的Predicate,根据服务返回的结果判断是否应该重试。如果需要重试Predicate应返回true,否则返回false
retryExceptionPredicatethrowable -> true自定义的Predicate,根据服务返回的异常判断是否应该重试。如果需要重试Predicate应返回true,否则返回false
retryExceptionsempty异常列表,遇到列表中的异常或其子类则重试
注意:如果您使用 Checked Exceptions,则必须使用 CheckedSupplier
ignoreExceptionsempty异常列表,遇到列表中的异常或其子类则不重试。此参数支持子类型。
failAfterMaxRetriesfalse当重试达到配置的 maxAttempts 并且结果仍未通过 retryOnResultPredicate 时启用或禁用抛出 MaxRetriesExceededException 的布尔值

1.4.1 Retry Demo

Resilience4jTestHelper测试辅助类

复制代码

    /**      * get the Retry status and metrics      * * @param prefixName      *      * @param retry      */     public static void getRetryStatus(String prefixName, Retry retry) {         Retry.Metrics metrics = retry.getMetrics();         long successfulCallsWithRetryAttempt = metrics.getNumberOfSuccessfulCallsWithRetryAttempt();         long successfulCallsWithoutRetryAttempt = metrics.getNumberOfSuccessfulCallsWithoutRetryAttempt();         long failedCallsWithRetryAttempt = metrics.getNumberOfFailedCallsWithRetryAttempt();         long failedCallsWithoutRetryAttempt = metrics.getNumberOfFailedCallsWithoutRetryAttempt();         log.info(prefixName + " -> retry metrics[ successfulCallsWithRetry=" + successfulCallsWithRetryAttempt +                 ", successfulCallsWithoutRetry=" + successfulCallsWithoutRetryAttempt +                 ", failedCallsWithRetry=" + failedCallsWithRetryAttempt +                 ", failedCallsWithoutRetry=" + failedCallsWithoutRetryAttempt +                 " ]"         );     }     public static void retryEventListener(Retry retry) {         retry.getEventPublisher()                 .onSuccess(event -> log.info("))))))))))) retry service success:{}", event))                 .onError(event -> {                     log.info("))))))))))) retry service failed:{}", event);                     Throwable exception = event.getLastThrowable();                     if (exception instanceof TimeoutException) {                         // TODO                     }                 })                 .onIgnoredError(event -> log.info("))))))))))) retry service failed and ignore:{}", event))                 .onRetry(event -> log.info("))))))))))) retry call service: {}", event.getNumberOfRetryAttempts()));     }          public static String responseToRetry(Retry retry, WebTestClient testClient, String path) {         WebTestClient.ResponseSpec responseSpec = testClient.post().uri(path).exchange();         try {             responseSpec.expectStatus().is4xxClientError();             return "HIT_ERROR_4XX";         } catch (Throwable error) {         }         try {             responseSpec.expectStatus().is5xxServerError();             return "HIT_ERROR_5XX";         } catch (Throwable error) {         }         responseSpec.expectStatus().is2xxSuccessful();         return "HIT_200";     }

复制代码

Resilience4jTest测试类

复制代码

    private RetryRegistry retryRegistry;     private Retry retry;     retryRegistry = new InMemoryRetryRegistry();     retry = retryRegistry.retry("resilience4jTest",             RetryConfig                     .custom()                     .maxAttempts(5)                     .waitDuration(Duration.ofMillis(500))                     .retryOnResult(val -> val.toString().contains("HIT_ERROR_"))  //                     .retryExceptions(RuntimeException.class)                     .build());     Resilience4jTestHelper.retryEventListener(retry);    @Test     public void When_Test_CircuitBreaker_Expect_Retry() {         AtomicInteger count = new AtomicInteger();         for (int i = 0; i < 30; i++) {             String path = expectError() ? PATH_200 : PATH_400;             Callable<String> response = Retry.decorateCallable(retry, () -> Resilience4jTestHelper.responseToRetry(retry, testClient, path));             Try.of(response::call).andThen(val -> log.info(">>>>>>>>>> result {}: {}", count.incrementAndGet(), val));             Resilience4jTestHelper.getRetryStatus("))))))))))", retry);         }     }

复制代码

1.5 TimeLimiter

TimeLImiter超时控制,和CircuitBreaker的slowCall相似,只是CircuitBreaker的slowCall触发了超时只是将超时记录在Metrics中不会抛出异常,而TimeLimiter触发了超时会直接抛出异常。

而且TimeLimiter配置类很简单

配置属性默认值描述
timeoutDuration5 [s]超时时间,默认1s
cancelRunningFutureTRUE当触发超时时是否取消运行中的Future

1.5.1 TimeLimiter Demo

Resilience4jTestHelper测试辅助类

复制代码

    public static void timeLimiterEventListener(TimeLimiter timeLimiter) {         timeLimiter.getEventPublisher()                 .onSuccess(event -> log.info("---------- timeLimiter success:{}", event))                 .onError(event -> log.info("---------- timeLimiter error:{}", event))                 .onTimeout(event -> log.info("---------- rateLimiter timeout:{}", event));     }     public static String responseToTimeLimiter(TimeLimiter timeLimiter, CircuitBreaker circuitBreaker, WebTestClient testClient, String path) {         WebTestClient.ResponseSpec responseSpec = testClient.post().uri(path).exchange();         try {             responseSpec.expectStatus().is4xxClientError();             circuitBreaker.onError(0, TimeUnit.MILLISECONDS, new RuntimeException("<<<<< hit 4XX >>>>>"));             timeLimiter.onError(new RuntimeException("<<<<< hit 4XX >>>>>"));             throw new RuntimeException("<<<<< hit 4XX >>>>>");         } catch (Throwable error) {         }         try {             responseSpec.expectStatus().is5xxServerError();             circuitBreaker.onError(0, TimeUnit.MILLISECONDS, new RuntimeException("<<<<< hit 5XX >>>>>"));             timeLimiter.onError(new RuntimeException("<<<<< hit 5XX >>>>>"));             throw new RuntimeException("<<<<< hit 5XX >>>>>");         } catch (Throwable error) {         }         responseSpec.expectStatus().is2xxSuccessful();         timeLimiter.onSuccess();         return "hit 200";     }

复制代码

Resilience4jTest测试类

复制代码

    private TimeLimiterRegistry timeLimiterRegistry;     private TimeLimiter timeLimiter;          timeLimiterRegistry = new InMemoryTimeLimiterRegistry();     timeLimiter = timeLimiterRegistry.timeLimiter("resilience4jTest",             TimeLimiterConfig                     .custom()                     .timeoutDuration(Duration.ofMillis(1000 * 1))                     .cancelRunningFuture(true)                     .build());                          Resilience4jTestHelper.timeLimiterEventListener(timeLimiter);          @Test     public void When_Test_CircuitBreaker_Expect_Timeout() {         AtomicInteger count = new AtomicInteger();         ExecutorService executorService = Executors.newFixedThreadPool(10);         for (int i = 0; i < 30; i++) {             String path = expectError() ? PATH_408 : PATH_200;             Future<String> futureStr =                     executorService.submit(() -> Resilience4jTestHelper.responseToTimeLimiter(timeLimiter, circuitBreaker, testClient, path));             Callable<String> stringCallable = timeLimiter.decorateFutureSupplier(() -> futureStr);             Callable<String> response = circuitBreaker.decorateCallable(stringCallable);             Try.of(response::call).andThen(val -> log.info(">>>>>>>>>> success {} {}", count.incrementAndGet(), val))                 .recover(CallNotPermittedException.class, throwable -> {                 Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> open CircuitBreaker " + count.incrementAndGet(), circuitBreaker);                 return "hit CircuitBreaker";             }).recover(throwable -> {                 Resilience4jTestHelper.getCircuitBreakerStatus(">>>>>>>>>> call fallback " + count.incrementAndGet(), circuitBreaker);                 log.error(">>>>>>>>>> fallback:{}", throwable.getMessage());                 return "hit Fallback";             });         }     }

复制代码

到此Resilience4j组件的基本用法介绍完毕,上面的测试代码我没有截图测试的结果,附上代码地址各位看官可以在本地跑跑测试代码

来源https://www.cnblogs.com/hlkawa/p/15358686.html

文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐