diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 99f503258b8..5946a924efd 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -177,6 +177,10 @@ public class CoreContainer { final SolrCores solrCores; + public SolrMetricsContext getSolrMetricsContext() { + return solrMetricsContext; + } + public static class CoreLoadFailure { public final CoreDescriptor cd; diff --git a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java index aa0e038e008..f01c6775b71 100644 --- a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java +++ b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java @@ -20,7 +20,9 @@ import static org.apache.solr.servlet.RateLimitManager.DEFAULT_CONCURRENT_REQUESTS; import static org.apache.solr.servlet.RateLimitManager.DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS; +import java.util.List; import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; public class RateLimiterConfig { public static final String RL_CONFIG_KEY = "rate-limiters"; @@ -31,6 +33,7 @@ public class RateLimiterConfig { public int allowedRequests; public boolean isSlotBorrowingEnabled; public int guaranteedSlotsThreshold; + public List readBuckets; public RateLimiterConfig(SolrRequest.SolrRequestType requestType) { this.requestType = requestType; diff --git a/solr/core/src/java/org/apache/solr/servlet/BucketedQueryRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/BucketedQueryRateLimiter.java new file mode 100644 index 00000000000..5d2edc90dcd --- /dev/null +++ b/solr/core/src/java/org/apache/solr/servlet/BucketedQueryRateLimiter.java @@ -0,0 +1,234 @@ +package org.apache.solr.servlet; + +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; +import java.util.regex.Pattern; + +import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; +import org.apache.solr.common.MapWriter; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.RateLimiterConfig; +import org.apache.solr.core.SolrInfoBean; +import org.apache.solr.metrics.MetricsMap; +import org.apache.solr.metrics.SolrMetricProducer; +import org.apache.solr.metrics.SolrMetricsContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BucketedQueryRateLimiter extends QueryRateLimiter implements SolrMetricProducer { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static final Map> conditionImpls = + new LinkedHashMap<>(); + + List buckets = new ArrayList<>(); + + @Override + public SlotMetadata handleRequest(RequestWrapper request) throws InterruptedException { + for (Bucket bucket : buckets) { + if (bucket.test(request)) { + bucket.tries.increment(); + if (bucket.guaranteedSlotsPool.tryAcquire( + bucket.bucketCfg.slotAcquisitionTimeoutInMS, TimeUnit.MILLISECONDS)) { + bucket.success.increment(); + return bucket.guaranteedSlotMetadata; + } else { + bucket.fails.increment(); + // could not acquire a slot + return RequestRateLimiter.nullSlotMetadata; + } + } + } + // no bucket matches + return null; + } + + private SolrMetricsContext metrics; + @Override + public void initializeMetrics(SolrMetricsContext parentContext, String scope) { + metrics = parentContext.getChildContext(this); + MetricsMap metricsMap = new MetricsMap(ew -> buckets.forEach(bucket -> ew.putNoEx(bucket.bucketCfg.name, bucket.getMetrics()))); + metrics.gauge( + metricsMap, true, scope, null, SolrInfoBean.Category.CONTAINER.toString()); + + } + + @Override + public SolrMetricsContext getSolrMetricsContext() { + return metrics; + } + + static class Bucket { + //for debugging purposes + private final String cfg; + private RateLimiterPayload.ReadBucketConfig bucketCfg; + private final Semaphore guaranteedSlotsPool; + private final SlotMetadata guaranteedSlotMetadata; + private final List conditions = new ArrayList<>(); + + public LongAdder tries = new LongAdder(); + public LongAdder success =new LongAdder(); + public LongAdder fails=new LongAdder(); + public com.codahale.metrics.Timer tryWait = new com.codahale.metrics.Timer(); + + private Bucket fallback; + + public boolean test(RequestWrapper req) { + boolean isPass = true; + for (Condition condition : conditions) { + if (!condition.test(req)) return false; + } + return isPass; + } + + public MapWriter getMetrics() { + return ew -> { + ew.put("queueLength", guaranteedSlotsPool.getQueueLength()); + ew.put("available", guaranteedSlotsPool.availablePermits()); + ew.put("tries", tries.longValue()); + ew.put("success", success.longValue()); + ew.put("fails", fails.longValue()); + ew.put("tryWaitAverage", tryWait.getMeanRate()); + }; + } + + public Bucket(RateLimiterPayload.ReadBucketConfig bucketCfg) { + this.bucketCfg = bucketCfg; + cfg = bucketCfg.jsonStr(); + this.guaranteedSlotsPool = new Semaphore(bucketCfg.allowedRequests); + this.guaranteedSlotMetadata = new SlotMetadata(guaranteedSlotsPool); + } + + @Override + public String toString() { + return cfg; + } + } + + static { + conditionImpls.put(ReqHeaderCondition.ID, ReqHeaderCondition.class); + conditionImpls.put(QueryParamCondition.ID, QueryParamCondition.class); + } + + @SuppressWarnings("unchecked") + public BucketedQueryRateLimiter(RateLimiterConfig rateLimiterConfig) { + super(rateLimiterConfig); + + for (RateLimiterPayload.ReadBucketConfig bucketCfg : rateLimiterConfig.readBuckets) { + Bucket b = new Bucket(bucketCfg); + buckets.add(b); + if (bucketCfg.conditions == null || bucketCfg.conditions.isEmpty()) { + b.conditions.add(MatchAllCondition.INST); + continue; + } + for (Object c : bucketCfg.conditions) { + List conditionInfo = c instanceof List ? (List) c : List.of(c); + for (Object o : conditionInfo) { + Map info = (Map) o; + Condition condition = null; + + for (Map.Entry> e : conditionImpls.entrySet()) { + if (info.containsKey(e.getKey())) { + try { + condition = e.getValue().getDeclaredConstructor().newInstance().init(info); + break; + } catch (Exception ex) { + // unlikely + throw new RuntimeException(ex); + } + } + } + if (condition == null) { + throw new RuntimeException("Unknown condition : " + Utils.toJSONString(info)); + } + b.conditions.add(condition); + } + } + } + } + + public interface Condition { + boolean test(RequestWrapper req); + + Condition init(Map config); + + String identifier(); + } + + public abstract static class KeyValCondition implements Condition { + protected String name; + protected Pattern valuePattern; + + @SuppressWarnings("unchecked") + @Override + public Condition init(Map config) { + Map kv = (Map) config.get(identifier()); + name = kv.keySet().iterator().next(); + valuePattern = Pattern.compile(kv.values().iterator().next()); + return this; + } + + @Override + public boolean test(RequestWrapper req) { + String val = readVal(req); + if (val == null) val = ""; + return valuePattern.matcher(val).find(); + } + + protected abstract String readVal(RequestWrapper req); + } + + public static class ReqHeaderCondition extends KeyValCondition { + public static final String ID = "headerPattern"; + + @Override + public String identifier() { + return ID; + } + + @Override + protected String readVal(RequestWrapper req) { + return req.getHeader(name); + } + } + + public static class QueryParamCondition extends KeyValCondition { + public static final String ID = "queryParamPattern"; + + @Override + public String identifier() { + return ID; + } + + @Override + protected String readVal(RequestWrapper req) { + return req.getParameter(name); + } + } + + public static class MatchAllCondition implements Condition { + public static final String ID = ""; + public static final MatchAllCondition INST = new MatchAllCondition(); + + @Override + public boolean test(RequestWrapper req) { + return true; + } + + @Override + public Condition init(Map config) { + return this; + } + + @Override + public String identifier() { + return ID; + } + } +} diff --git a/solr/core/src/java/org/apache/solr/servlet/CoreContainerProvider.java b/solr/core/src/java/org/apache/solr/servlet/CoreContainerProvider.java index 8859c9a3a58..17c464b7c98 100644 --- a/solr/core/src/java/org/apache/solr/servlet/CoreContainerProvider.java +++ b/solr/core/src/java/org/apache/solr/servlet/CoreContainerProvider.java @@ -58,10 +58,12 @@ import org.apache.http.client.HttpClient; import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.util.VectorUtil; +import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.MetricsConfig; import org.apache.solr.core.NodeConfig; @@ -77,6 +79,7 @@ import org.apache.solr.servlet.RateLimitManager.Builder; import org.apache.solr.util.SolrVersion; import org.apache.solr.util.StartupLoggingUtils; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,9 +257,22 @@ public void init(ServletContext servletContext) { zkClient = zkController.getZkClient(); } - Builder builder = new Builder(zkClient); + SolrZkClient zkClientCopy = zkClient; + Builder builder = + new Builder( + () -> { + try { + return zkClientCopy.getNode(ZkStateReader.CLUSTER_PROPS, null, true); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException(e); + } + }); this.rateLimitManager = builder.build(); + RequestRateLimiter queryRateLimiter = this.rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.QUERY); + if(queryRateLimiter instanceof BucketedQueryRateLimiter) { + ((BucketedQueryRateLimiter)queryRateLimiter).initializeMetrics(coresInit.getSolrMetricsContext(), "bucketedQueryRateLimiter"); + } if (zkController != null) { zkController.zkStateReader.registerClusterPropertiesListener(this.rateLimitManager); diff --git a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java index 6b54ce450cd..e5c527e0a5b 100644 --- a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java @@ -19,28 +19,26 @@ import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY; -import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Map; -import org.apache.solr.client.solrj.SolrRequest; +import java.util.function.Supplier; import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; import org.apache.solr.core.RateLimiterConfig; -import org.apache.solr.util.SolrJacksonAnnotationInspector; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; /** * Implementation of RequestRateLimiter specific to query request types. Most of the actual work is * delegated to the parent class but specific configurations and parsing are handled by this class. */ public class QueryRateLimiter extends RequestRateLimiter { - private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper(); - public QueryRateLimiter(SolrZkClient solrZkClient) { - super(constructQueryRateLimiterConfig(solrZkClient)); + public QueryRateLimiter(Supplier solrZkClient) { + super(RateLimitManager.constructQueryRateLimiterConfig(solrZkClient)); + } + + public QueryRateLimiter(RateLimiterConfig cfg) { + super(cfg); } public void processConfigChange(Map properties) throws IOException { @@ -51,75 +49,9 @@ public void processConfigChange(Map properties) throws IOExcepti return; } - RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); - - constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig); - } - - // To be used in initialization - @SuppressWarnings({"unchecked"}) - private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) { - try { - - if (zkClient == null) { - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } - - RateLimiterConfig rateLimiterConfig = - new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - Map clusterPropsJson = - (Map) - Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)); - byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY)); - - if (configInput.length == 0) { - // No Rate Limiter configuration defined in clusterprops.json. Return default configuration - // values - return rateLimiterConfig; - } - - RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); - - constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig); - - return rateLimiterConfig; - } catch (KeeperException.NoNodeException e) { - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } catch (KeeperException | InterruptedException e) { - throw new RuntimeException( - "Error reading cluster property", SolrZkClient.checkInterrupted(e)); - } catch (IOException e) { - throw new RuntimeException("Encountered an IOException " + e.getMessage()); - } - } - - private static void constructQueryRateLimiterConfigInternal( - RateLimiterPayload rateLimiterMeta, RateLimiterConfig rateLimiterConfig) { - - if (rateLimiterMeta == null) { - // No Rate limiter configuration defined in clusterprops.json - return; - } - - if (rateLimiterMeta.allowedRequests != null) { - rateLimiterConfig.allowedRequests = rateLimiterMeta.allowedRequests.intValue(); - } + RateLimiterPayload rateLimiterMeta = + RateLimitManager.mapper.readValue(configInput, RateLimiterPayload.class); - if (rateLimiterMeta.enabled != null) { - rateLimiterConfig.isEnabled = rateLimiterMeta.enabled; - } - - if (rateLimiterMeta.guaranteedSlots != null) { - rateLimiterConfig.guaranteedSlotsThreshold = rateLimiterMeta.guaranteedSlots; - } - - if (rateLimiterMeta.slotBorrowingEnabled != null) { - rateLimiterConfig.isSlotBorrowingEnabled = rateLimiterMeta.slotBorrowingEnabled; - } - - if (rateLimiterMeta.slotAcquisitionTimeoutInMS != null) { - rateLimiterConfig.waitForSlotAcquisition = - rateLimiterMeta.slotAcquisitionTimeoutInMS.longValue(); - } + RateLimitManager.constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig); } } diff --git a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java index 8531ec3318d..927cc2a00d2 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java +++ b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java @@ -19,18 +19,25 @@ import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_CONTEXT_PARAM; import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_TYPE_PARAM; +import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import javax.servlet.http.HttpServletRequest; import net.jcip.annotations.ThreadSafe; import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; import org.apache.solr.common.cloud.ClusterPropertiesListener; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.RateLimiterConfig; +import org.apache.solr.util.SolrJacksonAnnotationInspector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +52,7 @@ */ @ThreadSafe public class RateLimitManager implements ClusterPropertiesListener { + static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper(); private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static final int DEFAULT_CONCURRENT_REQUESTS = @@ -59,6 +67,71 @@ public RateLimitManager() { this.activeRequestsMap = new ConcurrentHashMap<>(); } + // To be used in initialization + @SuppressWarnings({"unchecked"}) + public static RateLimiterConfig constructQueryRateLimiterConfig( + Supplier nodeDataSupplier) { + try { + + if (nodeDataSupplier == null) { + return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + } + + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + Map clusterPropsJson = + (Map) Utils.fromJSON(nodeDataSupplier.get().data); + byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY)); + + if (configInput.length == 0) { + // No Rate Limiter configuration defined in clusterprops.json. Return default configuration + // values + return rateLimiterConfig; + } + + RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); + + constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig); + + return rateLimiterConfig; + } catch (IOException e) { + throw new RuntimeException("Encountered an IOException " + e.getMessage()); + } + } + + public static void constructQueryRateLimiterConfigInternal( + RateLimiterPayload rateLimiterMeta, RateLimiterConfig rateLimiterConfig) { + + if (rateLimiterMeta == null) { + // No Rate limiter configuration defined in clusterprops.json + return; + } + + if (rateLimiterMeta.allowedRequests != null) { + rateLimiterConfig.allowedRequests = rateLimiterMeta.allowedRequests.intValue(); + } + + if (rateLimiterMeta.enabled != null) { + rateLimiterConfig.isEnabled = rateLimiterMeta.enabled; + } + + if (rateLimiterMeta.guaranteedSlots != null) { + rateLimiterConfig.guaranteedSlotsThreshold = rateLimiterMeta.guaranteedSlots; + } + + if (rateLimiterMeta.slotBorrowingEnabled != null) { + rateLimiterConfig.isSlotBorrowingEnabled = rateLimiterMeta.slotBorrowingEnabled; + } + + if (rateLimiterMeta.slotAcquisitionTimeoutInMS != null) { + rateLimiterConfig.waitForSlotAcquisition = + rateLimiterMeta.slotAcquisitionTimeoutInMS.longValue(); + } + if (rateLimiterMeta.readBuckets != null && !rateLimiterMeta.readBuckets.isEmpty()) { + rateLimiterConfig.readBuckets = rateLimiterMeta.readBuckets; + } + } + @Override public boolean onChange(Map properties) { @@ -103,7 +176,19 @@ public boolean handleRequest(HttpServletRequest request) throws InterruptedExcep return true; } - RequestRateLimiter.SlotMetadata result = requestRateLimiter.handleRequest(); + RequestRateLimiter.SlotMetadata result = + requestRateLimiter.handleRequest( + new RequestRateLimiter.RequestWrapper() { + @Override + public String getParameter(String name) { + return request.getParameter(name); + } + + @Override + public String getHeader(String name) { + return request.getHeader(name); + } + }); if (result != null) { // Can be the case if request rate limiter is disabled @@ -112,6 +197,10 @@ public boolean handleRequest(HttpServletRequest request) throws InterruptedExcep } return true; } + if (requestRateLimiter instanceof BucketedQueryRateLimiter) { + // there is no slot borrowing for bucketed rate limiter + return true; + } RequestRateLimiter.SlotMetadata slotMetadata = trySlotBorrowing(typeOfRequest); @@ -188,17 +277,22 @@ public RequestRateLimiter getRequestRateLimiter(SolrRequest.SolrRequestType requ } public static class Builder { - protected SolrZkClient solrZkClient; + protected Supplier nodeDataSupplier; - public Builder(SolrZkClient solrZkClient) { - this.solrZkClient = solrZkClient; + public Builder(Supplier nodeDataSupplier) { + this.nodeDataSupplier = nodeDataSupplier; } public RateLimitManager build() { RateLimitManager rateLimitManager = new RateLimitManager(); + RateLimiterConfig cfg = constructQueryRateLimiterConfig(nodeDataSupplier); + + RequestRateLimiter queryRateLimiter = + cfg.readBuckets == null ? new QueryRateLimiter(cfg) : new BucketedQueryRateLimiter(cfg); + rateLimitManager.registerRequestRateLimiter( - new QueryRateLimiter(solrZkClient), SolrRequest.SolrRequestType.QUERY); + queryRateLimiter, SolrRequest.SolrRequestType.QUERY); return rateLimitManager; } diff --git a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java index cd33d3a717f..4e4e11c7fa2 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java @@ -42,7 +42,7 @@ public class RequestRateLimiter { private final RateLimiterConfig rateLimiterConfig; private final SlotMetadata guaranteedSlotMetadata; private final SlotMetadata borrowedSlotMetadata; - private static final SlotMetadata nullSlotMetadata = new SlotMetadata(null); + protected static final SlotMetadata nullSlotMetadata = new SlotMetadata(null); public RequestRateLimiter(RateLimiterConfig rateLimiterConfig) { this.rateLimiterConfig = rateLimiterConfig; @@ -58,7 +58,7 @@ public RequestRateLimiter(RateLimiterConfig rateLimiterConfig) { * Handles an incoming request. returns a metadata object representing the metadata for the * acquired slot, if acquired. If a slot is not acquired, returns a null metadata object. */ - public SlotMetadata handleRequest() throws InterruptedException { + public SlotMetadata handleRequest(RequestWrapper request) throws InterruptedException { if (!rateLimiterConfig.isEnabled) { return nullSlotMetadata; @@ -102,7 +102,7 @@ public RateLimiterConfig getRateLimiterConfig() { // Represents the metadata for a slot static class SlotMetadata { - private final Semaphore usedPool; + final Semaphore usedPool; public SlotMetadata(Semaphore usedPool) { this.usedPool = usedPool; @@ -118,4 +118,10 @@ public boolean isReleasable() { return usedPool != null; } } + + public interface RequestWrapper { + String getParameter(String name); + + String getHeader(String name); + } } diff --git a/solr/core/src/java/org/apache/solr/servlet/ServletUtils.java b/solr/core/src/java/org/apache/solr/servlet/ServletUtils.java index d10d97451ec..0340260edc7 100644 --- a/solr/core/src/java/org/apache/solr/servlet/ServletUtils.java +++ b/solr/core/src/java/org/apache/solr/servlet/ServletUtils.java @@ -209,7 +209,11 @@ static void rateLimitRequest( RateLimitManager rateLimitManager = getRateLimitManager(request); try { try { - accepted = rateLimitManager.handleRequest(request); + if (rateLimitManager != null) { + accepted = rateLimitManager.handleRequest(request); + } else { + accepted = true; + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage()); @@ -227,7 +231,7 @@ static void rateLimitRequest( traceHttpRequestExecution2(request, response, limitedExecution, trace); } finally { if (accepted) { - rateLimitManager.decrementActiveRequests(request); + if (rateLimitManager != null) rateLimitManager.decrementActiveRequests(request); } } } diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 44c02cce109..1af509d8834 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -43,6 +43,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.http.client.HttpClient; import org.apache.solr.api.V2HttpCall; +import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.util.ExecutorUtil; diff --git a/solr/core/src/test/org/apache/solr/servlet/TestBucketedRateLimit.java b/solr/core/src/test/org/apache/solr/servlet/TestBucketedRateLimit.java new file mode 100644 index 00000000000..3c080052680 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/servlet/TestBucketedRateLimit.java @@ -0,0 +1,144 @@ +package org.apache.solr.servlet; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.GenericSolrRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.NavigableObject; +import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.MapSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.embedded.JettySolrRunner; +import org.apache.zookeeper.data.Stat; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +public class TestBucketedRateLimit extends SolrCloudTestCase { + private static final String FIRST_COLLECTION = "c1"; + + @BeforeClass + public static void setupCluster() throws Exception {System.setProperty("metricsEnabled", "true"); + configureCluster(1).addConfig("conf", configset("cloud-minimal")). + configure(); + } + + @Test + @SuppressWarnings("unchecked") + public void testPerf() throws Exception { + String config = "{\n" + + " \"rate-limiters\": {\n" + + " \"readBuckets\": [\n" + + " {\n" + + " \"name\": \"test-bucket\",\n" + + " \"conditions\": [{\n" + + " \"queryParamPattern\": {\n" + + " \"q-bucket\": \"test-bucket\"\n" + + " }\n" + + " }],\n" + + " \"allowedRequests\": 2,\n" + + " \"slotAcquisitionTimeoutInMS\": 100\n" + + " }\n" + + "\n" + + " ]\n" + + " }\n" + + "}"; + + SolrZkClient zkClient = cluster.getZkClient(); + zkClient.atomicUpdate(ZkStateReader.CLUSTER_PROPS, + bytes -> config.getBytes() + ); + JettySolrRunner jetty = cluster.getJettySolrRunners().get(0); + jetty.stop(); + jetty.start(true); + String COLLECTION_NAME = "rateLimitTest"; + CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf", 2, 2) + .process(cluster.getSolrClient()); + cluster.getSolrClient().query(COLLECTION_NAME, new SolrQuery("*:*"). add("q-bucket","test-bucket" )); + NavigableObject rsp = cluster.getSolrClient().request(new GenericSolrRequest(SolrRequest.METHOD.GET, "/admin/metrics", + new MapSolrParams(Map.of("key","solr.node:CONTAINER.bucketedQueryRateLimiter")))); + + assertEquals("0", rsp._getStr(List.of("metrics","solr.node:CONTAINER.bucketedQueryRateLimiter", "test-bucket","queueLength"), null)); + assertEquals("2", rsp._getStr(List.of("metrics","solr.node:CONTAINER.bucketedQueryRateLimiter", "test-bucket","available"), null)); + assertEquals("1", rsp._getStr(List.of("metrics","solr.node:CONTAINER.bucketedQueryRateLimiter", "test-bucket","tries"), null)); + assertEquals("1", rsp._getStr(List.of("metrics","solr.node:CONTAINER.bucketedQueryRateLimiter", "test-bucket","success"), null)); + assertEquals("0", rsp._getStr(List.of("metrics","solr.node:CONTAINER.bucketedQueryRateLimiter", "test-bucket","fails"), null)); + + + + } + + + public void testConfig() throws Exception { + String config = + "{\n" + + " \"rate-limiters\": {\n" + + " \"readBuckets\": [\n" + + " {\n" + + " \"name\": \"expensive\",\n" + + " \"conditions\": [{\n" + + " \"queryParamPattern\": {\n" + + " \"q\": \".*multijoin.*\"\n" + + " }\n" + + " }],\n" + + " \"allowedRequests\": 5,\n" + + " \"slotAcquisitionTimeoutInMS\": 100\n" + + " },\n" + + " {\n" + + " \"name\": \"low\",\n" + + " \"conditions\": [{\n" + + " \"headerPattern\": {\n" + + " \"solr_req_priority\": \"20\"\n" + + " }\n" + + " }],\n" + + " \"allowedRequests\": 20,\n" + + " \"slotAcquisitionTimeoutInMS\": 100\n" + + " },\n" + + " {\n" + + " \"name\": \"global\",\n" + + " \"conditions\": [],\n" + + " \"allowedRequests\": 50,\n" + + " \"slotAcquisitionTimeoutInMS\": 100\n" + + " }\n" + + " ]\n" + + " }\n" + + "}\n" + + "\n"; + RateLimitManager mgr = + new RateLimitManager.Builder( + () -> + new SolrZkClient.NodeData(new Stat(), config.getBytes(StandardCharsets.UTF_8))) + .build(); + RequestRateLimiter rl = mgr.getRequestRateLimiter(SolrRequest.SolrRequestType.QUERY); + assertTrue(rl instanceof BucketedQueryRateLimiter); + BucketedQueryRateLimiter brl = (BucketedQueryRateLimiter) rl; + assertEquals(3, brl.buckets.size()); + + RequestRateLimiter.SlotMetadata smd = + rl.handleRequest( + new RequestRateLimiter.RequestWrapper() { + @Override + public String getParameter(String name) { + return null; + } + + @Override + public String getHeader(String name) { + if (name.equals("solr_req_priority")) return "20"; + else return null; + } + }); + + // star + assertEquals(19, smd.usedPool.availablePermits()); + smd.decrementRequest(); + assertEquals(20, smd.usedPool.availablePermits()); + } +} diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index c9b62b2b3c5..4b01f75f924 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -26,6 +26,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrRequest; @@ -214,10 +215,10 @@ public MockRequestRateLimiter(RateLimiterConfig config) { } @Override - public SlotMetadata handleRequest() throws InterruptedException { + public SlotMetadata handleRequest(RequestWrapper request) throws InterruptedException { incomingRequestCount.getAndIncrement(); - SlotMetadata response = super.handleRequest(); + SlotMetadata response = super.handleRequest(request); if (response != null) { acceptedNewRequestCount.getAndIncrement(); @@ -244,7 +245,8 @@ private static class MockBuilder extends RateLimitManager.Builder { private final RequestRateLimiter queryRequestRateLimiter; private final RequestRateLimiter indexRequestRateLimiter; - public MockBuilder(SolrZkClient zkClient, RequestRateLimiter queryRequestRateLimiter) { + public MockBuilder( + Supplier zkClient, RequestRateLimiter queryRequestRateLimiter) { super(zkClient); this.queryRequestRateLimiter = queryRequestRateLimiter; @@ -252,7 +254,7 @@ public MockBuilder(SolrZkClient zkClient, RequestRateLimiter queryRequestRateLim } public MockBuilder( - SolrZkClient zkClient, + Supplier zkClient, RequestRateLimiter queryRequestRateLimiter, RequestRateLimiter indexRequestRateLimiter) { super(zkClient); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java index 07bae33de6b..449809acb96 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java @@ -17,6 +17,7 @@ package org.apache.solr.client.solrj.request.beans; +import java.util.List; import java.util.Objects; import org.apache.solr.common.annotation.JsonProperty; import org.apache.solr.common.util.ReflectMapWriter; @@ -32,6 +33,7 @@ public class RateLimiterPayload implements ReflectMapWriter { @JsonProperty public Boolean slotBorrowingEnabled; @JsonProperty public Integer slotAcquisitionTimeoutInMS; + @JsonProperty public List readBuckets; public RateLimiterPayload copy() { RateLimiterPayload result = new RateLimiterPayload(); @@ -41,7 +43,7 @@ public RateLimiterPayload copy() { result.allowedRequests = allowedRequests; result.slotBorrowingEnabled = slotBorrowingEnabled; result.slotAcquisitionTimeoutInMS = slotAcquisitionTimeoutInMS; - + result.readBuckets = readBuckets; return result; } @@ -67,4 +69,15 @@ public int hashCode() { slotBorrowingEnabled, slotAcquisitionTimeoutInMS); } + + public static class ReadBucketConfig implements ReflectMapWriter { + + @JsonProperty public String name; + + @JsonProperty public Integer allowedRequests; + + @JsonProperty public Integer slotAcquisitionTimeoutInMS; + + @JsonProperty public List conditions; + } }