Skip to content

Commit

Permalink
Rate limiter (apache#278)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrew4699 authored Sep 24, 2024
1 parent 711060d commit 81ec424
Show file tree
Hide file tree
Showing 18 changed files with 723 additions and 0 deletions.
4 changes: 4 additions & 0 deletions polaris-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ logging:

# Limits the size of request bodies sent to Polaris. -1 means no limit.
maxRequestBodyBytes: -1

# Optional, not specifying a "rateLimiter" section also means no rate limiter
rateLimiter:
type: no-op
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.persistence.InMemoryPolarisMetaStoreManagerFactory;
import org.apache.polaris.service.ratelimiter.RateLimiterFilter;
import org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.ManifestFileCleanupTaskHandler;
import org.apache.polaris.service.task.TableCleanupTaskHandler;
Expand Down Expand Up @@ -252,6 +253,14 @@ public void run(PolarisApplicationConfig configuration, Environment environment)
.servlets()
.addFilter("tracing", new TracingFilter(openTelemetry))
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, "/*");

if (configuration.getRateLimiter() != null) {
environment
.servlets()
.addFilter("ratelimiter", new RateLimiterFilter(configuration.getRateLimiter()))
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, "/*");
}

DiscoverableAuthenticator<String, AuthenticatedPolarisPrincipal> authenticator =
configuration.getPolarisAuthenticator();
authenticator.setEntityManagerFactory(entityManagerFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
Expand All @@ -32,6 +33,7 @@
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.context.CallContextResolver;
import org.apache.polaris.service.context.RealmContextResolver;
import org.apache.polaris.service.ratelimiter.RateLimiter;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
Expand All @@ -55,6 +57,7 @@ public class PolarisApplicationConfig extends Configuration {
private String awsAccessKey;
private String awsSecretKey;
private FileIOFactory fileIOFactory;
private RateLimiter rateLimiter;

public static final long REQUEST_BODY_BYTES_NO_LIMIT = -1;
private long maxRequestBodyBytes = REQUEST_BODY_BYTES_NO_LIMIT;
Expand Down Expand Up @@ -137,6 +140,16 @@ public void setCorsConfiguration(CorsConfiguration corsConfiguration) {
this.corsConfiguration = corsConfiguration;
}

@JsonProperty("rateLimiter")
public RateLimiter getRateLimiter() {
return rateLimiter;
}

@JsonProperty("rateLimiter")
public void setRateLimiter(@Nullable RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

public void setTaskHandler(TaskHandlerConfiguration taskHandler) {
this.taskHandler = taskHandler;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import com.fasterxml.jackson.annotation.JsonTypeName;

/** Rate limiter that always allows the request */
@JsonTypeName("no-op")
public class NoOpRateLimiter implements RateLimiter {
@Override
public boolean tryAcquire() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;

/** Interface for rate limiting requests */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
public interface RateLimiter extends Discoverable {
/**
* This signifies that a request is being made. That is, the rate limiter should count the request
* at this point.
*
* @return Whether the request is allowed to proceed by the rate limiter
*/
boolean tryAcquire();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import jakarta.annotation.Priority;
import jakarta.servlet.Filter;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletResponse;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.ws.rs.Priorities;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Request filter that returns a 429 Too Many Requests if the rate limiter says so */
@Priority(Priorities.AUTHORIZATION + 1)
public class RateLimiterFilter implements Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterFilter.class);

private final RateLimiter rateLimiter;

public RateLimiterFilter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

/** Returns a 429 if the rate limiter says so. Otherwise, forwards the request along. */
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
throws IOException, ServletException {
if (response instanceof HttpServletResponse servletResponse && !rateLimiter.tryAcquire()) {
servletResponse.setStatus(Response.Status.TOO_MANY_REQUESTS.getStatusCode());
LOGGER.atDebug().log("Rate limiting request");
return;
}
chain.doFilter(request, response);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.time.Clock;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.jetbrains.annotations.VisibleForTesting;

/**
* Rate limiter that maps the request's realm identifier to its own TokenBucketRateLimiter, with its
* own capacity.
*/
@JsonTypeName("realm-token-bucket")
public class RealmTokenBucketRateLimiter implements RateLimiter {
private final long requestsPerSecond;
private final long windowSeconds;
private final Map<String, RateLimiter> perRealmLimiters;

@VisibleForTesting
@JsonCreator
public RealmTokenBucketRateLimiter(
@JsonProperty("requestsPerSecond") final long requestsPerSecond,
@JsonProperty("windowSeconds") final long windowSeconds) {
this.requestsPerSecond = requestsPerSecond;
this.windowSeconds = windowSeconds;
this.perRealmLimiters = new ConcurrentHashMap<>();
}

/**
* This signifies that a request is being made. That is, the rate limiter should count the request
* at this point.
*
* @return Whether the request is allowed to proceed by the rate limiter
*/
@Override
public boolean tryAcquire() {
String key =
Optional.ofNullable(CallContext.getCurrentContext())
.map(CallContext::getRealmContext)
.map(RealmContext::getRealmIdentifier)
.orElse("");

return perRealmLimiters
.computeIfAbsent(
key,
(k) ->
new TokenBucketRateLimiter(
requestsPerSecond,
Math.multiplyExact(requestsPerSecond, windowSeconds),
getClock()))
.tryAcquire();
}

@VisibleForTesting
protected Clock getClock() {
return Clock.systemUTC();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.ratelimiter;

import java.time.InstantSource;

/**
* Token bucket implementation of a Polaris RateLimiter. Acquires tokens at a fixed rate and has a
* maximum amount of tokens. Each successful "tryAcquire" costs 1 token.
*/
public class TokenBucketRateLimiter implements RateLimiter {
private final double tokensPerMilli;
private final long maxTokens;
private final InstantSource instantSource;

private double tokens;
private long lastTokenGenerationMillis;

public TokenBucketRateLimiter(long tokensPerSecond, long maxTokens, InstantSource instantSource) {
this.tokensPerMilli = tokensPerSecond / 1000D;
this.maxTokens = maxTokens;
this.instantSource = instantSource;

tokens = maxTokens;
lastTokenGenerationMillis = instantSource.millis();
}

/**
* Tries to acquire and spend 1 token. Doesn't block if a token isn't available.
*
* @return whether a token was successfully acquired & spent
*/
@Override
public synchronized boolean tryAcquire() {
// Grant tokens for the time that has passed since our last tryAcquire()
long t = instantSource.millis();
long millisPassed = Math.subtractExact(t, lastTokenGenerationMillis);
lastTokenGenerationMillis = t;
tokens = Math.min(maxTokens, tokens + (millisPassed * tokensPerMilli));

// Take a token if they have one available
if (tokens >= 1) {
tokens--;
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ org.apache.polaris.service.context.RealmContextResolver
org.apache.polaris.service.context.CallContextResolver
org.apache.polaris.service.auth.TokenBrokerFactory
org.apache.polaris.service.catalog.FileIOFactory
org.apache.polaris.service.ratelimiter.RateLimiter
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.polaris.service.ratelimiter.RealmTokenBucketRateLimiter
org.apache.polaris.service.ratelimiter.NoOpRateLimiter
Loading

0 comments on commit 81ec424

Please sign in to comment.