【OpenFeign】 重试机制
【OpenFeign】 重试机制
Metadata
title: 【OpenFeign】 重试机制
date: 2023-01-02 19:23
tags:
- 行动阶段/完成
- 主题场景/组件
- 笔记空间/KnowladgeSpace/ProgramSpace/ModuleSpace
- 细化主题/Module/OpenFeign/功能
categories:
- OpenFeign
keywords:
- OpenFeign
description: 【OpenFeign】 重试机制
【OpenFeign】 重试机制
重试机制: 在发生异常时,重新尝试请求,多次还是失败时,才会抛出异常。
应用场景: 可能由于网络抖动出现第一次调用失败,尝试几次就可以恢复正常。
比如 Spring 提供的声明式的重试类库Spring-Retry
。
Feign Core
也提供了自己的重试机制,基于Retryer
接口。
Retryer 接口
Retryer 接口位于 feign-core
包中,声明了一个连续重试方法,及一个从不重试的实例对象。
public interface Retryer extends Cloneable {
// 不重试Retryer 示例对象,直接抛出异常
Retryer NEVER_RETRY = new Retryer() {
public void continueOrPropagate(RetryableException e) {
throw e;
}
public Retryer clone() {
return this;
}
};
// 连续重试
void continueOrPropagate(RetryableException var1);
}
Default 类
Default
是 Retryer 接口的默认实现类,也就是 Feign 的默认重试策略。
public static class Default implements Retryer {
// 最大访问次数(包含了第一次和重试的次数)
private final int maxAttempts;
// 重试的间隔时间
private final long period;
// 最大重试间隔
private final long maxPeriod;
// 当前访问次数
int attempt;
// 总的重试间隔
long sleptForMillis;
// 默认配置(重试间隔100毫秒、最大重试间隔1S、最多访问5次)
public Default() {
this(100L, TimeUnit.SECONDS.toMillis(1L), 5);
}
public Default(long period, long maxPeriod, int maxAttempts) {
this.period = period;
this.maxPeriod = maxPeriod;
this.maxAttempts = maxAttempts;
this.attempt = 1;
}
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
// 默认重试算法
public void continueOrPropagate(RetryableException e) {
// 如果重试的次数attempt大于最大重试次数,直接抛出异常
// attempt的初始值为1 ,i++是先进行运算,所以就表示当前重试次数,比较结束了之后再+1,
// 当第attempt 为5时,也就是第五次进入这里,条件为true,则会直接抛出异常,不再进行请求发送。
// 所以最终是,重试了四次,总共请求了5次。
if (this.attempt++ >= this.maxAttempts) {
throw e;
} else {
long interval;
// 响应数据是否包含了 Retry-After 头,这个头用来告诉用户代理需要等待多长时间之后才能继续发送请求
if (e.retryAfter() != null) {
interval = e.retryAfter().getTime() - this.currentTimeMillis();
if (interval > this.maxPeriod) {
interval = this.maxPeriod;
}
if (interval < 0L) {
return;
}
} else {
// 计算重试时间间隔
interval = this.nextMaxInterval();
}
try {
// 线程睡眠时间间隔 第一次为 150ms
Thread.sleep(interval);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
throw e;
}
// 记录时间间隔总数
this.sleptForMillis += interval;
}
}
// 计算公式=》 配置的间隔时间(默认100ms)* (1.5 的(当前重试次数)次方)
// 接着判断 当前时间时间,是否超出了最大限制,超过了则返回最大时间间隔。
long nextMaxInterval() {
long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1)));
return interval > this.maxPeriod ? this.maxPeriod : interval;
}
public Retryer clone() {
return new Retryer.Default(this.period, this.maxPeriod, this.maxAttempts);
}
}
Feign 重试机制源码分析
默认是不进行重试,所以需要配置。
@Bean
Retryer feignRetryer() {
return new Retryer.Default();
}
在订单服务配置一个超长的线程睡眠,我们手动触发一个读取超时,然后由这个超时异常,由最里层往上进行分析,因为之前分析过很多次执行流程源码了,这里只着重看下异常。
这个异常是有底层 HTTP 框架抛出的,会被负载均衡客户端捕获到,这里 catch 了Exception
,所以异常都会被捕获,捕获了之后,会转化为ClientException
。
ClientException
又会被负载均衡客户端的execute
方法捕获到。因为SocketTimeoutException
也是IOException
,直接会直接抛出IOException
。
IOException
又会被方法处理器捕获到,然后转为RetryableException
(可以重启的异常)。
在上述代码的最后,调用FeignException
的errorExecuting
,可以看到,创建了一个RetryableException
,
static FeignException errorExecuting(Request request, IOException cause) {
return new RetryableException(-1, String.format("%s executing %s %s", cause.getMessage(), request.httpMethod(), request.url()), request.httpMethod(), cause, (Date)null, request);
}
最终,RetryableException
还是会被方法处理器的invoke
方法所捕获到,进行重试,直到重试到最大次数时,还是失败,就会抛出异常,结束循环,执行失败。
可以看到方法执行器,每次执行的时候,都会创建一个重试器,而且方法执行的时候,是一个死循环,只有抛出异常时或者正常返回时,才会结束。
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = this.buildTemplateFromArgs.create(argv);
Options options = this.findOptions(argv);
// 每次请求都会创建一个重试处理器
Retryer retryer = this.retryer.clone();
while(true) {
try {
return this.executeAndDecode(template, options);
} catch (RetryableException var9) {
RetryableException e = var9;
// 捕获到RetryableException ,则会调用重试处理器,主要是进行执行时间计算,没有抛出异常,则继续循环
try {
retryer.continueOrPropagate(e);
} catch (RetryableException var8) {
Throwable cause = var8.getCause();
if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) {
throw cause;
}
throw var8;
}
if (this.logLevel != Level.NONE) {
this.logger.logRetry(this.metadata.configKey(), this.logLevel);
}
}
}
}
某些疑问
多次重试请求,发出请求的策略?
默认的重试算法,会有一个间隔时间,线程会休眠,然后再重新执行请求(在 Default 源码中已经分析过了)。
计算公式 =》 配置的间隔时间(默认 100ms)* (1.5 的(当前重试次数)次方)。
long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1)));
return interval > this.maxPeriod ? this.maxPeriod : interval;
这里可以举个例子, 默认最大重试次数为 5 次,最大间隔时间为 1 秒,第一次休眠时间为 100ms,由计算公式我们可以推倒出来如下结果:
1. 第一次重试,100*(1.5的1次方)间隔150 毫秒后再次请求
2. 第二次重试,100*(1.5的2次方)间隔225 毫秒后再次请求
3. 第三次重试,100*(1.5的3次方)间隔337 毫秒后再次请求
4. 第四次重试,100*(1.5的4次方)间隔506 毫秒后再次请求,
5. 第五次,这个时候,访问次数attempt 达到了最大值5次,不会再重试了,而是直接抛出异常,结束请求
其他异常,会被重试吗?
通过以上源码分析,可知,只有 IO 异常时,才会解析为可重试异常,进行重试操作。
和 Feign 一样,Ribbon 也是有重试机制,接下来按照上面的套路分析下 Ribbon 。
RxJava
RxJava - JVM
响应式扩展 Reactive Extensions 用于使用 Java VM 的可观察序列编写异步和基于事件的程序的库。
RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables
与 Observe
r。Observables
作为被观察者,是一个值或事件的流集合;而Observer
则作为观察者,根据 Observables
进行处理。
Observables 与 Observer 之间的订阅发布关系 (观察者模式) 如下:
订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 通过回调 next 方法向 Observer 发布事件。
RetryHandler 接口
RetryHandler
接口是 Ribbon 的重试处理器,用来处理重试逻辑。
public interface RetryHandler {
RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();
// 判断当前异常是否应该重试
boolean isRetriableException(Throwable var1, boolean var2);
// 是否是熔断类型异常;
boolean isCircuitTrippingException(Throwable var1);
// 获取当前节点最大重试次数
int getMaxRetriesOnSameServer();
// 获取调用不同节点的最大重试次数
int getMaxRetriesOnNextServer();
}
DefaultLoadBalancerRetryHandler
DefaultLoadBalancerRetryHandler
是RetryHandler
接口的默认实现类,重点是看它的属性和构造函数:
// 定义了可以重试的异常
private List<Class<? extends Throwable>> retriable = Lists.newArrayList(new Class[]{ConnectException.class, SocketTimeoutException.class});
// 定义了可以重试的异常
private List<Class<? extends Throwable>> circuitRelated = Lists.newArrayList(new Class[]{SocketException.class, SocketTimeoutException.class});
// 对应 MaxAutoRetries 配置
protected final int retrySameServer;
// 对应 MaxAutoRetriesNextServer 配置
protected final int retryNextServer;
// 是否开启重试
protected final boolean retryEnabled;
// 没有参数时,表示不重试
public DefaultLoadBalancerRetryHandler() {
this.retrySameServer = 0;
this.retryNextServer = 0;
this.retryEnabled = false;
}
RequestSpecificRetryHandler
RequestSpecificRetryHandler
也实现了RetryHandler
接口,从名字上看,是具有请求特征的重试处理器,每次请求时,Ribbon 都会创建单独的一个RequestSpecificRetryHandler
(这是实际使用的处理器) ,也就是会和当前客户端的配置(IClientConfig 对象)绑定,实现不同请求,不同配置策略。
可以看到,RequestSpecificRetryHandler
代理了一个RetryHandler
,默认是DefaultLoadBalancerRetryHandler
,看他们的参数,可以知道,默认 Ribbon 重试机制也是关闭的。
LoadBalancerCommand
FeignLoadBalancer
的executeWithLoadBalancer
方法中调用buildLoadBalancerCommand
方法构造LoadBalancerCommand
对象。这个类是将Ribbon
将请求转为 RxJava API 调用的实现。
该类的selectServer
方法,会在注册中心中,根据负载均衡算法获取到一个健康的可用服务,然后返回一个Observable
对象,这是一个观察者对象,创建的时候传入了一个Subscriber
订阅者对象,当Observable
对象被订阅时,Subscriber
中的 call 方法会执行。
// 定义一个事件源
// Create操作符是用来创建一个Observable
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
// Observable 包含了一个OnSubscribe 对象
// 当Observable被订阅(subscribe)时,OnSubscribe接口的call方法会被执行
@Override
public void call(Subscriber<? super Server> next) {
try {
// 事件源被订阅时,执行call
// 负载均衡查询可用服务
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
//
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
在submit
方法中,会创建一个Observable
对象,用来观察请求执行状态,如果失败,则会重试,达到最大次数,抛出异常,结束重试。
public Observable<T> submit(final ServerOperation<T> operation) {
// 省略.....
// 获取重试器中 最大重试次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// (server == null ? selectServer() : Observable.just(server))
// 获取可用服务的Observable 对象
Observable<T> o = (server == null ? selectServer() : Observable.just(server))
// concatMap发射的数据集是有序的
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// 为选中的每台服务器执行调用
public Observable<T> call(Server server) {
context.setServer(server);
// 获取当前服务的监控记录
final ServerStats stats = loadBalancerContext.getServerStats(server);
// 为每次尝试和重试调用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
// 重试次数计数
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or onError are never called?
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
// 针对同一实例的重试回调
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
// 重试下一个实例的回调
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
// 重试超过次数则终止调用并设置对应异常的回调
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
// 封装异常信息并返回
@Override
public Observable<T> call(Throwable e) {
// 省略......
return Observable.error(e);
}
});
}
使用案例
以下为全局配置,也可以添加名称前缀,指定客户端。
# Ribbon 配置
ribbon:
MaxAutoRetries: 2
MaxAutoRetriesNextServer: 3
OkToRetryOnAllOperations: false
各参数说明如下:
- MaxAutoRetries:单节点最大重试次数,达到最大值时,切换到下一个示例
- MaxAutoRetriesNextServer:更换下一个重试节点的最大次数,可以设置为服务提供者副本数,也是就每个副本都查询一次。
- OkToRetryOnAllOperations: 是否对所有请求进行重试,默认 fasle,则只会对 GET 请求进行重试,建议配置为 false,不然添加数据接口,会造成多条重复,也就是幂等性问题。
重试总次数计算公式:
MaxAutoRetries+(MaxAutoRetries+1)*(MaxAutoRetriesNextServer)
第一次请求时异常,重试 2 次,查询下一个节点,发送一次请求失败,进入重试,再重试 2 次失败,继续查询三次节点重试,所以以上配置会重试(2+3*3=11)次
- Feign 自带重试机制,默认不开启,原理是捕获异常,发现超时异常,会进行重试,直到达到最大重试次数,退出循环请求
- Ribbon 也实现了自己的重试机制,基于 RxJava,异步处理超时异常,默认也是不开启,需要添加重试次数设置
- 推荐使用 Ribbon 重试机制,需要注意关闭
OkToRetryOnAllOperations
,不然很容易出现接口幂等性问题,而且下游服务的 GET 请求,是要求只做查询功能