Skip to content

Commit

Permalink
Replace class name check with instanceOf check
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 21, 2023
1 parent d1ecfd6 commit 9d5c7b2
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@
package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Collections.newSetFromMap;
import static java.util.logging.Level.SEVERE;

import dev.failsafe.RetryPolicy;
import dev.failsafe.function.CheckedPredicate;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* Flint options related to HTTP request retry.
Expand All @@ -40,6 +39,8 @@ public class FlintRetryOptions {
/**
* Retryable exception class name
*/
public static final String DEFAULT_RETRYABLE_EXCEPTION_CLASS_NAMES =
"java.net.ConnectException";
public static final String RETRYABLE_EXCEPTION_CLASS_NAMES = "retry.exception_class_names";

public FlintRetryOptions(Map<String, String> options) {
Expand Down Expand Up @@ -68,32 +69,42 @@ private int getMaxRetries() {
options.getOrDefault(MAX_RETRIES, String.valueOf(DEFAULT_MAX_RETRIES)));
}

private String getRetryableExceptionClassNames() {
return options.getOrDefault(
RETRYABLE_EXCEPTION_CLASS_NAMES,
DEFAULT_RETRYABLE_EXCEPTION_CLASS_NAMES);
}

private CheckedPredicate<? extends Throwable> isRetryableException() {
// Populate configured exception class name from options
Set<String> retryableClassNames;
if (options.containsKey(RETRYABLE_EXCEPTION_CLASS_NAMES)) {
retryableClassNames =
Arrays.stream(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES).split(","))
.map(String::trim)
.collect(Collectors.toSet());
} else {
retryableClassNames = new HashSet<>();
// Use weak collection avoids blocking class unloading
Set<Class<? extends Throwable>> retryableExceptions = newSetFromMap(new WeakHashMap<>());
String[] optClassNames = getRetryableExceptionClassNames().split(",");
for (String className : optClassNames) {
retryableExceptions.add(loadClass(className.trim()));
}

// Add default retryable exception class names
retryableClassNames.add(ConnectException.class.getName());

// Consider retryable if found anywhere on error stacktrace
return throwable -> {
// Handle nested exception to avoid dead loop
Set<Throwable> seen = new HashSet<>();
while (throwable != null && seen.add(throwable)) {
if (retryableClassNames.contains(throwable.getClass().getName())) {
return true;
for (Class<? extends Throwable> retryable : retryableExceptions) {
if (retryable.isInstance(throwable)) {
return true;
}
}
throwable = throwable.getCause();
}
return false;
};
}

private Class<? extends Throwable> loadClass(String className) {
try {
//noinspection unchecked
return (Class<? extends Throwable>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Failed to load class " + className, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public <T> Future<T> execute(HttpAsyncRequestProducer requestProducer,
HttpContext context,
FutureCallback<T> callback) {
return new Future<>() {
/** Delegate future object created per execution */
private Future<T> delegate;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
val internalClient: CloseableHttpAsyncClient = mock[CloseableHttpAsyncClient]
val future: Future[HttpResponse] = mock[Future[HttpResponse]]

/** Retryable client being tested */
// var retryableClient: CloseableHttpAsyncClient = _

behavior of "Retryable HTTP async client"

before {
Expand Down Expand Up @@ -76,8 +73,7 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
}

it should "retry if exception is configured in Flint options" in {
// Should not impact built-in exception class name list
Seq(new ConnectException, new SocketTimeoutException).foreach { ex =>
Seq(new SocketTimeoutException).foreach { ex =>
retryableClient
.withOption("retry.exception_class_names", "java.net.SocketTimeoutException")
.whenThrow(ex)
Expand Down

0 comments on commit 9d5c7b2

Please sign in to comment.