聊聊spring|聊聊spring cloud的RequestRateLimiterGatewayFilter

序 本文主要研究一下spring cloud的RequestRateLimiterGatewayFilter
GatewayAutoConfiguration

@Configuration @ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true) @EnableConfigurationProperties @AutoConfigureBefore(HttpHandlerAutoConfiguration.class) @AutoConfigureAfter({GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class}) @ConditionalOnClass(DispatcherHandler.class) public class GatewayAutoConfiguration { //...... @Bean(name = PrincipalNameKeyResolver.BEAN_NAME) @ConditionalOnBean(RateLimiter.class) public PrincipalNameKeyResolver principalNameKeyResolver() { return new PrincipalNameKeyResolver(); }@Bean @ConditionalOnBean({RateLimiter.class, KeyResolver.class}) public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter, PrincipalNameKeyResolver resolver) { return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver); } //...... }

注意是要有RateLimiter及KeyResolver的bean才生效,默认KeyResolver为PrincipalNameKeyResolver,也是需要有RateLimiter才生效。
RequestRateLimiterGatewayFilterFactory spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/factory/RequestRateLimiterGatewayFilterFactory.java
/** * User Request Rate Limiter filter. See https://stripe.com/blog/rate-limiters and */ public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory {public static final String KEY_RESOLVER_KEY = "keyResolver"; private final RateLimiter defaultRateLimiter; private final KeyResolver defaultKeyResolver; public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter, KeyResolver defaultKeyResolver) { super(Config.class); this.defaultRateLimiter = defaultRateLimiter; this.defaultKeyResolver = defaultKeyResolver; }public KeyResolver getDefaultKeyResolver() { return defaultKeyResolver; }public RateLimiter getDefaultRateLimiter() { return defaultRateLimiter; }@SuppressWarnings("unchecked") @Override public GatewayFilter apply(Config config) { KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver; RateLimiter limiter = (config.rateLimiter == null) ? defaultRateLimiter : config.rateLimiter; return (exchange, chain) -> { Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR); return resolver.resolve(exchange).flatMap(key -> // TODO: if key is empty? limiter.isAllowed(route.getId(), key).flatMap(response -> { // TODO: set some headers for rate, tokens leftif (response.isAllowed()) { return chain.filter(exchange); } exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); })); }; }public static class Config { private KeyResolver keyResolver; private RateLimiter rateLimiter; public KeyResolver getKeyResolver() { return keyResolver; }public Config setKeyResolver(KeyResolver keyResolver) { this.keyResolver = keyResolver; return this; } public RateLimiter getRateLimiter() { return rateLimiter; }public Config setRateLimiter(RateLimiter rateLimiter) { this.rateLimiter = rateLimiter; return this; } }}
  • 可以看到这个filterFactory依赖RateLimiter及KeyResolver
  • 其中KeyResolver用于从request中提取限流的key
  • 而RateLimiter则是相应的针对key的限流规则
实例 java配置
@Bean public RateLimiter inMemoryRateLimiter(){ return new InMemoryRateLimiter(); }@Bean(name = IpAddressKeyResolver.BEAN_NAME) public KeyResolver ipAddressKeyResolver() { return new IpAddressKeyResolver(); }

这里提供了InMemoryRateLimiter及IpAddressKeyResolver
文件配置
spring: cloud: gateway: routes: - id: rate-limit-demo uri: http://www.baidu.com predicates: - Path=/rate/** filters: - name: RequestRateLimiter args: keyResolver: '#{@ipAddressKeyResolver}' in-memory-rate-limiter: replenish-rate: 10 burst-capacity: 20

  • 这里首先指定了一个name为RequestRateLimiter的filter,然后其参数指定了keyResolver为name为ipAddressKeyResolver的bean
  • args中的in-memory-rate-limiter就是InMemoryRateLimiter指定的配置属性,然后下面的两个值就是InMemoryRateLimiter.Config的属性
IpAddressKeyResolver
public class IpAddressKeyResolver implements KeyResolver {public static final String BEAN_NAME = "ipAddressKeyResolver"; @Override public Mono resolve(ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress()); } }

InMemoryRateLimiter.Config
@Validated public static class Config { @Min(1) private int replenishRate; @Min(0) private int burstCapacity = 0; public int getReplenishRate() { return replenishRate; }public InMemoryRateLimiter.Config setReplenishRate(int replenishRate) { this.replenishRate = replenishRate; return this; }public int getBurstCapacity() { return burstCapacity; }public InMemoryRateLimiter.Config setBurstCapacity(int burstCapacity) { this.burstCapacity = burstCapacity; return this; }@Override public String toString() { return "Config{" + "replenishRate=" + replenishRate + ", burstCapacity=" + burstCapacity + '}'; } }

InMemoryRateLimiter
public class InMemoryRateLimiter extends AbstractRateLimiter {public static final String CONFIGURATION_PROPERTY_NAME = "in-memory-rate-limiter"; private InMemoryRateLimiter.Config defaultConfig; private final Map ipBucketMap = new ConcurrentHashMap<>(); public InMemoryRateLimiter() { super(InMemoryRateLimiter.Config.class, CONFIGURATION_PROPERTY_NAME, null); }public InMemoryRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) { super(Config.class, CONFIGURATION_PROPERTY_NAME, null); this.defaultConfig = new InMemoryRateLimiter.Config() .setReplenishRate(defaultReplenishRate) .setBurstCapacity(defaultBurstCapacity); }@Override public Mono isAllowed(String routeId, String id) { InMemoryRateLimiter.Config routeConfig = getConfig().get(routeId); if (routeConfig == null) { if (defaultConfig == null) { throw new IllegalArgumentException("No Configuration found for route " + routeId); } routeConfig = defaultConfig; }// How many requests per second do you want a user to be allowed to do? int replenishRate = routeConfig.getReplenishRate(); // How much bursting do you want to allow? int burstCapacity = routeConfig.getBurstCapacity(); Bucket bucket = ipBucketMap.computeIfAbsent(id, k -> { Refill refill = Refill.greedy(replenishRate, Duration.ofSeconds(1)); Bandwidth limit = Bandwidth.classic(burstCapacity, refill); return Bucket4j.builder().addLimit(limit).build(); }); // tryConsume returns false immediately if no tokens available with the bucket ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1); if (probe.isConsumed()) { // the limit is not exceeded return Mono.just(new Response(true, probe.getRemainingTokens())); } else { // limit is exceeded return Mono.just(new Response(false,-1)); } } }

继承了AbstractRateLimiter,重写isAllowed方法
限流返回实例
curl -i http://localhost:8080/rate/ HTTP/1.1 429 Too Many Requests X-Response-Default-Foo: Default-Bar content-length: 0

小结 【聊聊spring|聊聊spring cloud的RequestRateLimiterGatewayFilter】RequestRateLimiterGatewayFilter需要有RateLimiter的bean才能生效。另外还需要有个KeyResolver的bean,默认KeyResolver为PrincipalNameKeyResolver。
doc
  • 112.7 RequestRateLimiter GatewayFilter Factory
  • Scaling your API with rate limiters
  • bucket4j basic usage examples

    推荐阅读