From 4212dac4755a9ddbe0fb88ccf17c078a070f5cf3 Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Wed, 4 Dec 2024 15:03:14 -0800 Subject: [PATCH] check retryable response only if 400 Signed-off-by: Sean Kao --- .../flint/core/http/FlintRetryOptions.java | 14 +--- .../http/handler/HttpAOSSResultPredicate.java | 68 ++++++++++++++++++ .../HttpResponseMessageResultPredicate.java | 70 ------------------- .../http/handler/HttpResultPredicate.java | 4 +- .../http/RetryableHttpAsyncClientSuite.scala | 6 +- .../sql/flint/config/FlintSparkConf.scala | 7 -- .../flint/config/FlintSparkConfSuite.scala | 3 - 7 files changed, 76 insertions(+), 96 deletions(-) create mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java delete mode 100644 flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java index c8731b7c2..818ebe9d1 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/FlintRetryOptions.java @@ -16,7 +16,7 @@ import java.util.logging.Logger; import org.opensearch.action.bulk.BulkResponse; import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate; -import org.opensearch.flint.core.http.handler.HttpResponseMessageResultPredicate; +import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate; import org.opensearch.flint.core.http.handler.HttpResultPredicate; import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate; import java.io.Serializable; @@ -42,9 +42,6 @@ public class FlintRetryOptions implements Serializable { public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502"; public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes"; - public static final String DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES = "resource_already_exists_exception"; - public static final String RETRYABLE_HTTP_RESPONSE_MESSAGES = "retry.http_response_messages"; - /** * Retryable exception class name */ @@ -80,7 +77,7 @@ public RetryPolicy getRetryPolicy() { .handleResultIf( new HttpResultPredicate<>( new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()), - new HttpResponseMessageResultPredicate<>(getRetryableHttpResponseMessages()))) + new HttpAOSSResultPredicate<>())) // Logging listener .onFailedAttempt(FlintRetryOptions::onFailure) .onRetry(FlintRetryOptions::onRetry) @@ -124,13 +121,6 @@ public String getRetryableHttpStatusCodes() { return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES); } - /** - * @return retryable HTTP response message list - */ - public String getRetryableHttpResponseMessages() { - return options.getOrDefault(RETRYABLE_HTTP_RESPONSE_MESSAGES, DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES); - } - /** * @return retryable exception class name list */ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java new file mode 100644 index 000000000..06b957725 --- /dev/null +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpAOSSResultPredicate.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.http.handler; + +import dev.failsafe.function.CheckedPredicate; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.entity.BufferedHttpEntity; +import org.apache.http.util.EntityUtils; + +import java.util.Arrays; +import java.util.Set; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * Failure handler based on HTTP response from AOSS. + * + * @param result type (supposed to be HttpResponse for OS client) + */ +public class HttpAOSSResultPredicate implements CheckedPredicate { + + private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName()); + + public static final int BAD_REQUEST_STATUS_CODE = 400; + public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception"; + + public HttpAOSSResultPredicate() { } + + @Override + public boolean test(T result) throws Throwable { + LOG.info("Checking if response is retryable"); + + int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode(); + if (statusCode != BAD_REQUEST_STATUS_CODE) { + LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false"); + return false; + } + + HttpResponse response = (HttpResponse) result; + HttpEntity entity = response.getEntity(); + if (entity == null) { + LOG.info("No response entity found. Check result: false"); + return false; + } + + // Buffer the entity to make it repeatable + BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); + response.setEntity(bufferedEntity); + + try { + String responseContent = EntityUtils.toString(bufferedEntity); + // Reset the entity's content + bufferedEntity.getContent().reset(); + + boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE); + + LOG.info("Check retryable response result: " + isRetryable); + return isRetryable; + } catch (Exception e) { + LOG.info("Unable to parse response body. Check result: false"); + return false; + } + } +} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java deleted file mode 100644 index 86d21ac5c..000000000 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResponseMessageResultPredicate.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.http.handler; - -import dev.failsafe.function.CheckedPredicate; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.entity.BufferedHttpEntity; -import org.apache.http.util.EntityUtils; - -import java.util.Arrays; -import java.util.Set; -import java.util.logging.Logger; -import java.util.stream.Collectors; - -/** - * Failure handler based on content in HTTP response. - * - * @param result type (supposed to be HttpResponse for OS client) - */ -public class HttpResponseMessageResultPredicate implements CheckedPredicate { - - private static final Logger LOG = Logger.getLogger(HttpResponseMessageResultPredicate.class.getName()); - - /** - * Retryable HTTP response message list - */ - private final Set retryableResponseMessages; - - public HttpResponseMessageResultPredicate(String messages) { - this.retryableResponseMessages = - Arrays.stream(messages.split(",")) - .map(String::trim) - .collect(Collectors.toSet()); - } - - @Override - public boolean test(T result) throws Throwable { - HttpResponse response = (HttpResponse) result; - HttpEntity entity = response.getEntity(); - if (entity != null) { - // Buffer the entity so it can be read multiple times - BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity); - response.setEntity(bufferedEntity); - - try { - String responseContent = EntityUtils.toString(entity); - LOG.info("Checking if response message is retryable"); - - boolean isRetryable = retryableResponseMessages.stream() - .anyMatch(responseContent::contains); - - LOG.info("Check retryable result: " + isRetryable); - - // Reset the entity's content - bufferedEntity.getContent().reset(); - - return isRetryable; - } catch (Exception e) { - LOG.info("Unable to parse response body. Retryable result: false"); - return false; - } - } - LOG.info("No response entity found. Retryable result: false"); - return false; - } -} diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java index 0a59a9c0a..a991acf0a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/http/handler/HttpResultPredicate.java @@ -15,9 +15,9 @@ public class HttpResultPredicate implements CheckedPredicate { private final HttpStatusCodeResultPredicate statusCodePredicate; - private final HttpResponseMessageResultPredicate responseMessagePredicate; + private final HttpAOSSResultPredicate responseMessagePredicate; - public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpResponseMessageResultPredicate responseMessagePredicate) { + public HttpResultPredicate(HttpStatusCodeResultPredicate statusCodePredicate, HttpAOSSResultPredicate responseMessagePredicate) { this.statusCodePredicate = statusCodePredicate; this.responseMessagePredicate = responseMessagePredicate; } diff --git a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala index 42f6f64a1..a1dd9dfb3 100644 --- a/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala +++ b/flint-core/src/test/scala/org/opensearch/flint/core/http/RetryableHttpAsyncClientSuite.scala @@ -85,7 +85,8 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with it should "retry if response message contains retryable message" in { retryableClient - .whenResponseMessage( + .whenResponse( + 400, "OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,") .shouldExecute(times(DEFAULT_MAX_RETRIES + 1)) } @@ -184,11 +185,12 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with this } - def whenResponseMessage(responseMessage: String): AssertionHelper = { + def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = { val entity = mock[HttpEntity](RETURNS_DEEP_STUBS) mockStatic(classOf[EntityUtils]) when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage) val response = mock[HttpResponse](RETURNS_DEEP_STUBS) + when(response.getStatusLine.getStatusCode).thenReturn(statusCode) when(response.getEntity).thenReturn(entity) when(future.get()).thenReturn(response) this diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index fa41cfb4d..bdcc120c0 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -148,12 +148,6 @@ object FlintSparkConf { .doc("retryable HTTP response status code list") .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES) - val RETRYABLE_HTTP_RESPONSE_MESSAGES = - FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_RESPONSE_MESSAGES}") - .datasourceOption() - .doc("retryable HTTP response message list") - .createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES) - val RETRYABLE_EXCEPTION_CLASS_NAMES = FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}") .datasourceOption() @@ -339,7 +333,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable AUTH, MAX_RETRIES, RETRYABLE_HTTP_STATUS_CODES, - RETRYABLE_HTTP_RESPONSE_MESSAGES, BULK_REQUEST_RATE_LIMIT_PER_NODE, REGION, CUSTOM_AWS_CREDENTIALS_PROVIDER, diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala index 569bbb90f..0cde6ab0f 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala @@ -47,7 +47,6 @@ class FlintSparkConfSuite extends FlintSuite { val retryOptions = FlintSparkConf().flintOptions().getRetryOptions retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES - retryOptions.getRetryableHttpResponseMessages shouldBe DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty } @@ -56,14 +55,12 @@ class FlintSparkConfSuite extends FlintSuite { Map( "retry.max_retries" -> "5", "retry.http_status_codes" -> "429,502,503,504", - "retry.http_response_messages" -> "message1,message2", "retry.exception_class_names" -> "java.net.ConnectException").asJava) .flintOptions() .getRetryOptions retryOptions.getMaxRetries shouldBe 5 retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504" - retryOptions.getRetryableHttpResponseMessages shouldBe "message1,message2" retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException" }