Skip to content

Commit

Permalink
Add retryable status code option and handler
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Nov 21, 2023
1 parent 612f619 commit ceb56a7
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
import static java.util.logging.Level.SEVERE;

import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import dev.failsafe.function.CheckedPredicate;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.http.HttpResponse;

/**
* Flint options related to HTTP request retry.
Expand All @@ -39,6 +39,9 @@ public class FlintRetryOptions {
public static final int DEFAULT_MAX_RETRIES = 3;
public static final String MAX_RETRIES = "retry.max_retries";

public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502";
public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes";

/**
* Retryable exception class name
*/
Expand All @@ -56,43 +59,54 @@ public FlintRetryOptions(Map<String, String> options) {
*/
public <T> RetryPolicy<T> getRetryPolicy() {
LOG.info("Building HTTP request retry policy with retry options: " + this);
RetryPolicyBuilder<T> builder =
RetryPolicy.<T>builder()
.withMaxRetries(getMaxRetries())
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
// .handleResultIf(resp -> ((HttpResponse<T>) resp).statusCode() == 200)
.onFailedAttempt(ex ->
LOG.log(SEVERE, "Attempt to execute request failed", ex.getLastException()))
.onRetry(ex ->
LOG.warning("Retrying failed request at #" + ex.getAttemptCount()));

// Add optional retryable exception handler
if (options.containsKey(RETRYABLE_EXCEPTION_CLASS_NAMES)) {
builder.handleIf(isRetryableException());
} else {
// By default, Failsafe handles any Exception
builder.handleIf(ex -> false);
}
return builder.build();
return RetryPolicy.<T>builder()
.withMaxRetries(getMaxRetries())
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
.handleIf(getRetryableExceptionHandler())
.handleResultIf(getRetryableResultHandler())
.onFailedAttempt(ex ->
LOG.log(SEVERE, "Attempt to execute request failed", ex.getLastException()))
.onRetry(ex ->
LOG.warning("Retrying failed request at #" + ex.getAttemptCount())).build();
}

private int getMaxRetries() {
return Integer.parseInt(
options.getOrDefault(MAX_RETRIES, String.valueOf(DEFAULT_MAX_RETRIES)));
}

private CheckedPredicate<? extends Throwable> isRetryableException() {
private <T> CheckedPredicate<T> getRetryableResultHandler() {
Set<Integer> retryableStatusCodes =
Arrays.stream(
options.getOrDefault(
RETRYABLE_HTTP_STATUS_CODES,
DEFAULT_RETRYABLE_HTTP_STATUS_CODES).split(","))
.map(String::trim)
.map(Integer::valueOf)
.collect(Collectors.toSet());

return result -> retryableStatusCodes.contains(
((HttpResponse) result).getStatusLine().getStatusCode());
}

private CheckedPredicate<? extends Throwable> getRetryableExceptionHandler() {
// By default, Failsafe handles any Exception
String exceptionClassNames = options.get(RETRYABLE_EXCEPTION_CLASS_NAMES);
if (exceptionClassNames == null || exceptionClassNames.isEmpty()) {
return ex -> false;
}

// Use weak collection avoids blocking class unloading
Set<Class<? extends Throwable>> retryableExceptions = newSetFromMap(new WeakHashMap<>());
Arrays.stream(options.get(RETRYABLE_EXCEPTION_CLASS_NAMES).split(","))
Arrays.stream(exceptionClassNames.split(","))
.map(String::trim)
.map(this::loadClass)
.forEach(retryableExceptions::add);

// Consider retryable if found anywhere on error stacktrace
// Consider retryable if exception found anywhere on stacktrace.
// Meanwhile, handle nested exception to avoid dead loop by seen hash set.
return throwable -> {
// Handle nested exception to avoid dead loop
Set<Throwable> seen = new HashSet<>();
while (throwable != null && seen.add(throwable)) {
for (Class<? extends Throwable> retryable : retryableExceptions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
reset(internalClient, future)
}

ignore should "retry with configured max attempt count" in {
retryableClient
.withOption("retry.max_retries", "1")
.whenThrow(new ConnectException)
.shouldExecute(times(2))
}

ignore should "retry if response code is on the retryable status code list" in {
Seq(new SocketTimeoutException).foreach { ex =>
it should "retry if response code is on the retryable status code list" in {
Seq(429, 502).foreach { statusCode =>
retryableClient
.withOption("retry.exception_class_names", "java.net.SocketTimeoutException")
.whenThrow(ex)
.whenStatusCode(statusCode)
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}
}

it should "not retry if response code is not on the retryable status code list" in {
retryableClient
.whenStatusCode(400)
.shouldExecute(times(1))
}

it should "retry if exception is on the retryable exception list" in {
Seq(new ConnectException, new SocketTimeoutException).foreach { ex =>
retryableClient
Expand All @@ -85,6 +83,13 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
.shouldExecute(times(1))
}

it should "retry with configured max attempt count" in {
retryableClient
.withOption("retry.max_retries", "1")
.whenStatusCode(429)
.shouldExecute(times(2))
}

private def retryableClient: AssertionHelper = new AssertionHelper

class AssertionHelper {
Expand All @@ -100,22 +105,32 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
this
}

def whenStatusCode(statusCode: Int): AssertionHelper = {
val response = mock[HttpResponse](RETURNS_DEEP_STUBS)
when(response.getStatusLine.getStatusCode).thenReturn(statusCode)
when(future.get()).thenReturn(response)
this
}

def shouldExecute(expectTimes: VerificationMode): Unit = {
val client =
new RetryableHttpAsyncClient(internalClient, new FlintOptions(options))

assertThrows[ExecutionException] {
try {
client.execute(null, null, null, null).get()
} catch {
case _: Throwable => // Ignore because we're testing error case
} finally {
verify(internalClient, expectTimes)
.execute(
any[HttpAsyncRequestProducer],
any[HttpAsyncResponseConsumer[HttpResponse]],
any[HttpContext],
any[FutureCallback[HttpResponse]])

reset(future)
clearInvocations(internalClient)
}
verify(internalClient, expectTimes)
.execute(
any[HttpAsyncRequestProducer],
any[HttpAsyncResponseConsumer[HttpResponse]],
any[HttpContext],
any[FutureCallback[HttpResponse]])

reset(future)
clearInvocations(internalClient)
}
}
}

0 comments on commit ceb56a7

Please sign in to comment.