Skip to content

Commit

Permalink
Add basic thorttler/exponential backoff policy for retry/Defination o… (
Browse files Browse the repository at this point in the history
#3527)

* Add basic thorttler/exponential backoff policy for retry/Defination of throttling exception

Signed-off-by: Dhwanil Patel <[email protected]>

* Incorporated comments

Signed-off-by: Dhwanil Patel <[email protected]>

* Incorporated minor comments

Signed-off-by: Dhwanil Patel <[email protected]>

* Removed overall thorttlingEnabled flag from Throttler

Signed-off-by: Dhwanil Patel <[email protected]>

* Corrected Java doc for Throttler

Signed-off-by: Dhwanil Patel <[email protected]>

* Incorporated comments

Signed-off-by: Dhwanil Patel <[email protected]>

* Changed the default behaviour of Throttler to return Optional

Signed-off-by: Dhwanil Patel <[email protected]>

* Removed generics from Throttler and used String as key

Signed-off-by: Dhwanil Patel <[email protected]>

* Ignore backport / autocut / dependabot branches for gradle checks on push

Signed-off-by: Peter Zhu <[email protected]>

Co-authored-by: Peter Zhu <[email protected]>
  • Loading branch information
dhwanilpatel and peterzhuamazon authored Jul 11, 2022
1 parent 6c543fa commit b8db5bf
Show file tree
Hide file tree
Showing 11 changed files with 445 additions and 81 deletions.
9 changes: 9 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,15 @@ private enum OpenSearchExceptionHandle {
org.opensearch.indices.replication.common.ReplicationFailedException::new,
161,
V_2_1_0
),
/**
* TODO: Change the version number of check as per version in which this change will be merged.
*/
MASTER_TASK_THROTTLED_EXCEPTION(
org.opensearch.cluster.service.MasterTaskThrottlingException.class,
org.opensearch.cluster.service.MasterTaskThrottlingException::new,
162,
Version.V_3_0_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
73 changes: 73 additions & 0 deletions server/src/main/java/org/opensearch/action/bulk/BackoffPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.action.bulk;

import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;

import java.util.Iterator;
Expand Down Expand Up @@ -105,6 +106,19 @@ public static BackoffPolicy exponentialBackoff(TimeValue initialDelay, int maxNu
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
}

/**
* It provides exponential backoff between retries until it reaches maxDelayForRetry.
* It uses equal jitter scheme as it is being used for throttled exceptions.
* It will make random distribution and also guarantees a minimum delay.
*
* @param baseDelay BaseDelay for exponential Backoff
* @param maxDelayForRetry MaxDelay that can be returned from backoff policy
* @return A backoff policy with exponential backoff with equal jitter which can't return delay more than given max delay
*/
public static BackoffPolicy exponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
return new ExponentialEqualJitterBackoff(baseDelay, maxDelayForRetry);
}

/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
Expand Down Expand Up @@ -197,6 +211,65 @@ public TimeValue next() {
}
}

private static class ExponentialEqualJitterBackoff extends BackoffPolicy {
private final int maxDelayForRetry;
private final int baseDelay;

private ExponentialEqualJitterBackoff(int baseDelay, int maxDelayForRetry) {
this.maxDelayForRetry = maxDelayForRetry;
this.baseDelay = baseDelay;
}

@Override
public Iterator<TimeValue> iterator() {
return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry);
}
}

private static class ExponentialEqualJitterBackoffIterator implements Iterator<TimeValue> {
/**
* Retry limit to avoids integer overflow issues.
* Post this limit, max delay will be returned with Equal Jitter.
*
* NOTE: If the value is greater than 30, there can be integer overflow
* issues during delay calculation.
**/
private final int RETRIES_TILL_JITTER_INCREASE = 30;

/**
* Exponential increase in delay will happen till it reaches maxDelayForRetry.
* Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only
* and not increase the delay.
*/
private final int maxDelayForRetry;
private final int baseDelay;
private int retriesAttempted;

private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) {
this.baseDelay = baseDelay;
this.maxDelayForRetry = maxDelayForRetry;
}

/**
* There is not any limit for this BackOff.
* This Iterator will always return back off delay.
*
* @return true
*/
@Override
public boolean hasNext() {
return true;
}

@Override
public TimeValue next() {
int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE);
int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry);
retriesAttempted++;
return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1));
}
}

/**
* Concrete Constant Back Off Policy
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.client.Client;
import org.opensearch.client.IndicesAdminClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.AdjustableSemaphore;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -53,8 +54,6 @@
import org.opensearch.index.Index;
import org.opensearch.index.mapper.Mapping;

import java.util.concurrent.Semaphore;

/**
* Called by shards in the cluster when their mapping was dynamically updated and it needs to be updated
* in the cluster state meta data (and broadcast to all members).
Expand Down Expand Up @@ -170,35 +169,4 @@ private static RuntimeException unwrapEsException(OpenSearchException esEx) {
}
return new UncategorizedExecutionException("Failed execution", root);
}

/**
* An adjustable semaphore
*
* @opensearch.internal
*/
static class AdjustableSemaphore extends Semaphore {

private final Object maxPermitsMutex = new Object();
private int maxPermits;

AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}

