From 2161a4fd8392050c5facb1773cb53311ac4f5db2 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Thu, 7 Mar 2024 20:05:49 +0530 Subject: [PATCH] Remote reindex: Add support for configurable retry mechanism Signed-off-by: Ankit Kala --- CHANGELOG.md | 1 + .../index/reindex/ReindexModulePlugin.java | 2 + .../opensearch/index/reindex/Reindexer.java | 4 +- .../index/reindex/TransportReindexAction.java | 28 +++++ .../remote/RemoteScrollableHitSource.java | 52 +++++++-- .../RemoteScrollableHitSourceTests.java | 109 ++++++++++++++++++ .../index/reindex/RetryListener.java | 6 +- 7 files changed, 188 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e91e766b6ab38..e581915ffd64e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028)) - Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957)) - Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986)) +- Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java index c211f937c1dd9..aa48da4cb2421 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/ReindexModulePlugin.java @@ -132,6 +132,8 @@ public List> getSettings() { final List> settings = new ArrayList<>(); settings.add(TransportReindexAction.REMOTE_CLUSTER_WHITELIST); settings.add(TransportReindexAction.REMOTE_CLUSTER_ALLOWLIST); + settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_INITIAL_BACKOFF); + settings.add(TransportReindexAction.REMOTE_REINDEX_RETRY_MAX_COUNT); settings.addAll(ReindexSslConfig.getSettings()); return settings; } diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java index 7181a512428ab..c553effc65ab5 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/Reindexer.java @@ -54,6 +54,7 @@ import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.logging.Loggers; import org.opensearch.common.lucene.uid.Versions; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; @@ -141,7 +142,8 @@ public void execute(BulkByScrollTask task, ReindexRequest request, ActionListene ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task); AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction( task, - logger, + // Added prefix based logger(destination index) to distinguish multiple reindex jobs for easier debugging. + Loggers.getLogger(Reindexer.class, String.valueOf(request.getDestination().index())), assigningClient, threadPool, scriptService, diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java index e624b0619a26e..c9a970a4118b3 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/TransportReindexAction.java @@ -42,6 +42,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.index.reindex.spi.RemoteReindexExtension; import org.opensearch.script.ScriptService; @@ -71,11 +72,32 @@ public class TransportReindexAction extends HandledTransportAction REMOTE_REINDEX_RETRY_INITIAL_BACKOFF = Setting.timeSetting( + "reindex.remote.retry.initial_backoff", + TimeValue.timeValueMillis(500), + TimeValue.timeValueMillis(50), + TimeValue.timeValueMillis(5000), + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting REMOTE_REINDEX_RETRY_MAX_COUNT = Setting.intSetting( + "reindex.remote.retry.max_count", + 15, + 1, + 100, + Property.Dynamic, + Property.NodeScope + ); + public static Optional remoteExtension = Optional.empty(); private final ReindexValidator reindexValidator; private final Reindexer reindexer; + private final ClusterService clusterService; + @Inject public TransportReindexAction( Settings settings, @@ -92,10 +114,16 @@ public TransportReindexAction( super(ReindexAction.NAME, transportService, actionFilters, ReindexRequest::new); this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex); this.reindexer = new Reindexer(clusterService, client, threadPool, scriptService, sslConfig, remoteExtension); + this.clusterService = clusterService; } @Override protected void doExecute(Task task, ReindexRequest request, ActionListener listener) { + if (request.getRemoteInfo() != null) { + request.setMaxRetries(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_MAX_COUNT)); + request.setRetryBackoffInitialTime(clusterService.getClusterSettings().get(REMOTE_REINDEX_RETRY_INITIAL_BACKOFF)); + } + reindexValidator.initialValidation(request); BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task; reindexer.initTask(bulkByScrollTask, request, new ActionListener() { diff --git a/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java index 5b95ed4b9915b..c5f8b072b7573 100644 --- a/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -61,11 +61,14 @@ import org.opensearch.core.xcontent.XContentParseException; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.index.reindex.RejectAwareActionListener; +import org.opensearch.index.reindex.RetryListener; import org.opensearch.index.reindex.ScrollableHitSource; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; +import java.net.ConnectException; +import java.util.Arrays; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -99,23 +102,31 @@ public RemoteScrollableHitSource( @Override protected void doStart(RejectAwareActionListener searchListener) { - lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> { + logger.trace("Starting remote reindex for {}", Arrays.toString(searchRequest.indices())); + lookupRemoteVersion(RejectAwareActionListener.wrap(version -> { remoteVersion = version; - execute( + logger.trace("Starting initial search"); + executeWithRetries( RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion), RESPONSE_PARSER, RejectAwareActionListener.withResponseHandler(searchListener, r -> onStartResponse(searchListener, r)) ); - })); + // Skipping searchListener::onRejection(used for retries) for remote source as we've configured retries at request(scroll) + // level. + }, searchListener::onFailure, searchListener::onFailure)); } void lookupRemoteVersion(RejectAwareActionListener listener) { + logger.trace("Checking version for remote domain"); + // We're skipping retries for the first call to remote cluster so that we fail fast & respond back immediately + // instead of retrying for longer duration. execute(new Request("GET", ""), MAIN_ACTION_PARSER, listener); } private void onStartResponse(RejectAwareActionListener searchListener, Response response) { + logger.trace("On initial search response"); if (Strings.hasLength(response.getScrollId()) && response.getHits().isEmpty()) { - logger.debug("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId()); + logger.trace("First response looks like a scan response. Jumping right to the second. scroll=[{}]", response.getScrollId()); doStartNextScroll(response.getScrollId(), timeValueMillis(0), searchListener); } else { searchListener.onResponse(response); @@ -124,16 +135,18 @@ private void onStartResponse(RejectAwareActionListener searchListener, @Override protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { + logger.trace("Starting next scroll call"); TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()); - execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener); + executeWithRetries(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener); } @Override protected void clearScroll(String scrollId, Runnable onCompletion) { + logger.trace("Clearing the scrollID {}", scrollId); client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() { @Override public void onSuccess(org.opensearch.client.Response response) { - logger.debug("Successfully cleared [{}]", scrollId); + logger.trace("Successfully cleared [{}]", scrollId); onCompletion.run(); } @@ -171,7 +184,7 @@ protected void cleanup(Runnable onCompletion) { threadPool.generic().submit(() -> { try { client.close(); - logger.debug("Shut down remote connection"); + logger.trace("Shut down remote connection"); } catch (IOException e) { logger.error("Failed to shutdown the remote connection", e); } finally { @@ -180,17 +193,30 @@ protected void cleanup(Runnable onCompletion) { }); } + private void executeWithRetries( + Request request, + BiFunction parser, + RejectAwareActionListener childListener + ) { + execute(request, parser, new RetryListener(logger, threadPool, backoffPolicy, r -> { + logger.trace("Retrying execute request"); + execute(request, parser, r); + }, childListener)); + } + private void execute( Request request, BiFunction parser, RejectAwareActionListener listener ) { + logger.trace("Executing http request to remote cluster"); // Preserve the thread context so headers survive after the call java.util.function.Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(true); try { client.performRequestAsync(request, new ResponseListener() { @Override public void onSuccess(org.opensearch.client.Response response) { + logger.trace("Successfully got response from the remote"); // Restore the thread context to get the precious headers try (ThreadContext.StoredContext ctx = contextSupplier.get()) { assert ctx != null; // eliminates compiler warning @@ -205,7 +231,7 @@ public void onSuccess(org.opensearch.client.Response response) { } if (mediaType == null) { try { - logger.debug("Response didn't include Content-Type: " + bodyMessage(response.getEntity())); + logger.error("Response didn't include Content-Type: " + bodyMessage(response.getEntity())); throw new OpenSearchException( "Response didn't include supported Content-Type, remote is likely not an OpenSearch instance" ); @@ -237,22 +263,28 @@ public void onSuccess(org.opensearch.client.Response response) { public void onFailure(Exception e) { try (ThreadContext.StoredContext ctx = contextSupplier.get()) { assert ctx != null; // eliminates compiler warning + logger.trace("Received response failure {}", e.getMessage()); if (e instanceof ResponseException) { ResponseException re = (ResponseException) e; int statusCode = re.getResponse().getStatusLine().getStatusCode(); e = wrapExceptionToPreserveStatus(statusCode, re.getResponse().getEntity(), re); - if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode) { + // retry all 5xx & 429s. + if (RestStatus.TOO_MANY_REQUESTS.getStatus() == statusCode + || statusCode >= RestStatus.INTERNAL_SERVER_ERROR.getStatus()) { listener.onRejection(e); return; } + } else if (e instanceof ConnectException) { + listener.onRejection(e); + return; } else if (e instanceof ContentTooLongException) { e = new IllegalArgumentException( "Remote responded with a chunk that was too large. Use a smaller batch size.", e ); } - listener.onFailure(e); } + listener.onFailure(e); } }); } catch (Exception e) { diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index f15d0a3c23a5e..b5ac9b9f5369d 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -42,9 +42,12 @@ import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.HttpEntity; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.io.entity.InputStreamEntity; import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.http.message.BasicClassicHttpResponse; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer; @@ -57,6 +60,7 @@ import org.opensearch.Version; import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.search.SearchRequest; +import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.client.http.HttpUriRequestProducer; import org.opensearch.client.nio.HeapBufferedAsyncResponseConsumer; @@ -83,6 +87,7 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.UncheckedIOException; +import java.net.ConnectException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Queue; @@ -90,10 +95,13 @@ import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Stream; +import org.mockito.Mockito; + import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueMinutes; import static org.hamcrest.Matchers.empty; @@ -702,4 +710,105 @@ private static ClassicHttpRequest getRequest(AsyncRequestProducer requestProduce assertThat(requestProducer, instanceOf(HttpUriRequestProducer.class)); return ((HttpUriRequestProducer) requestProducer).getRequest(); } + + RemoteScrollableHitSource createRemoteSourceWithFailure( + boolean shouldMockRemoteVersion, + Exception failure, + AtomicInteger invocationCount + ) { + CloseableHttpAsyncClient httpClient = new CloseableHttpAsyncClient() { + + @Override + public void close() throws IOException {} + + @Override + public void close(CloseMode closeMode) {} + + @Override + public void start() {} + + @Override + public void register(String hostname, String uriPattern, Supplier supplier) {} + + @Override + public void initiateShutdown() {} + + @Override + public IOReactorStatus getStatus() { + return null; + } + + @Override + protected Future doExecute( + HttpHost target, + AsyncRequestProducer requestProducer, + AsyncResponseConsumer responseConsumer, + HandlerFactory pushHandlerFactory, + HttpContext context, + FutureCallback callback + ) { + invocationCount.getAndIncrement(); + callback.failed(failure); + return null; + } + + @Override + public void awaitShutdown(org.apache.hc.core5.util.TimeValue waitTime) throws InterruptedException {} + }; + return sourceWithMockedClient(shouldMockRemoteVersion, httpClient); + } + + void verifyRetries(boolean shouldMockRemoteVersion, Exception failureResponse, boolean expectedToRetry) { + retriesAllowed = 5; + AtomicInteger invocations = new AtomicInteger(); + invocations.set(0); + RemoteScrollableHitSource source = createRemoteSourceWithFailure(shouldMockRemoteVersion, failureResponse, invocations); + + Throwable e = expectThrows(RuntimeException.class, source::start); + int expectedInvocations = 0; + if (shouldMockRemoteVersion) { + expectedInvocations += 1; // first search + if (expectedToRetry) expectedInvocations += retriesAllowed; + } else { + expectedInvocations = 1; // the first should fail and not trigger any retry. + } + + assertEquals(expectedInvocations, invocations.get()); + + // Unwrap the some artifacts from the test + while (e.getMessage().equals("failed")) { + e = e.getCause(); + } + // There is an additional wrapper for ResponseException. + if (failureResponse instanceof ResponseException) { + e = e.getCause(); + } + + assertSame(failureResponse, e); + } + + ResponseException withResponseCode(int statusCode, String errorMsg) throws IOException { + org.opensearch.client.Response mockResponse = Mockito.mock(org.opensearch.client.Response.class); + Mockito.when(mockResponse.getEntity()).thenReturn(new StringEntity(errorMsg, ContentType.TEXT_PLAIN)); + Mockito.when(mockResponse.getStatusLine()).thenReturn(new StatusLine(new BasicClassicHttpResponse(statusCode, errorMsg))); + Mockito.when(mockResponse.getRequestLine()).thenReturn(new RequestLine("GET", "/", new ProtocolVersion("https", 1, 1))); + return new ResponseException(mockResponse); + } + + public void testRetryOnCallFailure() throws Exception { + // First call succeeds. Search calls failing with 5xxs and 429s should be retried but not 400s. + verifyRetries(true, withResponseCode(500, "Internal Server Error"), true); + verifyRetries(true, withResponseCode(429, "Too many requests"), true); + verifyRetries(true, withResponseCode(400, "Client Error"), false); + + // First call succeeds. Search call failed with exceptions other than ResponseException + verifyRetries(true, new ConnectException("blah"), true); // should retry connect exceptions. + verifyRetries(true, new RuntimeException("foobar"), false); + + // First call(remote version lookup) failed and no retries expected + verifyRetries(false, withResponseCode(500, "Internal Server Error"), false); + verifyRetries(false, withResponseCode(429, "Too many requests"), false); + verifyRetries(false, withResponseCode(400, "Client Error"), false); + verifyRetries(false, new ConnectException("blah"), false); + } } diff --git a/server/src/main/java/org/opensearch/index/reindex/RetryListener.java b/server/src/main/java/org/opensearch/index/reindex/RetryListener.java index 2b092539555e6..b064709720bf7 100644 --- a/server/src/main/java/org/opensearch/index/reindex/RetryListener.java +++ b/server/src/main/java/org/opensearch/index/reindex/RetryListener.java @@ -47,7 +47,7 @@ * * @opensearch.internal */ -class RetryListener implements RejectAwareActionListener { +public class RetryListener implements RejectAwareActionListener { private final Logger logger; private final Iterator retries; private final ThreadPool threadPool; @@ -55,7 +55,7 @@ class RetryListener implements RejectAwareActionListener delegate; private int retryCount = 0; - RetryListener( + public RetryListener( Logger logger, ThreadPool threadPool, BackoffPolicy backoffPolicy, @@ -84,7 +84,7 @@ public void onRejection(Exception e) { if (retries.hasNext()) { retryCount += 1; TimeValue delay = retries.next(); - logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); + logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay, e.getMessage())); schedule(() -> retryScrollHandler.accept(this), delay); } else { logger.warn(() -> new ParameterizedMessage("giving up on search because we retried [{}] times without success", retryCount), e);