常用限流方案的设计和实现

JAVA实用技巧

2018-05-31

37

0

为了保证在业务高峰期,线上系统也能保证一定的弹性和稳定性,最有效的方案就是进行服务降级了,而限流就是降级系统最常采用的方案之一。

限流即流量限制,或者高大上一点,叫做流量整形,限流的目的是在遇到流量高峰期或者流量突增(流量尖刺)时,把流量速率限制在系统所能接受的合理范围之内,不至于让系统被高流量击垮。

其实,服务降级系统中的限流并没有我们想象的那么简单,第一,限流方案必须是可选择的,没有任何方案可以适用所有场景,每种限流方案都有自己适合的场景,我们得根据业务和资源的特性和要求来选择限流方案;第二,限流策略必须是可配的,对策略调优一定是个长期的过程,这里说的策略,可以理解成建立在某个限流方案上的一套相关参数。

目前有几种常见的限流方式:

  1. 通过限制单位时间段内调用量来限流
  2. 通过限制系统的并发调用程度来限流
  3. 使用漏桶(Leaky Bucket)算法来进行限流
  4. 使用令牌桶(Token Bucket)算法来进行限流

下面我们来一起讨论各种限流方式的设计和具体使用场景。

对于第1种,通过限制某个服务的单位时间内的调用量来进行限流。从字面上,确实很容易理解,我们需要做的就是通过一个计数器统计单位时间段某个服务的访问量,如果超过了我们设定的阈值,则该单位时间段内则不允许继续访问、或者把接下来的请求放入队列中等待到下一个单位时间段继续访问。这里,计数器在需要在进入下一个单位时间段时先清零。

我们来看看在Java语言中,这种方式具体应该如何做,第一步我们需要做的就是确定这个单位时间段有多长,肯定不能太长,太长将会导致限流的效果变得不够“敏感”,因为我们知道,进入限流阶段后,如果采用的手段是不允许继续访问,那么在该单位时间段内,该服务是不可用的,比如我们把单位时间设置成1小时,如果在第29分钟,该服务的访问量就达到了我们设定的阈值,那么在接下来的31分钟,该服务都将变得不可用,这无形SO BAD!!如果单位时间段设置得太短,越短的单位时间段将导致我们的阈值越难设置,比如1秒钟,因为高峰期的1秒钟和低峰期的1秒钟的流量有可能相差百倍甚至千倍,同时过短的单位时间段也对限流代码片段提出了更高要求,限流部分的代码必须相当稳定并且高效!最优的单位时间片段应该以阈值设置的难易程度为标准,比如我们的监控系统统计的是服务每分钟的调用量,所以很自然我们可以选择1分钟作为时间片段,因为我们很容易评估每个服务在高峰期和低峰期的分钟调用量,并可以通过服务在每分钟的平均耗时和异常量来评估服务在不同单位时间段的服务质量,这给阈值的设置提供了很好的参考依据。当单位时间段和阈值已经确定,接下来就该考虑计数器的实现了,最快能想到的就是AtomicLong了,对于每次服务调用,我们可以通过AtomicLong#incrementAndGet()方法来给计数器加1并返回最新值,我们可以通过这个最新值和阈值来进行比较来看该服务单位时间段内是否超过了阈值。这里,如何设计计数器是个关键,假设单位时间段为1分钟,我们可以做一个环状结构的计数器,如下:


