Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x]Remote reindex: Add support for configurable retry mechanism #12634

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835))
- [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880))
- The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586))
- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561))
- Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public List<Setting<?>> getSettings() {
final List<Setting<?>> settings = new ArrayList<>();
settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST);
settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF);
settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT);
settings.addAll(ReindexSslConfig.getSettings());
return settings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -139,7 +140,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(
task,
logger,
// Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging.
Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())),
assigningClient,
threadPool,
scriptService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.reindex.spi.RemoteReindexExtension;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
Function.identity(),
Property.NodeScope
);

public static final Setting<TimeValue> REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting(
"reindex.remote.retry.initial_backoff",
TimeValue.timeValueMillis(500),
TimeValue.timeValueMillis(50),
TimeValue.timeValueMillis(5000),
Property.Dynamic,
Property.NodeScope
);

public static final Setting<Integer> REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting(
"reindex.remote.retry.max_count",
15,
1,
100,
Property.Dynamic,
Property.NodeScope
);

public static Optional<RemoteReindexExtension> remoteExtension = Optional.empty();

private final ReindexValidator reindexValidator;
private final Reindexer reindexer;

private final ClusterService clusterService;

@Inject
public TransportReindexAction(
Settings settings,
Expand All @@ -92,10 +114,16 @@ public TransportReindexAction(
super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new);
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension);
this.clusterService = clusterService;
}

@Override
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
if (request.getRemoteInfo() != null) {
request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT));
request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF));
}

reindexValidator.initialValidation(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.reindex.RejectAwareActionListener;
import org.opensearch.index.reindex.RetryListener;
import org.opensearch.index.reindex.ScrollableHitSource;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
import java.util.Arrays;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -98,21 +101,29 @@

@Override
protected void doStart(RejectAwareActionListener<Response> searchListener) {
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
logger.info("Starting remote reindex for {}", Arrays.toString(searchRequest.indices()));
lookupRemoteVersion(RejectAwareActionListener.wrap(version -> {
remoteVersion = version;
execute(
logger.trace("Starting initial search");
executeWithRetries(
RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
RESPONSE_PARSER,
RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r))
);
}));
// Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll)
// level.
}, searchListener::onFailure, searchListener::onFailure));
}

void lookupRemoteVersion(RejectAwareActionListener<Version> listener) {
logger.trace("Checking version for remote domain");
// We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately
// instead of retrying for longer duration.
execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener);
}

private void onStartResponse(RejectAwareActionListener<Response> searchListener, Response response) {
logger.trace("On initial search response");
if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) {
logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId());
doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener);
Expand All @@ -123,12 +134,14 @@

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
logger.trace("Starting next scroll call");
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
logger.debug("Clearing the scrollID {}", scrollId);
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
Expand Down Expand Up @@ -179,17 +192,31 @@
});
}

private void executeWithRetries(
Request request,
BiFunction<XContentParser, MediaType, Response> parser,
RejectAwareActionListener<Response> childListener
) {
execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> {
logger.debug("Retrying execute request {}", request.getEndpoint());
countSearchRetry.run();
execute(request, parser, r);
}, childListener));
}

private <T> void execute(
Request request,
BiFunction<XContentParser, MediaType, T> parser,
RejectAwareActionListener<? super T> listener
) {
logger.trace("Executing http request to remote cluster {}", request.getEndpoint());
// Preserve the thread context so headers survive after the call
java.util.function.Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
try {
client.performRequestAsync(request, new ResponseListener() {
@Override
public void onSuccess(org.opensearch.client.Response response) {
logger.trace("Successfully got response from the remote");
// Restore the thread context to get the precious headers
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
Expand All @@ -204,7 +231,7 @@
}
if (mediaType == null) {
try {
logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity()));
throw new OpenSearchException(
"Response didn't include supported Content-Type, remote is likely not an OpenSearch instance"
);
Expand Down Expand Up @@ -236,22 +263,28 @@
public void onFailure(Exception e) {
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
assert ctx != null; // eliminates compiler warning
logger.debug("Received response failure {}", e.getMessage());
if (e instanceof ResponseException) {
ResponseException re = (ResponseException) e;
int statusCode = re.getResponse().getStatusLine().getStatusCode();
e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re);
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) {
// retry all 5xx & 429s.
if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode
|| statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) {
listener.onRejection(e);
return;
}
} else if (e instanceof ConnectException) {
listener.onRejection(e);
return;

Check warning on line 279 in modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java

View check run for this annotation

Codecov / codecov/patch

modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java#L278-L279

Added lines #L278 - L279 were not covered by tests
} else if (e instanceof ContentTooLongException) {
e = new IllegalArgumentException(
"Remote responded with a chunk that was too large. Use a smaller batch size.",
e
);
}
listener.onFailure(e);
}
listener.onFailure(e);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicRequestLine;
import org.apache.http.message.BasicStatusLine;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.opensearch.LegacyESVersion;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.Version;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.client.HeapBufferedAsyncResponseConsumer;
import org.opensearch.client.ResponseException;
import org.opensearch.client.RestClient;
import org.opensearch.common.io.Streams;
import org.opensearch.common.unit.TimeValue;
Expand All @@ -79,17 +82,20 @@

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;

import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

Expand Down Expand Up @@ -490,7 +496,7 @@ public void testInvalidJsonThinksRemoteIsNotES() throws IOException {
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("some_text.txt").start());
assertEquals(
"Error parsing the response, remote is likely not an OpenSearch instance",
e.getCause().getCause().getCause().getMessage()
e.getCause().getCause().getCause().getCause().getMessage()
);
}

Expand All @@ -499,7 +505,7 @@ public void testUnexpectedJsonThinksRemoteIsNotES() throws IOException {
Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("main/2_3_3.json").start());
assertEquals(
"Error parsing the response, remote is likely not an OpenSearch instance",
e.getCause().getCause().getCause().getMessage()
e.getCause().getCause().getCause().getCause().getMessage()
);
}

