Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Prioritybased rate limiter #235

Merged
merged 13 commits into from
Oct 30, 2024
15 changes: 12 additions & 3 deletions solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class RateLimiterConfig {
public final int allowedRequests;
public final boolean isSlotBorrowingEnabled;
public final int guaranteedSlotsThreshold;
public final boolean priorityBasedEnabled;

/**
* We store the config definition in order to determine whether anything has changed that would
Expand All @@ -49,29 +50,33 @@ public RateLimiterConfig(
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled) {
this(
requestType,
makePayload(
isEnabled,
guaranteedSlotsThreshold,
waitForSlotAcquisition,
allowedRequests,
isSlotBorrowingEnabled));
isSlotBorrowingEnabled,
priorityBasedEnabled));
}

private static RateLimiterPayload makePayload(
boolean isEnabled,
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled) {
RateLimiterPayload ret = new RateLimiterPayload();
ret.enabled = isEnabled;
ret.allowedRequests = allowedRequests;
ret.guaranteedSlots = guaranteedSlotsThreshold;
ret.slotBorrowingEnabled = isSlotBorrowingEnabled;
ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition);
ret.priorityBasedEnabled = priorityBasedEnabled;
return ret;
}

Expand All @@ -98,6 +103,9 @@ public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPay
? DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS
: definition.slotAcquisitionTimeoutInMS.longValue();

priorityBasedEnabled =
definition.priorityBasedEnabled == null ? false : definition.priorityBasedEnabled;

this.definition = definition;
}

Expand Down Expand Up @@ -125,6 +133,7 @@ public String toString() {
sb.append(", guaranteedSlots=").append(guaranteedSlotsThreshold);
sb.append(", borrowEnabled=").append(isSlotBorrowingEnabled);
sb.append(", waitForSlotMillis=").append(waitForSlotAcquisition);
sb.append(", priorityBasedEnabled=").append(priorityBasedEnabled);
return sb.append('}').toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package org.apache.solr.servlet;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.RateLimiterConfig;

/**
* PriorityBasedRateLimiter allocates the slot based on their request priority Currently, it has two
* priorities FOREGROUND and BACKGROUND Requests. Client can pass the {@link
* org.apache.solr.common.params.CommonParams} SOLR_REQUEST_TYPE_PARAM request header to indicate
* the foreground and background request. Foreground requests has high priority than background
* requests
*/
public class PriorityBasedRateLimiter extends RequestRateLimiter {
public static final String SOLR_REQUEST_PRIORITY_PARAM = "Solr-Request-Priority";
private final AtomicInteger activeRequests = new AtomicInteger();
private final Semaphore numRequestsAllowed;

private final int totalAllowedRequests;

private final LinkedBlockingQueue<CountDownLatch> waitingList = new LinkedBlockingQueue<>();

private final long waitTimeoutInMillis;

public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) {
super(rateLimiterConfig);
this.numRequestsAllowed = new Semaphore(rateLimiterConfig.allowedRequests, true);
this.totalAllowedRequests = rateLimiterConfig.allowedRequests;
this.waitTimeoutInMillis = rateLimiterConfig.waitForSlotAcquisition;
}

@Override
public SlotReservation handleRequest(HttpServletRequest request) {
if (!rateLimiterConfig.isEnabled) {
return UNLIMITED;
}
RequestPriorities requestPriority = getRequestPriority(request);
if (requestPriority == null) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"Request priority header is not defined or not set properly");
}
try {
if (!acquire(requestPriority)) {
return null;
}
} catch (InterruptedException ie) {
return null;
}
return () -> PriorityBasedRateLimiter.this.release();
}

private boolean acquire(RequestPriorities priority) throws InterruptedException {
if (priority.equals(RequestPriorities.FOREGROUND)) {
return nextInQueue();
} else if (priority.equals(RequestPriorities.BACKGROUND)) {
if (this.activeRequests.get() < this.totalAllowedRequests) {
return nextInQueue();
} else {
CountDownLatch wait = new CountDownLatch(1);
this.waitingList.put(wait);
return wait.await(this.waitTimeoutInMillis, TimeUnit.MILLISECONDS) && nextInQueue();
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return true;
}

private boolean nextInQueue() throws InterruptedException {
boolean acquired =
this.numRequestsAllowed.tryAcquire(1, this.waitTimeoutInMillis, TimeUnit.MILLISECONDS);
if (!acquired) {
return false;
}
this.activeRequests.addAndGet(1);
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

private void exitFromQueue() {
this.numRequestsAllowed.release(1);
this.activeRequests.addAndGet(-1);
}

private void release() {
if (this.activeRequests.get() > this.totalAllowedRequests) {
// priority one request is waiting, let's inform it
this.exitFromQueue();
} else {
// next priority
CountDownLatch waiter = this.waitingList.poll();
if (waiter != null) {
waiter.countDown();
}
this.exitFromQueue();
}
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public SlotReservation allowSlotBorrowing() throws InterruptedException {
// if we reach here that means slot is not available
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

public int getRequestsAllowed() {
return this.activeRequests.get();
}

private RequestPriorities getRequestPriority(HttpServletRequest request) {
String requestPriority = request.getHeader(SOLR_REQUEST_PRIORITY_PARAM);
try {
return RequestPriorities.valueOf(requestPriority);
} catch (IllegalArgumentException iae) {
}
return null;
}

public enum RequestPriorities {
// this has high priority
FOREGROUND,
// this has low priority
BACKGROUND
}
}
60 changes: 5 additions & 55 deletions solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,12 @@

package org.apache.solr.servlet;

import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.RateLimiterConfig;
import org.apache.solr.util.SolrJacksonAnnotationInspector;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

/**
* Implementation of RequestRateLimiter specific to query request types. Most of the actual work is
Expand All @@ -39,26 +31,17 @@
public class QueryRateLimiter extends RequestRateLimiter {
private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();

public QueryRateLimiter(SolrZkClient solrZkClient) {
super(constructQueryRateLimiterConfig(solrZkClient));
}

public QueryRateLimiter(RateLimiterConfig config) {
super(config);
}

public static RateLimiterConfig processConfigChange(
SolrRequest.SolrRequestType requestType,
RateLimiterConfig rateLimiterConfig,
Map<String, Object> properties)
throws IOException {
byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY));
RateLimiterConfig rateLimiterConfig, RateLimiterPayload rateLimiterMeta) throws IOException {

RateLimiterPayload rateLimiterMeta;
if (configInput == null || configInput.length == 0) {
rateLimiterMeta = null;
} else {
rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);
// default rate limiter
SolrRequest.SolrRequestType requestType = SolrRequest.SolrRequestType.QUERY;
if (rateLimiterMeta.priorityBasedEnabled) {
requestType = SolrRequest.SolrRequestType.PRIORITY_BASED;
}

if (rateLimiterConfig == null || rateLimiterConfig.shouldUpdate(rateLimiterMeta)) {
Expand All @@ -68,37 +51,4 @@ public static RateLimiterConfig processConfigChange(
return null;
}
}

// To be used in initialization
@SuppressWarnings({"unchecked"})
private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) {
try {

if (zkClient == null) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
}

Map<String, Object> clusterPropsJson =
(Map<String, Object>)
Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY));

if (configInput.length == 0) {
// No Rate Limiter configuration defined in clusterprops.json. Return default configuration
// values
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
}

RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);

return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta);
} catch (KeeperException.NoNodeException e) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(
"Error reading cluster property", SolrZkClient.checkInterrupted(e));
} catch (IOException e) {
throw new RuntimeException("Encountered an IOException " + e.getMessage());
}
}
}
Loading
Loading