Skip to content

Commit

Permalink
Remote reindex: Add support for configurable retry mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed Mar 13, 2024
1 parent 1235756 commit 3b4e417
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))

### Dependencies
- Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public List<Setting<?>> getSettings() {
final List<Setting<?>> settings = new ArrayList<>();
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT);
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -139,7 +140,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
task,
logger,
// Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging.
Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())),
assigningClient,
threadPool,
scriptService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
Function.identity(),
Property.NodeScope
);

public static final Setting<TimeValue> REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting(
"reindex.remote.retry.initial_backoff",
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(50),
TimeValue.timeValueMillis(5000),
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting(
"reindex.remote.retry.max_count",
15,
1,
100,
Property.Dynamic,
Property.NodeScope
);

public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();

private final ReindexValidator reindexValidator;
private final Reindexer reindexer;

private final ClusterService clusterService;

@Inject
public TransportReindexAction(
Settings settings,
Expand All @@ -92,10 +114,16 @@ public TransportReindexAction(
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
if (request.getRemoteInfo() != null) {
request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT));
request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF));
}

reindexValidator.initialValidation(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.reindex.RejectAwareActionListener;
import org.opensearch.index.reindex.RetryListener;
import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -98,21 +101,29 @@ public RemoteScrollableHitSource(

@Override
protected void doStart(RejectAwareActionListener<Response> searchListener) {
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
logger.info("Starting remote reindex for {}", Arrays.toString(searchRequest.indices()));
lookupRemoteVersion(RejectAwareActionListener.wrap(version -> {
remoteVersion = version;
execute(
logger.trace("Starting initial search");
executeWithRetries(
RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
RESPONSE_PARSER,
RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r))
);
}));
// Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll)
// level.
}, searchListener::onFailure, searchListener::onFailure));
}

void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
logger.trace("Checking version for remote domain");
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
// instead of retrying for longer duration.
execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
}

private void onStartResponse(RejectAwareActionListener<Response> searchListener, Response response) {
logger.trace("On initial search response");
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener);
Expand All @@ -123,12 +134,14 @@ private void onStartResponse(RejectAwareActionListener<Response> searchListener,

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
logger.trace("Starting next scroll call");
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
logger.debug("Clearing the scrollID {}", scrollId);
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
Expand Down Expand Up @@ -179,17 +192,31 @@ protected void cleanup(Runnable onCompletion) {
});
}

private void executeWithRetries(
Request request,
BiFunction<XContentParser, MediaType, Response> parser,
RejectAwareActionListener<Response> childListener
) {
execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> {
logger.debug("Retrying execute request {}", request.getEndpoint());
countSearchRetry.run();
execute(request, parser, r);
}, childListener));
}

