Skip to content

Commit

Permalink
add handler for aoss only
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Dec 4, 2024
1 parent 4212dac commit 309bcd7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES;

import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedPredicate;
import java.time.Duration;
Expand All @@ -17,7 +21,6 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate;
import org.opensearch.flint.core.http.handler.HttpResultPredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

Expand All @@ -33,6 +36,8 @@ public class FlintRetryOptions implements Serializable {
*/
private final Map<String, String> options;

private final boolean isAOSS;

/**
* Maximum retry attempt
*/
Expand All @@ -49,6 +54,7 @@ public class FlintRetryOptions implements Serializable {

public FlintRetryOptions(Map<String, String> options) {
this.options = options;
this.isAOSS = options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES) == SERVICE_NAME_AOSS;
}

/**
Expand All @@ -67,21 +73,21 @@ public boolean isRetryEnabled() {
* @return Failsafe retry policy
*/
public <T> RetryPolicy<T> getRetryPolicy() {
return RetryPolicy.<T>builder()
RetryPolicyBuilder<T> builder = RetryPolicy.<T>builder()
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
// Failure handling config from Flint options
.withMaxRetries(getMaxRetries())
.handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames()))
.handleResultIf(
new HttpResultPredicate<>(
new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()),
new HttpAOSSResultPredicate<>()))
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(FlintRetryOptions::onFailure)
.onRetry(FlintRetryOptions::onRetry)
.build();
.onRetry(FlintRetryOptions::onRetry);
if (isAOSS) {
builder.handleResultIf(new HttpAOSSResultPredicate<>());
}
return builder.build();
}

public RetryPolicy<BulkResponse> getBulkRetryPolicy(CheckedPredicate<BulkResponse> resultPredicate) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
}
}

it should "retry if response message contains retryable message" in {
retryableClient
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not retry if response code is not on the retryable status code list" in {
retryableClient
.whenStatusCode(400)
Expand Down Expand Up @@ -163,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
expectFutureGetTimes = times(0))
}

it should "retry if AOSS response is retryable" in {
retryableClient
.withOption("auth.servicename", "aoss")
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not apply retry policy for AOSS response if service is not AOSS" in {
retryableClient
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(1))
}

private def retryableClient: AssertionHelper = new AssertionHelper

class AssertionHelper {
Expand Down

0 comments on commit 309bcd7

Please sign in to comment.