Expand Down Expand Up @@ -650,4 +656,91 @@ private <T extends Exception, V> T expectListenerFailure(Class<T> expectedExcept
assertNotNull(exception.get());
return exception.get();
}

RemoteScrollableHitSource createRemoteSourceWithFailure(
boolean shouldMockRemoteVersion,
Exception failure,
AtomicInteger invocationCount
) {
CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() {
@Override
public <T> Future<T> execute(
HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer<T> responseConsumer,
HttpContext context,
FutureCallback<T> callback
) {
invocationCount.getAndIncrement();
callback.failed(failure);
return null;
}

@Override
public void close() throws IOException {}

@Override
public boolean isRunning() {
return false;
}

@Override
public void start() {}
};
return sourceWithMockedClient(shouldMockRemoteVersion, httpClient);
}

void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) {
retriesAllowed = 5;
AtomicInteger invocations = new AtomicInteger();
invocations.set(0);
RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations);

Throwable e = expectThrows(RuntimeException.class, source::start);
int expectedInvocations = 0;
if (shouldMockRemoteVersion) {
expectedInvocations += 1; // first search
if (expectedToRetry) expectedInvocations += retriesAllowed;
} else {
expectedInvocations = 1; // the first should fail and not trigger any retry.
}

assertEquals(expectedInvocations, invocations.get());

// Unwrap the some artifacts from the test
while (e.getMessage().equals("failed")) {
e = e.getCause();
}
// There is an additional wrapper for ResponseException.
if (failureResponse instanceof ResponseException) {
e = e.getCause();
}

assertSame(failureResponse, e);
}

ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException {
org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class);
ProtocolVersion protocolVersion = new ProtocolVersion("https", 1, 1);
Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN));
Mockito.when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(protocolVersion, statusCode, errorMsg));
Mockito.when(mockResponse.getRequestLine()).thenReturn(new BasicRequestLine("GET", "/", protocolVersion));
return new ResponseException(mockResponse);
}

public void testRetryOnCallFailure() throws Exception {
// First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s.
verifyRetries(true, withResponseCode(500, "Internal Server Error"), true);
verifyRetries(true, withResponseCode(429, "Too many requests"), true);
verifyRetries(true, withResponseCode(400, "Client Error"), false);

// First call succeeds. Search call failed with exceptions other than ResponseException
verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions.
verifyRetries(true, new RuntimeException("foobar"), false);

// First call(remote version lookup) failed and no retries expected
verifyRetries(false, withResponseCode(500, "Internal Server Error"), false);
verifyRetries(false, withResponseCode(429, "Too many requests"), false);
verifyRetries(false, withResponseCode(400, "Client Error"), false);
verifyRetries(false, new ConnectException("blah"), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
*
* @opensearch.internal
*/
class RetryListener implements RejectAwareActionListener<ScrollableHitSource.Response> {
public class RetryListener implements RejectAwareActionListener<ScrollableHitSource.Response> {
private final Logger logger;
private final Iterator<TimeValue> retries;
private final ThreadPool threadPool;
private final Consumer<RejectAwareActionListener<ScrollableHitSource.Response>> retryScrollHandler;
private final ActionListener<ScrollableHitSource.Response> delegate;
private int retryCount = 0;

RetryListener(
public RetryListener(
Logger logger,
ThreadPool threadPool,
BackoffPolicy backoffPolicy,
Expand Down
Loading