Skip to content

Commit

Permalink
Remote reindex: Add support for configurable retry mechanism
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Kala <[email protected]>
  • Loading branch information
ankitkala committed Mar 11, 2024
1 parent 11836d0 commit 2161a4f
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public List<Setting<?>> getSettings() {
final List<Setting<?>> settings = new ArrayList<>();
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT);
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -141,7 +142,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
task,
logger,
// Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging.
Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())),
assigningClient,
threadPool,
scriptService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
Function.identity(),
Property.NodeScope
);

public static final Setting<TimeValue> REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting(
"reindex.remote.retry.initial_backoff",
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(50),
TimeValue.timeValueMillis(5000),
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting(
"reindex.remote.retry.max_count",
15,
1,
100,
Property.Dynamic,
Property.NodeScope
);

public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();

private final ReindexValidator reindexValidator;
private final Reindexer reindexer;

private final ClusterService clusterService;

@Inject
public TransportReindexAction(
Settings settings,
Expand All @@ -92,10 +114,16 @@ public TransportReindexAction(
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
if (request.getRemoteInfo() != null) {
request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT));
request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF));
}

reindexValidator.initialValidation(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.reindex.RejectAwareActionListener;
import org.opensearch.index.reindex.RetryListener;
import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -99,23 +102,31 @@ public RemoteScrollableHitSource(

@Override
protected void doStart(RejectAwareActionListener<Response> searchListener) {
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
logger.trace("Starting remote reindex for {}", Arrays.toString(searchRequest.indices()));
lookupRemoteVersion(RejectAwareActionListener.wrap(version -> {
remoteVersion = version;
execute(
logger.trace("Starting initial search");
executeWithRetries(
RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
RESPONSE_PARSER,
RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r))
);
}));
// Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll)
// level.
}, searchListener::onFailure, searchListener::onFailure));
}

void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
logger.trace("Checking version for remote domain");
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
// instead of retrying for longer duration.
execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
}

