From 3b4e417a5a22252f9a152b3edb71d171e117d376 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Thu, 7 Mar 2024 20:05:49 +0530 Subject: [PATCH] Remote reindex: Add support for configurable retry mechanism Signed-off-by: Ankit Kala --- CHANGELOG.md | 1 + .../index/reindex/ReindexPlugin.java | 2 + .../opensearch/index/reindex/Reindexer.java | 4 +- .../index/reindex/TransportReindexAction.java | 28 ++++++ .../remote/RemoteScrollableHitSource.java | 47 ++++++++-- .../RemoteScrollableHitSourceTests.java | 94 ++++++++++++++++++- .../index/reindex/RetryListener.java | 4 +- 7 files changed, 168 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc7d6b7537261..211ceca1acb9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add kuromoji_completion analyzer and filter ([#4835](https://github.com/opensearch-project/OpenSearch/issues/4835)) - [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880)) - The org.opensearch.bootstrap.Security should support codebase for JAR files with classifiers ([#12586](https://github.com/opensearch-project/OpenSearch/issues/12586)) +- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561)) ### Dependencies - Bump `com.squareup.okio:okio` from 3.7.0 to 3.8.0 ([#12290](https://github.com/opensearch-project/OpenSearch/pull/12290)) diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexPlugin.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexPlugin.java index cab1a1dedb3b8..56bb09317cddd 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexPlugin.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexPlugin.java @@ -133,6 +133,8 @@ public List> getSettings() { final List> settings = new ArrayList<>(); settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST); settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST); + settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF); + settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT); settings.addAll(ReindexSslConfig.getSettings()); return settings; } diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java index 8e338dedc4ec8..b57cd411d7b20 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java @@ -52,6 +52,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; @@ -139,7 +140,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task); AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction( task, - logger, + // Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging. + Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())), assigningClient, threadPool, scriptService, diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java index e624b0619a26e..c9a970a4118b3 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java @@ -42,6 +42,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.reindex.spi.RemoteReindexExtension; import org.opensearch.script.ScriptService; @@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting( + "reindex.remote.retry.initial_backoff", + TimeValue.timeValueMillis(500), + TimeValue.timeValueMillis(50), + TimeValue.timeValueMillis(5000), + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting( + "reindex.remote.retry.max_count", + 15, + 1, + 100, + Property.Dynamic, + Property.NodeScope + ); + public static Optional remoteExtension = Optional.empty(); private final ReindexValidator reindexValidator; private final Reindexer reindexer; + private final ClusterService clusterService; + @Inject public TransportReindexAction( Settings settings, @@ -92,10 +114,16 @@ public TransportReindexAction( super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new); this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex); this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension); + this.clusterService = clusterService; } @Override protected void doExecute(Task task, ReindexRequest request, ActionListener listener) { + if (request.getRemoteInfo() != null) { + request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT)); + request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF)); + } + reindexValidator.initialValidation(request); BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; reindexer.initTask(bulkByScrollTask, request, new ActionListener() { diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java index dc1cddca4bd55..5aefbd8c998e3 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -60,11 +60,14 @@ import org.opensearch.core.xcontent.XContentParseException; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.reindex.RejectAwareActionListener; +import org.opensearch.index.reindex.RetryListener; import org.opensearch.index.reindex.ScrollableHitSource; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; +import java.net.ConnectException; +import java.util.Arrays; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -98,21 +101,29 @@ public RemoteScrollableHitSource( @Override protected void doStart(RejectAwareActionListener searchListener) { - lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> { + logger.info("Starting remote reindex for {}", Arrays.toString(searchRequest.indices())); + lookupRemoteVersion(RejectAwareActionListener.wrap(version -> { remoteVersion = version; - execute( + logger.trace("Starting initial search"); + executeWithRetries( RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion), RESPONSE_PARSER, RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r)) ); - })); + // Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll) + // level. + }, searchListener::onFailure, searchListener::onFailure)); } void lookupRemoteVersion(RejectAwareActionListener listener) { + logger.trace("Checking version for remote domain"); + // We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately + // instead of retrying for longer duration. execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener); } private void onStartResponse(RejectAwareActionListener searchListener, Response response) { + logger.trace("On initial search response"); if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) { logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId()); doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener); @@ -123,12 +134,14 @@ private void onStartResponse(RejectAwareActionListener searchListener, @Override protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { + logger.trace("Starting next scroll call"); TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()); - execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener); + executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener); } @Override protected void clearScroll(String scrollId, Runnable onCompletion) { + logger.debug("Clearing the scrollID {}", scrollId); client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() { @Override public void onSuccess(org.opensearch.client.Response response) { @@ -179,17 +192,31 @@ protected void cleanup(Runnable onCompletion) { }); } + private void executeWithRetries( + Request request, + BiFunction parser, + RejectAwareActionListener childListener + ) { + execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> { + logger.debug("Retrying execute request {}", request.getEndpoint()); + countSearchRetry.run(); + execute(request, parser, r); + }, childListener)); + } + private void execute( Request request, BiFunction parser, RejectAwareActionListener listener ) { + logger.trace("Executing http request to remote cluster {}", request.getEndpoint()); // Preserve the thread context so headers survive after the call java.util.function.Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(true); try { client.performRequestAsync(request, new ResponseListener() { @Override public void onSuccess(org.opensearch.client.Response response) { + logger.trace("Successfully got response from the remote"); // Restore the thread context to get the precious headers try (ThreadContext.StoredContext ctx = contextSupplier.get()) { assert ctx != null; // eliminates compiler warning @@ -204,7 +231,7 @@ public void onSuccess(org.opensearch.client.Response response) { } if (mediaType == null) { try { - logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity())); + logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity())); throw new OpenSearchException( "Response didn't include supported Content-Type, remote is likely not an OpenSearch instance" ); @@ -236,22 +263,28 @@ public void onSuccess(org.opensearch.client.Response response) { public void onFailure(Exception e) { try (ThreadContext.StoredContext ctx = contextSupplier.get()) { assert ctx != null; // eliminates compiler warning + logger.debug("Received response failure {}", e.getMessage()); if (e instanceof ResponseException) { ResponseException re = (ResponseException) e; int statusCode = re.getResponse().getStatusLine().getStatusCode(); e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re); - if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) { + // retry all 5xx & 429s. + if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode + || statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) { listener.onRejection(e); return; } + } else if (e instanceof ConnectException) { + listener.onRejection(e); + return; } else if (e instanceof ContentTooLongException) { e = new IllegalArgumentException( "Remote responded with a chunk that was too large. Use a smaller batch size.", e ); } - listener.onFailure(e); } + listener.onFailure(e); } }); } catch (Exception e) { diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index aabd1100859d7..cb38fffb5ab7b 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -36,6 +36,7 @@ import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.ProtocolVersion; import org.apache.http.StatusLine; @@ -47,15 +48,18 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.message.BasicHttpResponse; +import org.apache.http.message.BasicRequestLine; import org.apache.http.message.BasicStatusLine; import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import org.apache.http.protocol.HttpContext; import org.opensearch.LegacyESVersion; import org.opensearch.OpenSearchStatusException; import org.opensearch.Version; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.search.SearchRequest; import org.opensearch.client.HeapBufferedAsyncResponseConsumer; +import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.common.io.Streams; import org.opensearch.common.unit.TimeValue; @@ -79,6 +83,8 @@ import java.io.IOException; import java.io.InputStreamReader; +import java.io.UncheckedIOException; +import java.net.ConnectException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Queue; @@ -86,12 +92,14 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.mockito.Mockito; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; @@ -490,7 +498,7 @@ public void testInvalidJsonThinksRemoteIsNotES() throws IOException { Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("some_text.txt").start()); assertEquals( "Error parsing the response, remote is likely not an OpenSearch instance", - e.getCause().getCause().getCause().getMessage() + e.getCause().getCause().getCause().getCause().getMessage() ); } @@ -499,7 +507,7 @@ public void testUnexpectedJsonThinksRemoteIsNotES() throws IOException { Exception e = expectThrows(RuntimeException.class, () -> sourceWithMockedRemoteCall("main/2_3_3.json").start()); assertEquals( "Error parsing the response, remote is likely not an OpenSearch instance", - e.getCause().getCause().getCause().getMessage() + e.getCause().getCause().getCause().getCause().getMessage() ); } @@ -650,4 +658,86 @@ private T expectListenerFailure(Class expectedExcept assertNotNull(exception.get()); return exception.get(); } + + RemoteScrollableHitSource createRemoteSourceWithFailure( + boolean shouldMockRemoteVersion, + Exception failure, + AtomicInteger invocationCount + ) { + CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() { + @Override + public Future execute(HttpAsyncRequestProducer requestProducer, HttpAsyncResponseConsumer responseConsumer, HttpContext context, FutureCallback callback) { + invocationCount.getAndIncrement(); + callback.failed(failure); + return null; + } + + @Override + public void close() throws IOException {} + + @Override + public boolean isRunning() { + return false; + } + + @Override + public void start() {} + }; + return sourceWithMockedClient(shouldMockRemoteVersion, httpClient); + } + + void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) { + retriesAllowed = 5; + AtomicInteger invocations = new AtomicInteger(); + invocations.set(0); + RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations); + + Throwable e = expectThrows(RuntimeException.class, source::start); + int expectedInvocations = 0; + if (shouldMockRemoteVersion) { + expectedInvocations += 1; // first search + if (expectedToRetry) expectedInvocations += retriesAllowed; + } else { + expectedInvocations = 1; // the first should fail and not trigger any retry. + } + + assertEquals(expectedInvocations, invocations.get()); + + // Unwrap the some artifacts from the test + while (e.getMessage().equals("failed")) { + e = e.getCause(); + } + // There is an additional wrapper for ResponseException. + if (failureResponse instanceof ResponseException) { + e = e.getCause(); + } + + assertSame(failureResponse, e); + } + + ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException { + org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class); + ProtocolVersion protocolVersion = new ProtocolVersion("https", 1, 1); + Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN)); + Mockito.when(mockResponse.getStatusLine()).thenReturn(new BasicStatusLine(protocolVersion, statusCode, errorMsg)); + Mockito.when(mockResponse.getRequestLine()).thenReturn(new BasicRequestLine("GET", "/", protocolVersion)); + return new ResponseException(mockResponse); + } + + public void testRetryOnCallFailure() throws Exception { + // First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s. + verifyRetries(true, withResponseCode(500, "Internal Server Error"), true); + verifyRetries(true, withResponseCode(429, "Too many requests"), true); + verifyRetries(true, withResponseCode(400, "Client Error"), false); + + // First call succeeds. Search call failed with exceptions other than ResponseException + verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions. + verifyRetries(true, new RuntimeException("foobar"), false); + + // First call(remote version lookup) failed and no retries expected + verifyRetries(false, withResponseCode(500, "Internal Server Error"), false); + verifyRetries(false, withResponseCode(429, "Too many requests"), false); + verifyRetries(false, withResponseCode(400, "Client Error"), false); + verifyRetries(false, new ConnectException("blah"), false); + } } diff --git a/server/src/main/java/org/opensearch/index/reindex/RetryListener.java b/server/src/main/java/org/opensearch/index/reindex/RetryListener.java index 2b092539555e6..e46e675977268 100644 --- a/server/src/main/java/org/opensearch/index/reindex/RetryListener.java +++ b/server/src/main/java/org/opensearch/index/reindex/RetryListener.java @@ -47,7 +47,7 @@ * * @opensearch.internal */ -class RetryListener implements RejectAwareActionListener { +public class RetryListener implements RejectAwareActionListener { private final Logger logger; private final Iterator retries; private final ThreadPool threadPool; @@ -55,7 +55,7 @@ class RetryListener implements RejectAwareActionListener delegate; private int retryCount = 0; - RetryListener( + public RetryListener( Logger logger, ThreadPool threadPool, BackoffPolicy backoffPolicy,