Skip to content

Commit

Permalink
porting multiple rate limiters to main
Browse files Browse the repository at this point in the history
  • Loading branch information
noblepaul committed Aug 5, 2024
1 parent ddfe2f1 commit b0876c0
Show file tree
Hide file tree
Showing 7 changed files with 690 additions and 153 deletions.
93 changes: 76 additions & 17 deletions solr/core/src/java/org/apache/solr/core/RateLimiterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,26 @@
import static org.apache.solr.servlet.RateLimitManager.DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS;

import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;

public class RateLimiterConfig {
public static final String RL_CONFIG_KEY = "rate-limiters";

public SolrRequest.SolrRequestType requestType;
public boolean isEnabled;
public long waitForSlotAcquisition;
public int allowedRequests;
public boolean isSlotBorrowingEnabled;
public int guaranteedSlotsThreshold;
public final SolrRequest.SolrRequestType requestType;
public final boolean isEnabled;
public final long waitForSlotAcquisition;
public final int allowedRequests;
public final boolean isSlotBorrowingEnabled;
public final int guaranteedSlotsThreshold;

/**
* We store the config definition in order to determine whether anything has changed that would
* call for re-initialization.
*/
public final RateLimiterPayload definition;

public RateLimiterConfig(SolrRequest.SolrRequestType requestType) {
this.requestType = requestType;
this.isEnabled = false;
this.allowedRequests = DEFAULT_CONCURRENT_REQUESTS;
this.isSlotBorrowingEnabled = false;
this.guaranteedSlotsThreshold = this.allowedRequests / 2;
this.waitForSlotAcquisition = DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS;
this(requestType, EMPTY);
}

public RateLimiterConfig(
Expand All @@ -48,11 +50,68 @@ public RateLimiterConfig(
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
this(
requestType,
makePayload(
isEnabled,
guaranteedSlotsThreshold,
waitForSlotAcquisition,
allowedRequests,
isSlotBorrowingEnabled));
}

private static RateLimiterPayload makePayload(
boolean isEnabled,
int guaranteedSlotsThreshold,
long waitForSlotAcquisition,
int allowedRequests,
boolean isSlotBorrowingEnabled) {
RateLimiterPayload ret = new RateLimiterPayload();
ret.enabled = isEnabled;
ret.allowedRequests = allowedRequests;
ret.guaranteedSlots = guaranteedSlotsThreshold;
ret.slotBorrowingEnabled = isSlotBorrowingEnabled;
ret.slotAcquisitionTimeoutInMS = Math.toIntExact(waitForSlotAcquisition);
return ret;
}

public RateLimiterConfig(SolrRequest.SolrRequestType requestType, RateLimiterPayload definition) {
this.requestType = requestType;
this.isEnabled = isEnabled;
this.guaranteedSlotsThreshold = guaranteedSlotsThreshold;
this.waitForSlotAcquisition = waitForSlotAcquisition;
this.allowedRequests = allowedRequests;
this.isSlotBorrowingEnabled = isSlotBorrowingEnabled;
if (definition == null) {
definition = EMPTY;
}
allowedRequests =
definition.allowedRequests == null
? DEFAULT_CONCURRENT_REQUESTS
: definition.allowedRequests;

isEnabled = definition.enabled == null ? false : definition.enabled; // disabled by default

guaranteedSlotsThreshold =
definition.guaranteedSlots == null ? this.allowedRequests / 2 : definition.guaranteedSlots;

isSlotBorrowingEnabled =
definition.slotBorrowingEnabled == null ? false : definition.slotBorrowingEnabled;

waitForSlotAcquisition =
definition.slotAcquisitionTimeoutInMS == null
? DEFAULT_SLOT_ACQUISITION_TIMEOUT_MS
: definition.slotAcquisitionTimeoutInMS.longValue();

this.definition = definition;
}

private static final RateLimiterPayload EMPTY = new RateLimiterPayload(); // use defaults;

public boolean shouldUpdate(RateLimiterPayload definition) {
if (definition == null) {
definition = EMPTY; // use defaults
}

if (definition.equals(this.definition)) {
return false;
}

return true;
}
}
105 changes: 55 additions & 50 deletions solr/core/src/java/org/apache/solr/servlet/QueryRateLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.request.beans.RateLimiterPayload;
Expand All @@ -31,60 +34,65 @@
import org.apache.solr.util.SolrJacksonAnnotationInspector;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of RequestRateLimiter specific to query request types. Most of the actual work is
* delegated to the parent class but specific configurations and parsing are handled by this class.
*/
public class QueryRateLimiter extends RequestRateLimiter {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final ObjectMapper mapper = SolrJacksonAnnotationInspector.createObjectMapper();
private static QueryRateLimiter DEFAULT =
new QueryRateLimiter(new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY), null);

public QueryRateLimiter(SolrZkClient solrZkClient) {
super(constructQueryRateLimiterConfig(solrZkClient));
public QueryRateLimiter(RateLimiterConfig config, byte[] configData) {
super(config);
this.configData = configData;
}

public void processConfigChange(Map<String, Object> properties) throws IOException {
RateLimiterConfig rateLimiterConfig = getRateLimiterConfig();
public static RateLimiterConfig processConfigChange(
SolrRequest.SolrRequestType requestType,
RateLimiterConfig rateLimiterConfig,
Map<String, Object> properties)
throws IOException {
byte[] configInput = Utils.toJSON(properties.get(RL_CONFIG_KEY));

RateLimiterPayload rateLimiterMeta;
if (configInput == null || configInput.length == 0) {
return;
rateLimiterMeta = null;
} else {
rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);
rateLimiterMeta.configBytes = configInput;
}

RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);

constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig);
if (rateLimiterConfig == null || rateLimiterConfig.shouldUpdate(rateLimiterMeta)) {
// no prior config, or config has changed; return the new config
return new RateLimiterConfig(requestType, rateLimiterMeta);
} else {
return null;
}
}

