From 91492eca70beec9c916662d08f339b2a2629a05f Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 26 Nov 2024 17:13:14 -0800 Subject: [PATCH 1/5] add retryable response message Signed-off-by: Sean Kao --- docs/index.md | 1 + .../flint/core/http/FlintRetryOptions.java | 17 ++++- .../HttpResponseMessageResultPredicate.java | 70 +++++++++++++++++++ .../http/handler/HttpResultPredicate.java | 29 ++++++++ .../http/RetryableHttpAsyncClientSuite.scala | 19 +++++ .../sql/flint/config/FlintSparkConf.scala | 7 ++ .../flint/config/FlintSparkConfSuite.scala | 3 + 7 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java diff --git a/docs/index.md b/docs/index.md index abc801bde..33e57c401 100644 --- a/docs/index.md +++ b/docs/index.md @@ -538,6 +538,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. - `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway). +- `spark.datasource.flint.retry.http_response_messages`: retryable HTTP response message list. default value is "resource_already_exists_exception". - `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown. - `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 8f6e2c07e..c8731b7c2 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -16,6 +16,8 @@ import java.util.logging.Logger; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; +import org.opensearch.flint.core.http.handler.HttpResponseMessageResultPredicate; +import org.opensearch.flint.core.http.handler.HttpResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -40,6 +42,9 @@ public class FlintRetryOptions implements Serializable { public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502"; public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes"; + public static final String DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES = "resource_already_exists_exception"; + public static final String RETRYABLE_HTTP_RESPONSE_MESSAGES = "retry.http_response_messages"; + /** * Retryable exception class name */ @@ -72,7 +77,10 @@ public RetryPolicy getRetryPolicy() { // Failure handling config from Flint options .withMaxRetries(getMaxRetries()) .handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames())) - .handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) + .handleResultIf( + new HttpResultPredicate<>( + new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()), + new HttpResponseMessageResultPredicate<>(getRetryableHttpResponseMessages()))) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) .onRetry(FlintRetryOptions::onRetry) @@ -116,6 +124,13 @@ public String getRetryableHttpStatusCodes() { return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES); } + /** + * @return retryable HTTP response message list + */ + public String getRetryableHttpResponseMessages() { + return options.getOrDefault(RETRYABLE_HTTP_RESPONSE_MESSAGES, DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES); + } + /** * @return retryable exception class name list */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java new file mode 100644 index 000000000..86d21ac5c --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.util.EntityUtils; + +import java.util.Arrays; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Failure handler based on content in HTTP response. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpResponseMessageResultPredicate implements CheckedPredicate { + + private static final Logger LOG = Logger.getLogger(HttpResponseMessageResultPredicate.class.getName()); + + /** + * Retryable HTTP response message list + */ + private final Set retryableResponseMessages; + + public HttpResponseMessageResultPredicate(String messages) { + this.retryableResponseMessages = + Arrays.stream(messages.split(",")) + .map(String::trim) + .collect(Collectors.toSet()); + } + + @Override + public boolean test(T result) throws Throwable { + HttpResponse response = (HttpResponse) result; + HttpEntity entity = response.getEntity(); + if (entity != null) { + // Buffer the entity so it can be read multiple times + BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); + response.setEntity(bufferedEntity); + + try { + String responseContent = EntityUtils.toString(entity); + LOG.info("Checking if response message is retryable"); + + boolean isRetryable = retryableResponseMessages.stream() + .anyMatch(responseContent::contains); + + LOG.info("Check retryable result: " + isRetryable); + + // Reset the entity's content + bufferedEntity.getContent().reset(); + + return isRetryable; + } catch (Exception e) { + LOG.info("Unable to parse response body. Retryable result: false"); + return false; + } + } + LOG.info("No response entity found. Retryable result: false"); + return false; + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java new file mode 100644 index 000000000..0a59a9c0a --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; + +/** + * Failure handler based on HTTP response. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpResultPredicate implements CheckedPredicate { + + private final HttpStatusCodeResultPredicate statusCodePredicate; + private final HttpResponseMessageResultPredicate responseMessagePredicate; + + public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpResponseMessageResultPredicate responseMessagePredicate) { + this.statusCodePredicate = statusCodePredicate; + this.responseMessagePredicate = responseMessagePredicate; + } + + @Override + public boolean test(T result) throws Throwable { + return statusCodePredicate.test(result) || responseMessagePredicate.test(result); + } +} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index 7d3b79a9e..42f6f64a1 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future} import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.apache.http.HttpEntity import org.apache.http.HttpResponse import org.apache.http.concurrent.FutureCallback import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder} import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer} import org.apache.http.protocol.HttpContext +import org.apache.http.util.EntityUtils import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ import org.mockito.verification.VerificationMode @@ -81,6 +83,13 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with } } + it should "retry if response message contains retryable message" in { + retryableClient + .whenResponseMessage( + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) + } + it should "not retry if response code is not on the retryable status code list" in { retryableClient .whenStatusCode(400) @@ -175,6 +184,16 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with this } + def whenResponseMessage(responseMessage: String): AssertionHelper = { + val entity = mock[HttpEntity](RETURNS_DEEP_STUBS) + mockStatic(classOf[EntityUtils]) + when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage) + val response = mock[HttpResponse](RETURNS_DEEP_STUBS) + when(response.getEntity).thenReturn(entity) + when(future.get()).thenReturn(response) + this + } + def shouldExecute(expectExecuteTimes: VerificationMode): Unit = { shouldExecute(expectExecuteTimes, expectExecuteTimes) } diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 364a8a1de..538476091 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -148,6 +148,12 @@ object FlintSparkConf { .doc("retryable HTTP response status code list") .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES) + val RETRYABLE_HTTP_RESPONSE_MESSAGES = + FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_RESPONSE_MESSAGES}") + .datasourceOption() + .doc("retryable HTTP response message list") + .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES) + val RETRYABLE_EXCEPTION_CLASS_NAMES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}") .datasourceOption() @@ -338,6 +344,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable AUTH, MAX_RETRIES, RETRYABLE_HTTP_STATUS_CODES, + RETRYABLE_HTTP_RESPONSE_MESSAGES, BULK_REQUEST_RATE_LIMIT_PER_NODE, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 594322bae..298e51224 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -47,6 +47,7 @@ class FlintSparkConfSuite extends FlintSuite { val retryOptions = FlintSparkConf().flintOptions().getRetryOptions retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES + retryOptions.getRetryableHttpResponseMessages shouldBe DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty } @@ -55,12 +56,14 @@ class FlintSparkConfSuite extends FlintSuite { Map( "retry.max_retries" -> "5", "retry.http_status_codes" -> "429,502,503,504", + "retry.http_response_messages" -> "message1,message2", "retry.exception_class_names" -> "java.net.ConnectException").asJava) .flintOptions() .getRetryOptions retryOptions.getMaxRetries shouldBe 5 retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504" + retryOptions.getRetryableHttpResponseMessages shouldBe "message1,message2" retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException" } From aec283652c94d6b47c0a96c8e2d8daaa86f802af Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 4 Dec 2024 15:03:14 -0800 Subject: [PATCH 2/5] check retryable response only if 400 Signed-off-by: Sean Kao --- .../flint/core/http/FlintRetryOptions.java | 14 +--- .../http/handler/HttpAOSSResultPredicate.java | 68 ++++++++++++++++++ .../HttpResponseMessageResultPredicate.java | 70 ------------------- .../http/handler/HttpResultPredicate.java | 4 +- .../http/RetryableHttpAsyncClientSuite.scala | 6 +- .../sql/flint/config/FlintSparkConf.scala | 7 -- .../flint/config/FlintSparkConfSuite.scala | 3 - 7 files changed, 76 insertions(+), 96 deletions(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java delete mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index c8731b7c2..818ebe9d1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -16,7 +16,7 @@ import java.util.logging.Logger; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; -import org.opensearch.flint.core.http.handler.HttpResponseMessageResultPredicate; +import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate; import org.opensearch.flint.core.http.handler.HttpResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -42,9 +42,6 @@ public class FlintRetryOptions implements Serializable { public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502"; public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes"; - public static final String DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES = "resource_already_exists_exception"; - public static final String RETRYABLE_HTTP_RESPONSE_MESSAGES = "retry.http_response_messages"; - /** * Retryable exception class name */ @@ -80,7 +77,7 @@ public RetryPolicy getRetryPolicy() { .handleResultIf( new HttpResultPredicate<>( new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()), - new HttpResponseMessageResultPredicate<>(getRetryableHttpResponseMessages()))) + new HttpAOSSResultPredicate<>())) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) .onRetry(FlintRetryOptions::onRetry) @@ -124,13 +121,6 @@ public String getRetryableHttpStatusCodes() { return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES); } - /** - * @return retryable HTTP response message list - */ - public String getRetryableHttpResponseMessages() { - return options.getOrDefault(RETRYABLE_HTTP_RESPONSE_MESSAGES, DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES); - } - /** * @return retryable exception class name list */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java new file mode 100644 index 000000000..06b957725 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.util.EntityUtils; + +import java.util.Arrays; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Failure handler based on HTTP response from AOSS. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpAOSSResultPredicate implements CheckedPredicate { + + private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName()); + + public static final int BAD_REQUEST_STATUS_CODE = 400; + public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception"; + + public HttpAOSSResultPredicate() { } + + @Override + public boolean test(T result) throws Throwable { + LOG.info("Checking if response is retryable"); + + int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode(); + if (statusCode != BAD_REQUEST_STATUS_CODE) { + LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false"); + return false; + } + + HttpResponse response = (HttpResponse) result; + HttpEntity entity = response.getEntity(); + if (entity == null) { + LOG.info("No response entity found. Check result: false"); + return false; + } + + // Buffer the entity to make it repeatable + BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); + response.setEntity(bufferedEntity); + + try { + String responseContent = EntityUtils.toString(bufferedEntity); + // Reset the entity's content + bufferedEntity.getContent().reset(); + + boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE); + + LOG.info("Check retryable response result: " + isRetryable); + return isRetryable; + } catch (Exception e) { + LOG.info("Unable to parse response body. Check result: false"); + return false; + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java deleted file mode 100644 index 86d21ac5c..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.http.handler; - -import dev.failsafe.function.CheckedPredicate; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.entity.BufferedHttpEntity; -import org.apache.http.util.EntityUtils; - -import java.util.Arrays; -import java.util.Set; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -/** - * Failure handler based on content in HTTP response. - * - * @param result type (supposed to be HttpResponse for OS client) - */ -public class HttpResponseMessageResultPredicate implements CheckedPredicate { - - private static final Logger LOG = Logger.getLogger(HttpResponseMessageResultPredicate.class.getName()); - - /** - * Retryable HTTP response message list - */ - private final Set retryableResponseMessages; - - public HttpResponseMessageResultPredicate(String messages) { - this.retryableResponseMessages = - Arrays.stream(messages.split(",")) - .map(String::trim) - .collect(Collectors.toSet()); - } - - @Override - public boolean test(T result) throws Throwable { - HttpResponse response = (HttpResponse) result; - HttpEntity entity = response.getEntity(); - if (entity != null) { - // Buffer the entity so it can be read multiple times - BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); - response.setEntity(bufferedEntity); - - try { - String responseContent = EntityUtils.toString(entity); - LOG.info("Checking if response message is retryable"); - - boolean isRetryable = retryableResponseMessages.stream() - .anyMatch(responseContent::contains); - - LOG.info("Check retryable result: " + isRetryable); - - // Reset the entity's content - bufferedEntity.getContent().reset(); - - return isRetryable; - } catch (Exception e) { - LOG.info("Unable to parse response body. Retryable result: false"); - return false; - } - } - LOG.info("No response entity found. Retryable result: false"); - return false; - } -} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java index 0a59a9c0a..a991acf0a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java @@ -15,9 +15,9 @@ public class HttpResultPredicate implements CheckedPredicate { private final HttpStatusCodeResultPredicate statusCodePredicate; - private final HttpResponseMessageResultPredicate responseMessagePredicate; + private final HttpAOSSResultPredicate responseMessagePredicate; - public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpResponseMessageResultPredicate responseMessagePredicate) { + public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpAOSSResultPredicate responseMessagePredicate) { this.statusCodePredicate = statusCodePredicate; this.responseMessagePredicate = responseMessagePredicate; } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index 42f6f64a1..a1dd9dfb3 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -85,7 +85,8 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with it should "retry if response message contains retryable message" in { retryableClient - .whenResponseMessage( + .whenResponse( + 400, "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) } @@ -184,11 +185,12 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with this } - def whenResponseMessage(responseMessage: String): AssertionHelper = { + def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = { val entity = mock[HttpEntity](RETURNS_DEEP_STUBS) mockStatic(classOf[EntityUtils]) when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage) val response = mock[HttpResponse](RETURNS_DEEP_STUBS) + when(response.getStatusLine.getStatusCode).thenReturn(statusCode) when(response.getEntity).thenReturn(entity) when(future.get()).thenReturn(response) this diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 538476091..364a8a1de 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -148,12 +148,6 @@ object FlintSparkConf { .doc("retryable HTTP response status code list") .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES) - val RETRYABLE_HTTP_RESPONSE_MESSAGES = - FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_RESPONSE_MESSAGES}") - .datasourceOption() - .doc("retryable HTTP response message list") - .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES) - val RETRYABLE_EXCEPTION_CLASS_NAMES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}") .datasourceOption() @@ -344,7 +338,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable AUTH, MAX_RETRIES, RETRYABLE_HTTP_STATUS_CODES, - RETRYABLE_HTTP_RESPONSE_MESSAGES, BULK_REQUEST_RATE_LIMIT_PER_NODE, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 298e51224..594322bae 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -47,7 +47,6 @@ class FlintSparkConfSuite extends FlintSuite { val retryOptions = FlintSparkConf().flintOptions().getRetryOptions retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES - retryOptions.getRetryableHttpResponseMessages shouldBe DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty } @@ -56,14 +55,12 @@ class FlintSparkConfSuite extends FlintSuite { Map( "retry.max_retries" -> "5", "retry.http_status_codes" -> "429,502,503,504", - "retry.http_response_messages" -> "message1,message2", "retry.exception_class_names" -> "java.net.ConnectException").asJava) .flintOptions() .getRetryOptions retryOptions.getMaxRetries shouldBe 5 retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504" - retryOptions.getRetryableHttpResponseMessages shouldBe "message1,message2" retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException" } From b216d9e106b73ba635e88fc32fa3682015fb638b Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 4 Dec 2024 15:31:08 -0800 Subject: [PATCH 3/5] add handler for aoss only Signed-off-by: Sean Kao --- docs/index.md | 1 - .../flint/core/http/FlintRetryOptions.java | 23 ++++++++++----- .../http/handler/HttpAOSSResultPredicate.java | 4 --- .../http/handler/HttpResultPredicate.java | 29 ------------------- .../http/RetryableHttpAsyncClientSuite.scala | 25 +++++++++++----- 5 files changed, 32 insertions(+), 50 deletions(-) delete mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java diff --git a/docs/index.md b/docs/index.md index 33e57c401..abc801bde 100644 --- a/docs/index.md +++ b/docs/index.md @@ -538,7 +538,6 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration. - `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry. - `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway). -- `spark.datasource.flint.retry.http_response_messages`: retryable HTTP response message list. default value is "resource_already_exists_exception". - `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown. - `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 818ebe9d1..0def1e3d8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -6,8 +6,12 @@ package org.opensearch.flint.core.http; import static java.time.temporal.ChronoUnit.SECONDS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS; +import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES; import dev.failsafe.RetryPolicy; +import dev.failsafe.RetryPolicyBuilder; import dev.failsafe.event.ExecutionAttemptedEvent; import dev.failsafe.function.CheckedPredicate; import java.time.Duration; @@ -17,7 +21,6 @@ import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate; -import org.opensearch.flint.core.http.handler.HttpResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -67,21 +70,21 @@ public boolean isRetryEnabled() { * @return Failsafe retry policy */ public RetryPolicy getRetryPolicy() { - return RetryPolicy.builder() + RetryPolicyBuilder builder = RetryPolicy.builder() // Backoff strategy config (can be configurable as needed in future) .withBackoff(1, 30, SECONDS) .withJitter(Duration.ofMillis(100)) // Failure handling config from Flint options .withMaxRetries(getMaxRetries()) .handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames())) - .handleResultIf( - new HttpResultPredicate<>( - new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()), - new HttpAOSSResultPredicate<>())) + .handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes())) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) - .onRetry(FlintRetryOptions::onRetry) - .build(); + .onRetry(FlintRetryOptions::onRetry); + if (getServiceName() == SERVICE_NAME_AOSS) { + builder.handleResultIf(new HttpAOSSResultPredicate<>()); + } + return builder.build(); } public RetryPolicy getBulkRetryPolicy(CheckedPredicate resultPredicate) { @@ -106,6 +109,10 @@ private static void onRetry(ExecutionAttemptedEvent event) { LOG.warning("Retrying failed request at #" + event.getAttemptCount()); } + private String getServiceName() { + return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES); + } + /** * @return maximum retry option value */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java index 06b957725..c5432eda9 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java @@ -11,10 +11,7 @@ import org.apache.http.entity.BufferedHttpEntity; import org.apache.http.util.EntityUtils; -import java.util.Arrays; -import java.util.Set; import java.util.logging.Logger; -import java.util.stream.Collectors; /** * Failure handler based on HTTP response from AOSS. @@ -53,7 +50,6 @@ public boolean test(T result) throws Throwable { try { String responseContent = EntityUtils.toString(bufferedEntity); - // Reset the entity's content bufferedEntity.getContent().reset(); boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE); diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java deleted file mode 100644 index a991acf0a..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.http.handler; - -import dev.failsafe.function.CheckedPredicate; - -/** - * Failure handler based on HTTP response. - * - * @param result type (supposed to be HttpResponse for OS client) - */ -public class HttpResultPredicate implements CheckedPredicate { - - private final HttpStatusCodeResultPredicate statusCodePredicate; - private final HttpAOSSResultPredicate responseMessagePredicate; - - public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpAOSSResultPredicate responseMessagePredicate) { - this.statusCodePredicate = statusCodePredicate; - this.responseMessagePredicate = responseMessagePredicate; - } - - @Override - public boolean test(T result) throws Throwable { - return statusCodePredicate.test(result) || responseMessagePredicate.test(result); - } -} diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index a1dd9dfb3..8a8927920 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -83,14 +83,6 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with } } - it should "retry if response message contains retryable message" in { - retryableClient - .whenResponse( - 400, - "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") - .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) - } - it should "not retry if response code is not on the retryable status code list" in { retryableClient .whenStatusCode(400) @@ -163,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with expectFutureGetTimes = times(0)) } + it should "retry if AOSS response is retryable" in { + retryableClient + .withOption("auth.servicename", "aoss") + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) + } + + it should "not apply retry policy for AOSS response if service is not AOSS" in { + retryableClient + .whenResponse( + 400, + "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") + .shouldExecute(times(1)) + } + private def retryableClient: AssertionHelper = new AssertionHelper class AssertionHelper { From ef9174658ed5cacecdea66ac38dcf367fdb21d21 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 4 Dec 2024 16:23:32 -0800 Subject: [PATCH 4/5] edit comment Signed-off-by: Sean Kao --- .../flint/core/http/handler/HttpAOSSResultPredicate.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java index c5432eda9..8bfb05fa3 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java @@ -44,12 +44,14 @@ public boolean test(T result) throws Throwable { return false; } - // Buffer the entity to make it repeatable + // Buffer the entity to make it repeatable, so that this retry test does not consume the content stream, + // resulting in the request caller getting empty response BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); response.setEntity(bufferedEntity); try { String responseContent = EntityUtils.toString(bufferedEntity); + // Effectively restores the content stream of the response bufferedEntity.getContent().reset(); boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE); From 1942b80fa2fd865cd826be8c8c4e4ec187e72dee Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Sat, 7 Dec 2024 17:37:14 -0800 Subject: [PATCH 5/5] bugfix: aoss result predicate not used Signed-off-by: Sean Kao --- .../scala/org/opensearch/flint/core/http/FlintRetryOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index 0def1e3d8..597f441ec 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -81,7 +81,7 @@ public RetryPolicy getRetryPolicy() { // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) .onRetry(FlintRetryOptions::onRetry); - if (getServiceName() == SERVICE_NAME_AOSS) { + if (SERVICE_NAME_AOSS.equals(getServiceName())) { builder.handleResultIf(new HttpAOSSResultPredicate<>()); } return builder.build();