Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retryable AOSS HTTP response #951

Merged
merged 5 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES;

import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedPredicate;
import java.time.Duration;
Expand All @@ -16,6 +20,7 @@
import java.util.logging.Logger;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

Expand Down Expand Up @@ -65,7 +70,7 @@ public boolean isRetryEnabled() {
* @return Failsafe retry policy
*/
public <T> RetryPolicy<T> getRetryPolicy() {
return RetryPolicy.<T>builder()
RetryPolicyBuilder<T> builder = RetryPolicy.<T>builder()
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
Expand All @@ -75,8 +80,11 @@ public <T> RetryPolicy<T> getRetryPolicy() {
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(FlintRetryOptions::onFailure)
.onRetry(FlintRetryOptions::onRetry)
.build();
.onRetry(FlintRetryOptions::onRetry);
if (SERVICE_NAME_AOSS.equals(getServiceName())) {
builder.handleResultIf(new HttpAOSSResultPredicate<>());
}
return builder.build();
}

public RetryPolicy<BulkResponse> getBulkRetryPolicy(CheckedPredicate<BulkResponse> resultPredicate) {
Expand All @@ -101,6 +109,10 @@ private static <T> void onRetry(ExecutionAttemptedEvent<T> event) {
LOG.warning("Retrying failed request at #" + event.getAttemptCount());
}

private String getServiceName() {
return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES);
}

/**
* @return maximum retry option value
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.util.EntityUtils;

import java.util.logging.Logger;

/**
* Failure handler based on HTTP response from AOSS.
*
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpAOSSResultPredicate<T> implements CheckedPredicate<T> {

private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName());

public static final int BAD_REQUEST_STATUS_CODE = 400;
public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception";

public HttpAOSSResultPredicate() { }

@Override
public boolean test(T result) throws Throwable {
LOG.info("Checking if response is retryable");

int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode();
if (statusCode != BAD_REQUEST_STATUS_CODE) {
LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false");
return false;
}

HttpResponse response = (HttpResponse) result;
HttpEntity entity = response.getEntity();
if (entity == null) {
LOG.info("No response entity found. Check result: false");
return false;
}

// Buffer the entity to make it repeatable, so that this retry test does not consume the content stream,
// resulting in the request caller getting empty response
BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity);
response.setEntity(bufferedEntity);

try {
String responseContent = EntityUtils.toString(bufferedEntity);
// Effectively restores the content stream of the response
bufferedEntity.getContent().reset();

boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE);

LOG.info("Check retryable response result: " + isRetryable);
return isRetryable;
} catch (Exception e) {
LOG.info("Unable to parse response body. Check result: false");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future}

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.apache.http.HttpEntity
import org.apache.http.HttpResponse
import org.apache.http.concurrent.FutureCallback
import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder}
import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer}
import org.apache.http.protocol.HttpContext
import org.apache.http.util.EntityUtils
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.verification.VerificationMode
Expand Down Expand Up @@ -153,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
expectFutureGetTimes = times(0))
}

it should "retry if AOSS response is retryable" in {
retryableClient
.withOption("auth.servicename", "aoss")
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not apply retry policy for AOSS response if service is not AOSS" in {
retryableClient
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(1))
}

private def retryableClient: AssertionHelper = new AssertionHelper

class AssertionHelper {
Expand All @@ -175,6 +194,17 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
this
}

def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = {
val entity = mock[HttpEntity](RETURNS_DEEP_STUBS)
mockStatic(classOf[EntityUtils])
when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage)
val response = mock[HttpResponse](RETURNS_DEEP_STUBS)
when(response.getStatusLine.getStatusCode).thenReturn(statusCode)
when(response.getEntity).thenReturn(entity)
when(future.get()).thenReturn(response)
this
}

def shouldExecute(expectExecuteTimes: VerificationMode): Unit = {
shouldExecute(expectExecuteTimes, expectExecuteTimes)
}
Expand Down
Loading