diff --git a/docs/index.md b/docs/index.md index 5dbd02eb5..82c147de2 100644 --- a/docs/index.md +++ b/docs/index.md @@ -538,7 +538,6 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. - `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway). -- `spark.datasource.flint.retry.http_response_messages`: retryable HTTP response message list. default value is "resource_already_exists_exception". - `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown. - `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 818ebe9d1..0def1e3d8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -6,8 +6,12 @@ package org.opensearch.flint.core.http; import static java.time.temporal.ChronoUnit.SECONDS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES; import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; import dev.failsafe.event.ExecutionAttemptedEvent; import dev.failsafe.function.CheckedPredicate; import java.time.Duration; @@ -17,7 +21,6 @@ import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate; -import org.opensearch.flint.core.http.handler.HttpResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -67,21 +70,21 @@ public boolean isRetryEnabled() { * @return Failsafe retry policy */ public RetryPolicy getRetryPolicy() { - return RetryPolicy.builder() + RetryPolicyBuilder builder = RetryPolicy.builder() // Backoff strategy config (can be configurable as needed in future) .withBackoff(1, 30, SECONDS) .withJitter(Duration.ofMillis(100)) // Failure handling config from Flint options .withMaxRetries(getMaxRetries()) .handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames())) - .handleResultIf( - new HttpResultPredicate<>( - new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()), - new HttpAOSSResultPredicate<>())) + .handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) - .onRetry(FlintRetryOptions::onRetry) - .build(); + .onRetry(FlintRetryOptions::onRetry); + if (getServiceName() == SERVICE_NAME_AOSS) { + builder.handleResultIf(new HttpAOSSResultPredicate<>()); + } + return builder.build(); } public RetryPolicy getBulkRetryPolicy(CheckedPredicate resultPredicate) { @@ -106,6 +109,10 @@ private static void onRetry(ExecutionAttemptedEvent event) { LOG.warning("Retrying failed request at #" + event.getAttemptCount()); } + private String getServiceName() { + return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES); + } + /** * @return maximum retry option value */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java index 06b957725..5c2ba3651 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java @@ -11,10 +11,7 @@ import org.apache.http.entity.BufferedHttpEntity; import org.apache.http.util.EntityUtils; -import java.util.Arrays; -import java.util.Set; import java.util.logging.Logger; -import java.util.stream.Collectors; /** * Failure handler based on HTTP response from AOSS. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java deleted file mode 100644 index a991acf0a..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.http.handler; - -import dev.failsafe.function.CheckedPredicate; - -/** - * Failure handler based on HTTP response. - * - * @param result type (supposed to be HttpResponse for OS client) - */ -public class HttpResultPredicate implements CheckedPredicate { - - private final HttpStatusCodeResultPredicate statusCodePredicate; - private final HttpAOSSResultPredicate responseMessagePredicate; - - public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpAOSSResultPredicate responseMessagePredicate) { - this.statusCodePredicate = statusCodePredicate; - this.responseMessagePredicate = responseMessagePredicate; - } - - @Override - public boolean test(T result) throws Throwable { - return statusCodePredicate.test(result) || responseMessagePredicate.test(result); - } -} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index a1dd9dfb3..8a8927920 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -83,14 +83,6 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with } } - it should "retry if response message contains retryable message" in { - retryableClient - .whenResponse( - 400, - "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") - .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) - } - it should "not retry if response code is not on the retryable status code list" in { retryableClient .whenStatusCode(400) @@ -163,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with expectFutureGetTimes = times(0)) } + it should "retry if AOSS response is retryable" in { + retryableClient + .withOption("auth.servicename", "aoss") + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) + } + + it should "not apply retry policy for AOSS response if service is not AOSS" in { + retryableClient + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(1)) + } + private def retryableClient: AssertionHelper = new AssertionHelper class AssertionHelper {