forked from opensearch-project/opensearch-spark
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add backoff retry capability in rest client (opensearch-project#170)
* Add retry http client, builder and future Signed-off-by: Chen Dai <[email protected]> * Add UT Signed-off-by: Chen Dai <[email protected]> * Add more UT Signed-off-by: Chen Dai <[email protected]> * Delete IT Signed-off-by: Chen Dai <[email protected]> * Refactor UT Signed-off-by: Chen Dai <[email protected]> * Replace class name check with instanceOf check Signed-off-by: Chen Dai <[email protected]> * Make retryable exception list optional Signed-off-by: Chen Dai <[email protected]> * Add retryable status code option and handler Signed-off-by: Chen Dai <[email protected]> * Add retry enabled check Signed-off-by: Chen Dai <[email protected]> * Add Spark conf and user manual Signed-off-by: Chen Dai <[email protected]> * Add exception class name to Spark conf Signed-off-by: Chen Dai <[email protected]> * Separate failure and result handler class Signed-off-by: Chen Dai <[email protected]> * Refactor failure and result predicate Signed-off-by: Chen Dai <[email protected]> * Add more UT Signed-off-by: Chen Dai <[email protected]> * Reword user manual Signed-off-by: Chen Dai <[email protected]> --------- Signed-off-by: Chen Dai <[email protected]>
- Loading branch information
Showing
12 changed files
with
727 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
110 changes: 110 additions & 0 deletions
110
flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.http; | ||
|
||
import static java.time.temporal.ChronoUnit.SECONDS; | ||
|
||
import dev.failsafe.RetryPolicy; | ||
import java.time.Duration; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.logging.Logger; | ||
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; | ||
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; | ||
|
||
/** | ||
* Flint options related to HTTP request retry. | ||
*/ | ||
public class FlintRetryOptions { | ||
|
||
private static final Logger LOG = Logger.getLogger(FlintRetryOptions.class.getName()); | ||
|
||
/** | ||
* All Flint options. | ||
*/ | ||
private final Map<String, String> options; | ||
|
||
/** | ||
* Maximum retry attempt | ||
*/ | ||
public static final int DEFAULT_MAX_RETRIES = 3; | ||
public static final String MAX_RETRIES = "retry.max_retries"; | ||
|
||
public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502"; | ||
public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes"; | ||
|
||
/** | ||
* Retryable exception class name | ||
*/ | ||
public static final String RETRYABLE_EXCEPTION_CLASS_NAMES = "retry.exception_class_names"; | ||
|
||
public FlintRetryOptions(Map<String, String> options) { | ||
this.options = options; | ||
} | ||
|
||
/** | ||
* Is auto retry capability enabled. | ||
* | ||
* @return true if enabled, otherwise false. | ||
*/ | ||
public boolean isRetryEnabled() { | ||
return getMaxRetries() > 0; | ||
} | ||
|
||
/** | ||
* Build retry policy based on the given Flint options. | ||
* | ||
* @param <T> success execution result type | ||
* @return Failsafe retry policy | ||
*/ | ||
public <T> RetryPolicy<T> getRetryPolicy() { | ||
return RetryPolicy.<T>builder() | ||
// Backoff strategy config (can be configurable as needed in future) | ||
.withBackoff(1, 30, SECONDS) | ||
.withJitter(Duration.ofMillis(100)) | ||
// Failure handling config from Flint options | ||
.withMaxRetries(getMaxRetries()) | ||
.handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames())) | ||
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) | ||
// Logging listener | ||
.onFailedAttempt(event -> | ||
LOG.severe("Attempt to execute request failed: " + event)) | ||
.onRetry(ex -> | ||
LOG.warning("Retrying failed request at #" + ex.getAttemptCount())) | ||
.build(); | ||
} | ||
|
||
/** | ||
* @return maximum retry option value | ||
*/ | ||
public int getMaxRetries() { | ||
return Integer.parseInt( | ||
options.getOrDefault(MAX_RETRIES, String.valueOf(DEFAULT_MAX_RETRIES))); | ||
} | ||
|
||
/** | ||
* @return retryable HTTP status code list | ||
*/ | ||
public String getRetryableHttpStatusCodes() { | ||
return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES); | ||
} | ||
|
||
/** | ||
* @return retryable exception class name list | ||
*/ | ||
public Optional<String> getRetryableExceptionClassNames() { | ||
return Optional.ofNullable(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES)); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "FlintRetryOptions{" + | ||
"maxRetries=" + getMaxRetries() + | ||
", retryableStatusCodes=" + getRetryableHttpStatusCodes() + | ||
", retryableExceptionClassNames=" + getRetryableExceptionClassNames() + | ||
'}'; | ||
} | ||
} |
143 changes: 143 additions & 0 deletions
143
flint-core/src/main/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.http; | ||
|
||
import dev.failsafe.Failsafe; | ||
import dev.failsafe.FailsafeException; | ||
import java.io.IOException; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.logging.Logger; | ||
import org.apache.http.concurrent.FutureCallback; | ||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; | ||
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; | ||
import org.apache.http.nio.protocol.HttpAsyncRequestProducer; | ||
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; | ||
import org.apache.http.protocol.HttpContext; | ||
import org.opensearch.flint.core.FlintOptions; | ||
|
||
/** | ||
* HTTP client that retries request to tolerant transient fault. | ||
*/ | ||
public class RetryableHttpAsyncClient extends CloseableHttpAsyncClient { | ||
|
||
private static final Logger LOG = Logger.getLogger(RetryableHttpAsyncClient.class.getName()); | ||
|
||
/** | ||
* Delegated internal HTTP client that execute the request underlying. | ||
*/ | ||
private final CloseableHttpAsyncClient internalClient; | ||
|
||
/** | ||
* Flint retry options. | ||
*/ | ||
private final FlintRetryOptions options; | ||
|
||
public RetryableHttpAsyncClient(CloseableHttpAsyncClient internalClient, | ||
FlintRetryOptions options) { | ||
this.internalClient = internalClient; | ||
this.options = options; | ||
} | ||
|
||
@Override | ||
public boolean isRunning() { | ||
return internalClient.isRunning(); | ||
} | ||
|
||
@Override | ||
public void start() { | ||
internalClient.start(); | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
internalClient.close(); | ||
} | ||
|
||
@Override | ||
public <T> Future<T> execute(HttpAsyncRequestProducer requestProducer, | ||
HttpAsyncResponseConsumer<T> responseConsumer, | ||
HttpContext context, | ||
FutureCallback<T> callback) { | ||
return new Future<>() { | ||
/** | ||
* Delegated future object created per doExecuteAndFutureGetWithRetry() call which creates initial object too. | ||
* In this way, we avoid the duplicate logic of first call and subsequent retry calls. | ||
* Here the assumption is cancel, isCancelled and isDone never called before get(). | ||
* (OpenSearch RestClient seems only call get() API) | ||
*/ | ||
private Future<T> delegate; | ||
|
||
@Override | ||
public boolean cancel(boolean mayInterruptIfRunning) { | ||
return delegate.cancel(mayInterruptIfRunning); | ||
} | ||
|
||
@Override | ||
public boolean isCancelled() { | ||
return delegate.isCancelled(); | ||
} | ||
|
||
@Override | ||
public boolean isDone() { | ||
return delegate.isDone(); | ||
} | ||
|
||
@Override | ||
public T get() throws InterruptedException, ExecutionException { | ||
return doExecuteAndFutureGetWithRetry(() -> delegate.get()); | ||
} | ||
|
||
@Override | ||
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException { | ||
return doExecuteAndFutureGetWithRetry(() -> delegate.get(timeout, unit)); | ||
} | ||
|
||
private T doExecuteAndFutureGetWithRetry(Callable<T> futureGet) throws InterruptedException, ExecutionException { | ||
try { | ||
// Retry by creating a new Future object (as new delegate) and get its result again | ||
return Failsafe | ||
.with(options.getRetryPolicy()) | ||
.get(() -> { | ||
this.delegate = internalClient.execute(requestProducer, responseConsumer, context, callback); | ||
return futureGet.call(); | ||
}); | ||
} catch (FailsafeException ex) { | ||
LOG.severe("Request failed permanently. Re-throwing original exception."); | ||
|
||
// Failsafe will wrap checked exception, such as ExecutionException | ||
// So here we have to unwrap failsafe exception and rethrow it | ||
Throwable cause = ex.getCause(); | ||
if (cause instanceof InterruptedException) { | ||
throw (InterruptedException) cause; | ||
} else if (cause instanceof ExecutionException) { | ||
throw (ExecutionException) cause; | ||
} else { | ||
throw ex; | ||
} | ||
} | ||
} | ||
}; | ||
} | ||
|
||
public static HttpAsyncClientBuilder builder(HttpAsyncClientBuilder delegate, FlintOptions options) { | ||
FlintRetryOptions retryOptions = options.getRetryOptions(); | ||
if (!retryOptions.isRetryEnabled()) { | ||
return delegate; | ||
} | ||
|
||
// Wrap original builder so created client will be wrapped by retryable client too | ||
return new HttpAsyncClientBuilder() { | ||
@Override | ||
public CloseableHttpAsyncClient build() { | ||
LOG.info("Building retryable http async client with options: " + retryOptions); | ||
return new RetryableHttpAsyncClient(delegate.build(), retryOptions); | ||
} | ||
}; | ||
} | ||
} |
47 changes: 47 additions & 0 deletions
47
...rc/main/scala/org/opensearch/flint/core/http/handler/ErrorStacktraceFailurePredicate.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.flint.core.http.handler; | ||
|
||
import dev.failsafe.function.CheckedPredicate; | ||
import java.util.HashSet; | ||
import java.util.Set; | ||
import java.util.logging.Logger; | ||
|
||
/** | ||
* Failure predicate that determines if retryable based on error stacktrace iteration. | ||
*/ | ||
public abstract class ErrorStacktraceFailurePredicate implements CheckedPredicate<Throwable> { | ||
|
||
private static final Logger LOG = Logger.getLogger(ErrorStacktraceFailurePredicate.class.getName()); | ||
|
||
/** | ||
* This base class implementation iterates the stacktrace and pass each exception | ||
* to subclass for retryable decision. | ||
*/ | ||
@Override | ||
public boolean test(Throwable throwable) throws Throwable { | ||
// Use extra set to Handle nested exception to avoid dead loop | ||
Set<Throwable> seen = new HashSet<>(); | ||
|
||
while (throwable != null && seen.add(throwable)) { | ||
LOG.info("Checking if exception retryable: " + throwable); | ||
|
||
if (isRetryable(throwable)) { | ||
LOG.info("Exception is retryable: " + throwable); | ||
return true; | ||
} | ||
throwable = throwable.getCause(); | ||
} | ||
|
||
LOG.info("No retryable exception found on the stacktrace"); | ||
return false; | ||
} | ||
|
||
/** | ||
* Is exception retryable decided by subclass implementation | ||
*/ | ||
protected abstract boolean isRetryable(Throwable throwable); | ||
} |
Oops, something went wrong.