Skip to content

Commit

Permalink
add handler for aoss only
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az committed Dec 4, 2024
1 parent 4212dac commit b9e5aad
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 49 deletions.
1 change: 0 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.read.scroll_duration`: default value is 5 minutes. scroll context keep alive duration.
- `spark.datasource.flint.retry.max_retries`: max retries on failed HTTP request. default value is 3. Use 0 to disable retry.
- `spark.datasource.flint.retry.http_status_codes`: retryable HTTP response status code list. default value is "429,502" (429 Too Many Request and 502 Bad Gateway).
- `spark.datasource.flint.retry.http_response_messages`: retryable HTTP response message list. default value is "resource_already_exists_exception".
- `spark.datasource.flint.retry.exception_class_names`: retryable exception class name list. by default no retry on any exception thrown.
- `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version.
- `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES;

import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedPredicate;
import java.time.Duration;
Expand All @@ -17,7 +21,6 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate;
import org.opensearch.flint.core.http.handler.HttpResultPredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

Expand Down Expand Up @@ -67,21 +70,21 @@ public boolean isRetryEnabled() {
* @return Failsafe retry policy
*/
public <T> RetryPolicy<T> getRetryPolicy() {
return RetryPolicy.<T>builder()
RetryPolicyBuilder<T> builder = RetryPolicy.<T>builder()
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
// Failure handling config from Flint options
.withMaxRetries(getMaxRetries())
.handleIf(ExceptionClassNameFailurePredicate.create(getRetryableExceptionClassNames()))
.handleResultIf(
new HttpResultPredicate<>(
new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()),
new HttpAOSSResultPredicate<>()))
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(FlintRetryOptions::onFailure)
.onRetry(FlintRetryOptions::onRetry)
.build();
.onRetry(FlintRetryOptions::onRetry);
if (getServiceName() == SERVICE_NAME_AOSS) {
builder.handleResultIf(new HttpAOSSResultPredicate<>());
}
return builder.build();
}

public RetryPolicy<BulkResponse> getBulkRetryPolicy(CheckedPredicate<BulkResponse> resultPredicate) {
Expand All @@ -106,6 +109,10 @@ private static <T> void onRetry(ExecutionAttemptedEvent<T> event) {
LOG.warning("Retrying failed request at #" + event.getAttemptCount());
}

private String getServiceName() {
return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES);
}

/**
* @return maximum retry option value
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.util.EntityUtils;

import java.util.Arrays;
import java.util.Set;
import java.util.logging.Logger;
import java.util.stream.Collectors;

/**
* Failure handler based on HTTP response from AOSS.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,6 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
}
}

it should "retry if response message contains retryable message" in {
retryableClient
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not retry if response code is not on the retryable status code list" in {
retryableClient
.whenStatusCode(400)
Expand Down Expand Up @@ -163,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
expectFutureGetTimes = times(0))
}

it should "retry if AOSS response is retryable" in {
retryableClient
.withOption("auth.servicename", "aoss")
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not apply retry policy for AOSS response if service is not AOSS" in {
retryableClient
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(1))
}

private def retryableClient: AssertionHelper = new AssertionHelper

class AssertionHelper {
Expand Down

0 comments on commit b9e5aad

Please sign in to comment.