diff --git a/docs/index.md b/docs/index.md index 82c147de2..5dbd02eb5 100644 --- a/docs/index.md +++ b/docs/index.md @@ -538,6 +538,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. - `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway). +- `spark.datasource.flint.retry.http_response_messages`: retryable HTTP response message list. default value is "resource_already_exists_exception". - `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown. - `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 8f6e2c07e..c8731b7c2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -16,6 +16,8 @@ import java.util.logging.Logger; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; +import org.opensearch.flint.core.http.handler.HttpResponseMessageResultPredicate; +import org.opensearch.flint.core.http.handler.HttpResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -40,6 +42,9 @@ public class FlintRetryOptions implements Serializable { public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502"; public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes"; + public static final String DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES = "resource_already_exists_exception"; + public static final String RETRYABLE_HTTP_RESPONSE_MESSAGES = "retry.http_response_messages"; + /** * Retryable exception class name */ @@ -72,7 +77,10 @@ public RetryPolicy getRetryPolicy() { // Failure handling config from Flint options .withMaxRetries(getMaxRetries()) .handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames())) - .handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) + .handleResultIf( + new HttpResultPredicate<>( + new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()), + new HttpResponseMessageResultPredicate<>(getRetryableHttpResponseMessages()))) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) .onRetry(FlintRetryOptions::onRetry) @@ -116,6 +124,13 @@ public String getRetryableHttpStatusCodes() { return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES); } + /** + * @return retryable HTTP response message list + */ + public String getRetryableHttpResponseMessages() { + return options.getOrDefault(RETRYABLE_HTTP_RESPONSE_MESSAGES, DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES); + } + /** * @return retryable exception class name list */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java new file mode 100644 index 000000000..86d21ac5c --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.util.EntityUtils; + +import java.util.Arrays; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Failure handler based on content in HTTP response. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpResponseMessageResultPredicate implements CheckedPredicate { + + private static final Logger LOG = Logger.getLogger(HttpResponseMessageResultPredicate.class.getName()); + + /** + * Retryable HTTP response message list + */ + private final Set retryableResponseMessages; + + public HttpResponseMessageResultPredicate(String messages) { + this.retryableResponseMessages = + Arrays.stream(messages.split(",")) + .map(String::trim) + .collect(Collectors.toSet()); + } + + @Override + public boolean test(T result) throws Throwable { + HttpResponse response = (HttpResponse) result; + HttpEntity entity = response.getEntity(); + if (entity != null) { + // Buffer the entity so it can be read multiple times + BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); + response.setEntity(bufferedEntity); + + try { + String responseContent = EntityUtils.toString(entity); + LOG.info("Checking if response message is retryable"); + + boolean isRetryable = retryableResponseMessages.stream() + .anyMatch(responseContent::contains); + + LOG.info("Check retryable result: " + isRetryable); + + // Reset the entity's content + bufferedEntity.getContent().reset(); + + return isRetryable; + } catch (Exception e) { + LOG.info("Unable to parse response body. Retryable result: false"); + return false; + } + } + LOG.info("No response entity found. Retryable result: false"); + return false; + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java new file mode 100644 index 000000000..0a59a9c0a --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; + +/** + * Failure handler based on HTTP response. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpResultPredicate implements CheckedPredicate { + + private final HttpStatusCodeResultPredicate statusCodePredicate; + private final HttpResponseMessageResultPredicate responseMessagePredicate; + + public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpResponseMessageResultPredicate responseMessagePredicate) { + this.statusCodePredicate = statusCodePredicate; + this.responseMessagePredicate = responseMessagePredicate; + } + + @Override + public boolean test(T result) throws Throwable { + return statusCodePredicate.test(result) || responseMessagePredicate.test(result); + } +} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index 7d3b79a9e..42f6f64a1 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future} import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.apache.http.HttpEntity import org.apache.http.HttpResponse import org.apache.http.concurrent.FutureCallback import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder} import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer} import org.apache.http.protocol.HttpContext +import org.apache.http.util.EntityUtils import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.verification.VerificationMode @@ -81,6 +83,13 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with } } + it should "retry if response message contains retryable message" in { + retryableClient + .whenResponseMessage( + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) + } + it should "not retry if response code is not on the retryable status code list" in { retryableClient .whenStatusCode(400) @@ -175,6 +184,16 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with this } + def whenResponseMessage(responseMessage: String): AssertionHelper = { + val entity = mock[HttpEntity](RETURNS_DEEP_STUBS) + mockStatic(classOf[EntityUtils]) + when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage) + val response = mock[HttpResponse](RETURNS_DEEP_STUBS) + when(response.getEntity).thenReturn(entity) + when(future.get()).thenReturn(response) + this + } + def shouldExecute(expectExecuteTimes: VerificationMode): Unit = { shouldExecute(expectExecuteTimes, expectExecuteTimes) } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index bdcc120c0..fa41cfb4d 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -148,6 +148,12 @@ object FlintSparkConf { .doc("retryable HTTP response status code list") .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES) + val RETRYABLE_HTTP_RESPONSE_MESSAGES = + FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_RESPONSE_MESSAGES}") + .datasourceOption() + .doc("retryable HTTP response message list") + .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES) + val RETRYABLE_EXCEPTION_CLASS_NAMES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}") .datasourceOption() @@ -333,6 +339,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable AUTH, MAX_RETRIES, RETRYABLE_HTTP_STATUS_CODES, + RETRYABLE_HTTP_RESPONSE_MESSAGES, BULK_REQUEST_RATE_LIMIT_PER_NODE, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 0cde6ab0f..569bbb90f 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -47,6 +47,7 @@ class FlintSparkConfSuite extends FlintSuite { val retryOptions = FlintSparkConf().flintOptions().getRetryOptions retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES + retryOptions.getRetryableHttpResponseMessages shouldBe DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty } @@ -55,12 +56,14 @@ class FlintSparkConfSuite extends FlintSuite { Map( "retry.max_retries" -> "5", "retry.http_status_codes" -> "429,502,503,504", + "retry.http_response_messages" -> "message1,message2", "retry.exception_class_names" -> "java.net.ConnectException").asJava) .flintOptions() .getRetryOptions retryOptions.getMaxRetries shouldBe 5 retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504" + retryOptions.getRetryableHttpResponseMessages shouldBe "message1,message2" retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException" }