Skip to content

Commit

Permalink
Separate failure and result handler class
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 22, 2023
1 parent 948aea6 commit fdf09cf
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,15 @@
package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Collections.newSetFromMap;
import static java.util.logging.Level.SEVERE;

import dev.failsafe.RetryPolicy;
import dev.failsafe.function.CheckedPredicate;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.http.HttpResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameHandler;
import org.opensearch.flint.core.http.handler.HttpStatusCodeHandler;

/**
* Flint options related to HTTP request retry.
Expand Down Expand Up @@ -74,8 +68,8 @@ public <T> RetryPolicy<T> getRetryPolicy() {
.withJitter(Duration.ofMillis(100))
// Failure handling config from Flint options
.withMaxRetries(getMaxRetries())
.handleIf(getRetryableExceptionHandler())
.handleResultIf(getRetryableResultHandler())
.handleIf(ExceptionClassNameHandler.create(getRetryableExceptionClassNames()))
.handleResultIf(new HttpStatusCodeHandler<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(ex ->
LOG.log(SEVERE, "Attempt to execute request failed", ex.getLastException()))
Expand Down Expand Up @@ -105,56 +99,6 @@ public Optional<String> getRetryableExceptionClassNames() {
return Optional.ofNullable(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES));
}

private <T> CheckedPredicate<T> getRetryableResultHandler() {
Set<Integer> retryableStatusCodes =
Arrays.stream(getRetryableHttpStatusCodes().split(","))
.map(String::trim)
.map(Integer::valueOf)
.collect(Collectors.toSet());

return result -> retryableStatusCodes.contains(
((HttpResponse) result).getStatusLine().getStatusCode());
}

private CheckedPredicate<? extends Throwable> getRetryableExceptionHandler() {
// By default, Failsafe handles any Exception
Optional<String> exceptionClassNames = getRetryableExceptionClassNames();
if (exceptionClassNames.isEmpty()) {
return ex -> false;
}

// Use weak collection avoids blocking class unloading
Set<Class<? extends Throwable>> retryableExceptions = newSetFromMap(new WeakHashMap<>());
Arrays.stream(exceptionClassNames.get().split(","))
.map(String::trim)
.map(this::loadClass)
.forEach(retryableExceptions::add);

// Consider retryable if exception found anywhere on stacktrace.
// Meanwhile, handle nested exception to avoid dead loop by seen hash set.
return throwable -> {
Set<Throwable> seen = new HashSet<>();
while (throwable != null && seen.add(throwable)) {
for (Class<? extends Throwable> retryable : retryableExceptions) {
if (retryable.isInstance(throwable)) {
return true;
}
}
throwable = throwable.getCause();
}
return false;
};
}

private Class<? extends Throwable> loadClass(String className) {
try {
//noinspection unchecked
return (Class<? extends Throwable>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Failed to load class " + className, e);
}
}

@Override
public String toString() {
return "FlintRetryOptions{" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import static java.util.Collections.newSetFromMap;

import dev.failsafe.function.CheckedPredicate;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;

/**
* Failure handler based on exception class type check.
*/
public class ExceptionClassNameHandler implements CheckedPredicate<Throwable> {

/**
* Retryable exception class types.
*/
private final Set<Class<? extends Throwable>> retryableExceptions;

/**
* @return exception class handler or empty handler (treat any exception non-retryable)
*/
public static CheckedPredicate<? extends Throwable> create(
Optional<String> exceptionClassNames) {
// By default, Failsafe handles any Exception
if (exceptionClassNames.isEmpty()) {
return ex -> false;
}
return new ExceptionClassNameHandler(exceptionClassNames.get());
}

public ExceptionClassNameHandler(String exceptionClassNames) {
// Use weak collection avoids blocking class unloading
this.retryableExceptions = newSetFromMap(new WeakHashMap<>());
Arrays.stream(exceptionClassNames.split(","))
.map(String::trim)
.map(this::loadClass)
.forEach(retryableExceptions::add);
}

@Override
public boolean test(Throwable throwable) throws Throwable {
// Consider retryable if exception found anywhere on stacktrace.
// Meanwhile, handle nested exception to avoid dead loop by seen hash set.
Set<Throwable> seen = new HashSet<>();
while (throwable != null && seen.add(throwable)) {
for (Class<? extends Throwable> retryable : retryableExceptions) {
if (retryable.isInstance(throwable)) {
return true;
}
}
throwable = throwable.getCause();
}
return false;
}

private Class<? extends Throwable> loadClass(String className) {
try {
//noinspection unchecked
return (Class<? extends Throwable>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Failed to load class " + className, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.http.HttpResponse;

/**
* Failure handler based on status code in HTTP response.
*
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpStatusCodeHandler<T> implements CheckedPredicate<T> {

/**
* Retryable HTTP status code list
*/
private final String[] httpStatusCodes;

public HttpStatusCodeHandler(String httpStatusCodes) {
this.httpStatusCodes = httpStatusCodes.split(",");
}

@Override
public boolean test(T result) throws Throwable {
Set<Integer> retryableStatusCodes =
Arrays.stream(httpStatusCodes)
.map(String::trim)
.map(Integer::valueOf)
.collect(Collectors.toSet());

return retryableStatusCodes.contains(
((HttpResponse) result).getStatusLine().getStatusCode());
}
}

0 comments on commit fdf09cf

Please sign in to comment.