Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote reindex: Add support for configurable retry mechanism #12561

Merged
merged 6 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public List<Setting<?>> getSettings() {
final List<Setting<?>> settings = new ArrayList<>();
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT);
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -141,7 +142,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
task,
logger,
// Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging.
Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())),
assigningClient,
threadPool,
scriptService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
Function.identity(),
Property.NodeScope
);

public static final Setting<TimeValue> REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting(
"reindex.remote.retry.initial_backoff",
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(50),
TimeValue.timeValueMillis(5000),
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting(
"reindex.remote.retry.max_count",
15,
1,
100,
Property.Dynamic,
Property.NodeScope
);

public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();

private final ReindexValidator reindexValidator;
private final Reindexer reindexer;

private final ClusterService clusterService;

@Inject
public TransportReindexAction(
Settings settings,
Expand All @@ -92,10 +114,16 @@ public TransportReindexAction(
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
if (request.getRemoteInfo() != null) {
request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT));
request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF));
}

reindexValidator.initialValidation(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.reindex.RejectAwareActionListener;
import org.opensearch.index.reindex.RetryListener;
import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -99,21 +102,29 @@

@Override
protected void doStart(RejectAwareActionListener<Response> searchListener) {
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
logger.info("Starting remote reindex for {}", Arrays.toString(searchRequest.indices()));
lookupRemoteVersion(RejectAwareActionListener.wrap(version -> {
remoteVersion = version;
execute(
logger.trace("Starting initial search");
executeWithRetries(
RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
RESPONSE_PARSER,
RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r))
);
}));
// Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll)
// level.
}, searchListener::onFailure, searchListener::onFailure));
}

void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
logger.trace("Checking version for remote domain");
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
// instead of retrying for longer duration.
execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
}

private void onStartResponse(RejectAwareActionListener<Response> searchListener, Response response) {
logger.trace("On initial search response");
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener);
Expand All @@ -124,12 +135,14 @@

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
logger.trace("Starting next scroll call");
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
logger.debug("Clearing the scrollID {}", scrollId);
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
Expand Down Expand Up @@ -180,17 +193,31 @@
});
}

private void executeWithRetries(
Request request,
BiFunction<XContentParser, MediaType, Response> parser,
RejectAwareActionListener<Response> childListener
) {
execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> {
logger.debug("Retrying execute request {}", request.getEndpoint());
countSearchRetry.run();
execute(request, parser, r);
}, childListener));
}

private <T> void execute(
Request request,
BiFunction<XContentParser, MediaType, T> parser,
RejectAwareActionListener<? super T> listener
) {
logger.trace("Executing http request to remote cluster {}", request.getEndpoint());
// Preserve the thread context so headers survive after the call
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
try {
client.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
logger.trace("Successfully got response from the remote");
// Restore the thread context to get the precious headers
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
Expand All @@ -205,7 +232,7 @@
}
if (mediaType == null) {
try {
logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
throw new OpenSearchException(
"Response didn't include supported Content-Type, remote is likely not an OpenSearch instance"
);
Expand Down Expand Up @@ -237,22 +264,28 @@
public void onFailure(Exception e) {
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
logger.debug("Received response failure {}", e.getMessage());
if (e instanceof ResponseException) {
ResponseException re = (ResponseException) e;
int statusCode = re.getResponse().getStatusLine().getStatusCode();
e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re);
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) {
// retry all 5xx & 429s.
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode
|| statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
listener.onRejection(e);
return;
}
} else if (e instanceof ConnectException) {
ankitkala marked this conversation as resolved.
Show resolved Hide resolved
listener.onRejection(e);
return;

Check warning on line 280 in modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java

View check run for this annotation

Codecov / codecov/patch

modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java#L279-L280

Added lines #L279 - L280 were not covered by tests
} else if (e instanceof ContentTooLongException) {
e = new IllegalArgumentException(
"Remote responded with a chunk that was too large. Use a smaller batch size.",
e
);
}
listener.onFailure(e);
}
listener.onFailure(e);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.io.entity.InputStreamEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
Expand All @@ -57,6 +60,7 @@
import org.opensearch.Version;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.client.http.HttpUriRequestProducer;
import org.opensearch.client.nio.HeapBufferedAsyncResponseConsumer;
Expand All @@ -83,17 +87,21 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.mockito.Mockito;

import static org.opensearch.common.unit.TimeValue.timeValueMillis;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -515,7 +523,7 @@ public void testInvalidJsonThinksRemoteIsNotES() throws IOException {
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("some_text.txt").start());
assertEquals(
"Error parsing the response, remote is likely not an OpenSearch instance",
e.getCause().getCause().getCause().getMessage()
e.getCause().getCause().getCause().getCause().getMessage()
);
}

