189 8069 5689

SpringCloudGateway如何实现限流操作-创新互联

这篇文章主要介绍Spring Cloud Gateway如何实现限流操作,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

成都创新互联公司长期为成百上千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为雁峰企业提供专业的做网站、成都网站设计雁峰网站改版等技术服务。拥有十余年丰富建站经验和众多成功案例,为您定制开发。

开发高并发系统时有三把利器用来保护系统:缓存、降级和限流。

API网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。

常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等。

在Zuul中我们可以自己去实现限流的功能 (Zuul中如何限流在我的书 《Spring Cloud微服务-全栈技术与案例解析》  中有详细讲解) ,Spring Cloud Gateway的出现本身就是用来替代Zuul的。

要想替代那肯定得有强大的功能,除了性能上的优势之外,Spring Cloud Gateway还提供了很多新功能,比如今天我们要讲的限流操作,使用起来非常简单,今天我们就来学习在如何在Spring Cloud Gateway中进行限流操作。

目前限流提供了基于Redis的实现,我们需要增加对应的依赖:

 
 org.springframework.boot
 spring-boot-starter-data-redis-reactive

可以通过KeyResolver来指定限流的Key,比如我们需要根据用户来做限流,IP来做限流等等。

IP限流

@Bean
public KeyResolver ipKeyResolver() {
 return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
}

通过exchange对象可以获取到请求信息,这边用了HostName,如果你想根据用户来做限流的话这边可以获取当前请求的用户ID或者用户名就可以了,比如:

用户限流

使用这种方式限流,请求路径中必须携带userId参数。

@Bean
KeyResolver userKeyResolver() {
 return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("userId"));
}

接口限流

获取请求地址的uri作为限流key。

@Bean
KeyResolver apiKeyResolver() {
 return exchange -> Mono.just(exchange.getRequest().getPath().value());
}

然后配置限流的过滤器信息:

server:
 port: 8084
spring:
 redis:
 host: 127.0.0.1
 port: 6379
 cloud:
 gateway:
  routes:
  - id: fsh-house
  uri: lb://fsh-house
  predicates:
  - Path=/house/**
  filters:
  - name: RequestRateLimiter
   args:
   redis-rate-limiter.replenishRate: 10
   redis-rate-limiter.burstCapacity: 20
   key-resolver: "#{@ipKeyResolver}"
  • filter名称必须是RequestRateLimiter

  • redis-rate-limiter.replenishRate:允许用户每秒处理多少个请求

  • redis-rate-limiter.burstCapacity:令牌桶的容量,允许在一秒钟内完成的大请求数

  • key-resolver:使用SpEL按名称引用bean

可以访问接口进行测试,这时候Redis中会有对应的数据:

127.0.0.1:6379> keys *
1) "request_rate_limiter.{localhost}.timestamp"
2) "request_rate_limiter.{localhost}.tokens"

大括号中就是我们的限流Key,这边是IP,本地的就是localhost

  • timestamp:存储的是当前时间的秒数,也就是System.currentTimeMillis() / 1000或者Instant.now().getEpochSecond()

  • tokens:存储的是当前这秒钟的对应的可用的令牌数量

Spring Cloud Gateway目前提供的限流还是相对比较简单的,在实际中我们的限流策略会有很多种情况,比如:

  • 每个接口的限流数量不同,可以通过配置中心动态调整

  • 超过的流量被拒绝后可以返回固定的格式给调用方

  • 对某个服务进行整体限流(这个大家可以思考下用Spring Cloud Gateway如何实现,其实很简单)

  • ……

当然我们也可以通过重新RedisRateLimiter来实现自己的限流策略,这个我们后面再进行介绍。

限流源码

// routeId也就是我们的fsh-house,id就是限流的key,也就是localhost。
public Mono isAllowed(String routeId, String id) {
 // 会判断RedisRateLimiter是否初始化了
 if (!this.initialized.get()) {
  throw new IllegalStateException("RedisRateLimiter is not initialized");
 }
 // 获取routeId对应的限流配置
 Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);

 if (routeConfig == null) {
  throw new IllegalArgumentException("No Configuration found for route " + routeId);
 }

 // 允许用户每秒做多少次请求
 int replenishRate = routeConfig.getReplenishRate();

 // 令牌桶的容量,允许在一秒钟内完成的大请求数
 int burstCapacity = routeConfig.getBurstCapacity();

 try {
  // 限流key的名称(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens)
  List keys = getKeys(id);


  // The arguments to the LUA script. time() returns unixtime in seconds.
  List scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
    Instant.now().getEpochSecond() + "", "1");
  // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
  // 执行LUA脚本
  Flux> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
    // .log("redisratelimiter", Level.FINER);
  return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
    .reduce(new ArrayList(), (longs, l) -> {
     longs.addAll(l);
     return longs;
    }) .map(results -> {
     boolean allowed = results.get(0) == 1L;
     Long tokensLeft = results.get(1);

     Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));

     if (log.isDebugEnabled()) {
      log.debug("response: " + response);
     }
     return response;
    });
 }
 catch (Exception e) {
  log.error("Error determining if user allowed from redis", e);
 }
 return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}

LUA脚本在:

Spring Cloud Gateway如何实现限流操作

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)

--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
 last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
 last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
 new_tokens = filled_tokens - requested
 allowed_num = 1
end

--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed_num, new_tokens }

以上是“Spring Cloud Gateway如何实现限流操作”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注创新互联行业资讯频道!


网页标题:SpringCloudGateway如何实现限流操作-创新互联
URL分享:http://cdxtjz.cn/article/dcohhh.html

其他资讯