Skip to content

Commit

Permalink
Add retryable AOSS HTTP response (#951)
Browse files Browse the repository at this point in the history
* add retryable response message

Signed-off-by: Sean Kao <[email protected]>

* check retryable response only if 400

Signed-off-by: Sean Kao <[email protected]>

* add handler for aoss only

Signed-off-by: Sean Kao <[email protected]>

* edit comment

Signed-off-by: Sean Kao <[email protected]>

* bugfix: aoss result predicate not used

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Dec 9, 2024
1 parent 9902a3f commit 5720e54
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package org.opensearch.flint.core.http;

import static java.time.temporal.ChronoUnit.SECONDS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_AOSS;
import static org.opensearch.flint.core.FlintOptions.SERVICE_NAME_ES;

import dev.failsafe.RetryPolicy;
import dev.failsafe.RetryPolicyBuilder;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedPredicate;
import java.time.Duration;
Expand All @@ -16,6 +20,7 @@
import java.util.logging.Logger;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.flint.core.http.handler.ExceptionClassNameFailurePredicate;
import org.opensearch.flint.core.http.handler.HttpAOSSResultPredicate;
import org.opensearch.flint.core.http.handler.HttpStatusCodeResultPredicate;
import java.io.Serializable;

Expand Down Expand Up @@ -65,7 +70,7 @@ public boolean isRetryEnabled() {
* @return Failsafe retry policy
*/
public <T> RetryPolicy<T> getRetryPolicy() {
return RetryPolicy.<T>builder()
RetryPolicyBuilder<T> builder = RetryPolicy.<T>builder()
// Backoff strategy config (can be configurable as needed in future)
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
Expand All @@ -75,8 +80,11 @@ public <T> RetryPolicy<T> getRetryPolicy() {
.handleResultIf(new HttpStatusCodeResultPredicate<>(getRetryableHttpStatusCodes()))
// Logging listener
.onFailedAttempt(FlintRetryOptions::onFailure)
.onRetry(FlintRetryOptions::onRetry)
.build();
.onRetry(FlintRetryOptions::onRetry);
if (SERVICE_NAME_AOSS.equals(getServiceName())) {
builder.handleResultIf(new HttpAOSSResultPredicate<>());
}
return builder.build();
}

public RetryPolicy<BulkResponse> getBulkRetryPolicy(CheckedPredicate<BulkResponse> resultPredicate) {
Expand All @@ -101,6 +109,10 @@ private static <T> void onRetry(ExecutionAttemptedEvent<T> event) {
LOG.warning("Retrying failed request at #" + event.getAttemptCount());
}

private String getServiceName() {
return options.getOrDefault(SERVICE_NAME, SERVICE_NAME_ES);
}

/**
* @return maximum retry option value
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.http.handler;

import dev.failsafe.function.CheckedPredicate;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.util.EntityUtils;

import java.util.logging.Logger;

/**
* Failure handler based on HTTP response from AOSS.
*
* @param <T> result type (supposed to be HttpResponse for OS client)
*/
public class HttpAOSSResultPredicate<T> implements CheckedPredicate<T> {

private static final Logger LOG = Logger.getLogger(HttpAOSSResultPredicate.class.getName());

public static final int BAD_REQUEST_STATUS_CODE = 400;
public static final String RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE = "resource_already_exists_exception";

public HttpAOSSResultPredicate() { }

@Override
public boolean test(T result) throws Throwable {
LOG.info("Checking if response is retryable");

int statusCode = ((HttpResponse) result).getStatusLine().getStatusCode();
if (statusCode != BAD_REQUEST_STATUS_CODE) {
LOG.info("Status code " + statusCode + " is not " + BAD_REQUEST_STATUS_CODE + ". Check result: false");
return false;
}

HttpResponse response = (HttpResponse) result;
HttpEntity entity = response.getEntity();
if (entity == null) {
LOG.info("No response entity found. Check result: false");
return false;
}

// Buffer the entity to make it repeatable, so that this retry test does not consume the content stream,
// resulting in the request caller getting empty response
BufferedHttpEntity bufferedEntity = new BufferedHttpEntity(entity);
response.setEntity(bufferedEntity);

try {
String responseContent = EntityUtils.toString(bufferedEntity);
// Effectively restores the content stream of the response
bufferedEntity.getContent().reset();

boolean isRetryable = responseContent.contains(RESOURCE_ALREADY_EXISTS_EXCEPTION_MESSAGE);

LOG.info("Check retryable response result: " + isRetryable);
return isRetryable;
} catch (Exception e) {
LOG.info("Unable to parse response body. Check result: false");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import java.util.concurrent.{ExecutionException, Future}

import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.apache.http.HttpEntity
import org.apache.http.HttpResponse
import org.apache.http.concurrent.FutureCallback
import org.apache.http.impl.nio.client.{CloseableHttpAsyncClient, HttpAsyncClientBuilder}
import org.apache.http.nio.protocol.{HttpAsyncRequestProducer, HttpAsyncResponseConsumer}
import org.apache.http.protocol.HttpContext
import org.apache.http.util.EntityUtils
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import org.mockito.verification.VerificationMode
Expand Down Expand Up @@ -153,6 +155,23 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
expectFutureGetTimes = times(0))
}

it should "retry if AOSS response is retryable" in {
retryableClient
.withOption("auth.servicename", "aoss")
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(DEFAULT_MAX_RETRIES + 1))
}

it should "not apply retry policy for AOSS response if service is not AOSS" in {
retryableClient
.whenResponse(
400,
"OpenSearchStatusException[OpenSearch exception [type=resource_already_exists_exception,")
.shouldExecute(times(1))
}

private def retryableClient: AssertionHelper = new AssertionHelper

class AssertionHelper {
Expand All @@ -175,6 +194,17 @@ class RetryableHttpAsyncClientSuite extends AnyFlatSpec with BeforeAndAfter with
this
}

def whenResponse(statusCode: Int, responseMessage: String): AssertionHelper = {
val entity = mock[HttpEntity](RETURNS_DEEP_STUBS)
mockStatic(classOf[EntityUtils])
when(EntityUtils.toString(any[HttpEntity])).thenReturn(responseMessage)
val response = mock[HttpResponse](RETURNS_DEEP_STUBS)
when(response.getStatusLine.getStatusCode).thenReturn(statusCode)
when(response.getEntity).thenReturn(entity)
when(future.get()).thenReturn(response)
this
}

def shouldExecute(expectExecuteTimes: VerificationMode): Unit = {
shouldExecute(expectExecuteTimes, expectExecuteTimes)
}
Expand Down

0 comments on commit 5720e54

Please sign in to comment.