Skip to content

Commit

Permalink
add retryable response message
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Nov 27, 2024
1 parent 3ff2ef2 commit d05fe0c
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
- `spark.datasource.flint.retry.http_response_messages`: retryable HTTP response message list. default value is "resource_already_exists_exception".
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version.
- `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import java.util.logging.Logger;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpResponseMessageResultPredicate;
import org.opensearch.flint.core.http.handler.HttpResultPredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

Expand All @@ -40,6 +42,9 @@ public class FlintRetryOptions implements Serializable {
public static final String DEFAULT_RETRYABLE_HTTP_STATUS_CODES = "429,502";
public static final String RETRYABLE_HTTP_STATUS_CODES = "retry.http_status_codes";

public static final String DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES = "resource_already_exists_exception";
public static final String RETRYABLE_HTTP_RESPONSE_MESSAGES = "retry.http_response_messages";

/**
* Retryable exception class name
*/
Expand Down Expand Up @@ -72,7 +77,10 @@ public <T> RetryPolicy<T> getRetryPolicy() {
// Failure handling config from Flint options
.withMaxRetries(getMaxRetries())
.handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames()))
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
.handleResultIf(
new HttpResultPredicate<>(
new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()),
new HttpResponseMessageResultPredicate<>(getRetryableHttpResponseMessages())))
// Logging listener
.onFailedAttempt(FlintRetryOptions::onFailure)
.onRetry(FlintRetryOptions::onRetry)
Expand Down Expand Up @@ -116,6 +124,13 @@ public String getRetryableHttpStatusCodes() {
return options.getOrDefault(RETRYABLE_HTTP_STATUS_CODES, DEFAULT_RETRYABLE_HTTP_STATUS_CODES);
}

/**
* @return retryable HTTP response message list
*/
public String getRetryableHttpResponseMessages() {
return options.getOrDefault(RETRYABLE_HTTP_RESPONSE_MESSAGES, DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES);
}

/**
* @return retryable exception class name list
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.util.EntityUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* Failure handler based on content in HTTP response.
*
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpResponseMessageResultPredicate<T> implements CheckedPredicate<T> {

private static final Logger LOG = Logger.getLogger(HttpResponseMessageResultPredicate.class.getName());

/**
* Retryable HTTP response message list
*/
private final Set<String> retryableResponseMessages;

public HttpResponseMessageResultPredicate(String messages) {
this.retryableResponseMessages =
Arrays.stream(messages.split(","))
.map(String::trim)
.collect(Collectors.toSet());
}

@Override
public boolean test(T result) throws Throwable {
HttpResponse response = (HttpResponse) result;
HttpEntity entity = response.getEntity();
if (entity != null) {
try {
String responseContent = EntityUtils.toString(entity);
LOG.info("Checking if response message is retryable: " + responseContent);

boolean isRetryable = retryableResponseMessages.stream()
.anyMatch(retryableMessage -> responseContent.contains(retryableMessage));

LOG.info("Check retryable result: " + isRetryable);
return isRetryable;
} catch (IOException e) {
LOG.warning("Unable to parse response body. Retryable result: false");
return false;
}
}
LOG.info("No response entity found. Retryable result: false");
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;

/**
* Failure handler based on HTTP response.
*
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpResultPredicate<T> implements CheckedPredicate<T> {

private final HttpStatusCodeResultPredicate<T> statusCodePredicate;
private final HttpResponseMessageResultPredicate<T> responseMessagePredicate;

public HttpResultPredicate(HttpStatusCodeResultPredicate<T> statusCodePredicate, HttpResponseMessageResultPredicate<T> responseMessagePredicate) {
this.statusCodePredicate = statusCodePredicate;
this.responseMessagePredicate = responseMessagePredicate;
}

@Override
public boolean test(T result) throws Throwable {
return statusCodePredicate.test(result) || responseMessagePredicate.test(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future}

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.apache.http.HttpEntity
import org.apache.http.HttpResponse
import org.apache.http.concurrent.FutureCallback
import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder}
import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer}
import org.apache.http.protocol.HttpContext
import org.apache.http.util.EntityUtils
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.verification.VerificationMode
Expand Down Expand Up @@ -81,6 +83,13 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
}
}

it should "retry if response message contains retryable message" in {
retryableClient
.whenResponseMessage(
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not retry if response code is not on the retryable status code list" in {
retryableClient
.whenStatusCode(400)
Expand Down Expand Up @@ -175,6 +184,16 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
this
}

def whenResponseMessage(responseMessage: String): AssertionHelper = {
val entity = mock[HttpEntity](RETURNS_DEEP_STUBS)
mockStatic(classOf[EntityUtils])
when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage)
val response = mock[HttpResponse](RETURNS_DEEP_STUBS)
when(response.getEntity).thenReturn(entity)
when(future.get()).thenReturn(response)
this
}

def shouldExecute(expectExecuteTimes: VerificationMode): Unit = {
shouldExecute(expectExecuteTimes, expectExecuteTimes)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ object FlintSparkConf {
.doc("retryable HTTP response status code list")
.createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_STATUS_CODES)

val RETRYABLE_HTTP_RESPONSE_MESSAGES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_HTTP_RESPONSE_MESSAGES}")
.datasourceOption()
.doc("retryable HTTP response message list")
.createWithDefault(FlintRetryOptions.DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES)

val RETRYABLE_EXCEPTION_CLASS_NAMES =
FlintConfig(s"spark.datasource.flint.${FlintRetryOptions.RETRYABLE_EXCEPTION_CLASS_NAMES}")
.datasourceOption()
Expand Down Expand Up @@ -333,6 +339,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
AUTH,
MAX_RETRIES,
RETRYABLE_HTTP_STATUS_CODES,
RETRYABLE_HTTP_RESPONSE_MESSAGES,
BULK_REQUEST_RATE_LIMIT_PER_NODE,
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class FlintSparkConfSuite extends FlintSuite {
val retryOptions = FlintSparkConf().flintOptions().getRetryOptions
retryOptions.getMaxRetries shouldBe DEFAULT_MAX_RETRIES
retryOptions.getRetryableHttpStatusCodes shouldBe DEFAULT_RETRYABLE_HTTP_STATUS_CODES
retryOptions.getRetryableHttpResponseMessages shouldBe DEFAULT_RETRYABLE_HTTP_RESPONSE_MESSAGES
retryOptions.getRetryableExceptionClassNames shouldBe Optional.empty
}

Expand All @@ -55,12 +56,14 @@ class FlintSparkConfSuite extends FlintSuite {
Map(
"retry.max_retries" -> "5",
"retry.http_status_codes" -> "429,502,503,504",
"retry.http_response_messages" -> "message1,message2",
"retry.exception_class_names" -> "java.net.ConnectException").asJava)
.flintOptions()
.getRetryOptions

retryOptions.getMaxRetries shouldBe 5
retryOptions.getRetryableHttpStatusCodes shouldBe "429,502,503,504"
retryOptions.getRetryableHttpResponseMessages shouldBe "message1,message2"
retryOptions.getRetryableExceptionClassNames.get() shouldBe "java.net.ConnectException"
}

Expand Down

0 comments on commit d05fe0c

Please sign in to comment.