Skip to content

Commit

Permalink
bucketed rate limiter implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
noblepaul committed Nov 20, 2023
1 parent acb887a commit cb38f31
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static org.apache.solr.servlet.RateLimitManager.DEFAULT_CONCURRENT_REQUESTS;
import static org.apache.solr.servlet.RateLimitManager.DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS;

import java.util.List;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;

public class RateLimiterConfig {
public static final String RL_CONFIG_KEY = "rate-limiters";
Expand All @@ -31,6 +33,7 @@ public class RateLimiterConfig {
public int allowedRequests;
public boolean isSlotBorrowingEnabled;
public int guaranteedSlotsThreshold;
public List<RateLimiterPayload.ReadBucketConfig> readBuckets;

public RateLimiterConfig(SolrRequest.SolrRequestType requestType) {
this.requestType = requestType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package org.apache.solr.servlet;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.RateLimiterConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketedQueryRateLimiter extends RequestRateLimiter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final Map<String, Class<? extends Condition>> conditionImpls =
new LinkedHashMap<>();

List<Bucket> buckets = new ArrayList<>();

@Override
public SlotMetadata handleRequest(RequestWrapper request) throws InterruptedException {
for (Bucket bucket : buckets) {
if (bucket.test(request)) {
if (bucket.guaranteedSlotsPool.tryAcquire(
bucket.bucketCfg.slotAcquisitionTimeoutInMS, TimeUnit.MILLISECONDS)) {
return bucket.guaranteedSlotMetadata;
} else {
// could not acquire aslot
return RequestRateLimiter.nullSlotMetadata;
}
}
}
// no bucket matches
return null;
}

static class Bucket {
private final String cfg;
private RateLimiterPayload.ReadBucketConfig bucketCfg;
private final Semaphore guaranteedSlotsPool;
private final SlotMetadata guaranteedSlotMetadata;
private final List<Condition> conditions = new ArrayList<>();

private Bucket fallback;

public boolean test(RequestWrapper req) {
boolean isPass = true;
for (Condition condition : conditions) {
if (!condition.test(req)) isPass = false;
}
return isPass;
}

public Bucket(RateLimiterPayload.ReadBucketConfig bucketCfg) {
this.bucketCfg = bucketCfg;
cfg = bucketCfg.jsonStr();
this.guaranteedSlotsPool = new Semaphore(bucketCfg.allowedRequests);
this.guaranteedSlotMetadata = new SlotMetadata(guaranteedSlotsPool);
}

@Override
public String toString() {
return cfg;
}
}

static {
conditionImpls.put(ReqHeaderCondition.ID, ReqHeaderCondition.class);
conditionImpls.put(QueryParamCondition.ID, QueryParamCondition.class);
}

@SuppressWarnings("unchecked")
public BucketedQueryRateLimiter(RateLimiterConfig rateLimiterConfig) {
super(rateLimiterConfig);

for (RateLimiterPayload.ReadBucketConfig bucketCfg : rateLimiterConfig.readBuckets) {
Bucket b = new Bucket(bucketCfg);
buckets.add(b);
if (bucketCfg.conditions == null || bucketCfg.conditions.isEmpty()) {
b.conditions.add(MatchAllCondition.INST);
continue;
}
for (Object c : bucketCfg.conditions) {
List<Object> conditionInfo = c instanceof List ? (List<Object>) c : List.of(c);
for (Object o : conditionInfo) {
Map<String, Object> info = (Map<String, Object>) o;
Condition condition = null;

for (Map.Entry<String, Class<? extends Condition>> e : conditionImpls.entrySet()) {
if (info.containsKey(e.getKey())) {
try {
condition = e.getValue().getDeclaredConstructor().newInstance().init(info);
break;
} catch (Exception ex) {
// unlikely
throw new RuntimeException(ex);
}
}
}
if (condition == null) {
throw new RuntimeException("Unknown condition : " + Utils.toJSONString(info));
}
b.conditions.add(condition);
}
}
}
}

public interface Condition {
boolean test(RequestWrapper req);

Condition init(Map<String, Object> config);

String identifier();
}

public abstract static class KeyValCondition implements Condition {
protected String name;
protected Pattern valuePattern;

@SuppressWarnings("unchecked")
@Override
public Condition init(Map<String, Object> config) {
Map<String, String> kv = (Map<String, String>) config.get(identifier());
name = kv.keySet().iterator().next();
valuePattern = Pattern.compile(kv.values().iterator().next());
return this;
}

@Override
public boolean test(RequestWrapper req) {
String val = readVal(req);
if (val == null) val = "";
return valuePattern.matcher(val).find();
}

protected abstract String readVal(RequestWrapper req);
}

public static class ReqHeaderCondition extends KeyValCondition {
public static final String ID = "headerPattern";

@Override
public String identifier() {
return ID;
}

@Override
protected String readVal(RequestWrapper req) {
return req.getHeader(name);
}
}

public static class QueryParamCondition extends KeyValCondition {
public static final String ID = "queryParamPattern";

@Override
public String identifier() {
return ID;
}

@Override
protected String readVal(RequestWrapper req) {
return req.getParameter(name);
}
}

public static class MatchAllCondition implements Condition {
public static final String ID = "";
public static final MatchAllCondition INST = new MatchAllCondition();

@Override
public boolean test(RequestWrapper req) {
return true;
}

@Override
public Condition init(Map<String, Object> config) {
return this;
}

@Override
public String identifier() {
return ID;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.MetricsConfig;
import org.apache.solr.core.NodeConfig;
Expand All @@ -77,6 +78,7 @@
import org.apache.solr.servlet.RateLimitManager.Builder;
import org.apache.solr.util.SolrVersion;
import org.apache.solr.util.StartupLoggingUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -254,7 +256,16 @@ public void init(ServletContext servletContext) {
zkClient = zkController.getZkClient();
}

Builder builder = new Builder(zkClient);
SolrZkClient zkClientCopy = zkClient;
Builder builder =
new Builder(
() -> {
try {
return zkClientCopy.getNode(ZkStateReader.CLUSTER_PROPS, null, true);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
});

this.rateLimitManager = builder.build();

Expand Down
88 changes: 10 additions & 78 deletions solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,26 @@

import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import java.util.function.Supplier;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.RateLimiterConfig;
import org.apache.solr.util.SolrJacksonAnnotationInspector;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

/**
* Implementation of RequestRateLimiter specific to query request types. Most of the actual work is
* delegated to the parent class but specific configurations and parsing are handled by this class.
*/
public class QueryRateLimiter extends RequestRateLimiter {
private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();

public QueryRateLimiter(SolrZkClient solrZkClient) {
super(constructQueryRateLimiterConfig(solrZkClient));
public QueryRateLimiter(Supplier<SolrZkClient.NodeData> solrZkClient) {
super(RateLimitManager.constructQueryRateLimiterConfig(solrZkClient));
}

public QueryRateLimiter(RateLimiterConfig cfg) {
super(cfg);
}

public void processConfigChange(Map<String, Object> properties) throws IOException {
Expand All @@ -51,75 +49,9 @@ public void processConfigChange(Map<String, Object> properties) throws IOExcepti
return;
}

RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);

constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig);
}

// To be used in initialization
@SuppressWarnings({"unchecked"})
private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) {
try {

if (zkClient == null) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
}

RateLimiterConfig rateLimiterConfig =
new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
Map<String, Object> clusterPropsJson =
(Map<String, Object>)
Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY));

if (configInput.length == 0) {
// No Rate Limiter configuration defined in clusterprops.json. Return default configuration
// values
return rateLimiterConfig;
}

RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);

constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig);

return rateLimiterConfig;
} catch (KeeperException.NoNodeException e) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(
"Error reading cluster property", SolrZkClient.checkInterrupted(e));
} catch (IOException e) {
throw new RuntimeException("Encountered an IOException " + e.getMessage());
}
}

