Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Prioritybased rate limiter #235

Merged
merged 13 commits into from
Oct 30, 2024
15 changes: 12 additions & 3 deletions solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class RateLimiterConfig {
public final int allowedRequests;
public final boolean isSlotBorrowingEnabled;
public final int guaranteedSlotsThreshold;
public final Boolean priorityBasedEnabled;
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved

/**
* We store the config definition in order to determine whether anything has changed that would
Expand All @@ -49,29 +50,33 @@ public RateLimiterConfig(
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled) {
this(
requestType,
makePayload(
isEnabled,
guaranteedSlotsThreshold,
waitForSlotAcquisition,
allowedRequests,
isSlotBorrowingEnabled));
isSlotBorrowingEnabled,
priorityBasedEnabled));
}

private static RateLimiterPayload makePayload(
boolean isEnabled,
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
boolean isSlotBorrowingEnabled,
boolean priorityBasedEnabled) {
RateLimiterPayload ret = new RateLimiterPayload();
ret.enabled = isEnabled;
ret.allowedRequests = allowedRequests;
ret.guaranteedSlots = guaranteedSlotsThreshold;
ret.slotBorrowingEnabled = isSlotBorrowingEnabled;
ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition);
ret.priorityBasedEnabled = priorityBasedEnabled;
return ret;
}

Expand All @@ -98,6 +103,9 @@ public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPay
? DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS
: definition.slotAcquisitionTimeoutInMS.longValue();

priorityBasedEnabled =
definition.priorityBasedEnabled == null ? false : definition.priorityBasedEnabled;

this.definition = definition;
}

Expand Down Expand Up @@ -125,6 +133,7 @@ public String toString() {
sb.append(", guaranteedSlots=").append(guaranteedSlotsThreshold);
sb.append(", borrowEnabled=").append(isSlotBorrowingEnabled);
sb.append(", waitForSlotMillis=").append(waitForSlotAcquisition);
sb.append(", priorityBasedEnabled=").append(priorityBasedEnabled);
return sb.append('}').toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.apache.solr.servlet;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.core.RateLimiterConfig;

/**
* PriorityBasedRateLimiter allocates the slot based on their request priority Currently, it has two
* priorities FOREGROUND and BACKGROUND Requests. Client can pass the {@link
* org.apache.solr.common.params.CommonParams} SOLR_REQUEST_TYPE_PARAM request header to indicate
* the foreground and background request. Foreground requests has high priority than background
* requests
*/
public class PriorityBasedRateLimiter extends RequestRateLimiter {
private final AtomicInteger priorityOneRequests = new AtomicInteger();
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
private final String[] priorities;
private final Semaphore numRequestsAllowed;

private final int totalAllowedRequests;

private final LinkedBlockingQueue<CountDownLatch> waitingList = new LinkedBlockingQueue<>();

private final long waitTimeoutInMillis;

public PriorityBasedRateLimiter(RateLimiterConfig rateLimiterConfig) {
super(rateLimiterConfig);
this.priorities =
new String[] {
SolrRequest.RequestPriorities.FOREGROUND.name(),
SolrRequest.RequestPriorities.BACKGROUND.name()
};
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
this.numRequestsAllowed = new Semaphore(rateLimiterConfig.allowedRequests, true);
this.totalAllowedRequests = rateLimiterConfig.allowedRequests;
this.waitTimeoutInMillis = rateLimiterConfig.waitForSlotAcquisition;
}

@Override
public SlotReservation handleRequest(String requestPriority) {
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
if (!rateLimiterConfig.isEnabled) {
return UNLIMITED;
}
try {
if (!acquire(requestPriority)) {
return null;
}
} catch (InterruptedException ie) {
return null;
}
return () -> PriorityBasedRateLimiter.this.release(requestPriority);
}

private boolean acquire(String priority) throws InterruptedException {
if (priority.equals(this.priorities[0])) {
return nextInQueue();
} else if (priority.equals(this.priorities[1])) {
if (this.priorityOneRequests.get() < this.totalAllowedRequests) {
return nextInQueue();
} else {
CountDownLatch wait = new CountDownLatch(1);
this.waitingList.put(wait);
return wait.await(this.waitTimeoutInMillis, TimeUnit.MILLISECONDS) && nextInQueue();
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
}
}
return true;
}

private boolean nextInQueue() throws InterruptedException {
boolean acquired =
this.numRequestsAllowed.tryAcquire(1, this.waitTimeoutInMillis, TimeUnit.MILLISECONDS);
if (!acquired) {
return false;
}
this.priorityOneRequests.addAndGet(1);
return true;
}

private void exitFromQueue() {
this.numRequestsAllowed.release(1);
this.priorityOneRequests.addAndGet(-1);
}

private void release(String priority) {
if (this.priorities[0].equals(priority) || this.priorities[1].equals(priority)) {
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
if (this.priorityOneRequests.get() > this.totalAllowedRequests) {
// priority one request is waiting, let's inform it
this.exitFromQueue();
} else {
// next priority
CountDownLatch waiter = this.waitingList.poll();
if (waiter != null) {
waiter.countDown();
}
this.exitFromQueue();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition here that could leak CountDownLatch instances into the waitingList. (because we check priorityOneRequests count before inserting into waitingList, it's possible that by the time we insert, another thread would have decremented priorityOneRequests and polled the waitingList).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can sync release/acquire method but I was avoiding this.

  1. As each release call will poll and then then notify the any waiting request.
  2. Also release call will make it decrements the ActiveRequests call.

I'm assuming this is enough to cover any race condition, but let me know if i'm missing something here ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's definitely a race condition here.

      if (this.activeRequests.get() < this.totalAllowedRequests) {
        return nextInQueue();
      } else {
        // RACE CONDITION SOMWHERE IN HERE
        CountDownLatch wait = new CountDownLatch(1);
        this.waitingList.put(wait);
        return wait.await(this.waitTimeoutInMillis, TimeUnit.MILLISECONDS) && nextInQueue();

It is possible for a ton of BACKGROUND requests to come in find activeRequests full, then have all (or some) of the activeRequests complete, and poll waitingList before the BACKGROUND requests have been added to it.

I think it's an edge case, but it's definitely there, and could yield unpredictable behavior.

}
}

@Override
public SlotReservation allowSlotBorrowing() throws InterruptedException {
// if we reach here that means slot is not available
hiteshk25 marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

public int getRequestsAllowed() {
return this.priorityOneRequests.get();
}
}
60 changes: 5 additions & 55 deletions solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,12 @@

package org.apache.solr.servlet;

import static org.apache.solr.core.RateLimiterConfig.RL_CONFIG_KEY;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.RateLimiterConfig;
import org.apache.solr.util.SolrJacksonAnnotationInspector;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

/**
* Implementation of RequestRateLimiter specific to query request types. Most of the actual work is
Expand All @@ -39,26 +31,17 @@
public class QueryRateLimiter extends RequestRateLimiter {
private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();

public QueryRateLimiter(SolrZkClient solrZkClient) {
super(constructQueryRateLimiterConfig(solrZkClient));
}

public QueryRateLimiter(RateLimiterConfig config) {
super(config);
}

public static RateLimiterConfig processConfigChange(
SolrRequest.SolrRequestType requestType,
RateLimiterConfig rateLimiterConfig,
Map<String, Object> properties)
throws IOException {
byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY));
RateLimiterConfig rateLimiterConfig, RateLimiterPayload rateLimiterMeta) throws IOException {

RateLimiterPayload rateLimiterMeta;
if (configInput == null || configInput.length == 0) {
rateLimiterMeta = null;
} else {
rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);
// default rate limiter
SolrRequest.SolrRequestType requestType = SolrRequest.SolrRequestType.QUERY;
if (rateLimiterMeta.priorityBasedEnabled) {
requestType = SolrRequest.SolrRequestType.PRIORITY_BASED;
}

if (rateLimiterConfig == null || rateLimiterConfig.shouldUpdate(rateLimiterMeta)) {
Expand All @@ -68,37 +51,4 @@ public static RateLimiterConfig processConfigChange(
return null;
}
}

// To be used in initialization
@SuppressWarnings({"unchecked"})
private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) {
try {

if (zkClient == null) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
}

Map<String, Object> clusterPropsJson =
(Map<String, Object>)
Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY));

if (configInput.length == 0) {
// No Rate Limiter configuration defined in clusterprops.json. Return default configuration
// values
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
}

RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);

return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY, rateLimiterMeta);
} catch (KeeperException.NoNodeException e) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(
"Error reading cluster property", SolrZkClient.checkInterrupted(e));
} catch (IOException e) {
throw new RuntimeException("Encountered an IOException " + e.getMessage());
}
}
}
Loading
Loading