diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 8f6e2c07e..597f441ec 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -6,8 +6,12 @@ package org.opensearch.flint.core.http; import static java.time.temporal.ChronoUnit.SECONDS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES; import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; import dev.failsafe.event.ExecutionAttemptedEvent; import dev.failsafe.function.CheckedPredicate; import java.time.Duration; @@ -16,6 +20,7 @@ import java.util.logging.Logger; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; +import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -65,7 +70,7 @@ public boolean isRetryEnabled() { * @return Failsafe retry policy */ public RetryPolicy getRetryPolicy() { - return RetryPolicy.builder() + RetryPolicyBuilder builder = RetryPolicy.builder() // Backoff strategy config (can be configurable as needed in future) .withBackoff(1, 30, SECONDS) .withJitter(Duration.ofMillis(100)) @@ -75,8 +80,11 @@ public RetryPolicy getRetryPolicy() { .handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) - .onRetry(FlintRetryOptions::onRetry) - .build(); + .onRetry(FlintRetryOptions::onRetry); + if (SERVICE_NAME_AOSS.equals(getServiceName())) { + builder.handleResultIf(new HttpAOSSResultPredicate<>()); + } + return builder.build(); } public RetryPolicy getBulkRetryPolicy(CheckedPredicate resultPredicate) { @@ -101,6 +109,10 @@ private static void onRetry(ExecutionAttemptedEvent event) { LOG.warning("Retrying failed request at #" + event.getAttemptCount()); } + private String getServiceName() { + return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES); + } + /** * @return maximum retry option value */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java new file mode 100644 index 000000000..8bfb05fa3 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.util.EntityUtils; + +import java.util.logging.Logger; + +/** + * Failure handler based on HTTP response from AOSS. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpAOSSResultPredicate implements CheckedPredicate { + + private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName()); + + public static final int BAD_REQUEST_STATUS_CODE = 400; + public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception"; + + public HttpAOSSResultPredicate() { } + + @Override + public boolean test(T result) throws Throwable { + LOG.info("Checking if response is retryable"); + + int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode(); + if (statusCode != BAD_REQUEST_STATUS_CODE) { + LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false"); + return false; + } + + HttpResponse response = (HttpResponse) result; + HttpEntity entity = response.getEntity(); + if (entity == null) { + LOG.info("No response entity found. Check result: false"); + return false; + } + + // Buffer the entity to make it repeatable, so that this retry test does not consume the content stream, + // resulting in the request caller getting empty response + BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); + response.setEntity(bufferedEntity); + + try { + String responseContent = EntityUtils.toString(bufferedEntity); + // Effectively restores the content stream of the response + bufferedEntity.getContent().reset(); + + boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE); + + LOG.info("Check retryable response result: " + isRetryable); + return isRetryable; + } catch (Exception e) { + LOG.info("Unable to parse response body. Check result: false"); + return false; + } + } +} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index 7d3b79a9e..8a8927920 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future} import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.apache.http.HttpEntity import org.apache.http.HttpResponse import org.apache.http.concurrent.FutureCallback import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder} import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer} import org.apache.http.protocol.HttpContext +import org.apache.http.util.EntityUtils import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.verification.VerificationMode @@ -153,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with expectFutureGetTimes = times(0)) } + it should "retry if AOSS response is retryable" in { + retryableClient + .withOption("auth.servicename", "aoss") + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) + } + + it should "not apply retry policy for AOSS response if service is not AOSS" in { + retryableClient + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(1)) + } + private def retryableClient: AssertionHelper = new AssertionHelper class AssertionHelper { @@ -175,6 +194,17 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with this } + def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = { + val entity = mock[HttpEntity](RETURNS_DEEP_STUBS) + mockStatic(classOf[EntityUtils]) + when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage) + val response = mock[HttpResponse](RETURNS_DEEP_STUBS) + when(response.getStatusLine.getStatusCode).thenReturn(statusCode) + when(response.getEntity).thenReturn(entity) + when(future.get()).thenReturn(response) + this + } + def shouldExecute(expectExecuteTimes: VerificationMode): Unit = { shouldExecute(expectExecuteTimes, expectExecuteTimes) }