private static void constructQueryRateLimiterConfigInternal(
RateLimiterPayload rateLimiterMeta, RateLimiterConfig rateLimiterConfig) {

if (rateLimiterMeta == null) {
// No Rate limiter configuration defined in clusterprops.json
return;
}

if (rateLimiterMeta.allowedRequests != null) {
rateLimiterConfig.allowedRequests = rateLimiterMeta.allowedRequests.intValue();
}
RateLimiterPayload rateLimiterMeta =
RateLimitManager.mapper.readValue(configInput, RateLimiterPayload.class);

if (rateLimiterMeta.enabled != null) {
rateLimiterConfig.isEnabled = rateLimiterMeta.enabled;
}

if (rateLimiterMeta.guaranteedSlots != null) {
rateLimiterConfig.guaranteedSlotsThreshold = rateLimiterMeta.guaranteedSlots;
}

if (rateLimiterMeta.slotBorrowingEnabled != null) {
rateLimiterConfig.isSlotBorrowingEnabled = rateLimiterMeta.slotBorrowingEnabled;
}

if (rateLimiterMeta.slotAcquisitionTimeoutInMS != null) {
rateLimiterConfig.waitForSlotAcquisition =
rateLimiterMeta.slotAcquisitionTimeoutInMS.longValue();
}
RateLimitManager.constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig);
}
}
Loading

0 comments on commit cb38f31

Please sign in to comment.