maxPermits = permits;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;

import java.io.IOException;

/**
* Exception raised from master node due to task throttling.
*/
public class MasterTaskThrottlingException extends OpenSearchException {

public MasterTaskThrottlingException(String msg, Object... args) {
super(msg, args);
}

public MasterTaskThrottlingException(StreamInput in) throws IOException {
super(in);
}
}
93 changes: 93 additions & 0 deletions server/src/main/java/org/opensearch/cluster/service/Throttler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.common.AdjustableSemaphore;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Base class for Throttling logic.
* It provides throttling functionality over multiple keys.
*/
public class Throttler {
protected ConcurrentMap<String, AdjustableSemaphore> semaphores = new ConcurrentHashMap<String, AdjustableSemaphore>();

/**
* Method to acquire permits for a key type.
* It will return true if permits can be acquired within threshold limits else false.
*
* If Throttler is not configured for the key then it will return Optional.empty().
* calling function need to handle this for determining the default behavior.
*
* @param key Key for which we want to acquire permits.
* @param permits Number of permits to acquire.
* @return Optional(Boolean) True/False - Throttler is configured for key and is able to acquire the permits or not
* Optional.empty() - Throttler is not configured for key
*/
public Optional<Boolean> acquire(final String key, final int permits) {
assert permits > 0;
AdjustableSemaphore semaphore = semaphores.get(key);
if (semaphore != null) {
return Optional.of(semaphore.tryAcquire(permits));
}
return Optional.empty();
}

/**
* Release the given permits for given type.
*
* @param key key for which we want to release permits.
* @param permits number of permits to release.
*/
public void release(final String key, final int permits) {
assert permits > 0;
AdjustableSemaphore semaphore = semaphores.get(key);
if (semaphore != null) {
semaphore.release(permits);
assert semaphore.availablePermits() <= semaphore.getMaxPermits();
}
}

/**
* Update the Threshold for throttling for given type.
*
* @param key Key for which we want to update limit.
* @param newLimit Updated limit.
*/
public void updateThrottlingLimit(final String key, final Integer newLimit) {
assert newLimit >= 0;
AdjustableSemaphore semaphore = semaphores.get(key);
if (semaphore == null) {
semaphore = semaphores.computeIfAbsent(key, k -> new AdjustableSemaphore(newLimit, true));
}
semaphore.setMaxPermits(newLimit);
}

/**
* Remove the threshold for given key.
* Throttler will no longer do throttling for given key.
*
* @param key Key for which we want to remove throttling.
*/
public void removeThrottlingLimit(final String key) {
assert semaphores.containsKey(key);
semaphores.remove(key);
}

public Integer getThrottlingLimit(final String key) {
AdjustableSemaphore semaphore = semaphores.get(key);
if (semaphore != null) {
return semaphore.getMaxPermits();
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import java.util.concurrent.Semaphore;

/**
* AdjustableSemaphore is Extended Semphore where we can change maxPermits.
*/
public class AdjustableSemaphore extends Semaphore {
private final Object maxPermitsMutex = new Object();
private int maxPermits;

public AdjustableSemaphore(int maxPermits, boolean fair) {
super(maxPermits, fair);
this.maxPermits = maxPermits;
}

/**
* Update the maxPermits in semaphore
*/
public void setMaxPermits(int permits) {
synchronized (maxPermitsMutex) {
final int diff = Math.subtractExact(permits, maxPermits);
if (diff > 0) {
// add permits
release(diff);
} else if (diff < 0) {
// remove permits
reducePermits(Math.negateExact(diff));
}
maxPermits = permits;
}
}

/**
* Returns maxPermits.
*/
public int getMaxPermits() {
return maxPermits;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.TestShardRouting;
import org.opensearch.cluster.service.MasterTaskThrottlingException;
import org.opensearch.common.ParsingException;
import org.opensearch.common.Strings;
import org.opensearch.common.UUIDs;
Expand Down Expand Up @@ -851,6 +852,7 @@ public void testIds() {
ids.put(159, NodeHealthCheckFailureException.class);
ids.put(160, NoSeedNodeLeftException.class);
ids.put(161, ReplicationFailedException.class);
ids.put(162, MasterTaskThrottlingException.class);

Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,27 @@ public void testWrapBackoffPolicy() {
assertEquals(expectedRetries, retries.get());
}
}

public void testEqualJitterExponentialBackOffPolicy() {
int baseDelay = 10;
int maxDelay = 10000;
BackoffPolicy policy = BackoffPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay);
Iterator<TimeValue> iterator = policy.iterator();

// Assert equal jitter
int retriesTillMaxDelay = 10;
for (int i = 0; i < retriesTillMaxDelay; i++) {
TimeValue delay = iterator.next();
assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2);
assertTrue(delay.getMillis() <= baseDelay * (1L << i));
}

// Now policy should return max delay for next retries.
int retriesAfterMaxDelay = randomInt(10);
for (int i = 0; i < retriesAfterMaxDelay; i++) {
TimeValue delay = iterator.next();
assertTrue(delay.getMillis() >= maxDelay / 2);
assertTrue(delay.getMillis() <= maxDelay);
}
}
}
Loading

0 comments on commit b8db5bf

Please sign in to comment.