当然我们可以直接用一个数组来实现它:new AtomicLong[]{new AtomicLong(0), new AtomicLong(0), newAtomicLong(0)},当前分钟AtomicLong保存的是当前单位时间段内该服务的调用次数,上一分钟显然保存的是上一单位时间段的统计数据,之所以有这个是为了统计需要,既然到了当前的单位时间段,说明上一分钟的访问统计已经结束,即可将上一分钟的该接口的访问量数据打印日志或发送到某个服务端进行统计,因为我们说过,阈值的设置是个不断调优的过程,所以有时候这些统计数据会很有用。在对当前时间段的访问量进行统计的时候,需要将下一分钟的AtomicLong清零,这一步是很关键的,有两种清零方案:第一种,直接(通过Executors.newSingleThreadScheduledExecutor)起个单独线程,比如每隔50秒(这个当然必须小于单位时间段)对下一分钟的AtomicLong进行清零。第二种,每次在给当前分钟AtomicLong加1时,对下一分钟的AtomicLong的值进行检测,如果不是0,则设置成0,如果采用这种方案,这里会有一个bug,如果某个服务在某个完整的单位时间段内一次也没有被调用,则下一分钟的AtomicLong在使用前显然没有被清0,所以采用第二种方案还得通过额外的一个字段保存上一次清0的时间,每次使用当前分钟AtomicLong时,需要先判断这个字段,如果超过一个单位时间段,这则需要先清0再使用。两种方案对比来看,第一种方案实现起来更简单。对于如何访问当前分钟、上一分钟和下一分钟的AtomicLong,可以直接通过当前分钟数来对数组的length取模即可(比如获取当前分钟的数据index:(System.currentTimeMillis() / 60000) % 3)。

对于限制单位时间段内调用量的这种限流方式,实现简单,适用于大多数场景,如果阈值可以通过服务端来动态配置,甚至可以当做业务开关来使用,但也有一定的局限性,因为我们的阈值是通过分析单位时间段内调用量来设置的,如果它在单位时间段的前几秒就被流量突刺消耗完了,将导致该时间段内剩余的时间内该服务“拒绝服务”,可以将这种现象称为“突刺消耗”,但庆幸的是,这种情况并不常见。

第2种说的是通过并发限制来限流,我们通过严格限制某服务的并发访问程度,其实也就限制了该服务单位时间段内的访问量,比如限制服务的并发访问数是100,而服务处理的平均耗时是10毫秒,那么1分钟内,该服务平均能提供( 1000 / 10 ) * 60 * 100 = 6,000 次,而且这个于第1种限流方案相比,它有着更严格的限制边界,因为如果采用第1种限流方案,如果大量服务调用在极短的时间内产生,仍可能压垮系统,甚至产生雪崩效应。但是并发的阈值设置成多少比较合适呢?大多数业务的监控系统不会去统计某个服务在单位时间段内的并发量,这是因为很多监控系统都是通过业务日志来做到“异步”调用量的统计,但如果要统计并发量,则需要嵌入到代码调用层面中去,比如通过AOP,如果要这样的话,监控系统最好能和RPC框架或者服务治理等框架来配合使用,这样对于开发人员来说,才足够透明。介于上面所说的,我们很难去评估某个服务在业务高峰期和低峰期的并发量,这给并发阈值的设置带来了困难,但我们可以通过线上业务的监控数据来逐步对并发阈值进行调优,只要肯花时间,我们总能找到一个即能保证一定服务质量又能保证一定服务吞吐量的合理并发阈值。用Java来实现并发也相当简单,我们第一个想到的可能就是信号量Semaphore,在服务的入口调用非阻塞方法Semaphore#tryAcquire()来尝试获取一个信号量,如果失败,则直接返回或将调用放入某个队列中,然后在finally语句块中调用Semaphore#release(),这里需要注意,如果某个调用请求没有获取信号量成功,那么它不应该调用release方法来释放信号量(具体原因分析可以参考:http://manzhizhen.iteye.com/blog/2307298)。我们常常会希望还有如下效果:

  1. 既想控制某服务的并发访问程度,又想知道实际的最高并发量会达到多少。
  2. 能动态调整并发量数据。

