Feign和Ribbon的重试机制

Feign和Ribbon的重试机制

Ribbon的重试机制

配置

ribbon:
  ReadTimeout: 2000
  ConnectTimeout: 2000
  MaxAutoRetries: 1 #同一台实例最大重试次数,不包括首次调用
  MaxAutoRetriesNextServer: 3 #重试负载均衡其他的实例最大重试次数,不包括首次调用
  OkToRetryOnAllOperations: true  #是否所有操作都重试

结果

Feign和Ribbon的重试机制

Feign和Ribbon的重试机制

可以看到2台机器分别被请求了四次,而且每个时间的间隔大约为2秒左右。一开始请求的是8083接口,请求的次数为8083,8083,8081,8081,8083,8083,8081,8083。

那它的计算公式为:(MaxAutoRetries+ 1)*(MaxAutoRetriesNextServer+1) =8次

源码分析

首先由于在配置文件中配置了超时时间这些信息,一开始注入bean的时候,会读取这些值,覆盖默认值,在HttpClientRibbonConfiguration中

    @Bean
    @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
    @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
    public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
            IClientConfig config, ServerIntrospector serverIntrospector,
            ILoadBalancer loadBalancer, RetryHandler retryHandler,
            CloseableHttpClient httpClient) {
        RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
                httpClient, config, serverIntrospector);
        client.setLoadBalancer(loadBalancer);
        client.setRetryHandler(retryHandler);
        Monitors.registerObject("Client_" + this.name, client);
        return client;
    }

    public RibbonLoadBalancingHttpClient(CloseableHttpClient delegate,
            IClientConfig config, ServerIntrospector serverIntrospector) {
        super(delegate, config, serverIntrospector);
    }

    protected AbstractLoadBalancingClient(D delegate, IClientConfig config,
            ServerIntrospector serverIntrospector) {
        super(null);
        this.delegate = delegate;
        this.config = config;
        this.serverIntrospector = serverIntrospector;
        this.setRetryHandler(RetryHandler.DEFAULT);
        initWithNiwsConfig(config);
    }

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        super.initWithNiwsConfig(clientConfig);
        RibbonProperties ribbon = RibbonProperties.from(clientConfig);
        this.connectTimeout = ribbon.connectTimeout(DEFAULT_CONNECT_TIMEOUT);
        this.readTimeout = ribbon.readTimeout(DEFAULT_READ_TIMEOUT);
        this.secure = ribbon.isSecure();
        this.followRedirects = ribbon.isFollowRedirects();
        this.okToRetryOnAllOperations = ribbon.isOkToRetryOnAllOperations();
        this.gzipPayload = ribbon.isGZipPayload(DEFAULT_GZIP_PAYLOAD);
    }

当方法开始执行的时候,由于前面已经断点过此处就直接放行,一直来到LoadBalancerFeignClient

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        try {
            URI asUri = URI.create(request.url());
            String clientName = asUri.getHost();
            URI uriWithoutHost = cleanUrl(request.url(), clientName);
            FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
                    this.delegate, request, uriWithoutHost);

            IClientConfig requestConfig = getClientConfig(options, clientName);
            return lbClient(clientName)
                    .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
        }
        catch (ClientException e) {
            IOException io = findIOException(e);
            if (io != null) {
                throw io;
            }
            throw new RuntimeException(e);
        }
    }

在进入到executeWithLoadBalancer的方法中。

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        LoadBalancerCommand command = buildLoadBalancerCommand(request, requestConfig);

        try {
            return command.submit(
                new ServerOperation() {
                    @Override
                    public Observable call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        }
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }

    }