Expand All @@ -524,7 +532,7 @@ public void testUnexpectedJsonThinksRemoteIsNotES() throws IOException {
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("main/2_3_3.json").start());
assertEquals(
"Error parsing the response, remote is likely not an OpenSearch instance",
e.getCause().getCause().getCause().getMessage()
e.getCause().getCause().getCause().getCause().getMessage()
);
}

Expand Down Expand Up @@ -702,4 +710,105 @@ private static ClassicHttpRequest getRequest(AsyncRequestProducer requestProduce
assertThat(requestProducer, instanceOf(HttpUriRequestProducer.class));
return ((HttpUriRequestProducer) requestProducer).getRequest();
}

RemoteScrollableHitSource createRemoteSourceWithFailure(
boolean shouldMockRemoteVersion,
Exception failure,
AtomicInteger invocationCount
) {
CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() {

@Override
public void close() throws IOException {}

@Override
public void close(CloseMode closeMode) {}

@Override
public void start() {}

@Override
public void register(String hostname, String uriPattern, Supplier<AsyncPushConsumer> supplier) {}

@Override
public void initiateShutdown() {}

@Override
public IOReactorStatus getStatus() {
return null;
}

@Override
protected <T> Future<T> doExecute(
HttpHost target,
AsyncRequestProducer requestProducer,
AsyncResponseConsumer<T> responseConsumer,
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
HttpContext context,
FutureCallback<T> callback
) {
invocationCount.getAndIncrement();
callback.failed(failure);
return null;
}

@Override
public void awaitShutdown(org.apache.hc.core5.util.TimeValue waitTime) throws InterruptedException {}
};
return sourceWithMockedClient(shouldMockRemoteVersion, httpClient);
}

void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) {
retriesAllowed = 5;
AtomicInteger invocations = new AtomicInteger();
invocations.set(0);
RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations);

Throwable e = expectThrows(RuntimeException.class, source::start);
int expectedInvocations = 0;
if (shouldMockRemoteVersion) {
expectedInvocations += 1; // first search
if (expectedToRetry) expectedInvocations += retriesAllowed;
} else {
expectedInvocations = 1; // the first should fail and not trigger any retry.
}

assertEquals(expectedInvocations, invocations.get());

// Unwrap the some artifacts from the test
while (e.getMessage().equals("failed")) {
e = e.getCause();
}
// There is an additional wrapper for ResponseException.
if (failureResponse instanceof ResponseException) {
e = e.getCause();
}

assertSame(failureResponse, e);
}

ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException {
org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class);
Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN));
Mockito.when(mockResponse.getStatusLine()).thenReturn(new StatusLine(new BasicClassicHttpResponse(statusCode, errorMsg)));
Mockito.when(mockResponse.getRequestLine()).thenReturn(new RequestLine("GET", "/", new ProtocolVersion("https", 1, 1)));
return new ResponseException(mockResponse);
}

public void testRetryOnCallFailure() throws Exception {
// First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s.
verifyRetries(true, withResponseCode(500, "Internal Server Error"), true);
verifyRetries(true, withResponseCode(429, "Too many requests"), true);
verifyRetries(true, withResponseCode(400, "Client Error"), false);

// First call succeeds. Search call failed with exceptions other than ResponseException
verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions.
verifyRetries(true, new RuntimeException("foobar"), false);

// First call(remote version lookup) failed and no retries expected
verifyRetries(false, withResponseCode(500, "Internal Server Error"), false);
verifyRetries(false, withResponseCode(429, "Too many requests"), false);
verifyRetries(false, withResponseCode(400, "Client Error"), false);
verifyRetries(false, new ConnectException("blah"), false);
}
}
Loading
Loading