Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC Bucketed rate limit #168

Open
wants to merge 8 commits into
base: fs/branch_9_3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions solr/core/src/java/org/apache/solr/core/CoreContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public class CoreContainer {

final SolrCores solrCores;

public SolrMetricsContext getSolrMetricsContext() {
return solrMetricsContext;
}

public static class CoreLoadFailure {

public final CoreDescriptor cd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static org.apache.solr.servlet.RateLimitManager.DEFAULT_CONCURRENT_REQUESTS;
import static org.apache.solr.servlet.RateLimitManager.DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS;

import java.util.List;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;

public class RateLimiterConfig {
public static final String RL_CONFIG_KEY = "rate-limiters";
Expand All @@ -31,6 +33,7 @@ public class RateLimiterConfig {
public int allowedRequests;
public boolean isSlotBorrowingEnabled;
public int guaranteedSlotsThreshold;
public List<RateLimiterPayload.ReadBucketConfig> readBuckets;
Copy link
Collaborator

@patsonluk patsonluk Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess of readBuckets nested inside RateLimiterConfig is to allow the json reader to handle both cases:

  1. no bucket, just configuration on the existing non bucket limiter:
"rate-limiters": {
     "allowedRequests": 2,
     "slotAcquisitionTimeoutInMS": 1000
}
  1. use bucket:
"rate-limiters": {
  "readBuckets": [
    {
      "name": "test-bucket",
        "conditions": [{
          "queryParamPattern": {
            "q-bucket": "test-bucket"
          }
        }],
        "allowedRequests": 2,
        "slotAcquisitionTimeoutInMS": 1000
     }]
}

However, this could introduce some confusion as properties such as allowedRequests could be defined either under each "readBuckets" and/or "rate-limiters".

The allowedRequests defined under "rate-limiters" will be ignored if there are buckets. It seems all fields in RateLimiterConfig will be ignored except readBuckets, which lead me to think whether we should just use 2 separate class/config to avoid confusion and unused fields.

Could we just use something like check existence of rate-limiters and rate-limiters-buckets key to identify whether it's bucket and non-bucket use case (I think they are mutually exclusive) For example:

  1. No bucket usage
"rate-limiters": {
     "allowedRequests": 2,
     "slotAcquisitionTimeoutInMS": 1000
}

or
2. Bucket usage

"rate-limiters-buckets" : [
  {
    "name": "test-bucket",
    "conditions": [{
      "queryParamPattern": {
        "q-bucket": "test-bucket"
       }
     }],
     "allowedRequests": 2,
     "slotAcquisitionTimeoutInMS": 1000
   }
]

With this proposal the first case will be deserialized as RateLimiterConfig, and the 2nd case will just be parsed as an array of ReadBucketConfig, which might affect the ctor signature of RequestRateLimiter (and child classes), more on this later.

Copy link
Collaborator Author

@noblepaul noblepaul Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it. This was done to simplify the code. Isn't it better to just warn the user if other properties are used?


public RateLimiterConfig(SolrRequest.SolrRequestType requestType) {
this.requestType = requestType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package org.apache.solr.servlet;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Pattern;

import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.RateLimiterConfig;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.MetricsMap;
import org.apache.solr.metrics.SolrMetricProducer;
import org.apache.solr.metrics.SolrMetricsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketedQueryRateLimiter extends QueryRateLimiter implements SolrMetricProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private static final Map<String, Class<? extends Condition>> conditionImpls =
new LinkedHashMap<>();

List<Bucket> buckets = new ArrayList<>();

@Override
public SlotMetadata handleRequest(RequestWrapper request) throws InterruptedException {
for (Bucket bucket : buckets) {
if (bucket.test(request)) {
bucket.tries.increment();
if (bucket.guaranteedSlotsPool.tryAcquire(
bucket.bucketCfg.slotAcquisitionTimeoutInMS, TimeUnit.MILLISECONDS)) {
bucket.success.increment();
return bucket.guaranteedSlotMetadata;
} else {
bucket.fails.increment();
// could not acquire a slot
return RequestRateLimiter.nullSlotMetadata;
}
}
}
// no bucket matches
return null;
}

private SolrMetricsContext metrics;
@Override
public void initializeMetrics(SolrMetricsContext parentContext, String scope) {
metrics = parentContext.getChildContext(this);
MetricsMap metricsMap = new MetricsMap(ew -> buckets.forEach(bucket -> ew.putNoEx(bucket.bucketCfg.name, bucket.getMetrics())));
metrics.gauge(
metricsMap, true, scope, null, SolrInfoBean.Category.CONTAINER.toString());

}

@Override
public SolrMetricsContext getSolrMetricsContext() {
return metrics;
}

static class Bucket {
//for debugging purposes
private final String cfg;
private RateLimiterPayload.ReadBucketConfig bucketCfg;
private final Semaphore guaranteedSlotsPool;
private final SlotMetadata guaranteedSlotMetadata;
private final List<Condition> conditions = new ArrayList<>();

public LongAdder tries = new LongAdder();
public LongAdder success =new LongAdder();
public LongAdder fails=new LongAdder();
public com.codahale.metrics.Timer tryWait = new com.codahale.metrics.Timer();

private Bucket fallback;

public boolean test(RequestWrapper req) {
boolean isPass = true;
for (Condition condition : conditions) {
if (!condition.test(req)) return false;
}
return isPass;
}

public MapWriter getMetrics() {
return ew -> {
ew.put("queueLength", guaranteedSlotsPool.getQueueLength());
ew.put("available", guaranteedSlotsPool.availablePermits());
ew.put("tries", tries.longValue());
ew.put("success", success.longValue());
ew.put("fails", fails.longValue());
ew.put("tryWaitAverage", tryWait.getMeanRate());
};
}

public Bucket(RateLimiterPayload.ReadBucketConfig bucketCfg) {
this.bucketCfg = bucketCfg;
cfg = bucketCfg.jsonStr();
this.guaranteedSlotsPool = new Semaphore(bucketCfg.allowedRequests);
this.guaranteedSlotMetadata = new SlotMetadata(guaranteedSlotsPool);
}

@Override
public String toString() {
return cfg;
}
}

static {
conditionImpls.put(ReqHeaderCondition.ID, ReqHeaderCondition.class);
conditionImpls.put(QueryParamCondition.ID, QueryParamCondition.class);
}

@SuppressWarnings("unchecked")
public BucketedQueryRateLimiter(RateLimiterConfig rateLimiterConfig) {
super(rateLimiterConfig);
Copy link
Collaborator

@patsonluk patsonluk Jan 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can consider enhancing the RequestRateLimiter logic in the manager at here, instead of introducing BucketQueryRateLimiter? It could avoid some code duplication, and unused Semaphore in parent class QueryRateLimiter.

It appears that we want the same ability as the existing QueryRateLimiter in terms of throttling (or even borrowing as @hiteshk25 mentioned in another comment), so it might be handy to just re-use QueryRateLimiter/RequestRateLimiter to handle bucketed limiter as well?

The idea is during the lookup, we could use a Matcher to return the correct Rate limiter. For example for non bucket configuration, it will just be the existing code (using RequestType for simple lookup from the map), but for bucket configuration, it will use iterate and check for match similar to here? :)

And I ❤️ the new metrics added, perhaps we can add that to QueryRateLimiter instead of just this class?


for (RateLimiterPayload.ReadBucketConfig bucketCfg : rateLimiterConfig.readBuckets) {
Bucket b = new Bucket(bucketCfg);
buckets.add(b);
if (bucketCfg.conditions == null || bucketCfg.conditions.isEmpty()) {
b.conditions.add(MatchAllCondition.INST);
continue;
}
for (Object c : bucketCfg.conditions) {
List<Object> conditionInfo = c instanceof List ? (List<Object>) c : List.of(c);
for (Object o : conditionInfo) {
Map<String, Object> info = (Map<String, Object>) o;
Condition condition = null;

for (Map.Entry<String, Class<? extends Condition>> e : conditionImpls.entrySet()) {
if (info.containsKey(e.getKey())) {
try {
condition = e.getValue().getDeclaredConstructor().newInstance().init(info);
break;
} catch (Exception ex) {
// unlikely
throw new RuntimeException(ex);
}
}
}
if (condition == null) {
throw new RuntimeException("Unknown condition : " + Utils.toJSONString(info));
}
b.conditions.add(condition);
}
}
}
}

public interface Condition {
boolean test(RequestWrapper req);

Condition init(Map<String, Object> config);

String identifier();
}

public abstract static class KeyValCondition implements Condition {
protected String name;
protected Pattern valuePattern;

@SuppressWarnings("unchecked")
@Override
public Condition init(Map<String, Object> config) {
Map<String, String> kv = (Map<String, String>) config.get(identifier());
name = kv.keySet().iterator().next();
valuePattern = Pattern.compile(kv.values().iterator().next());
return this;
}

@Override
public boolean test(RequestWrapper req) {
String val = readVal(req);
if (val == null) val = "";
return valuePattern.matcher(val).find();
}

protected abstract String readVal(RequestWrapper req);
}

public static class ReqHeaderCondition extends KeyValCondition {
public static final String ID = "headerPattern";

@Override
public String identifier() {
return ID;
}

@Override
protected String readVal(RequestWrapper req) {
return req.getHeader(name);
}
}

public static class QueryParamCondition extends KeyValCondition {
public static final String ID = "queryParamPattern";

@Override
public String identifier() {
return ID;
}

@Override
protected String readVal(RequestWrapper req) {
return req.getParameter(name);
}
}

public static class MatchAllCondition implements Condition {
public static final String ID = "";
public static final MatchAllCondition INST = new MatchAllCondition();

@Override
public boolean test(RequestWrapper req) {
return true;
}

@Override
public Condition init(Map<String, Object> config) {
return this;
}

@Override
public String identifier() {
return ID;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@
import org.apache.http.client.HttpClient;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.util.VectorUtil;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.MetricsConfig;
import org.apache.solr.core.NodeConfig;
Expand All @@ -77,6 +79,7 @@
import org.apache.solr.servlet.RateLimitManager.Builder;
import org.apache.solr.util.SolrVersion;
import org.apache.solr.util.StartupLoggingUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -254,9 +257,22 @@ public void init(ServletContext servletContext) {
zkClient = zkController.getZkClient();
}

Builder builder = new Builder(zkClient);
SolrZkClient zkClientCopy = zkClient;
Builder builder =
new Builder(
() -> {
try {
return zkClientCopy.getNode(ZkStateReader.CLUSTER_PROPS, null, true);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
});

this.rateLimitManager = builder.build();
RequestRateLimiter queryRateLimiter = this.rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.QUERY);
if(queryRateLimiter instanceof BucketedQueryRateLimiter) {
((BucketedQueryRateLimiter)queryRateLimiter).initializeMetrics(coresInit.getSolrMetricsContext(), "bucketedQueryRateLimiter");
}

if (zkController != null) {
zkController.zkStateReader.registerClusterPropertiesListener(this.rateLimitManager);
Expand Down
Loading
Loading