Skip to content

Commit

Permalink
Added initial implementation of priority based rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
hiteshk25 committed Oct 24, 2024
1 parent 10837a3 commit d115c54
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 26 deletions.
24 changes: 21 additions & 3 deletions solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class RateLimiterConfig {
public final int allowedRequests;
public final boolean isSlotBorrowingEnabled;
public final int guaranteedSlotsThreshold;
public final Boolean priorityBasedEnabled;

public final int priorityMaxRequests;

/**
* We store the config definition in order to determine whether anything has changed that would
Expand All @@ -49,29 +52,37 @@ public RateLimiterConfig(
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled,
int priorityMaxRequests) {
this(
requestType,
makePayload(
isEnabled,
guaranteedSlotsThreshold,
waitForSlotAcquisition,
allowedRequests,
isSlotBorrowingEnabled));
isSlotBorrowingEnabled,
priorityBasedEnabled,
priorityMaxRequests));
}

private static RateLimiterPayload makePayload(
boolean isEnabled,
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled,
int priorityMaxRequests) {
RateLimiterPayload ret = new RateLimiterPayload();
ret.enabled = isEnabled;
ret.allowedRequests = allowedRequests;
ret.guaranteedSlots = guaranteedSlotsThreshold;
ret.slotBorrowingEnabled = isSlotBorrowingEnabled;
ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition);
ret.priorityBasedEnabled = priorityBasedEnabled;
ret.priorityMaxRequests = priorityMaxRequests;
return ret;
}

Expand All @@ -98,6 +109,11 @@ public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPay
? DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS
: definition.slotAcquisitionTimeoutInMS.longValue();

priorityBasedEnabled =
definition.priorityBasedEnabled == null ? false : definition.priorityBasedEnabled;
priorityMaxRequests =
definition.priorityMaxRequests == null ? 0 : definition.priorityMaxRequests;

this.definition = definition;
}

Expand Down Expand Up @@ -125,6 +141,8 @@ public String toString() {
sb.append(", guaranteedSlots=").append(guaranteedSlotsThreshold);
sb.append(", borrowEnabled=").append(isSlotBorrowingEnabled);
sb.append(", waitForSlotMillis=").append(waitForSlotAcquisition);
sb.append(", priorityBasedEnabled=").append(priorityBasedEnabled);
sb.append(", priorityMaxRequests=").append(priorityMaxRequests);
return sb.append('}').toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.apache.solr.servlet;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.core.RateLimiterConfig;

public class PriorityBasedRateLimiter extends RequestRateLimiter {
private final AtomicInteger priorityOneRequests = new AtomicInteger();
private final String[] priorities;
private final Semaphore numRequestsAllowed;

private final int totalAllowedRequests;

private final LinkedBlockingQueue<CountDownLatch> waitingList = new LinkedBlockingQueue<>();

public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) {
super(rateLimiterConfig);
this.priorities =
new String[] {
SolrRequest.RequestPriorities.FOREGROUND.toString(),
SolrRequest.RequestPriorities.BACKGROUND.toString()
};
this.numRequestsAllowed = new Semaphore(rateLimiterConfig.priorityMaxRequests, true);
this.totalAllowedRequests = rateLimiterConfig.priorityMaxRequests;
}

/* public PriorityBasedRequestLimiter(String[] priorities, int numRequestsAllowed) {
this.priorities = priorities;
this.numRequestsAllowed = new Semaphore(numRequestsAllowed, true);
this.totalAllowedRequests = numRequestsAllowed;
}*/

@Override
public SlotReservation handleRequest(String requestPriority) throws InterruptedException {
acquire(requestPriority);
return () -> PriorityBasedRateLimiter.this.release(requestPriority);
}

public void acquire(String priority) throws InterruptedException {
if (priority.equals(this.priorities[0])) {
nextInQueue();
} else if (priority.equals(this.priorities[1])) {
if (this.priorityOneRequests.get() < this.totalAllowedRequests) {
nextInQueue();
} else {
CountDownLatch wait = new CountDownLatch(1);
this.waitingList.put(wait);
wait.await();
nextInQueue();
}
}
}

private void nextInQueue() throws InterruptedException {
this.priorityOneRequests.addAndGet(1);
this.numRequestsAllowed.acquire(1);
}

private void exitFromQueue() {
this.priorityOneRequests.addAndGet(-1);
this.numRequestsAllowed.release(1);
}

public void release(String priority) {
if (this.priorities[0].equals(priority) || this.priorities[1].equals(priority)) {
if (this.priorityOneRequests.get() > this.totalAllowedRequests) {
// priority one request is waiting, let's inform it
this.exitFromQueue();
} else {
// next priority
CountDownLatch waiter = this.waitingList.poll();
if (waiter != null) {
waiter.countDown();
}
this.exitFromQueue();
}
}
}

@Override
public SlotReservation allowSlotBorrowing() throws InterruptedException {
throw new RuntimeException(
"PriorityBasedRateLimiter.allowSlotBorrowing method is not implemented");
}

public int getRequestsAllowed() {
return this.priorityOneRequests.get();
}
}
11 changes: 7 additions & 4 deletions solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ public QueryRateLimiter(RateLimiterConfig config) {
}