所以,在具体实现时,你得格外小心,比如Semaphore没有提供重设信号量的方法(原因参看:http://manzhizhen.iteye.com/blog/2307298),所以在需要修改信号量数量时,只能重新new一个Semaphore,但这是在Semaphore使用中过程中进行修改的,需要确保tryAcquire和release操作是在同一个Semaphore之上。

总所周知,并发量限流一般用于对于服务资源有严格限制的场景,比如连接数、线程数等,但也未尝不能用于通用的服务场景。现在业务的复杂性,给系统的设计带来了一定挑战,而且随着业务的发展,系统的架构会不断的拆分并服务化,下面的服务依赖场景会很常见:



  

从上图可以看出上游的A、B服务直接依赖了下游的基础服务C、D和E,对于A,B服务都依赖的基础服务D这种场景,服务A和B其实处于某种竞争关系,当我们考量服务A的并发阈值时,不可忽略的是服务B对A的影响,所以,大多情况并发阈值的设置需要保守一点,如果服务A的并发阈值设置过大,当流量高峰期来临,有可能直接拖垮基础服务D并影响服务B,即雪崩效应来了。从表面上看并发量限流似乎很有用,但也不可否认,它仍然可以造成流量尖刺,即每台服务器上该服务的并发量从0上升到阈值是没有任何“阻力”的,这是因为并发量考虑的只是服务能力边界的问题。

 

第3种是通过漏桶算法来进行限流,漏桶算法是网络中流量整形的常用算法之一,它有点像我们生活中用到的漏斗,液体倒进去以后,总是从下端的小口中以固定速率流出,漏桶算法也类似,不管突然流量有多大,漏桶都保证了流量的常速率输出,也可以类比于调用量,比如,不管服务调用多么不稳定,我们只固定进行服务输出,比如每10毫秒接受一次服务调用。既然是一个桶,那就肯定有容量,由于调用的消费速率已经固定,那么当桶的容量堆满了,则只能丢弃了,漏桶算法如下图:



 

 

漏桶算法其实是悲观的,因为它严格限制了系统的吞吐量,从某种角度上来说,它的效果和并发量限流很类似。漏桶算法也可以用于大多数场景,但由于它对服务吞吐量有着严格固定的限制,如果在某个大的服务网络中只对某些服务进行漏桶算法限流,这些服务可能会成为瓶颈。其实对于可扩展的大型服务网络,上游的服务压力可以经过多重下游服务进行扩散,过多的漏桶限流似乎意义不大。

实现方面,可以先准备一个队列,当做桶的容量,另外通过一个计划线程池(ScheduledExecutorService)来定期从队列中获取并执行请求调用,当然,我们没有限定一次性只能从队里中拿取一个请求,比如可以一次性拿100个请求,然后并发执行。

 

 

 

第4种是令牌桶算法限流,令牌桶算法从某种程度上来说是漏桶算法的一种改进,漏桶算法能够强行限制请求调用的速率,而令牌桶算法能够在限制调用的平均速率的同时还允许某种程度的突发调用。在令牌桶算法中,桶中会有一定数量的令牌,每次请求调用需要去桶中拿取一个令牌,拿到令牌后才有资格执行请求调用,否则只能等待能拿到足够的令牌数,读者看到这里,可能就认为是不是可以把令牌比喻成信号量,那和前面说的并发量限流不是没什么区别嘛?其实不然,令牌桶算法的精髓就在于“拿令牌”和“放令牌”的方式,这和单纯的并发量限流有明显区别,采用并发量限流时,当一个调用到来时,会先获取一个信号量,当调用结束时,会释放一个信号量,但令牌桶算法不同,因为每次请求获取的令牌数不是固定的,比如当桶中的令牌数还比较多时,每次调用只需要获取一个令牌,随着桶中的令牌数逐渐减少,当到令牌的使用率(即使用中的令牌数/令牌总数)达某个比例,可能一次请求需要获取两个令牌,当令牌使用率到了一个更高的比例,可能一次请求调用需要获取更多的令牌数。同时,当调用使用完令牌后,有两种令牌生成方法,第一种就是直接往桶中放回使用的令牌数,第二种就是不做任何操作,有另一个额外的令牌生成步骤来将令牌匀速放回桶中。如下图:



 

 

在并发量限制时,在达到并发阈值之前,并发量和前来调用的线程数可以说是成严格正比关系的,但在令牌桶中可能就并不是这样,下面给出在某种特定场景和特定参数下四种限流方式对服务并发能力影响的折线图,其中X轴表示当前并发调用数,而Y轴表示某服务在不同并发调用程度下采取限流后的实际并发调用数:

 

 

 

其实,不同场景不同参数下,服务采用所述四种限流方式都会有不同的效果,甚至较大差异,上图也并不能说明实际并发度越高就吞吐量越高,因为还必须把稳定性等因素考虑进去,这就好比插入排序、堆排序、归并排序和快速排序的对比一样,没有任何限流算法可以说自己在任何场景下都是最优限流算法,这需要从服务资源特性、限流策略(参数)配置难度、开发难度和效果检测难度等多方面因素来考虑。

咱们再回到令牌桶算法,和其他三种降级方式来说,令牌桶算法限流无疑是最灵活的,因为它有众多可配置的参数来直接影响限流的效果。幸运的是,谷歌的Guava包中RateLimiter提供了令牌桶算法的实现.

我们先看看如何创建一个RateLimiter实例:

RateLimiter create(double permitsPerSecond);  // 创建一个每秒包含permitsPerSecond个令牌的令牌桶,可以理解为QPS最多为permitsPerSecond

RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)// 创建一个每秒包含permitsPerSecond个令牌的令牌桶,可以理解为QPS最多为permitsPerSecond,并包含某个时间段的预热期

 