private void onStartResponse(RejectAwareActionListener<Response> searchListener, Response response) {
logger.trace("On initial search response");
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
logger.trace("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener);
} else {
searchListener.onResponse(response);
Expand All @@ -124,16 +135,18 @@ private void onStartResponse(RejectAwareActionListener<Response> searchListener,

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
logger.trace("Starting next scroll call");
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
logger.trace("Clearing the scrollID {}", scrollId);
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
logger.debug("Successfully cleared [{}]", scrollId);
logger.trace("Successfully cleared [{}]", scrollId);
onCompletion.run();
}

Expand Down Expand Up @@ -171,7 +184,7 @@ protected void cleanup(Runnable onCompletion) {
threadPool.generic().submit(() -> {
try {
client.close();
logger.debug("Shut down remote connection");
logger.trace("Shut down remote connection");
} catch (IOException e) {
logger.error("Failed to shutdown the remote connection", e);
} finally {
Expand All @@ -180,17 +193,30 @@ protected void cleanup(Runnable onCompletion) {
});
}

private void executeWithRetries(
Request request,
BiFunction<XContentParser, MediaType, Response> parser,
RejectAwareActionListener<Response> childListener
) {
execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> {
logger.trace("Retrying execute request");
execute(request, parser, r);
}, childListener));
}

private <T> void execute(
Request request,
BiFunction<XContentParser, MediaType, T> parser,
RejectAwareActionListener<? super T> listener
) {
logger.trace("Executing http request to remote cluster");
// Preserve the thread context so headers survive after the call
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
try {
client.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
logger.trace("Successfully got response from the remote");
// Restore the thread context to get the precious headers
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
Expand All @@ -205,7 +231,7 @@ public void onSuccess(org.opensearch.client.Response response) {
}
if (mediaType == null) {
try {
logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
throw new OpenSearchException(
"Response didn't include supported Content-Type, remote is likely not an OpenSearch instance"
);
Expand Down Expand Up @@ -237,22 +263,28 @@ public void onSuccess(org.opensearch.client.Response response) {
public void onFailure(Exception e) {
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
logger.trace("Received response failure {}", e.getMessage());
if (e instanceof ResponseException) {
ResponseException re = (ResponseException) e;
int statusCode = re.getResponse().getStatusLine().getStatusCode();
e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re);
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) {
// retry all 5xx & 429s.
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode
|| statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
listener.onRejection(e);
return;
}
} else if (e instanceof ConnectException) {
listener.onRejection(e);
return;
} else if (e instanceof ContentTooLongException) {
e = new IllegalArgumentException(
"Remote responded with a chunk that was too large. Use a smaller batch size.",
e
);
}
listener.onFailure(e);
}
listener.onFailure(e);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
Expand All @@ -57,6 +60,7 @@
import org.opensearch.Version;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.client.http.HttpUriRequestProducer;
import org.opensearch.client.nio.HeapBufferedAsyncResponseConsumer;
Expand All @@ -83,17 +87,21 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.mockito.Mockito;

import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -702,4 +710,105 @@ private static ClassicHttpRequest getRequest(AsyncRequestProducer requestProduce
assertThat(requestProducer, instanceOf(HttpUriRequestProducer.class));
return ((HttpUriRequestProducer) requestProducer).getRequest();
}

RemoteScrollableHitSource createRemoteSourceWithFailure(
boolean shouldMockRemoteVersion,
Exception failure,
AtomicInteger invocationCount
) {
CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() {

@Override
public void close() throws IOException {}

@Override
public void close(CloseMode closeMode) {}

@Override
public void start() {}

@Override
public void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier) {}

@Override
public void initiateShutdown() {}

@Override
public IOReactorStatus getStatus() {
return null;
}

@Override
protected <T> Future<T> doExecute(
HttpHost target,
AsyncRequestProducer requestProducer,
AsyncResponseConsumer<T> responseConsumer,
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context,
FutureCallback<T> callback
) {
invocationCount.getAndIncrement();
callback.failed(failure);
return null;
}

@Override
public void awaitShutdown(org.apache.hc.core5.util.TimeValue waitTime) throws InterruptedException {}
};
return sourceWithMockedClient(shouldMockRemoteVersion, httpClient);
}

void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) {
retriesAllowed = 5;
AtomicInteger invocations = new AtomicInteger();
invocations.set(0);
RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations);

Throwable e = expectThrows(RuntimeException.class, source::start);
int expectedInvocations = 0;
if (shouldMockRemoteVersion) {
expectedInvocations += 1; // first search
if (expectedToRetry) expectedInvocations += retriesAllowed;
} else {
expectedInvocations = 1; // the first should fail and not trigger any retry.
}

assertEquals(expectedInvocations, invocations.get());

// Unwrap the some artifacts from the test
while (e.getMessage().equals("failed")) {
e = e.getCause();
}
// There is an additional wrapper for ResponseException.
if (failureResponse instanceof ResponseException) {
e = e.getCause();
}

assertSame(failureResponse, e);
}

ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException {
org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class);
Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN));
Mockito.when(mockResponse.getStatusLine()).thenReturn(new StatusLine(new BasicClassicHttpResponse(statusCode, errorMsg)));
Mockito.when(mockResponse.getRequestLine()).thenReturn(new RequestLine("GET", "/", new ProtocolVersion("https", 1, 1)));
return new ResponseException(mockResponse);
}

public void testRetryOnCallFailure() throws Exception {
// First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s.
verifyRetries(true, withResponseCode(500, "Internal Server Error"), true);
verifyRetries(true, withResponseCode(429, "Too many requests"), true);
verifyRetries(true, withResponseCode(400, "Client Error"), false);

// First call succeeds. Search call failed with exceptions other than ResponseException
verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions.
verifyRetries(true, new RuntimeException("foobar"), false);

// First call(remote version lookup) failed and no retries expected
verifyRetries(false, withResponseCode(500, "Internal Server Error"), false);
verifyRetries(false, withResponseCode(429, "Too many requests"), false);
verifyRetries(false, withResponseCode(400, "Client Error"), false);
verifyRetries(false, new ConnectException("blah"), false);
}
}
Loading

0 comments on commit 2161a4f

Please sign in to comment.