public static RateLimiterConfig processConfigChange(
SolrRequest.SolrRequestType requestType,
RateLimiterConfig rateLimiterConfig,
Map<String, Object> properties)
throws IOException {
RateLimiterConfig rateLimiterConfig, Map<String, Object> properties) throws IOException {
byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY));

RateLimiterPayload rateLimiterMeta;
Expand All @@ -61,6 +58,12 @@ public static RateLimiterConfig processConfigChange(
rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);
}

// default rate limiter
SolrRequest.SolrRequestType requestType = SolrRequest.SolrRequestType.QUERY;
if (rateLimiterConfig.priorityBasedEnabled) {
requestType = SolrRequest.SolrRequestType.PRIORITY_BASED;
}

if (rateLimiterConfig == null || rateLimiterConfig.shouldUpdate(rateLimiterMeta)) {
// no prior config, or config has changed; return the new config
return new RateLimiterConfig(requestType, rateLimiterMeta);
Expand Down
14 changes: 10 additions & 4 deletions solr/core/src/java/org/apache/solr/servlet/RateLimitManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ public boolean onChange(Map<String, Object> properties) {
try {
RateLimiterConfig newConfig =
QueryRateLimiter.processConfigChange(
SolrRequest.SolrRequestType.QUERY,
v == null ? null : v.getRateLimiterConfig(),
properties);
v == null ? null : v.getRateLimiterConfig(), properties);
if (newConfig == null) {
return v;
} else {
log.info("updated config: {}", newConfig);
if (newConfig.priorityBasedEnabled) {
return new PriorityBasedRateLimiter(newConfig);
}
return new QueryRateLimiter(newConfig);
}
} catch (IOException e) {
Expand All @@ -93,6 +94,7 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque
throws InterruptedException {
String requestContext = request.getHeader(SOLR_REQUEST_CONTEXT_PARAM);
String typeOfRequest = request.getHeader(SOLR_REQUEST_TYPE_PARAM);
String requestPriority = typeOfRequest;

if (typeOfRequest == null) {
// Cannot determine if this request should be throttled
Expand All @@ -105,6 +107,10 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque
return RequestRateLimiter.UNLIMITED;
}

if (typeOfRequest.equals(SolrRequest.RequestPriorities.FOREGROUND.toString())
|| typeOfRequest.equals(SolrRequest.RequestPriorities.BACKGROUND.toString())) {
typeOfRequest = SolrRequest.SolrRequestType.PRIORITY_BASED.toString();
}
RequestRateLimiter requestRateLimiter = requestRateLimiterMap.get(typeOfRequest);

if (requestRateLimiter == null) {
Expand All @@ -115,7 +121,7 @@ public RequestRateLimiter.SlotReservation handleRequest(HttpServletRequest reque
// slot borrowing should be fallback behavior, so if `slotAcquisitionTimeoutInMS`
// is configured it will be applied here (blocking if necessary), to make a best
// effort to draw from the request's own slot pool.
RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest();
RequestRateLimiter.SlotReservation result = requestRateLimiter.handleRequest(requestPriority);

if (result != null) {
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ boolean isEmpty() {
* Handles an incoming request. returns a metadata object representing the metadata for the
* acquired slot, if acquired. If a slot is not acquired, returns a null metadata object.
*/
public SlotReservation handleRequest() throws InterruptedException {
public SlotReservation handleRequest(String requestType) throws InterruptedException {

if (!rateLimiterConfig.isEnabled) {
return UNLIMITED;
Expand Down
Loading

0 comments on commit d115c54

Please sign in to comment.