From d115c544e37f485ba46daddc6a0ae3e3827446d0 Mon Sep 17 00:00:00 2001 From: hitesh Date: Wed, 23 Oct 2024 17:04:53 -0700 Subject: [PATCH 01/13] Added initial implementation of priority based rate limiter --- .../apache/solr/core/RateLimiterConfig.java | 24 +++- .../servlet/PriorityBasedRateLimiter.java | 92 ++++++++++++ .../apache/solr/servlet/QueryRateLimiter.java | 11 +- .../apache/solr/servlet/RateLimitManager.java | 14 +- .../solr/servlet/RequestRateLimiter.java | 2 +- .../solr/servlet/TestRequestRateLimiter.java | 131 ++++++++++++++++-- .../apache/solr/client/solrj/SolrRequest.java | 8 +- .../request/beans/RateLimiterPayload.java | 15 +- 8 files changed, 271 insertions(+), 26 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java diff --git a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java index d08958a0348..0e851a2b345 100644 --- a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java +++ b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java @@ -32,6 +32,9 @@ public class RateLimiterConfig { public final int allowedRequests; public final boolean isSlotBorrowingEnabled; public final int guaranteedSlotsThreshold; + public final Boolean priorityBasedEnabled; + + public final int priorityMaxRequests; /** * We store the config definition in order to determine whether anything has changed that would @@ -49,7 +52,9 @@ public RateLimiterConfig( int guaranteedSlotsThreshold, long waitForSlotAcquisition, int allowedRequests, - boolean isSlotBorrowingEnabled) { + boolean isSlotBorrowingEnabled, + boolean priorityBasedEnabled, + int priorityMaxRequests) { this( requestType, makePayload( @@ -57,7 +62,9 @@ public RateLimiterConfig( guaranteedSlotsThreshold, waitForSlotAcquisition, allowedRequests, - isSlotBorrowingEnabled)); + isSlotBorrowingEnabled, + priorityBasedEnabled, + priorityMaxRequests)); } private static RateLimiterPayload makePayload( @@ -65,13 +72,17 @@ private static RateLimiterPayload makePayload( int guaranteedSlotsThreshold, long waitForSlotAcquisition, int allowedRequests, - boolean isSlotBorrowingEnabled) { + boolean isSlotBorrowingEnabled, + boolean priorityBasedEnabled, + int priorityMaxRequests) { RateLimiterPayload ret = new RateLimiterPayload(); ret.enabled = isEnabled; ret.allowedRequests = allowedRequests; ret.guaranteedSlots = guaranteedSlotsThreshold; ret.slotBorrowingEnabled = isSlotBorrowingEnabled; ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition); + ret.priorityBasedEnabled = priorityBasedEnabled; + ret.priorityMaxRequests = priorityMaxRequests; return ret; } @@ -98,6 +109,11 @@ public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPay ? DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS : definition.slotAcquisitionTimeoutInMS.longValue(); + priorityBasedEnabled = + definition.priorityBasedEnabled == null ? false : definition.priorityBasedEnabled; + priorityMaxRequests = + definition.priorityMaxRequests == null ? 0 : definition.priorityMaxRequests; + this.definition = definition; } @@ -125,6 +141,8 @@ public String toString() { sb.append(", guaranteedSlots=").append(guaranteedSlotsThreshold); sb.append(", borrowEnabled=").append(isSlotBorrowingEnabled); sb.append(", waitForSlotMillis=").append(waitForSlotAcquisition); + sb.append(", priorityBasedEnabled=").append(priorityBasedEnabled); + sb.append(", priorityMaxRequests=").append(priorityMaxRequests); return sb.append('}').toString(); } } diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java new file mode 100644 index 00000000000..8c77ff706c8 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -0,0 +1,92 @@ +package org.apache.solr.servlet; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.core.RateLimiterConfig; + +public class PriorityBasedRateLimiter extends RequestRateLimiter { + private final AtomicInteger priorityOneRequests = new AtomicInteger(); + private final String[] priorities; + private final Semaphore numRequestsAllowed; + + private final int totalAllowedRequests; + + private final LinkedBlockingQueue waitingList = new LinkedBlockingQueue<>(); + + public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) { + super(rateLimiterConfig); + this.priorities = + new String[] { + SolrRequest.RequestPriorities.FOREGROUND.toString(), + SolrRequest.RequestPriorities.BACKGROUND.toString() + }; + this.numRequestsAllowed = new Semaphore(rateLimiterConfig.priorityMaxRequests, true); + this.totalAllowedRequests = rateLimiterConfig.priorityMaxRequests; + } + + /* public PriorityBasedRequestLimiter(String[] priorities, int numRequestsAllowed) { + this.priorities = priorities; + this.numRequestsAllowed = new Semaphore(numRequestsAllowed, true); + this.totalAllowedRequests = numRequestsAllowed; + }*/ + + @Override + public SlotReservation handleRequest(String requestPriority) throws InterruptedException { + acquire(requestPriority); + return () -> PriorityBasedRateLimiter.this.release(requestPriority); + } + + public void acquire(String priority) throws InterruptedException { + if (priority.equals(this.priorities[0])) { + nextInQueue(); + } else if (priority.equals(this.priorities[1])) { + if (this.priorityOneRequests.get() < this.totalAllowedRequests) { + nextInQueue(); + } else { + CountDownLatch wait = new CountDownLatch(1); + this.waitingList.put(wait); + wait.await(); + nextInQueue(); + } + } + } + + private void nextInQueue() throws InterruptedException { + this.priorityOneRequests.addAndGet(1); + this.numRequestsAllowed.acquire(1); + } + + private void exitFromQueue() { + this.priorityOneRequests.addAndGet(-1); + this.numRequestsAllowed.release(1); + } + + public void release(String priority) { + if (this.priorities[0].equals(priority) || this.priorities[1].equals(priority)) { + if (this.priorityOneRequests.get() > this.totalAllowedRequests) { + // priority one request is waiting, let's inform it + this.exitFromQueue(); + } else { + // next priority + CountDownLatch waiter = this.waitingList.poll(); + if (waiter != null) { + waiter.countDown(); + } + this.exitFromQueue(); + } + } + } + + @Override + public SlotReservation allowSlotBorrowing() throws InterruptedException { + throw new RuntimeException( + "PriorityBasedRateLimiter.allowSlotBorrowing method is not implemented"); + } + + public int getRequestsAllowed() { + return this.priorityOneRequests.get(); + } +} diff --git a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java index dae744c0df4..0a4f7b082df 100644 --- a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java @@ -48,10 +48,7 @@ public QueryRateLimiter(RateLimiterConfig config) { } public static RateLimiterConfig processConfigChange( - SolrRequest.SolrRequestType requestType, - RateLimiterConfig rateLimiterConfig, - Map properties) - throws IOException { + RateLimiterConfig rateLimiterConfig, Map properties) throws IOException { byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY)); RateLimiterPayload rateLimiterMeta; @@ -61,6 +58,12 @@ public static RateLimiterConfig processConfigChange( rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); } + // default rate limiter + SolrRequest.SolrRequestType requestType = SolrRequest.SolrRequestType.QUERY; + if (rateLimiterConfig.priorityBasedEnabled) { + requestType = SolrRequest.SolrRequestType.PRIORITY_BASED; + } + if (rateLimiterConfig == null || rateLimiterConfig.shouldUpdate(rateLimiterMeta)) { // no prior config, or config has changed; return the new config return new RateLimiterConfig(requestType, rateLimiterMeta); diff --git a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java index 5a02553ceaf..90315a85c8d 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java +++ b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java @@ -68,13 +68,14 @@ public boolean onChange(Map properties) { try { RateLimiterConfig newConfig = QueryRateLimiter.processConfigChange( - SolrRequest.SolrRequestType.QUERY, - v == null ? null : v.getRateLimiterConfig(), - properties); + v == null ? null : v.getRateLimiterConfig(), properties); if (newConfig == null) { return v; } else { log.info("updated config: {}", newConfig); + if (newConfig.priorityBasedEnabled) { + return new PriorityBasedRateLimiter(newConfig); + } return new QueryRateLimiter(newConfig); } } catch (IOException e) { @@ -93,6 +94,7 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque throws InterruptedException { String requestContext = request.getHeader(SOLR_REQUEST_CONTEXT_PARAM); String typeOfRequest = request.getHeader(SOLR_REQUEST_TYPE_PARAM); + String requestPriority = typeOfRequest; if (typeOfRequest == null) { // Cannot determine if this request should be throttled @@ -105,6 +107,10 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque return RequestRateLimiter.UNLIMITED; } + if (typeOfRequest.equals(SolrRequest.RequestPriorities.FOREGROUND.toString()) + || typeOfRequest.equals(SolrRequest.RequestPriorities.BACKGROUND.toString())) { + typeOfRequest = SolrRequest.SolrRequestType.PRIORITY_BASED.toString(); + } RequestRateLimiter requestRateLimiter = requestRateLimiterMap.get(typeOfRequest); if (requestRateLimiter == null) { @@ -115,7 +121,7 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque // slot borrowing should be fallback behavior, so if `slotAcquisitionTimeoutInMS` // is configured it will be applied here (blocking if necessary), to make a best // effort to draw from the request's own slot pool. - RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest(); + RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest(requestPriority); if (result != null) { return result; diff --git a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java index 04ef0900ac5..cbd13e677bc 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java @@ -91,7 +91,7 @@ boolean isEmpty() { * Handles an incoming request. returns a metadata object representing the metadata for the * acquired slot, if acquired. If a slot is not acquired, returns a null metadata object. */ - public SlotReservation handleRequest() throws InterruptedException { + public SlotReservation handleRequest(String requestType) throws InterruptedException { if (!rateLimiterConfig.isEnabled) { return UNLIMITED; diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index 84c3a81d125..3edb552a285 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -81,7 +81,9 @@ public void testConcurrentQueries() throws Exception { 1, DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false, + 0); // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes // its parent here RateLimitManager.Builder builder = @@ -129,7 +131,9 @@ public void testSlotBorrowingAcquisitionTimeout() guaranteed, slotAcqTimeMillis, slotLimit /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false, + 0); // set allowed/guaranteed to the same, and very low, to force it to mainly borrow. It would also // be theoretically possible to optimize a single-request-type config to bypass slot-borrowing // logic altogether, so configuring a second ratelimiter eliminates the possibility that at @@ -141,7 +145,9 @@ public void testSlotBorrowingAcquisitionTimeout() 1, slotAcqTimeMillis, 1 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false, + 0); mgr.registerRequestRateLimiter( new RequestRateLimiter(queryConfig), SolrRequest.SolrRequestType.QUERY); mgr.registerRequestRateLimiter( @@ -319,7 +325,9 @@ public void testSlotBorrowing() throws Exception { 1, DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false, + 0); RateLimiterConfig indexRateLimiterConfig = new RateLimiterConfig( SolrRequest.SolrRequestType.UPDATE, @@ -327,7 +335,9 @@ public void testSlotBorrowing() throws Exception { 1, DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false, + 0); // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes // its parent RateLimitManager.Builder builder = @@ -420,10 +430,10 @@ public MockRequestRateLimiter(RateLimiterConfig config) { } @Override - public SlotReservation handleRequest() throws InterruptedException { + public SlotReservation handleRequest(String requestType) throws InterruptedException { incomingRequestCount.getAndIncrement(); - SlotReservation response = super.handleRequest(); + SlotReservation response = super.handleRequest(requestType); if (response != null) { acceptedNewRequestCount.getAndIncrement(); @@ -498,7 +508,9 @@ public void testAdjustingConfig() throws IOException, InterruptedException { guaranteed, 20, allowed /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false, + 0); RequestRateLimiter limiter = new RequestRateLimiter(config); ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool("tests"); try (Closeable c = () -> ExecutorUtil.shutdownAndAwaitTermination(exec)) { @@ -522,7 +534,7 @@ public void testAdjustingConfig() throws IOException, InterruptedException { () -> { while (!finish.get()) { try (RequestRateLimiter.SlotReservation slotReservation = - limiterF.handleRequest()) { + limiterF.handleRequest(SolrRequest.SolrRequestType.QUERY.toString())) { if (slotReservation != null) { executed.increment(); int ct = outstanding.incrementAndGet(); @@ -601,9 +613,108 @@ public void testAdjustingConfig() throws IOException, InterruptedException { guaranteed, 20, allowed /* allowedRequests */, - true /* isSlotBorrowing */); + true /* isSlotBorrowing */, + false, + 0); limiter = new RequestRateLimiter(config); } } } + + @Test + public void testPriorityBasedRateLimiter() throws Exception { + try (CloudSolrClient client = + cluster.basicSolrClientBuilder().withDefaultCollection(FIRST_COLLECTION).build()) { + + CollectionAdminRequest.createCollection(FIRST_COLLECTION, 1, 1).process(client); + cluster.waitForActiveCollection(FIRST_COLLECTION, 1, 1); + + SolrDispatchFilter solrDispatchFilter = cluster.getJettySolrRunner(0).getSolrDispatchFilter(); + + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, + 5 /* allowedRequests */, + true /* isSlotBorrowing */, + true, + 1); + // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes + // its parent here + RateLimitManager.Builder builder = + new MockBuilder( + null /* dummy SolrZkClient */, new MockRequestRateLimiter(rateLimiterConfig)); + RateLimitManager rateLimitManager = builder.build(); + + solrDispatchFilter.replaceRateLimitManager(rateLimitManager); + + int numDocs = TEST_NIGHTLY ? 10000 : 100; + + processTest(client, numDocs, 350 /* number of queries */); + + MockRequestRateLimiter mockQueryRateLimiter = + (MockRequestRateLimiter) + rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.QUERY); + + assertEquals(350, mockQueryRateLimiter.incomingRequestCount.get()); + + assertTrue(mockQueryRateLimiter.acceptedNewRequestCount.get() > 0); + assertTrue( + (mockQueryRateLimiter.acceptedNewRequestCount.get() + == mockQueryRateLimiter.incomingRequestCount.get() + || mockQueryRateLimiter.rejectedRequestCount.get() > 0)); + assertEquals( + mockQueryRateLimiter.incomingRequestCount.get(), + mockQueryRateLimiter.acceptedNewRequestCount.get() + + mockQueryRateLimiter.rejectedRequestCount.get()); + } + } + + @Test + public void testPriorityBasedRateLimiter1() throws Exception { + RateLimitManager rateLimitManager = new RateLimitManager(); + + // PriorityBasedRateLimiter + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, + 5 /* allowedRequests */, + true /* isSlotBorrowing */, + true, + 1); + + PriorityBasedRateLimiter requestRateLimiter = new PriorityBasedRateLimiter(rateLimiterConfig); + + rateLimitManager.registerRequestRateLimiter( + requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); + + HttpServletRequest foreground = new DummyRequest(null, "FOREGROUND"); + + try (final RequestRateLimiter.SlotReservation allowed = + rateLimitManager.handleRequest(foreground)) { + assertNotNull(allowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + } + + HttpServletRequest background = new DummyRequest(null, "BACKGROUND"); + + try (final RequestRateLimiter.SlotReservation allowed = + rateLimitManager.handleRequest(background)) { + assertNotNull(allowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + } + + HttpServletRequest unknown = new DummyRequest(null, "unknown"); + + try (final RequestRateLimiter.SlotReservation allowed = + rateLimitManager.handleRequest(unknown)) { + assertNotNull(allowed); + assertEquals(0, requestRateLimiter.getRequestsAllowed()); + } + } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java index e00ad8376dd..d056fb4ac5d 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java @@ -58,9 +58,15 @@ public enum SolrRequestType { SECURITY, ADMIN, STREAMING, - UNSPECIFIED + UNSPECIFIED, + PRIORITY_BASED, }; + public enum RequestPriorities { + FOREGROUND, + BACKGROUND + } + public enum SolrClientContext { CLIENT, SERVER diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java index 07bae33de6b..78fe897fff5 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java @@ -33,6 +33,10 @@ public class RateLimiterPayload implements ReflectMapWriter { @JsonProperty public Integer slotAcquisitionTimeoutInMS; + @JsonProperty public Boolean priorityBasedEnabled; + + @JsonProperty public Integer priorityMaxRequests; + public RateLimiterPayload copy() { RateLimiterPayload result = new RateLimiterPayload(); @@ -41,7 +45,8 @@ public RateLimiterPayload copy() { result.allowedRequests = allowedRequests; result.slotBorrowingEnabled = slotBorrowingEnabled; result.slotAcquisitionTimeoutInMS = slotAcquisitionTimeoutInMS; - + result.priorityBasedEnabled = priorityBasedEnabled; + result.priorityMaxRequests = priorityMaxRequests; return result; } @@ -53,7 +58,9 @@ public boolean equals(Object obj) { && Objects.equals(this.guaranteedSlots, that.guaranteedSlots) && Objects.equals(this.allowedRequests, that.allowedRequests) && Objects.equals(this.slotBorrowingEnabled, that.slotBorrowingEnabled) - && Objects.equals(this.slotAcquisitionTimeoutInMS, that.slotAcquisitionTimeoutInMS); + && Objects.equals(this.slotAcquisitionTimeoutInMS, that.slotAcquisitionTimeoutInMS) + && Objects.equals(this.priorityBasedEnabled, that.priorityBasedEnabled) + && Objects.equals(this.priorityMaxRequests, that.priorityMaxRequests); } return false; } @@ -65,6 +72,8 @@ public int hashCode() { guaranteedSlots, allowedRequests, slotBorrowingEnabled, - slotAcquisitionTimeoutInMS); + slotAcquisitionTimeoutInMS, + priorityBasedEnabled, + priorityMaxRequests); } } From 72541cbcaf4438155a70a64d519e2fd239606d7b Mon Sep 17 00:00:00 2001 From: hitesh Date: Thu, 24 Oct 2024 12:15:36 -0700 Subject: [PATCH 02/13] Updated timeout and test --- .../apache/solr/core/RateLimiterConfig.java | 16 +-- .../servlet/PriorityBasedRateLimiter.java | 61 +++++---- .../apache/solr/servlet/RateLimitManager.java | 6 +- .../solr/servlet/TestRequestRateLimiter.java | 117 ++++++++---------- 4 files changed, 94 insertions(+), 106 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java index 0e851a2b345..c8f37a5eeb8 100644 --- a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java +++ b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java @@ -33,9 +33,6 @@ public class RateLimiterConfig { public final boolean isSlotBorrowingEnabled; public final int guaranteedSlotsThreshold; public final Boolean priorityBasedEnabled; - - public final int priorityMaxRequests; - /** * We store the config definition in order to determine whether anything has changed that would * call for re-initialization. @@ -53,8 +50,7 @@ public RateLimiterConfig( long waitForSlotAcquisition, int allowedRequests, boolean isSlotBorrowingEnabled, - boolean priorityBasedEnabled, - int priorityMaxRequests) { + boolean priorityBasedEnabled) { this( requestType, makePayload( @@ -63,8 +59,7 @@ public RateLimiterConfig( waitForSlotAcquisition, allowedRequests, isSlotBorrowingEnabled, - priorityBasedEnabled, - priorityMaxRequests)); + priorityBasedEnabled)); } private static RateLimiterPayload makePayload( @@ -73,8 +68,7 @@ private static RateLimiterPayload makePayload( long waitForSlotAcquisition, int allowedRequests, boolean isSlotBorrowingEnabled, - boolean priorityBasedEnabled, - int priorityMaxRequests) { + boolean priorityBasedEnabled) { RateLimiterPayload ret = new RateLimiterPayload(); ret.enabled = isEnabled; ret.allowedRequests = allowedRequests; @@ -82,7 +76,6 @@ private static RateLimiterPayload makePayload( ret.slotBorrowingEnabled = isSlotBorrowingEnabled; ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition); ret.priorityBasedEnabled = priorityBasedEnabled; - ret.priorityMaxRequests = priorityMaxRequests; return ret; } @@ -111,8 +104,6 @@ public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPay priorityBasedEnabled = definition.priorityBasedEnabled == null ? false : definition.priorityBasedEnabled; - priorityMaxRequests = - definition.priorityMaxRequests == null ? 0 : definition.priorityMaxRequests; this.definition = definition; } @@ -142,7 +133,6 @@ public String toString() { sb.append(", borrowEnabled=").append(isSlotBorrowingEnabled); sb.append(", waitForSlotMillis=").append(waitForSlotAcquisition); sb.append(", priorityBasedEnabled=").append(priorityBasedEnabled); - sb.append(", priorityMaxRequests=").append(priorityMaxRequests); return sb.append('}').toString(); } } diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index 8c77ff706c8..6c9a4906580 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -3,10 +3,18 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.core.RateLimiterConfig; +/** + * PriorityBasedRateLimiter allocates the slot based on their request priority Currently, it has two + * priorities {@link SolrRequest.RequestPriorities} FOREGROUND and {@link + * SolrRequest.RequestPriorities} BACKGROUND Requests. Client can pass the {@link + * org.apache.solr.common.params.CommonParams} SOLR_REQUEST_TYPE_PARAM request header to indicate + * the foreground and background request. Foreground requests has high priority than background requests + */ public class PriorityBasedRateLimiter extends RequestRateLimiter { private final AtomicInteger priorityOneRequests = new AtomicInteger(); private final String[] priorities; @@ -16,55 +24,62 @@ public class PriorityBasedRateLimiter extends RequestRateLimiter { private final LinkedBlockingQueue waitingList = new LinkedBlockingQueue<>(); + private final long waitTimeoutInMillis; + public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) { super(rateLimiterConfig); this.priorities = new String[] { - SolrRequest.RequestPriorities.FOREGROUND.toString(), - SolrRequest.RequestPriorities.BACKGROUND.toString() + SolrRequest.RequestPriorities.FOREGROUND.name(), + SolrRequest.RequestPriorities.BACKGROUND.name() }; - this.numRequestsAllowed = new Semaphore(rateLimiterConfig.priorityMaxRequests, true); - this.totalAllowedRequests = rateLimiterConfig.priorityMaxRequests; + this.numRequestsAllowed = new Semaphore(rateLimiterConfig.allowedRequests, true); + this.totalAllowedRequests = rateLimiterConfig.allowedRequests; + this.waitTimeoutInMillis = rateLimiterConfig.waitForSlotAcquisition; } - /* public PriorityBasedRequestLimiter(String[] priorities, int numRequestsAllowed) { - this.priorities = priorities; - this.numRequestsAllowed = new Semaphore(numRequestsAllowed, true); - this.totalAllowedRequests = numRequestsAllowed; - }*/ - @Override - public SlotReservation handleRequest(String requestPriority) throws InterruptedException { - acquire(requestPriority); + public SlotReservation handleRequest(String requestPriority) { + try { + if (!acquire(requestPriority)) { + return null; + } + }catch (InterruptedException ie) { + return null; + } return () -> PriorityBasedRateLimiter.this.release(requestPriority); } - public void acquire(String priority) throws InterruptedException { + private boolean acquire(String priority) throws InterruptedException { if (priority.equals(this.priorities[0])) { - nextInQueue(); + return nextInQueue(); } else if (priority.equals(this.priorities[1])) { if (this.priorityOneRequests.get() < this.totalAllowedRequests) { - nextInQueue(); + return nextInQueue(); } else { CountDownLatch wait = new CountDownLatch(1); this.waitingList.put(wait); - wait.await(); - nextInQueue(); + return wait.await(this.waitTimeoutInMillis, TimeUnit.MILLISECONDS) && nextInQueue(); } } + return true; } - private void nextInQueue() throws InterruptedException { + private boolean nextInQueue() throws InterruptedException { + boolean acquired = this.numRequestsAllowed.tryAcquire(1, this.waitTimeoutInMillis, TimeUnit.MILLISECONDS); + if (!acquired) { + return false; + } this.priorityOneRequests.addAndGet(1); - this.numRequestsAllowed.acquire(1); + return true; } private void exitFromQueue() { - this.priorityOneRequests.addAndGet(-1); this.numRequestsAllowed.release(1); + this.priorityOneRequests.addAndGet(-1); } - public void release(String priority) { + private void release(String priority) { if (this.priorities[0].equals(priority) || this.priorities[1].equals(priority)) { if (this.priorityOneRequests.get() > this.totalAllowedRequests) { // priority one request is waiting, let's inform it @@ -82,8 +97,8 @@ public void release(String priority) { @Override public SlotReservation allowSlotBorrowing() throws InterruptedException { - throw new RuntimeException( - "PriorityBasedRateLimiter.allowSlotBorrowing method is not implemented"); + // if we reach here that means slot is not available + return null; } public int getRequestsAllowed() { diff --git a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java index 90315a85c8d..d4ce299081c 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java +++ b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java @@ -107,9 +107,9 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque return RequestRateLimiter.UNLIMITED; } - if (typeOfRequest.equals(SolrRequest.RequestPriorities.FOREGROUND.toString()) - || typeOfRequest.equals(SolrRequest.RequestPriorities.BACKGROUND.toString())) { - typeOfRequest = SolrRequest.SolrRequestType.PRIORITY_BASED.toString(); + if (typeOfRequest.equals(SolrRequest.RequestPriorities.FOREGROUND.name()) + || typeOfRequest.equals(SolrRequest.RequestPriorities.BACKGROUND.name())) { + typeOfRequest = SolrRequest.SolrRequestType.PRIORITY_BASED.name(); } RequestRateLimiter requestRateLimiter = requestRateLimiterMap.get(typeOfRequest); diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index 3edb552a285..bc87aa839a7 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -82,8 +82,7 @@ public void testConcurrentQueries() throws Exception { DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, true /* isSlotBorrowing */, - false, - 0); + false); // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes // its parent here RateLimitManager.Builder builder = @@ -132,8 +131,7 @@ public void testSlotBorrowingAcquisitionTimeout() slotAcqTimeMillis, slotLimit /* allowedRequests */, true /* isSlotBorrowing */, - false, - 0); + false); // set allowed/guaranteed to the same, and very low, to force it to mainly borrow. It would also // be theoretically possible to optimize a single-request-type config to bypass slot-borrowing // logic altogether, so configuring a second ratelimiter eliminates the possibility that at @@ -146,8 +144,7 @@ public void testSlotBorrowingAcquisitionTimeout() slotAcqTimeMillis, 1 /* allowedRequests */, true /* isSlotBorrowing */, - false, - 0); + false); mgr.registerRequestRateLimiter( new RequestRateLimiter(queryConfig), SolrRequest.SolrRequestType.QUERY); mgr.registerRequestRateLimiter( @@ -326,8 +323,7 @@ public void testSlotBorrowing() throws Exception { DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, true /* isSlotBorrowing */, - false, - 0); + false); RateLimiterConfig indexRateLimiterConfig = new RateLimiterConfig( SolrRequest.SolrRequestType.UPDATE, @@ -336,8 +332,7 @@ public void testSlotBorrowing() throws Exception { DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, true /* isSlotBorrowing */, - false, - 0); + false); // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes // its parent RateLimitManager.Builder builder = @@ -509,8 +504,7 @@ public void testAdjustingConfig() throws IOException, InterruptedException { 20, allowed /* allowedRequests */, true /* isSlotBorrowing */, - false, - 0); + false); RequestRateLimiter limiter = new RequestRateLimiter(config); ExecutorService exec = ExecutorUtil.newMDCAwareCachedThreadPool("tests"); try (Closeable c = () -> ExecutorUtil.shutdownAndAwaitTermination(exec)) { @@ -614,8 +608,7 @@ public void testAdjustingConfig() throws IOException, InterruptedException { 20, allowed /* allowedRequests */, true /* isSlotBorrowing */, - false, - 0); + false); limiter = new RequestRateLimiter(config); } } @@ -623,57 +616,6 @@ public void testAdjustingConfig() throws IOException, InterruptedException { @Test public void testPriorityBasedRateLimiter() throws Exception { - try (CloudSolrClient client = - cluster.basicSolrClientBuilder().withDefaultCollection(FIRST_COLLECTION).build()) { - - CollectionAdminRequest.createCollection(FIRST_COLLECTION, 1, 1).process(client); - cluster.waitForActiveCollection(FIRST_COLLECTION, 1, 1); - - SolrDispatchFilter solrDispatchFilter = cluster.getJettySolrRunner(0).getSolrDispatchFilter(); - - RateLimiterConfig rateLimiterConfig = - new RateLimiterConfig( - SolrRequest.SolrRequestType.QUERY, - true, - 1, - DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, - 5 /* allowedRequests */, - true /* isSlotBorrowing */, - true, - 1); - // We are fine with a null FilterConfig here since we ensure that MockBuilder never invokes - // its parent here - RateLimitManager.Builder builder = - new MockBuilder( - null /* dummy SolrZkClient */, new MockRequestRateLimiter(rateLimiterConfig)); - RateLimitManager rateLimitManager = builder.build(); - - solrDispatchFilter.replaceRateLimitManager(rateLimitManager); - - int numDocs = TEST_NIGHTLY ? 10000 : 100; - - processTest(client, numDocs, 350 /* number of queries */); - - MockRequestRateLimiter mockQueryRateLimiter = - (MockRequestRateLimiter) - rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.QUERY); - - assertEquals(350, mockQueryRateLimiter.incomingRequestCount.get()); - - assertTrue(mockQueryRateLimiter.acceptedNewRequestCount.get() > 0); - assertTrue( - (mockQueryRateLimiter.acceptedNewRequestCount.get() - == mockQueryRateLimiter.incomingRequestCount.get() - || mockQueryRateLimiter.rejectedRequestCount.get() > 0)); - assertEquals( - mockQueryRateLimiter.incomingRequestCount.get(), - mockQueryRateLimiter.acceptedNewRequestCount.get() - + mockQueryRateLimiter.rejectedRequestCount.get()); - } - } - - @Test - public void testPriorityBasedRateLimiter1() throws Exception { RateLimitManager rateLimitManager = new RateLimitManager(); // PriorityBasedRateLimiter @@ -685,8 +627,7 @@ public void testPriorityBasedRateLimiter1() throws Exception { DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS, 5 /* allowedRequests */, true /* isSlotBorrowing */, - true, - 1); + true); PriorityBasedRateLimiter requestRateLimiter = new PriorityBasedRateLimiter(rateLimiterConfig); @@ -717,4 +658,46 @@ public void testPriorityBasedRateLimiter1() throws Exception { assertEquals(0, requestRateLimiter.getRequestsAllowed()); } } + + @Test + public void testPriorityBasedRateLimiterTimeout() throws Exception { + RateLimitManager rateLimitManager = new RateLimitManager(); + + // PriorityBasedRateLimiter + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + 10, + 1 /* allowedRequests */, + true /* isSlotBorrowing */, + true); + + PriorityBasedRateLimiter requestRateLimiter = new PriorityBasedRateLimiter(rateLimiterConfig); + + rateLimitManager.registerRequestRateLimiter( + requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); + + HttpServletRequest firstRequest = new DummyRequest(null, "FOREGROUND"); + + RequestRateLimiter.SlotReservation firstRequestAllowed = rateLimitManager.handleRequest(firstRequest); + assertNotNull(firstRequestAllowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + + + HttpServletRequest secondRequest = new DummyRequest(null, "FOREGROUND"); + + RequestRateLimiter.SlotReservation secondRequestNotAllowed = rateLimitManager.handleRequest(secondRequest); + assertNull(secondRequestNotAllowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + + HttpServletRequest thirdRequest = new DummyRequest(null, "BACKGROUND"); + + RequestRateLimiter.SlotReservation thirdRequestNotAllowed = rateLimitManager.handleRequest(thirdRequest); + assertNull(thirdRequestNotAllowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); + + firstRequestAllowed.close(); + } } From 8d6eb7c56471742da59c1ffa8e5b51fe85242d63 Mon Sep 17 00:00:00 2001 From: hitesh Date: Thu, 24 Oct 2024 12:16:57 -0700 Subject: [PATCH 03/13] tidy --- .../apache/solr/core/RateLimiterConfig.java | 1 + .../servlet/PriorityBasedRateLimiter.java | 8 +++-- .../solr/servlet/TestRequestRateLimiter.java | 32 ++++++++++--------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java index c8f37a5eeb8..9bd24cf4113 100644 --- a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java +++ b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java @@ -33,6 +33,7 @@ public class RateLimiterConfig { public final boolean isSlotBorrowingEnabled; public final int guaranteedSlotsThreshold; public final Boolean priorityBasedEnabled; + /** * We store the config definition in order to determine whether anything has changed that would * call for re-initialization. diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index 6c9a4906580..d2985082f33 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -13,7 +13,8 @@ * priorities {@link SolrRequest.RequestPriorities} FOREGROUND and {@link * SolrRequest.RequestPriorities} BACKGROUND Requests. Client can pass the {@link * org.apache.solr.common.params.CommonParams} SOLR_REQUEST_TYPE_PARAM request header to indicate - * the foreground and background request. Foreground requests has high priority than background requests + * the foreground and background request. Foreground requests has high priority than background + * requests */ public class PriorityBasedRateLimiter extends RequestRateLimiter { private final AtomicInteger priorityOneRequests = new AtomicInteger(); @@ -44,7 +45,7 @@ public SlotReservation handleRequest(String requestPriority) { if (!acquire(requestPriority)) { return null; } - }catch (InterruptedException ie) { + } catch (InterruptedException ie) { return null; } return () -> PriorityBasedRateLimiter.this.release(requestPriority); @@ -66,7 +67,8 @@ private boolean acquire(String priority) throws InterruptedException { } private boolean nextInQueue() throws InterruptedException { - boolean acquired = this.numRequestsAllowed.tryAcquire(1, this.waitTimeoutInMillis, TimeUnit.MILLISECONDS); + boolean acquired = + this.numRequestsAllowed.tryAcquire(1, this.waitTimeoutInMillis, TimeUnit.MILLISECONDS); if (!acquired) { return false; } diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index bc87aa839a7..67ea40dfb56 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -665,36 +665,38 @@ public void testPriorityBasedRateLimiterTimeout() throws Exception { // PriorityBasedRateLimiter RateLimiterConfig rateLimiterConfig = - new RateLimiterConfig( - SolrRequest.SolrRequestType.QUERY, - true, - 1, - 10, - 1 /* allowedRequests */, - true /* isSlotBorrowing */, - true); + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + 10, + 1 /* allowedRequests */, + true /* isSlotBorrowing */, + true); PriorityBasedRateLimiter requestRateLimiter = new PriorityBasedRateLimiter(rateLimiterConfig); rateLimitManager.registerRequestRateLimiter( - requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); + requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); HttpServletRequest firstRequest = new DummyRequest(null, "FOREGROUND"); - RequestRateLimiter.SlotReservation firstRequestAllowed = rateLimitManager.handleRequest(firstRequest); + RequestRateLimiter.SlotReservation firstRequestAllowed = + rateLimitManager.handleRequest(firstRequest); assertNotNull(firstRequestAllowed); assertEquals(1, requestRateLimiter.getRequestsAllowed()); - HttpServletRequest secondRequest = new DummyRequest(null, "FOREGROUND"); - RequestRateLimiter.SlotReservation secondRequestNotAllowed = rateLimitManager.handleRequest(secondRequest); - assertNull(secondRequestNotAllowed); - assertEquals(1, requestRateLimiter.getRequestsAllowed()); + RequestRateLimiter.SlotReservation secondRequestNotAllowed = + rateLimitManager.handleRequest(secondRequest); + assertNull(secondRequestNotAllowed); + assertEquals(1, requestRateLimiter.getRequestsAllowed()); HttpServletRequest thirdRequest = new DummyRequest(null, "BACKGROUND"); - RequestRateLimiter.SlotReservation thirdRequestNotAllowed = rateLimitManager.handleRequest(thirdRequest); + RequestRateLimiter.SlotReservation thirdRequestNotAllowed = + rateLimitManager.handleRequest(thirdRequest); assertNull(thirdRequestNotAllowed); assertEquals(1, requestRateLimiter.getRequestsAllowed()); From cbf0ed5afc2875f8ed9c56b228fcd2f90e4c91b5 Mon Sep 17 00:00:00 2001 From: hitesh Date: Thu, 24 Oct 2024 12:20:04 -0700 Subject: [PATCH 04/13] cleanup --- .../client/solrj/request/beans/RateLimiterPayload.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java index 78fe897fff5..6088cd00cee 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/beans/RateLimiterPayload.java @@ -35,8 +35,6 @@ public class RateLimiterPayload implements ReflectMapWriter { @JsonProperty public Boolean priorityBasedEnabled; - @JsonProperty public Integer priorityMaxRequests; - public RateLimiterPayload copy() { RateLimiterPayload result = new RateLimiterPayload(); @@ -46,7 +44,6 @@ public RateLimiterPayload copy() { result.slotBorrowingEnabled = slotBorrowingEnabled; result.slotAcquisitionTimeoutInMS = slotAcquisitionTimeoutInMS; result.priorityBasedEnabled = priorityBasedEnabled; - result.priorityMaxRequests = priorityMaxRequests; return result; } @@ -59,8 +56,7 @@ public boolean equals(Object obj) { && Objects.equals(this.allowedRequests, that.allowedRequests) && Objects.equals(this.slotBorrowingEnabled, that.slotBorrowingEnabled) && Objects.equals(this.slotAcquisitionTimeoutInMS, that.slotAcquisitionTimeoutInMS) - && Objects.equals(this.priorityBasedEnabled, that.priorityBasedEnabled) - && Objects.equals(this.priorityMaxRequests, that.priorityMaxRequests); + && Objects.equals(this.priorityBasedEnabled, that.priorityBasedEnabled); } return false; } @@ -73,7 +69,6 @@ public int hashCode() { allowedRequests, slotBorrowingEnabled, slotAcquisitionTimeoutInMS, - priorityBasedEnabled, - priorityMaxRequests); + priorityBasedEnabled); } } From 075875a13c46a7380259a9692a0b39ec9179fe0b Mon Sep 17 00:00:00 2001 From: hitesh Date: Thu, 24 Oct 2024 12:37:02 -0700 Subject: [PATCH 05/13] cleanup --- .../java/org/apache/solr/servlet/PriorityBasedRateLimiter.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index d2985082f33..fd1e0c88fbf 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -10,8 +10,7 @@ /** * PriorityBasedRateLimiter allocates the slot based on their request priority Currently, it has two - * priorities {@link SolrRequest.RequestPriorities} FOREGROUND and {@link - * SolrRequest.RequestPriorities} BACKGROUND Requests. Client can pass the {@link + * priorities FOREGROUND and BACKGROUND Requests. Client can pass the {@link * org.apache.solr.common.params.CommonParams} SOLR_REQUEST_TYPE_PARAM request header to indicate * the foreground and background request. Foreground requests has high priority than background * requests From 5892b6a536fff321bbe26e586e94f74656ee82d7 Mon Sep 17 00:00:00 2001 From: hitesh Date: Thu, 24 Oct 2024 16:20:29 -0700 Subject: [PATCH 06/13] Updated dynamic change + test --- .../apache/solr/servlet/QueryRateLimiter.java | 12 +---- .../apache/solr/servlet/RateLimitManager.java | 26 +++++++++-- .../solr/servlet/TestRequestRateLimiter.java | 45 +++++++++++++++++++ 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java index 0a4f7b082df..91145c03653 100644 --- a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java @@ -48,19 +48,11 @@ public QueryRateLimiter(RateLimiterConfig config) { } public static RateLimiterConfig processConfigChange( - RateLimiterConfig rateLimiterConfig, Map properties) throws IOException { - byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY)); - - RateLimiterPayload rateLimiterMeta; - if (configInput == null || configInput.length == 0) { - rateLimiterMeta = null; - } else { - rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); - } + RateLimiterConfig rateLimiterConfig, RateLimiterPayload rateLimiterMeta) throws IOException { // default rate limiter SolrRequest.SolrRequestType requestType = SolrRequest.SolrRequestType.QUERY; - if (rateLimiterConfig.priorityBasedEnabled) { + if (rateLimiterMeta.priorityBasedEnabled) { requestType = SolrRequest.SolrRequestType.PRIORITY_BASED; } diff --git a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java index d4ce299081c..7d244bc3024 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java +++ b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java @@ -19,7 +19,9 @@ import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_CONTEXT_PARAM; import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_TYPE_PARAM; +import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.invoke.MethodHandles; @@ -28,9 +30,12 @@ import javax.servlet.http.HttpServletRequest; import net.jcip.annotations.ThreadSafe; import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; import org.apache.solr.common.cloud.ClusterPropertiesListener; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.util.Utils; import org.apache.solr.core.RateLimiterConfig; +import org.apache.solr.util.SolrJacksonAnnotationInspector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +51,7 @@ @ThreadSafe public class RateLimitManager implements ClusterPropertiesListener { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper(); public static final String ERROR_MESSAGE = "Too many requests for this request type. Please try after some time or increase the quota for this request type"; public static final int DEFAULT_CONCURRENT_REQUESTS = @@ -61,14 +66,29 @@ public RateLimitManager() { @Override public boolean onChange(Map properties) { + byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY)); + + RateLimiterPayload rateLimiterMeta; + if (configInput == null || configInput.length == 0) { + return false; + } else { + try { + rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + // Hack: We only support query rate limiting for now requestRateLimiterMap.compute( - SolrRequest.SolrRequestType.QUERY.toString(), + rateLimiterMeta.priorityBasedEnabled + ? SolrRequest.SolrRequestType.PRIORITY_BASED.name() + : SolrRequest.SolrRequestType.QUERY.name(), (k, v) -> { try { RateLimiterConfig newConfig = QueryRateLimiter.processConfigChange( - v == null ? null : v.getRateLimiterConfig(), properties); + v == null ? null : v.getRateLimiterConfig(), rateLimiterMeta); if (newConfig == null) { return v; } else { diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index 67ea40dfb56..c309ee2f3ba 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -26,7 +26,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -702,4 +704,47 @@ public void testPriorityBasedRateLimiterTimeout() throws Exception { firstRequestAllowed.close(); } + + @Test + public void testPriorityBasedRateLimiterDynamicChange() throws Exception { + RateLimitManager rateLimitManager = new RateLimitManager(); + + // PriorityBasedRateLimiter + RateLimiterConfig rateLimiterConfig = + new RateLimiterConfig( + SolrRequest.SolrRequestType.QUERY, + true, + 1, + 10, + 1 /* allowedRequests */, + true /* isSlotBorrowing */, + false); + + QueryRateLimiter requestRateLimiter = new QueryRateLimiter(rateLimiterConfig); + + rateLimitManager.registerRequestRateLimiter( + requestRateLimiter, SolrRequest.SolrRequestType.QUERY); + + RequestRateLimiter rateLimiter = + rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.PRIORITY_BASED); + assertNull(rateLimiter); + + Map properties = new HashMap<>(); + Map rateLimiterProps = new HashMap<>(); + rateLimiterProps.put("enabled", true); + rateLimiterProps.put("guaranteedSlots", 1); + rateLimiterProps.put("allowedRequests", 1); + rateLimiterProps.put("slotBorrowingEnabled", false); + rateLimiterProps.put("slotAcquisitionTimeoutInMS", 100); + rateLimiterProps.put("priorityBasedEnabled", true); + properties.put("rate-limiters", rateLimiterProps); + + // updating rate limiter + rateLimitManager.onChange(properties); + + rateLimiter = + rateLimitManager.getRequestRateLimiter(SolrRequest.SolrRequestType.PRIORITY_BASED); + + assertEquals(true, rateLimiter.getRateLimiterConfig().priorityBasedEnabled); + } } From 5d5c47d949b6cc2ff4cca98a8376c277ec5a416e Mon Sep 17 00:00:00 2001 From: hitesh Date: Thu, 24 Oct 2024 18:13:41 -0700 Subject: [PATCH 07/13] refactor init code --- .../servlet/PriorityBasedRateLimiter.java | 3 ++ .../apache/solr/servlet/QueryRateLimiter.java | 45 ---------------- .../apache/solr/servlet/RateLimitManager.java | 52 ++++++++++++++++++- .../solr/servlet/RequestRateLimiter.java | 2 +- 4 files changed, 54 insertions(+), 48 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index fd1e0c88fbf..d70e3b4611b 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -40,6 +40,9 @@ public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) { @Override public SlotReservation handleRequest(String requestPriority) { + if (!rateLimiterConfig.isEnabled) { + return UNLIMITED; + } try { if (!acquire(requestPriority)) { return null; diff --git a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java index 91145c03653..11e5c10692f 100644 --- a/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java @@ -17,20 +17,12 @@ package org.apache.solr.servlet; -import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY; - import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; -import java.util.Map; import org.apache.solr.client.solrj.SolrRequest; import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; -import org.apache.solr.common.cloud.SolrZkClient; -import org.apache.solr.common.cloud.ZkStateReader; -import org.apache.solr.common.util.Utils; import org.apache.solr.core.RateLimiterConfig; import org.apache.solr.util.SolrJacksonAnnotationInspector; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.data.Stat; /** * Implementation of RequestRateLimiter specific to query request types. Most of the actual work is @@ -39,10 +31,6 @@ public class QueryRateLimiter extends RequestRateLimiter { private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper(); - public QueryRateLimiter(SolrZkClient solrZkClient) { - super(constructQueryRateLimiterConfig(solrZkClient)); - } - public QueryRateLimiter(RateLimiterConfig config) { super(config); } @@ -63,37 +51,4 @@ public static RateLimiterConfig processConfigChange( return null; } } - - // To be used in initialization - @SuppressWarnings({"unchecked"}) - private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) { - try { - - if (zkClient == null) { - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } - - Map clusterPropsJson = - (Map) - Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)); - byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY)); - - if (configInput.length == 0) { - // No Rate Limiter configuration defined in clusterprops.json. Return default configuration - // values - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } - - RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class); - - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta); - } catch (KeeperException.NoNodeException e) { - return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); - } catch (KeeperException | InterruptedException e) { - throw new RuntimeException( - "Error reading cluster property", SolrZkClient.checkInterrupted(e)); - } catch (IOException e) { - throw new RuntimeException("Encountered an IOException " + e.getMessage()); - } - } } diff --git a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java index 7d244bc3024..8799d1008fc 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java +++ b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java @@ -33,9 +33,12 @@ import org.apache.solr.client.solrj.request.beans.RateLimiterPayload; import org.apache.solr.common.cloud.ClusterPropertiesListener; import org.apache.solr.common.cloud.SolrZkClient; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.util.Utils; import org.apache.solr.core.RateLimiterConfig; import org.apache.solr.util.SolrJacksonAnnotationInspector; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -214,10 +217,55 @@ public Builder(SolrZkClient solrZkClient) { public RateLimitManager build() { RateLimitManager rateLimitManager = new RateLimitManager(); - rateLimitManager.registerRequestRateLimiter( - new QueryRateLimiter(solrZkClient), SolrRequest.SolrRequestType.QUERY); + RateLimiterConfig rateLimiterConfig = constructQueryRateLimiterConfig(solrZkClient); + + if (rateLimiterConfig.priorityBasedEnabled) { + rateLimitManager.registerRequestRateLimiter( + new PriorityBasedRateLimiter(rateLimiterConfig), + SolrRequest.SolrRequestType.PRIORITY_BASED); + } else { + rateLimitManager.registerRequestRateLimiter( + new QueryRateLimiter(rateLimiterConfig), SolrRequest.SolrRequestType.QUERY); + } return rateLimitManager; } + + // To be used in initialization + @SuppressWarnings({"unchecked"}) + private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) { + try { + + if (zkClient == null) { + return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + } + + Map clusterPropsJson = + (Map) + Utils.fromJSON( + zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true)); + byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY)); + + if (configInput.length == 0) { + // No Rate Limiter configuration defined in clusterprops.json. Return default + // configuration + // values + return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + } + + RateLimiterPayload rateLimiterMeta = + mapper.readValue(configInput, RateLimiterPayload.class); + return rateLimiterMeta.priorityBasedEnabled + ? new RateLimiterConfig(SolrRequest.SolrRequestType.PRIORITY_BASED, rateLimiterMeta) + : new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta); + } catch (KeeperException.NoNodeException e) { + return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY); + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException( + "Error reading cluster property", SolrZkClient.checkInterrupted(e)); + } catch (IOException e) { + throw new RuntimeException("Encountered an IOException " + e.getMessage()); + } + } } } diff --git a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java index cbd13e677bc..3db9af73036 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java @@ -46,7 +46,7 @@ public class RequestRateLimiter { private final AtomicInteger nativeReservations; - private final RateLimiterConfig rateLimiterConfig; + protected final RateLimiterConfig rateLimiterConfig; public static final SlotReservation UNLIMITED = () -> { // no-op From adc9d9224a870fda310944a48338fe136991cb78 Mon Sep 17 00:00:00 2001 From: hitesh Date: Mon, 28 Oct 2024 11:25:21 -0700 Subject: [PATCH 08/13] review feedback --- .../apache/solr/core/RateLimiterConfig.java | 2 +- .../servlet/PriorityBasedRateLimiter.java | 70 +++++++++++-------- .../apache/solr/servlet/RateLimitManager.java | 7 +- .../solr/servlet/RequestRateLimiter.java | 3 +- .../solr/servlet/TestRequestRateLimiter.java | 38 +++++++--- .../apache/solr/client/solrj/SolrRequest.java | 5 -- 6 files changed, 75 insertions(+), 50 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java index 9bd24cf4113..d90a754f426 100644 --- a/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java +++ b/solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java @@ -32,7 +32,7 @@ public class RateLimiterConfig { public final int allowedRequests; public final boolean isSlotBorrowingEnabled; public final int guaranteedSlotsThreshold; - public final Boolean priorityBasedEnabled; + public final boolean priorityBasedEnabled; /** * We store the config definition in order to determine whether anything has changed that would diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index d70e3b4611b..942d854334d 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -5,7 +5,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.solr.client.solrj.SolrRequest; +import javax.servlet.http.HttpServletRequest; import org.apache.solr.core.RateLimiterConfig; /** @@ -16,8 +16,8 @@ * requests */ public class PriorityBasedRateLimiter extends RequestRateLimiter { - private final AtomicInteger priorityOneRequests = new AtomicInteger(); - private final String[] priorities; + public static final String SOLR_REQUEST_PRIORITY_PARAM = "Solr-Request-Priority"; + private final AtomicInteger activeRequests = new AtomicInteger(); private final Semaphore numRequestsAllowed; private final int totalAllowedRequests; @@ -28,21 +28,20 @@ public class PriorityBasedRateLimiter extends RequestRateLimiter { public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) { super(rateLimiterConfig); - this.priorities = - new String[] { - SolrRequest.RequestPriorities.FOREGROUND.name(), - SolrRequest.RequestPriorities.BACKGROUND.name() - }; this.numRequestsAllowed = new Semaphore(rateLimiterConfig.allowedRequests, true); this.totalAllowedRequests = rateLimiterConfig.allowedRequests; this.waitTimeoutInMillis = rateLimiterConfig.waitForSlotAcquisition; } @Override - public SlotReservation handleRequest(String requestPriority) { + public SlotReservation handleRequest(HttpServletRequest request) { if (!rateLimiterConfig.isEnabled) { return UNLIMITED; } + RequestPriorities requestPriority = getRequestPriority(request); + if (requestPriority == null) { + return UNLIMITED; + } try { if (!acquire(requestPriority)) { return null; @@ -53,11 +52,11 @@ public SlotReservation handleRequest(String requestPriority) { return () -> PriorityBasedRateLimiter.this.release(requestPriority); } - private boolean acquire(String priority) throws InterruptedException { - if (priority.equals(this.priorities[0])) { + private boolean acquire(RequestPriorities priority) throws InterruptedException { + if (priority.equals(RequestPriorities.FOREGROUND)) { return nextInQueue(); - } else if (priority.equals(this.priorities[1])) { - if (this.priorityOneRequests.get() < this.totalAllowedRequests) { + } else if (priority.equals(RequestPriorities.BACKGROUND)) { + if (this.activeRequests.get() < this.totalAllowedRequests) { return nextInQueue(); } else { CountDownLatch wait = new CountDownLatch(1); @@ -74,28 +73,26 @@ private boolean nextInQueue() throws InterruptedException { if (!acquired) { return false; } - this.priorityOneRequests.addAndGet(1); + this.activeRequests.addAndGet(1); return true; } private void exitFromQueue() { this.numRequestsAllowed.release(1); - this.priorityOneRequests.addAndGet(-1); + this.activeRequests.addAndGet(-1); } - private void release(String priority) { - if (this.priorities[0].equals(priority) || this.priorities[1].equals(priority)) { - if (this.priorityOneRequests.get() > this.totalAllowedRequests) { - // priority one request is waiting, let's inform it - this.exitFromQueue(); - } else { - // next priority - CountDownLatch waiter = this.waitingList.poll(); - if (waiter != null) { - waiter.countDown(); - } - this.exitFromQueue(); + private void release(RequestPriorities priority) { + if (this.activeRequests.get() > this.totalAllowedRequests) { + // priority one request is waiting, let's inform it + this.exitFromQueue(); + } else { + // next priority + CountDownLatch waiter = this.waitingList.poll(); + if (waiter != null) { + waiter.countDown(); } + this.exitFromQueue(); } } @@ -106,6 +103,23 @@ public SlotReservation allowSlotBorrowing() throws InterruptedException { } public int getRequestsAllowed() { - return this.priorityOneRequests.get(); + return this.activeRequests.get(); + } + + private RequestPriorities getRequestPriority(HttpServletRequest request) { + String requestPriority = request.getHeader(SOLR_REQUEST_PRIORITY_PARAM); + try { + return RequestPriorities.valueOf(requestPriority); + } catch (IllegalArgumentException iae) { + + } + return null; + } + + public enum RequestPriorities { + // this has high priority + FOREGROUND, + // this has low priority + BACKGROUND } } diff --git a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java index 8799d1008fc..d339cafb5c3 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java +++ b/solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java @@ -117,7 +117,6 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque throws InterruptedException { String requestContext = request.getHeader(SOLR_REQUEST_CONTEXT_PARAM); String typeOfRequest = request.getHeader(SOLR_REQUEST_TYPE_PARAM); - String requestPriority = typeOfRequest; if (typeOfRequest == null) { // Cannot determine if this request should be throttled @@ -130,10 +129,6 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque return RequestRateLimiter.UNLIMITED; } - if (typeOfRequest.equals(SolrRequest.RequestPriorities.FOREGROUND.name()) - || typeOfRequest.equals(SolrRequest.RequestPriorities.BACKGROUND.name())) { - typeOfRequest = SolrRequest.SolrRequestType.PRIORITY_BASED.name(); - } RequestRateLimiter requestRateLimiter = requestRateLimiterMap.get(typeOfRequest); if (requestRateLimiter == null) { @@ -144,7 +139,7 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque // slot borrowing should be fallback behavior, so if `slotAcquisitionTimeoutInMS` // is configured it will be applied here (blocking if necessary), to make a best // effort to draw from the request's own slot pool. - RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest(requestPriority); + RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest(request); if (result != null) { return result; diff --git a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java index 3db9af73036..c8a7d587882 100644 --- a/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/RequestRateLimiter.java @@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.http.HttpServletRequest; import net.jcip.annotations.ThreadSafe; import org.apache.solr.core.RateLimiterConfig; @@ -91,7 +92,7 @@ boolean isEmpty() { * Handles an incoming request. returns a metadata object representing the metadata for the * acquired slot, if acquired. If a slot is not acquired, returns a null metadata object. */ - public SlotReservation handleRequest(String requestType) throws InterruptedException { + public SlotReservation handleRequest(HttpServletRequest request) throws InterruptedException { if (!rateLimiterConfig.isEnabled) { return UNLIMITED; diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index c309ee2f3ba..9dae9c361e9 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -19,6 +19,7 @@ import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_CONTEXT_PARAM; import static org.apache.solr.common.params.CommonParams.SOLR_REQUEST_TYPE_PARAM; +import static org.apache.solr.servlet.PriorityBasedRateLimiter.SOLR_REQUEST_PRIORITY_PARAM; import static org.apache.solr.servlet.RateLimitManager.DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; @@ -288,10 +289,20 @@ private static class DummyRequest extends Request { private final String ctx; private final String type; + private final String priority; + public DummyRequest(String ctx, String type) { super(null, null); this.ctx = ctx; this.type = type; + this.priority = null; + } + + public DummyRequest(String ctx, String type, String priority) { + super(null, null); + this.ctx = ctx; + this.type = type; + this.priority = priority; } @Override @@ -301,6 +312,8 @@ public String getHeader(String name) { return ctx; case SOLR_REQUEST_TYPE_PARAM: return type; + case SOLR_REQUEST_PRIORITY_PARAM: + return priority; default: throw new IllegalArgumentException(); } @@ -427,10 +440,10 @@ public MockRequestRateLimiter(RateLimiterConfig config) { } @Override - public SlotReservation handleRequest(String requestType) throws InterruptedException { + public SlotReservation handleRequest(HttpServletRequest request) throws InterruptedException { incomingRequestCount.getAndIncrement(); - SlotReservation response = super.handleRequest(requestType); + SlotReservation response = super.handleRequest(request); if (response != null) { acceptedNewRequestCount.getAndIncrement(); @@ -493,6 +506,7 @@ public RateLimitManager build() { @Test @SuppressWarnings("try") public void testAdjustingConfig() throws IOException, InterruptedException { + DummyRequest dr = new DummyRequest(null, SolrRequest.SolrRequestType.QUERY.toString()); Random r = random(); int maxAllowed = 32; int allowed = r.nextInt(maxAllowed) + 1; @@ -530,7 +544,7 @@ public void testAdjustingConfig() throws IOException, InterruptedException { () -> { while (!finish.get()) { try (RequestRateLimiter.SlotReservation slotReservation = - limiterF.handleRequest(SolrRequest.SolrRequestType.QUERY.toString())) { + limiterF.handleRequest(dr)) { if (slotReservation != null) { executed.increment(); int ct = outstanding.incrementAndGet(); @@ -636,7 +650,8 @@ public void testPriorityBasedRateLimiter() throws Exception { rateLimitManager.registerRequestRateLimiter( requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); - HttpServletRequest foreground = new DummyRequest(null, "FOREGROUND"); + HttpServletRequest foreground = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "FOREGROUND"); try (final RequestRateLimiter.SlotReservation allowed = rateLimitManager.handleRequest(foreground)) { @@ -644,7 +659,8 @@ public void testPriorityBasedRateLimiter() throws Exception { assertEquals(1, requestRateLimiter.getRequestsAllowed()); } - HttpServletRequest background = new DummyRequest(null, "BACKGROUND"); + HttpServletRequest background = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "BACKGROUND"); try (final RequestRateLimiter.SlotReservation allowed = rateLimitManager.handleRequest(background)) { @@ -652,7 +668,8 @@ public void testPriorityBasedRateLimiter() throws Exception { assertEquals(1, requestRateLimiter.getRequestsAllowed()); } - HttpServletRequest unknown = new DummyRequest(null, "unknown"); + HttpServletRequest unknown = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "unknown"); try (final RequestRateLimiter.SlotReservation allowed = rateLimitManager.handleRequest(unknown)) { @@ -681,21 +698,24 @@ public void testPriorityBasedRateLimiterTimeout() throws Exception { rateLimitManager.registerRequestRateLimiter( requestRateLimiter, SolrRequest.SolrRequestType.PRIORITY_BASED); - HttpServletRequest firstRequest = new DummyRequest(null, "FOREGROUND"); + HttpServletRequest firstRequest = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "FOREGROUND"); RequestRateLimiter.SlotReservation firstRequestAllowed = rateLimitManager.handleRequest(firstRequest); assertNotNull(firstRequestAllowed); assertEquals(1, requestRateLimiter.getRequestsAllowed()); - HttpServletRequest secondRequest = new DummyRequest(null, "FOREGROUND"); + HttpServletRequest secondRequest = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "FOREGROUND"); RequestRateLimiter.SlotReservation secondRequestNotAllowed = rateLimitManager.handleRequest(secondRequest); assertNull(secondRequestNotAllowed); assertEquals(1, requestRateLimiter.getRequestsAllowed()); - HttpServletRequest thirdRequest = new DummyRequest(null, "BACKGROUND"); + HttpServletRequest thirdRequest = + new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "BACKGROUND"); RequestRateLimiter.SlotReservation thirdRequestNotAllowed = rateLimitManager.handleRequest(thirdRequest); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java index d056fb4ac5d..cfd32af318c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrRequest.java @@ -62,11 +62,6 @@ public enum SolrRequestType { PRIORITY_BASED, }; - public enum RequestPriorities { - FOREGROUND, - BACKGROUND - } - public enum SolrClientContext { CLIENT, SERVER From 23c56d3285b17528428ffa7314452f28d162e44f Mon Sep 17 00:00:00 2001 From: hitesh Date: Mon, 28 Oct 2024 11:41:26 -0700 Subject: [PATCH 09/13] cleanup --- .../org/apache/solr/servlet/PriorityBasedRateLimiter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index 942d854334d..262dc338252 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -49,7 +49,7 @@ public SlotReservation handleRequest(HttpServletRequest request) { } catch (InterruptedException ie) { return null; } - return () -> PriorityBasedRateLimiter.this.release(requestPriority); + return () -> PriorityBasedRateLimiter.this.release(); } private boolean acquire(RequestPriorities priority) throws InterruptedException { @@ -82,7 +82,7 @@ private void exitFromQueue() { this.activeRequests.addAndGet(-1); } - private void release(RequestPriorities priority) { + private void release() { if (this.activeRequests.get() > this.totalAllowedRequests) { // priority one request is waiting, let's inform it this.exitFromQueue(); From 7bbab4282dba737435541f28aa2bca35c36d7ef8 Mon Sep 17 00:00:00 2001 From: hitesh Date: Mon, 28 Oct 2024 11:46:08 -0700 Subject: [PATCH 10/13] if priority header is not set properly then throw 400 error --- .../org/apache/solr/servlet/PriorityBasedRateLimiter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index 262dc338252..3f7cbf1ab7c 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -6,6 +6,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.http.HttpServletRequest; + +import org.apache.solr.common.SolrException; import org.apache.solr.core.RateLimiterConfig; /** @@ -40,7 +42,7 @@ public SlotReservation handleRequest(HttpServletRequest request) { } RequestPriorities requestPriority = getRequestPriority(request); if (requestPriority == null) { - return UNLIMITED; + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Request priority header is not defined or not set properly"); } try { if (!acquire(requestPriority)) { @@ -111,7 +113,6 @@ private RequestPriorities getRequestPriority(HttpServletRequest request) { try { return RequestPriorities.valueOf(requestPriority); } catch (IllegalArgumentException iae) { - } return null; } From ad5031631ff7304f924c0b41f824636d01bbb89b Mon Sep 17 00:00:00 2001 From: hitesh Date: Mon, 28 Oct 2024 12:00:06 -0700 Subject: [PATCH 11/13] tidy --- .../org/apache/solr/servlet/PriorityBasedRateLimiter.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index 3f7cbf1ab7c..bf7b241be78 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -6,7 +6,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.http.HttpServletRequest; - import org.apache.solr.common.SolrException; import org.apache.solr.core.RateLimiterConfig; @@ -42,7 +41,9 @@ public SlotReservation handleRequest(HttpServletRequest request) { } RequestPriorities requestPriority = getRequestPriority(request); if (requestPriority == null) { - throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Request priority header is not defined or not set properly"); + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, + "Request priority header is not defined or not set properly"); } try { if (!acquire(requestPriority)) { From 68a4e64351a03dbed9bd83c16f4baaeac3b4b95d Mon Sep 17 00:00:00 2001 From: hitesh Date: Tue, 29 Oct 2024 11:18:08 -0700 Subject: [PATCH 12/13] Review feedback --- .../servlet/PriorityBasedRateLimiter.java | 26 +++++++++---------- .../solr/servlet/TestRequestRateLimiter.java | 7 +++-- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index bf7b241be78..3b9ce49a16b 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -25,13 +25,13 @@ public class PriorityBasedRateLimiter extends RequestRateLimiter { private final LinkedBlockingQueue waitingList = new LinkedBlockingQueue<>(); - private final long waitTimeoutInMillis; + private final long waitTimeoutInNanos; public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) { super(rateLimiterConfig); this.numRequestsAllowed = new Semaphore(rateLimiterConfig.allowedRequests, true); this.totalAllowedRequests = rateLimiterConfig.allowedRequests; - this.waitTimeoutInMillis = rateLimiterConfig.waitForSlotAcquisition; + this.waitTimeoutInNanos = rateLimiterConfig.waitForSlotAcquisition * 1000000l; } @Override @@ -57,26 +57,29 @@ public SlotReservation handleRequest(HttpServletRequest request) { private boolean acquire(RequestPriorities priority) throws InterruptedException { if (priority.equals(RequestPriorities.FOREGROUND)) { - return nextInQueue(); + return nextInQueue(this.waitTimeoutInNanos); } else if (priority.equals(RequestPriorities.BACKGROUND)) { if (this.activeRequests.get() < this.totalAllowedRequests) { - return nextInQueue(); + return nextInQueue(this.waitTimeoutInNanos); } else { CountDownLatch wait = new CountDownLatch(1); this.waitingList.put(wait); - return wait.await(this.waitTimeoutInMillis, TimeUnit.MILLISECONDS) && nextInQueue(); + long startTime = System.nanoTime(); + return wait.await(this.waitTimeoutInNanos, TimeUnit.NANOSECONDS) + && nextInQueue(this.waitTimeoutInNanos - (System.nanoTime() - startTime)); } } return true; } - private boolean nextInQueue() throws InterruptedException { + private boolean nextInQueue(long waitTimeoutInNanos) throws InterruptedException { + this.activeRequests.addAndGet(1); boolean acquired = - this.numRequestsAllowed.tryAcquire(1, this.waitTimeoutInMillis, TimeUnit.MILLISECONDS); + this.numRequestsAllowed.tryAcquire(1, waitTimeoutInNanos, TimeUnit.NANOSECONDS); if (!acquired) { + this.activeRequests.addAndGet(-1); return false; } - this.activeRequests.addAndGet(1); return true; } @@ -86,16 +89,13 @@ private void exitFromQueue() { } private void release() { - if (this.activeRequests.get() > this.totalAllowedRequests) { - // priority one request is waiting, let's inform it - this.exitFromQueue(); - } else { + this.exitFromQueue(); + if (this.activeRequests.get() < this.totalAllowedRequests) { // next priority CountDownLatch waiter = this.waitingList.poll(); if (waiter != null) { waiter.countDown(); } - this.exitFromQueue(); } } diff --git a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java index 9dae9c361e9..e4cc923787d 100644 --- a/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java +++ b/solr/core/src/test/org/apache/solr/servlet/TestRequestRateLimiter.java @@ -671,11 +671,14 @@ public void testPriorityBasedRateLimiter() throws Exception { HttpServletRequest unknown = new DummyRequest(null, SolrRequest.SolrRequestType.PRIORITY_BASED.name(), "unknown"); + boolean gotException = false; try (final RequestRateLimiter.SlotReservation allowed = rateLimitManager.handleRequest(unknown)) { - assertNotNull(allowed); - assertEquals(0, requestRateLimiter.getRequestsAllowed()); + assertNull(allowed); + } catch (SolrException se) { + gotException = true; } + assertTrue(gotException); } @Test From 51a370899fddc52da66477def533dd452c4b014f Mon Sep 17 00:00:00 2001 From: hitesh Date: Wed, 30 Oct 2024 09:12:01 -0700 Subject: [PATCH 13/13] fixed request leak --- .../apache/solr/servlet/PriorityBasedRateLimiter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java index 3b9ce49a16b..b4d17afaae5 100644 --- a/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java +++ b/solr/core/src/java/org/apache/solr/servlet/PriorityBasedRateLimiter.java @@ -65,8 +65,13 @@ private boolean acquire(RequestPriorities priority) throws InterruptedException CountDownLatch wait = new CountDownLatch(1); this.waitingList.put(wait); long startTime = System.nanoTime(); - return wait.await(this.waitTimeoutInNanos, TimeUnit.NANOSECONDS) - && nextInQueue(this.waitTimeoutInNanos - (System.nanoTime() - startTime)); + if (wait.await(this.waitTimeoutInNanos, TimeUnit.NANOSECONDS)) { + return nextInQueue(this.waitTimeoutInNanos - (System.nanoTime() - startTime)); + } else { + // remove from the queue; this/other requests already waited long enough; thus best effort + this.waitingList.poll(); + return false; + } } } return true;