我们再看看获取令牌的相关方法:

double acquire(); // 阻塞直到获取一个许可,返回被限制的睡眠等待时间,单位秒

double acquire(int permits); // 阻塞直到获取permits个许可,返回被限制的睡眠等待时间,单位秒

boolean tryAcquire();  // 尝试获取一个许可

boolean tryAcquire(int permits);  // 尝试获取permits个许可

boolean tryAcquire(long timeout, TimeUnit unit);  // 尝试获取一个许可,最多等待timeout时间

boolean tryAcquire(int permits, long timeout, TimeUnit unit);  // 尝试获取permits个许可,最多等待timeout时间

 

我们来看个最简单的例子:

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss.SSS");

RateLimiter rateLimiter = RateLimiter.create(2);

while(true) {

    rateLimiter.acquire();

    System.out.println(simpleDateFormat.format(new Date()));

}

运行该例子会得到类似如下结果:

20160716 17:04:03.352

20160716 17:04:03.851

20160716 17:04:04.350

20160716 17:04:04.849

20160716 17:04:05.350

20160716 17:04:05.850

20160716 17:04:06.350

20160716 17:04:06.850

 

其实,单从RateLimiter实例的创建方法来说,它更像是漏桶算法的实现(前面说的第3种方式),或者像一个单位时间段为1秒的调用量限流方式(前面说的第1种方式),唯一看起来像令牌桶算法的是其获取信号量的时候,可以一次性尝试获取多个令牌。就像上面的例子,我们通过RateLimiter.create(2)创建了一个每秒会产生两个令牌的令牌桶,也就是说每秒你最多能获取两个令牌,如果每次调用都需要获取一个令牌,那么就限制了QPS最多为2。RateLimiter对外没有提供释放令牌的release方法,而是默认会每秒往桶中放入两个令牌。所以,在接下来的while循环中,我们获取一个令牌后直接打印当前的时间,可以看出,每秒将打印两次。从例子可以看出,我们故意打出了毫秒数,可以看出,令牌的获取也是有固定间隔的,因为我们是每秒两个令牌,所以每个令牌的获取间隔大约是500毫秒。

 

从RateLimiter提供的操作来看,要实现一个实用的漏桶算法限流工具还有些路要走,比如

  1. 它似乎不允许并发,虽然当我们每秒设置的令牌数足够多时或者服务处理时间超过1秒时,效果和并发类似。
  2. 它没有提供一些通用的函数,来表式令牌使用率和获取令牌数之间的关系,需要外部实现。
  3. 除了默认的自动添加令牌的方式,如果能提供手动释放令牌的方式,适用的的场景可能会更多。

 

撇开这些不说,我们还是来看看其中的具体实现吧。抽象类RateLimiter的create方法返回的是一个“平滑突发”类型SmoothBursty的实例:

public static RateLimiter create(double permitsPerSecond) {

  return create(SleepingStopwatch.createFromSystemTimer(), permitsPerSecond);

}

@VisibleForTesting

static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {

  // 这里就默认设置了1秒钟,表示如果SmoothBursty没被使用,许可数能被保存1秒钟

  RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);

  rateLimiter.setRate(permitsPerSecond);

  return rateLimiter;

}

 