executeWithLoadBalancer方法中会创建一个 LoadBalancerCommand,然后调用 LoadBalancerCommandsubmit方法提交请求Operation, submit方法源码如下(有删减):


   public Observable submit(final ServerOperation operation) {
        // .......

        //  获取重试次数
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
        // Use the load balancer
        Observable o = (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1>() {
                    @Override
                    public Observable call(Server server) {
                        //.......

                        // 相同节点的重试
                        if (maxRetrysSame > 0)
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
        // 不同节点的重试
        if (maxRetrysNext > 0 && server == null)
            o = o.retry(retryPolicy(maxRetrysNext, false));
        return o.onErrorResumeNext(...);

submit方法中调用retryHandler的getMaxRetriesOnSameServer方法和getMaxRetriesOnNextServer方法分别获取配置maxRetrysSame、maxRetrysNext。maxRetrysSame表示调用相同节点的重试次数,默认为0;maxRetrysNext表示调用不同节点的重试次数,默认为1。

retryPolicy方法返回的是一个包装RetryHandler重试决策者的RxJava API的对象,最终由该RetryHandler决定是否需要重试,如抛出的异常是否允许重试。而是否达到最大重试次数则是在retryPolicy返回的Func2中完成,这是RxJava的API,retryPolicy方法的源码如下。


private Func2 retryPolicy(final int maxRetrys, final boolean same) {
    return new Func2() {
        @Override
        public Boolean call(Integer tryCount, Throwable e) {
            if (e instanceof AbortExecutionException) {
                return false;
            }
            // 大于最大重试次数
            if (tryCount > maxRetrys) {
                return false;
            }
            if (e.getCause() != null && e instanceof RuntimeException) {
                e = e.getCause();
            }
            // 调用RetryHandler判断是否重试
            return retryHandler.isRetriableException(e, same);
        }
    };

那么这个retryHandler是怎么来的呢?

FeignLoadBalancer的executeWithLoadBalancer方法中调用buildLoadBalancerCommand方法构造LoadBalancerCommand对象时创建的,buildLoadBalancerCommand方法源码如下。

    protected LoadBalancerCommand buildLoadBalancerCommand(final S request, final IClientConfig config) {
        // 获取RetryHandler
        RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
        // 使用Builder构造者模式构造LoadBalancerCommand
        LoadBalancerCommand.Builder builder = LoadBalancerCommand.builder()
                .withLoadBalancerContext(this)
                // 传入RetryHandler
                .withRetryHandler(handler)
                .withLoadBalancerURI(request.getUri());
        return builder.build();
    }

从源码中可以看出, Ribbon使用的 RetryHandlerRequestSpecificRetryHandler。这里还用到了 Builder构造者模式。 FeignLoadBalancergetRequestSpecificRetryHandler方法源码如下:


@Override
public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
    RibbonRequest request, IClientConfig requestConfig) {
    //.....
    if (!request.toRequest().httpMethod().name().equals("GET")) {
        // 调用this.getRetryHandler()方法获取一次RetryHandler
        return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
                requestConfig);
    }
    else {
        // 调用this.getRetryHandler()方法获取一次RetryHandler
        return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
                requestConfig);
    }

Feign的重试机制

因为ribbon的重试机制和Feign的重试机制有冲突,所以源码中默认关闭Feign的重试机制,源码如下

Feign和Ribbon的重试机制

配置

feign:
  client:
    config:
      ServiceA:
        connect-timeout: 3000
        read-timeout: 3000

      @Bean
    public Retryer feignRetryer() {
        return new Retryer.Default(100, SECONDS.toMillis(1), 7);
    }

结果

Feign和Ribbon的重试机制Feign和Ribbon的重试机制

会发现2台机器是轮休的访问,时间超过3秒多一点,是因为期间有一个休眠,而且feign的超时和ribbon的超时是不一样的,而且如果同时都配置的话,生效的只是Feign,那下面就分析一下看看。

源码分析

SynchronousMethodHandler的反射方法

  @Override
  public Object invoke(Object[] argv) throws Throwable {
    RequestTemplate template = buildTemplateFromArgs.create(argv);
    Options options = findOptions(argv);
    Retryer retryer = this.retryer.clone();
    while (true) {
      try {
        return executeAndDecode(template, options);
      } catch (RetryableException e) {
        try {
          retryer.continueOrPropagate(e);
        } catch (RetryableException th) {
          Throwable cause = th.getCause();
          if (propagationPolicy == UNWRAP && cause != null) {
            throw cause;
          } else {
            throw th;
          }
        }
        if (logLevel != Logger.Level.NONE) {
          logger.logRetry(metadata.configKey(), logLevel);
        }
        continue;
      }
    }
  }

在它的方法里面下面就是核心的尝试机制,前面嵌套一层循环。此处的retryer已经拿到了配置的重试参数。

Feign和Ribbon的重试机制

后面会进入到下面的这个方法中,这里就是核心,会创建一个handler,由于创建的类型不是默认的,会创建FeignOptionsClientConfig,导致之前配置的ribbon的会失效,这样它重试的次数和最大的次数都是-1,int值的时候会变成0,就会走feign的重试机制

Feign和Ribbon的重试机制

Feign和Ribbon的重试机制

Feign和Ribbon的重试机制

如果超时的会就会来到一开始的方法,被catch中,核心代码如下:

Feign和Ribbon的重试机制
    public void continueOrPropagate(RetryableException e) {
       //如果重试机制大于最大的尝试次数,则抛出异常
      if (attempt++ >= maxAttempts) {
        throw e;
      }

      long interval;
      if (e.retryAfter() != null) {
          //根据上次重试的时间减去当前时间来决定重试的间隔
        interval = e.retryAfter().getTime() - currentTimeMillis();
        if (interval > maxPeriod) {
          interval = maxPeriod;
        }
        if (interval < 0) {
          return;
        }
      } else {
        interval = nextMaxInterval();
      }
      try {
        Thread.sleep(interval);
      } catch (InterruptedException ignored) {
        Thread.currentThread().interrupt();
        throw e;
      }
      sleptForMillis += interval;
    }

Feign和Ribbon的重试机制

Original: https://www.cnblogs.com/dalianpai/p/15439861.html
Author: 天宇轩-王
Title: Feign和Ribbon的重试机制

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/551519/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球