private <T> void execute(
Request request,
BiFunction<XContentParser, MediaType, T> parser,
RejectAwareActionListener<? super T> listener
) {
logger.trace("Executing http request to remote cluster {}", request.getEndpoint());
// Preserve the thread context so headers survive after the call
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
try {
client.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
logger.trace("Successfully got response from the remote");
// Restore the thread context to get the precious headers
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
Expand All @@ -204,7 +231,7 @@ public void onSuccess(org.opensearch.client.Response response) {
}
if (mediaType == null) {
try {
logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
throw new OpenSearchException(
"Response didn't include supported Content-Type, remote is likely not an OpenSearch instance"
);
Expand Down Expand Up @@ -236,22 +263,28 @@ public void onSuccess(org.opensearch.client.Response response) {
public void onFailure(Exception e) {
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
logger.debug("Received response failure {}", e.getMessage());
if (e instanceof ResponseException) {
ResponseException re = (ResponseException) e;
int statusCode = re.getResponse().getStatusLine().getStatusCode();
e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re);
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) {
// retry all 5xx & 429s.
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode
|| statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
listener.onRejection(e);
return;
}
} else if (e instanceof ConnectException) {
listener.onRejection(e);
return;
} else if (e instanceof ContentTooLongException) {
e = new IllegalArgumentException(
"Remote responded with a chunk that was too large. Use a smaller batch size.",
e
);
}
listener.onFailure(e);
}
listener.onFailure(e);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.StatusLine;
Expand All @@ -47,15 +48,18 @@
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicRequestLine;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.Version;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.HeapBufferedAsyncResponseConsumer;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.common.io.Streams;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -79,19 +83,23 @@

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.Mockito;

import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
Expand Down Expand Up @@ -490,7 +498,7 @@ public void testInvalidJsonThinksRemoteIsNotES() throws IOException {
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("some_text.txt").start());
assertEquals(
"Error parsing the response, remote is likely not an OpenSearch instance",
e.getCause().getCause().getCause().getMessage()
e.getCause().getCause().getCause().getCause().getMessage()
);
}

Expand All @@ -499,7 +507,7 @@ public void testUnexpectedJsonThinksRemoteIsNotES() throws IOException {
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("main/2_3_3.json").start());
assertEquals(
"Error parsing the response, remote is likely not an OpenSearch instance",
e.getCause().getCause().getCause().getMessage()
e.getCause().getCause().getCause().getCause().getMessage()
);
}

Expand Down Expand Up @@ -650,4 +658,86 @@ private <T extends Exception, V> T expectListenerFailure(Class<T> expectedExcept
assertNotNull(exception.get());
return exception.get();
}

RemoteScrollableHitSource createRemoteSourceWithFailure(
boolean shouldMockRemoteVersion,
Exception failure,
AtomicInteger invocationCount
) {
CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() {
@Override
public <T> Future<T> execute(HttpAsyncRequestProducer requestProducer, HttpAsyncResponseConsumer<T> responseConsumer, HttpContext context, FutureCallback<T> callback) {
invocationCount.getAndIncrement();
callback.failed(failure);
return null;
}

@Override
public void close() throws IOException {}

@Override
public boolean isRunning() {
return false;
}

@Override
public void start() {}
};
return sourceWithMockedClient(shouldMockRemoteVersion, httpClient);
}

void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) {
retriesAllowed = 5;
AtomicInteger invocations = new AtomicInteger();
invocations.set(0);
RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations);

Throwable e = expectThrows(RuntimeException.class, source::start);
int expectedInvocations = 0;
if (shouldMockRemoteVersion) {
expectedInvocations += 1; // first search
if (expectedToRetry) expectedInvocations += retriesAllowed;
} else {
expectedInvocations = 1; // the first should fail and not trigger any retry.
}

assertEquals(expectedInvocations, invocations.get());

// Unwrap the some artifacts from the test
while (e.getMessage().equals("failed")) {
e = e.getCause();
}
// There is an additional wrapper for ResponseException.
if (failureResponse instanceof ResponseException) {
e = e.getCause();
}

assertSame(failureResponse, e);
}

ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException {
org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class);
ProtocolVersion protocolVersion = new ProtocolVersion("https", 1, 1);
Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN));
Mockito.when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(protocolVersion, statusCode, errorMsg));
Mockito.when(mockResponse.getRequestLine()).thenReturn(new BasicRequestLine("GET", "/", protocolVersion));
return new ResponseException(mockResponse);
}

public void testRetryOnCallFailure() throws Exception {
// First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s.
verifyRetries(true, withResponseCode(500, "Internal Server Error"), true);
verifyRetries(true, withResponseCode(429, "Too many requests"), true);
verifyRetries(true, withResponseCode(400, "Client Error"), false);

// First call succeeds. Search call failed with exceptions other than ResponseException
verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions.
verifyRetries(true, new RuntimeException("foobar"), false);

// First call(remote version lookup) failed and no retries expected
verifyRetries(false, withResponseCode(500, "Internal Server Error"), false);
verifyRetries(false, withResponseCode(429, "Too many requests"), false);
verifyRetries(false, withResponseCode(400, "Client Error"), false);
verifyRetries(false, new ConnectException("blah"), false);
}
}
Loading

0 comments on commit 3b4e417

Please sign in to comment.