SmoothBursty的构造函数有两个入参,一个是SleepingStopwatch 实例,它是一个阻止睡眠观察器,可以理解成一个闹钟,如果你想睡十秒,它会在10秒钟内一直阻塞你(让你睡觉),到了十秒钟,它将对你放行(唤醒你),另一个参数是许可被保存的秒数,因为RateLimiter的行为是每过一秒会自动补充令牌,所以前一秒的令牌需要“被过期”。我们直接看获取令牌的阻塞实现acquire:

@CanIgnoreReturnValue

public double acquire(int permits) {

  // 预定permits个许可供未来使用,并返回等待这些许可可用的微秒数

  long microsToWait = reserve(permits);

  // 等待指定的时间

  stopwatch.sleepMicrosUninterruptibly(microsToWait);

  // 返回消耗的时间

  return 1.0 * microsToWait / SECONDS.toMicros(1L);

}

 

可以看出步骤很少,就是先预定,然后等待,最后返回,我们先来看预定步骤的实现:

final long reserve(int permits) {

  // permits的值必须大于0

  checkPermits(permits);

  // 这里通过一个对象锁进行同步

  synchronized (mutex()) {

    // 预定permits数目的许可,并返回调用方需要等待的时间

    return reserveAndGetWaitLength(permits, stopwatch.readMicros());

  }

}

// Can't be initialized in the constructor because mocks don't call the constructor.

private volatile Object mutexDoNotUseDirectly;

private Object mutex() {

  Object mutex = mutexDoNotUseDirectly;

  if (mutex == null) {

    synchronized (this) {

      mutex = mutexDoNotUseDirectly;

      if (mutex == null) {

        mutexDoNotUseDirectly = mutex = new Object();

      }

    }

  }

  return mutex;

}

 

当拿到mutexDoNotUseDirectly锁后,我们看看reserveAndGetWaitLength的实现:

final long reserveAndGetWaitLength(int permits, long nowMicros) {

  // reserveEarliestAvailable由RateLimiter的子类SmoothRateLimiter来实现

  long momentAvailable = reserveEarliestAvailable(permits, nowMicros);

  return max(momentAvailable - nowMicros, 0);

}

 

可以看出,获取许可需要等待的时间是由reserveEarliestAvailable方法来实现的,它是RateLimiter的抽象方法,由其抽象子类SmoothRateLimiter来实现,在看reserveEarliestAvailable方法的实现之前,我们先来看看SmoothRateLimiter类中的主要属性:

// 最大许可数,比如RateLimiter.create(2);表明最大的许可数就是2

double maxPermits;

// 当前剩余的许可数

double storedPermits;

// 固定的微秒周期

double stableIntervalMicros;

// 下一个空闲票据的时间点

private long nextFreeTicketMicros = 0L;

因为RateLimiter需要保证许可被稳定连续的输出,比如每秒有5个许可,那么你获取许可的间隔时间是200毫秒,而stableIntervalMicros就是用来保存这个固定的间隔时间的,方便后面计算使用。

SmoothRateLimiter有两个子类:SmoothBursty和SmoothWarmingUp,从名字可以看出,SmoothWarmingUp类的实现带有预热功能,而SmoothBursty类是没有的,什么是预热功能呢?就好比缓存一样,当使用SmoothWarmingUp的实现时,不会在前几秒就给足全量的许可,就是说许可数会慢慢的增长,知道达到我们预定义的值,所以,SmoothRateLimiter会留下如下抽象方法交给其子类来实现:

// 重设流量相关参数,需要子类来实现,不同子类参数不尽相同,比如SmoothWarmingUp肯定有增长比率相关参数

void doSetRate(double permitsPerSecond, double stableIntervalMicros);

// 计算生成这些许可数需要等待的时间

long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

// 返回许可冷却(间隔)时间

double coolDownIntervalMicros();

 

 

我们来看看SmoothRateLimiter中该方法的实现:

@Override

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {

  // 基于当前时间nowMicros来更新storedPermits和nextFreeTicketMicros

  resync(nowMicros);

  long returnValue = nextFreeTicketMicros;

  // 从需要申请的许可数和当前可用的许可数中找到最小值

  double storedPermitsToSpend = min(requiredPermits, this.storedPermits);

  // 还差的许可数

  double freshPermits = requiredPermits - storedPermitsToSpend;

  long waitMicros =

      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)

          + (long) (freshPermits * stableIntervalMicros);

 

     // 更新下一个票据的等待时间

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);

  // 更新剩余可用许可数

  this.storedPermits -= storedPermitsToSpend;

  return returnValue;

}

从上面的代码片段可以看出,该方法先会去调用resync方法来更新当前可用许可数(nowMicros)和下一个空闲票据的时间(nextFreeTicketMicros )。我们看下resync的实现:

void resync(long nowMicros) {

  // 如果下一空闲票据时间已经小于当前时间,说明需要更新当前数据

  if (nowMicros > nextFreeTicketMicros) {

    // 根据当前时间和上一次获取许可时间的间隔时间,来计算应该有多少许可被生成

    double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();

    // 更新当前可用许可

    storedPermits = min(maxPermits, storedPermits + newPermits);

    nextFreeTicketMicros = nowMicros;

  }

}

从这里我们可以看出,补充当前可用许可是在每次获取许可数步骤中的第一步完成的。

在reserveEarliestAvailable方法中,storedPermitsToWaitTime方法的调用是其关键步骤,前面说过,它由子类实现,子类SmoothBursty类的storedPermitsToWaitTime实现相当简单:

long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {

    return 0L;

}

所以,在这里,waitMicros的值是:0 + (long) (freshPermits * stableIntervalMicros),即还差的许可数乘以许可间隔时间,很自然在这里下一次能获取许可的时间(nextFreeTicketMicros )是nextFreeTicketMicros + freshPermits * stableIntervalMicros。从这里可以得出,假设我们配置的是每秒5个许可,当我们在第0毫秒时获取第一个许可可以马上拿到,当我们在第100毫秒想要再次获取1个许可,还需要等100毫秒,当时间到了210毫秒时,如果我们这是想要获取两个许可,则需要等待200 * 2 + 200 - 210 = 390毫秒(这里加的200毫秒是上一次获取许可的时间).

 

现在我们回过头来看acquire方法,当拿到下一个许可的等待时间后,将调用SleepingStopwatch#sleepMicrosUninterruptibly来阻塞等待,先说说Stopwatch类,Stopwatch用来测量时间,可以理解成guava包中使用System#nanoTime的替代品,SleepingStopwatch也使用委派方式来使用Stopwatch的计算时间的功能,但sleepMicrosUninterruptibly是SleepingStopwatch的内部实现:

@Override

protected void sleepMicrosUninterruptibly(long micros) {

  if (micros > 0) {

    Uninterruptibles.sleepUninterruptibly(micros, MICROSECONDS);

  }

}

可见,阻塞任务是由Uninterruptibles来完成的,Uninterruptibles是guava中的阻塞工具类,我们直接看它的sleepUninterruptibly方法的实现:

@GwtIncompatible // concurrency

public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {

  boolean interrupted = false;

  try {

    long remainingNanos = unit.toNanos(sleepFor);

    long end = System.nanoTime() + remainingNanos;

    while (true) {

      try {

        // TimeUnit.sleep() treats negative timeouts just like zero.

        NANOSECONDS.sleep(remainingNanos);

        return;

      } catch (InterruptedException e) {

        interrupted = true;

        remainingNanos = end - System.nanoTime();

      }

    }

  } finally {

    if (interrupted) {

      Thread.currentThread().interrupt();

    }

  }

}

可见,它采用的是TimeUnit#sleep方法。

 

到这里,RateLimiter#acquire获取许可的操作就介绍完毕了,可见,其中并没有复杂的算法或技巧,RateLimiter还提供了重设许可数的方法:

void doSetRate(double permitsPerSecond, long nowMicros);

 

就像本文提到的,使用令牌桶时,桶中的令牌数量和每次获取的令牌数是个可调的参数,如果按照RateLimiter这种设计的话,多个令牌数的获取将要等待多个间隔时间,这也许不是我们想要的。

 

发表评论

全部评论:0条

houzhe11

JAVA从业者