// To be used in initialization
@SuppressWarnings({"unchecked"})
private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zkClient) {
public static Map<SolrRequest.SolrRequestType, QueryRateLimiter> read(SolrZkClient zkClient) {
try {

if (zkClient == null) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
return Map.of(SolrRequest.SolrRequestType.QUERY, DEFAULT);
}

RateLimiterConfig rateLimiterConfig =
new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
Map<String, Object> clusterPropsJson =
(Map<String, Object>)
Object obj =
Utils.fromJSON(zkClient.getData(ZkStateReader.CLUSTER_PROPS, null, new Stat(), true));
byte[] configInput = Utils.toJSON(clusterPropsJson.get(RL_CONFIG_KEY));

if (configInput.length == 0) {
// No Rate Limiter configuration defined in clusterprops.json. Return default configuration
// values
return rateLimiterConfig;
if (obj == null) return Map.of(SolrRequest.SolrRequestType.QUERY, DEFAULT);
else {
return parseRateLimiterConfig((Map<String, Object>) obj);
}

RateLimiterPayload rateLimiterMeta = mapper.readValue(configInput, RateLimiterPayload.class);

constructQueryRateLimiterConfigInternal(rateLimiterMeta, rateLimiterConfig);

return rateLimiterConfig;
} catch (KeeperException.NoNodeException e) {
return new RateLimiterConfig(SolrRequest.SolrRequestType.QUERY);
return Map.of(SolrRequest.SolrRequestType.QUERY, DEFAULT);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(
"Error reading cluster property", SolrZkClient.checkInterrupted(e));
Expand All @@ -93,33 +101,30 @@ private static RateLimiterConfig constructQueryRateLimiterConfig(SolrZkClient zk
}
}

private static void constructQueryRateLimiterConfigInternal(
RateLimiterPayload rateLimiterMeta, RateLimiterConfig rateLimiterConfig) {

if (rateLimiterMeta == null) {
// No Rate limiter configuration defined in clusterprops.json
return;
public static Map<SolrRequest.SolrRequestType, QueryRateLimiter> parseRateLimiterConfig(
Map<String, Object> cfg) throws IOException {
Object obj = cfg.get(RL_CONFIG_KEY);
if (obj == null) return Map.of(SolrRequest.SolrRequestType.QUERY, DEFAULT);
Map<SolrRequest.SolrRequestType, QueryRateLimiter> result = new HashMap<>();
List<Map<String, Object>> list = null;
if (obj instanceof List) {
list = (List<Map<String, Object>>) obj;
} else {
list = List.of((Map<String, Object>) obj);
}

if (rateLimiterMeta.allowedRequests != null) {
rateLimiterConfig.allowedRequests = rateLimiterMeta.allowedRequests.intValue();
if (list.isEmpty()) return Map.of(SolrRequest.SolrRequestType.QUERY, DEFAULT);
for (Map<String, Object> rl : list) {
byte[] cfgData = Utils.toJSON(rl);
RateLimiterPayload rateLimiterPayload = mapper.readValue(cfgData, RateLimiterPayload.class);
QueryRateLimiter qrl =
new QueryRateLimiter(
new RateLimiterConfig(
SolrRequest.SolrRequestType.parse(rateLimiterPayload.type), rateLimiterPayload),
cfgData);
result.put(qrl.getRateLimiterConfig().requestType, qrl);
}

if (rateLimiterMeta.enabled != null) {
rateLimiterConfig.isEnabled = rateLimiterMeta.enabled;
}

if (rateLimiterMeta.guaranteedSlots != null) {
rateLimiterConfig.guaranteedSlotsThreshold = rateLimiterMeta.guaranteedSlots;
}

if (rateLimiterMeta.slotBorrowingEnabled != null) {
rateLimiterConfig.isSlotBorrowingEnabled = rateLimiterMeta.slotBorrowingEnabled;
}

if (rateLimiterMeta.slotAcquisitionTimeoutInMS != null) {
rateLimiterConfig.waitForSlotAcquisition =
rateLimiterMeta.slotAcquisitionTimeoutInMS.longValue();
}
return result;
}
}
Loading

0 comments on commit b0876c0

Please sign in to comment.