From b1e69ff21bc7f92dd6f14175ac05dd6cb6a67b18 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 14 Sep 2023 14:13:00 -0700 Subject: [PATCH 01/14] AwaitFix EsqlActionTaskIT Relates #99589 --- .../org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 92b81057ec698..ff6a7b15c3462 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.esql.action; +import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; @@ -65,6 +66,7 @@ value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.tasks.TaskCancellationService:TRACE", reason = "These tests are failing frequently; we need logs before muting them" ) +@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/99589") public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase { private static int PAGE_SIZE; private static int NUM_DOCS; From bba26ae388a706c0de0458f43ae6cea9fbef6540 Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Thu, 14 Sep 2023 14:32:28 -0700 Subject: [PATCH 02/14] Add rest filtering to RestExtension (#99559) This commit modifies the recently added RestExtension so that it can also filter rest handlers. The construction of the extension must change slightly so that the extension may bind to elements from a plugin (eg ThreadPool to get ThreadContext) which it would not have in a pure static environment of Java's SPI. --- .../elasticsearch/action/ActionModule.java | 24 ++++--------- .../java/org/elasticsearch/node/Node.java | 4 ++- .../elasticsearch/plugins/PluginsService.java | 22 ++++++++++++ .../plugins/internal/RestExtension.java | 36 +++++++++++++------ .../action/ActionModuleTests.java | 16 ++++++--- .../AbstractHttpServerTransportTests.java | 4 ++- .../xpack/security/SecurityTests.java | 4 ++- 7 files changed, 73 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 5683ace63ba3b..31355aac21d67 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -510,6 +510,7 @@ public class ActionModule extends AbstractModule { private final ThreadPool threadPool; private final ReservedClusterStateService reservedClusterStateService; private final boolean serverlessEnabled; + private final RestExtension restExtension; public ActionModule( Settings settings, @@ -525,7 +526,8 @@ public ActionModule( SystemIndices systemIndices, Tracer tracer, ClusterService clusterService, - List> reservedStateHandlers + List> reservedStateHandlers, + RestExtension restExtension ) { this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver; @@ -572,6 +574,7 @@ public ActionModule( restController = new RestController(restInterceptor, nodeClient, circuitBreakerService, usageService, tracer); } reservedClusterStateService = new ReservedClusterStateService(clusterService, reservedStateHandlers); + this.restExtension = restExtension; } private static T getRestServerComponent( @@ -851,15 +854,10 @@ private static ActionFilters setupActionFilters(List actionPlugins public void initRestHandlers(Supplier nodesInCluster) { List catActions = new ArrayList<>(); - var restExtension = RestExtension.load(() -> new RestExtension() { - @Override - public Predicate getCatActionsFilter() { - return action -> true; - } - }); Predicate catActionsFilter = restExtension.getCatActionsFilter(); + Predicate restFilter = restExtension.getActionsFilter(); Consumer registerHandler = handler -> { - if (shouldKeepRestHandler(handler)) { + if (restFilter.test(handler)) { if (handler instanceof AbstractCatAction catAction && catActionsFilter.test(catAction)) { catActions.add(catAction); } @@ -1066,16 +1064,6 @@ public Predicate getCatActionsFilter() { registerHandler.accept(new RestDeleteSynonymRuleAction()); } - /** - * This method is used to determine whether a RestHandler ought to be kept in memory or not. Returns true if serverless mode is - * disabled, or if there is any ServlerlessScope annotation on the RestHandler. - * @param handler - * @return - */ - private boolean shouldKeepRestHandler(final RestHandler handler) { - return serverlessEnabled == false || handler.getServerlessScope() != null; - } - @Override protected void configure() { bind(ActionFilters.class).toInstance(actionFilters); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 111dc5ec72165..1ae3aaa9e09db 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -183,6 +183,7 @@ import org.elasticsearch.plugins.internal.DocumentParsingObserver; import org.elasticsearch.plugins.internal.DocumentParsingObserverPlugin; import org.elasticsearch.plugins.internal.ReloadAwarePlugin; +import org.elasticsearch.plugins.internal.RestExtension; import org.elasticsearch.plugins.internal.SettingsExtension; import org.elasticsearch.readiness.ReadinessService; import org.elasticsearch.repositories.RepositoriesModule; @@ -814,7 +815,8 @@ protected Node( systemIndices, tracer, clusterService, - reservedStateHandlers + reservedStateHandlers, + pluginsService.loadSingletonServiceProvider(RestExtension.class, RestExtension::allowAll) ); modules.add(actionModule); diff --git a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java index be8f9b303f4eb..96f3eedde165c 100644 --- a/server/src/main/java/org/elasticsearch/plugins/PluginsService.java +++ b/server/src/main/java/org/elasticsearch/plugins/PluginsService.java @@ -59,6 +59,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -345,6 +346,27 @@ public List loadServiceProviders(Class service) { return Collections.unmodifiableList(result); } + /** + * Loads a single SPI extension. + * + * There should be no more than one extension found. If no service providers + * are found, the supplied fallback is used. + * + * @param service the SPI class that should be loaded + * @param fallback a supplier for an instance if no providers are found + * @return an instance of the service + * @param the SPI service type + */ + public T loadSingletonServiceProvider(Class service, Supplier fallback) { + var services = loadServiceProviders(service); + if (services.size() > 1) { + throw new IllegalStateException(String.format(Locale.ROOT, "More than one extension found for %s", service.getSimpleName())); + } else if (services.size() == 0) { + return fallback.get(); + } + return services.get(0); + } + private static void loadExtensionsForPlugin(ExtensiblePlugin extensiblePlugin, List extendingPlugins) { ExtensiblePlugin.ExtensionLoader extensionLoader = new ExtensiblePlugin.ExtensionLoader() { @Override diff --git a/server/src/main/java/org/elasticsearch/plugins/internal/RestExtension.java b/server/src/main/java/org/elasticsearch/plugins/internal/RestExtension.java index da5de4f784a22..4864e6bf31222 100644 --- a/server/src/main/java/org/elasticsearch/plugins/internal/RestExtension.java +++ b/server/src/main/java/org/elasticsearch/plugins/internal/RestExtension.java @@ -8,11 +8,10 @@ package org.elasticsearch.plugins.internal; +import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.action.cat.AbstractCatAction; -import java.util.ServiceLoader; import java.util.function.Predicate; -import java.util.function.Supplier; public interface RestExtension { /** @@ -23,14 +22,29 @@ public interface RestExtension { */ Predicate getCatActionsFilter(); - static RestExtension load(Supplier fallback) { - var loader = ServiceLoader.load(RestExtension.class); - var extensions = loader.stream().toList(); - if (extensions.size() > 1) { - throw new IllegalStateException("More than one rest extension found"); - } else if (extensions.size() == 0) { - return fallback.get(); - } - return extensions.get(0).get(); + /** + * Returns a filter that determines which rest actions are exposed. + * + * The filter should return {@code false} if an action should be included, + * or {@code false} if the paths + * @return + */ + Predicate getActionsFilter(); + + /** + * Returns a rest extension which allows all rest endpoints through. + */ + static RestExtension allowAll() { + return new RestExtension() { + @Override + public Predicate getCatActionsFilter() { + return action -> true; + } + + @Override + public Predicate getActionsFilter() { + return handler -> true; + } + }; } } diff --git a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java index 6975cc22adaa9..1279ea810f0a6 100644 --- a/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java +++ b/server/src/test/java/org/elasticsearch/action/ActionModuleTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ActionPlugin.ActionHandler; import org.elasticsearch.plugins.interceptor.RestServerActionPlugin; +import org.elasticsearch.plugins.internal.RestExtension; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -123,7 +124,8 @@ public void testSetupRestHandlerContainsKnownBuiltin() { null, null, mock(ClusterService.class), - List.of() + List.of(), + RestExtension.allowAll() ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -182,7 +184,8 @@ public String getName() { null, null, mock(ClusterService.class), - List.of() + List.of(), + RestExtension.allowAll() ); Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null)); assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET")); @@ -234,7 +237,8 @@ public List getRestHandlers( null, null, mock(ClusterService.class), - List.of() + List.of(), + RestExtension.allowAll() ); actionModule.initRestHandlers(null); // At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail @@ -281,7 +285,8 @@ public void test3rdPartyHandlerIsNotInstalled() { null, null, mock(ClusterService.class), - List.of() + List.of(), + RestExtension.allowAll() ) ); assertThat( @@ -319,7 +324,8 @@ public void test3rdPartyRestControllerIsNotInstalled() { null, null, mock(ClusterService.class), - List.of() + List.of(), + RestExtension.allowAll() ) ); assertThat( diff --git a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java index e32fa310ec5c8..b8d102db7e8ae 100644 --- a/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java +++ b/server/src/test/java/org/elasticsearch/http/AbstractHttpServerTransportTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.internal.RestExtension; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestControllerTests; import org.elasticsearch.rest.RestHeaderDefinition; @@ -1152,7 +1153,8 @@ public Collection getRestHeaders() { null, null, mock(ClusterService.class), - List.of() + List.of(), + RestExtension.allowAll() ); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 72f3c2be7087f..fa7ff7b12994c 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.internal.DocumentParsingObserver; +import org.elasticsearch.plugins.internal.RestExtension; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestHandler; import org.elasticsearch.rest.RestRequest; @@ -775,7 +776,8 @@ public void testSecurityRestHandlerInterceptorCanBeInstalled() throws IllegalAcc null, Tracer.NOOP, mock(ClusterService.class), - List.of() + List.of(), + RestExtension.allowAll() ); actionModule.initRestHandlers(null); From 460c2ee0f6e6f915f3266994206c894ee05d73d9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 14 Sep 2023 14:58:33 -0700 Subject: [PATCH 03/14] Avoid using query pragmas in ESQL tests (#99600) This PR avoids enabling QueryPragmas in ESQL tests since it's only available in the snapshot build. --- .../xpack/esql/action/AbstractEsqlIntegTestCase.java | 6 ++++-- .../elasticsearch/xpack/esql/action/EsqlActionIT.java | 3 +-- .../xpack/esql/action/EsqlActionTaskIT.java | 1 + .../elasticsearch/xpack/esql/action/ManyShardsIT.java | 10 +++++----- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java index 9598d8d4498cb..2c904a39129db 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractEsqlIntegTestCase.java @@ -85,8 +85,7 @@ protected EsqlQueryResponse run(EsqlQueryRequest request) { protected static QueryPragmas randomPragmas() { Settings.Builder settings = Settings.builder(); - // pragmas are only enabled on snapshot builds - if (Build.current().isSnapshot()) { + if (canUseQueryPragmas()) { if (randomBoolean()) { settings.put("task_concurrency", randomLongBetween(1, 10)); } @@ -118,4 +117,7 @@ protected static QueryPragmas randomPragmas() { return new QueryPragmas(settings.build()); } + protected static boolean canUseQueryPragmas() { + return Build.current().isSnapshot(); + } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index c567edda97018..fdc83161dcd06 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -445,8 +445,7 @@ public void testFromEvalStats() { assertEquals(0.034d, (double) getValuesList(results).get(0).get(0), 0.001d); } - public void testFromStatsEvalWithPragma() { - assumeTrue("pragmas only enabled on snapshot builds", Build.current().isSnapshot()); + public void testFromStatsThenEval() { EsqlQueryResponse results = run("from test | stats avg_count = avg(count) | eval x = avg_count + 7"); logger.info(results); Assert.assertEquals(1, getValuesList(results).size()); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index ff6a7b15c3462..16f704aa8f7c3 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -82,6 +82,7 @@ protected Collection> nodePlugins() { @Before public void setupIndex() throws IOException { + assumeTrue("requires query pragmas", canUseQueryPragmas()); PAGE_SIZE = between(10, 100); NUM_DOCS = between(4 * PAGE_SIZE, 5 * PAGE_SIZE); READ_DESCRIPTION = """ diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java index a95f601d88ca0..c1476c8c52de5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ManyShardsIT.java @@ -59,11 +59,11 @@ public void testConcurrentQueries() throws Exception { } catch (InterruptedException e) { throw new AssertionError(e); } - var pragmas = Settings.builder() - .put(randomPragmas().getSettings()) - .put("exchange_concurrent_clients", between(1, 2)) - .build(); - run("from test-* | stats count(user) by tags", new QueryPragmas(pragmas)); + final var pragmas = Settings.builder(); + if (canUseQueryPragmas()) { + pragmas.put(randomPragmas().getSettings()).put("exchange_concurrent_clients", between(1, 2)); + } + run("from test-* | stats count(user) by tags", new QueryPragmas(pragmas.build())); }); } for (Thread thread : threads) { From 89d11c66ebbe4959ee6a5a6ec9fb9b5435b9d58a Mon Sep 17 00:00:00 2001 From: Ryan Ernst Date: Thu, 14 Sep 2023 21:58:27 -0700 Subject: [PATCH 04/14] Do not run die-with-dignity test on release builds (#99440) The test external modules only exist in snapshot builds. This commit fixes the module test to only run on snapshot builds. fixes #99402 --- test/external-modules/die-with-dignity/build.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/external-modules/die-with-dignity/build.gradle b/test/external-modules/die-with-dignity/build.gradle index 93582c9decb86..999b81af027b3 100644 --- a/test/external-modules/die-with-dignity/build.gradle +++ b/test/external-modules/die-with-dignity/build.gradle @@ -19,3 +19,7 @@ tasks.named("test").configure { tasks.named("yamlRestTest").configure { enabled = false } + +tasks.named('javaRestTest').configure { + it.onlyIf("snapshot build") { BuildParams.isSnapshotBuild() } +} From cb42b964e53f725bb41ac93108d5afe8abe7be7a Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Fri, 15 Sep 2023 11:06:42 +0100 Subject: [PATCH 05/14] [DSL] Add fixed_interval min value validation for a downsampling round (#99544) This adds validationg for the downsampling round to have a minimum `fixed_interval` of `5m`. --- .../DataStreamLifecycleServiceTests.java | 8 +++--- .../cluster/metadata/DataStreamLifecycle.java | 10 +++++++ .../metadata/DataStreamLifecycleTests.java | 18 ++++++++++++ .../cluster/metadata/DataStreamTests.java | 12 ++++---- ...StreamLifecycleDownsampleDisruptionIT.java | 4 +-- .../DataStreamLifecycleDownsampleIT.java | 28 +++++++++---------- ...StreamLifecycleDownsamplingSecurityIT.java | 16 +++++------ 7 files changed, 62 insertions(+), 34 deletions(-) diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 3a5afd2042565..5b73d94be578a 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -923,7 +923,7 @@ public void testDownsampling() throws Exception { DataStreamLifecycle.newBuilder() .downsampling( new Downsampling( - List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s")))) + List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m")))) ) ) .dataRetention(TimeValue.MAX_VALUE) @@ -977,7 +977,7 @@ public void testDownsampling() throws Exception { String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName( DOWNSAMPLED_INDEX_PREFIX, state.metadata().index(firstGenIndex), - new DateHistogramInterval("1s") + new DateHistogramInterval("5m") ); { // let's simulate the in-progress downsampling @@ -1100,7 +1100,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc DataStreamLifecycle.newBuilder() .downsampling( new Downsampling( - List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s")))) + List.of(new Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m")))) ) ) .dataRetention(TimeValue.MAX_VALUE) @@ -1129,7 +1129,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName( DOWNSAMPLED_INDEX_PREFIX, state.metadata().index(firstGenIndexName), - new DateHistogramInterval("1s") + new DateHistogramInterval("5m") ); Metadata.Builder newMetadata = Metadata.builder(state.metadata()) .put( diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java index b768b468204fc..65de7a7cab265 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamLifecycle.java @@ -322,6 +322,8 @@ public void writeTo(StreamOutput out) throws IOException { */ public record Downsampling(@Nullable List rounds) implements Writeable, ToXContentFragment { + public static final long FIVE_MINUTES_MILLIS = TimeValue.timeValueMinutes(5).getMillis(); + /** * A round represents the configuration for when and how elasticsearch will downsample a backing index. * @param after is a TimeValue configuring how old (based on generation age) should a backing index be before downsampling @@ -356,6 +358,14 @@ public static Round read(StreamInput in) throws IOException { return new Round(in.readTimeValue(), new DownsampleConfig(in)); } + public Round { + if (config.getFixedInterval().estimateMillis() < FIVE_MINUTES_MILLIS) { + throw new IllegalArgumentException( + "A downsampling round must have a fixed interval of at least five minutes but found: " + config.getFixedInterval() + ); + } + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeTimeValue(after); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java index e45e0efda636f..441e8491b4b92 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamLifecycleTests.java @@ -233,6 +233,24 @@ public void testInvalidDownsamplingConfiguration() { ); assertThat(exception.getMessage(), equalTo("Downsampling configuration supports maximum 10 configured rounds. Found: 12")); } + + { + IllegalArgumentException exception = expectThrows( + IllegalArgumentException.class, + () -> new DataStreamLifecycle.Downsampling( + List.of( + new DataStreamLifecycle.Downsampling.Round( + TimeValue.timeValueDays(10), + new DownsampleConfig(new DateHistogramInterval("2m")) + ) + ) + ) + ); + assertThat( + exception.getMessage(), + equalTo("A downsampling round must have a fixed interval of at least five minutes but found: 2m") + ); + } } @Nullable diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index d10e44e3dd1fa..1b1e512113712 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -1299,15 +1299,15 @@ public void testGetDownsampleRounds() { List.of( new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(2000), - new DownsampleConfig(new DateHistogramInterval("10s")) + new DownsampleConfig(new DateHistogramInterval("10m")) ), new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(3200), - new DownsampleConfig(new DateHistogramInterval("100s")) + new DownsampleConfig(new DateHistogramInterval("100m")) ), new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(3500), - new DownsampleConfig(new DateHistogramInterval("1000s")) + new DownsampleConfig(new DateHistogramInterval("1000m")) ) ) ) @@ -1360,15 +1360,15 @@ public void testGetDownsampleRounds() { List.of( new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(2000), - new DownsampleConfig(new DateHistogramInterval("10s")) + new DownsampleConfig(new DateHistogramInterval("10m")) ), new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(3200), - new DownsampleConfig(new DateHistogramInterval("100s")) + new DownsampleConfig(new DateHistogramInterval("100m")) ), new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(3500), - new DownsampleConfig(new DateHistogramInterval("1000s")) + new DownsampleConfig(new DateHistogramInterval("1000m")) ) ) ) diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index cef748dfded3c..166f41fa063ca 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -71,7 +71,7 @@ public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { List.of( new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("1s")) + new DownsampleConfig(new DateHistogramInterval("5m")) ) ) ) @@ -121,7 +121,7 @@ public boolean validateClusterForming() { ); ensureStableCluster(cluster.numDataAndMasterNodes()); - final String targetIndex = "downsample-1s-" + sourceIndex; + final String targetIndex = "downsample-5m-" + sourceIndex; assertBusy(() -> { try { GetSettingsResponse getSettingsResponse = client().admin() diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java index faa67479cc0d5..cf5e79982d836 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleIT.java @@ -61,8 +61,8 @@ public void testDownsampling() throws Exception { .downsampling( new Downsampling( List.of( - new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))), - new Downsampling.Round(TimeValue.timeValueSeconds(10), new DownsampleConfig(new DateHistogramInterval("10s"))) + new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))), + new Downsampling.Round(TimeValue.timeValueSeconds(10), new DownsampleConfig(new DateHistogramInterval("10m"))) ) ) ) @@ -72,8 +72,8 @@ public void testDownsampling() throws Exception { List backingIndices = getBackingIndices(client(), dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); - String oneSecondDownsampleIndex = "downsample-1s-" + firstGenerationBackingIndex; - String tenSecondsDownsampleIndex = "downsample-10s-" + firstGenerationBackingIndex; + String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; + String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; Set witnessedDownsamplingIndices = new HashSet<>(); clusterService().addListener(event -> { @@ -119,10 +119,10 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception .downsampling( new Downsampling( List.of( - new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))), + new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))), // data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at // least 2 seconds since rollover. only the 10 seconds round should be executed. - new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10s"))) + new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10m"))) ) ) ) @@ -131,8 +131,8 @@ public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception List backingIndices = getBackingIndices(client(), dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); - String oneSecondDownsampleIndex = "downsample-1s-" + firstGenerationBackingIndex; - String tenSecondsDownsampleIndex = "downsample-10s-" + firstGenerationBackingIndex; + String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; + String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; Set witnessedDownsamplingIndices = new HashSet<>(); clusterService().addListener(event -> { @@ -173,10 +173,10 @@ public void testUpdateDownsampleRound() throws Exception { .downsampling( new Downsampling( List.of( - new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("1s"))), + new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))), // data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at // least 2 seconds since rollover. only the 10 seconds round should be executed. - new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10s"))) + new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10m"))) ) ) ) @@ -186,8 +186,8 @@ public void testUpdateDownsampleRound() throws Exception { List backingIndices = getBackingIndices(client(), dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0); - String oneSecondDownsampleIndex = "downsample-1s-" + firstGenerationBackingIndex; - String tenSecondsDownsampleIndex = "downsample-10s-" + firstGenerationBackingIndex; + String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; + String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; Set witnessedDownsamplingIndices = new HashSet<>(); clusterService().addListener(event -> { @@ -222,7 +222,7 @@ public void testUpdateDownsampleRound() throws Exception { DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.newBuilder() .downsampling( new Downsampling( - List.of(new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("30s")))) + List.of(new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("20m")))) ) ) .build(); @@ -232,7 +232,7 @@ public void testUpdateDownsampleRound() throws Exception { new PutDataStreamLifecycleAction.Request(new String[] { dataStreamName }, updatedLifecycle) ); - String thirtySecondsDownsampleIndex = "downsample-30s-" + firstGenerationBackingIndex; + String thirtySecondsDownsampleIndex = "downsample-20m-" + firstGenerationBackingIndex; assertBusy(() -> { assertThat(indexExists(tenSecondsDownsampleIndex), is(false)); diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java index cb3a032f44127..49ac36b854298 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java @@ -120,11 +120,11 @@ public void testDownsamplingAuthorized() throws Exception { List.of( new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("1s")) + new DownsampleConfig(new DateHistogramInterval("5m")) ), new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueSeconds(10), - new DownsampleConfig(new DateHistogramInterval("10s")) + new DownsampleConfig(new DateHistogramInterval("10m")) ) ) ) @@ -144,11 +144,11 @@ public void testConfiguringLifecycleWithDownsamplingForSystemDataStreamFails() { List.of( new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("1s")) + new DownsampleConfig(new DateHistogramInterval("5m")) ), new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueSeconds(10), - new DownsampleConfig(new DateHistogramInterval("10s")) + new DownsampleConfig(new DateHistogramInterval("10m")) ) ) ) @@ -188,8 +188,8 @@ public void testExplicitSystemDataStreamConfigurationWithDownsamplingFails() { private void waitAndAssertDownsamplingCompleted(String dataStreamName) throws Exception { List backingIndices = getDataStreamBackingIndices(dataStreamName); String firstGenerationBackingIndex = backingIndices.get(0).getName(); - String oneSecondDownsampleIndex = "downsample-1s-" + firstGenerationBackingIndex; - String tenSecondsDownsampleIndex = "downsample-10s-" + firstGenerationBackingIndex; + String oneSecondDownsampleIndex = "downsample-5m-" + firstGenerationBackingIndex; + String tenSecondsDownsampleIndex = "downsample-10m-" + firstGenerationBackingIndex; Set witnessedDownsamplingIndices = new HashSet<>(); clusterService().addListener(event -> { @@ -439,11 +439,11 @@ public Collection getSystemDataStreamDescriptors() { List.of( new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueMillis(0), - new DownsampleConfig(new DateHistogramInterval("1s")) + new DownsampleConfig(new DateHistogramInterval("5m")) ), new DataStreamLifecycle.Downsampling.Round( TimeValue.timeValueSeconds(10), - new DownsampleConfig(new DateHistogramInterval("10s")) + new DownsampleConfig(new DateHistogramInterval("10m")) ) ) ) From 29f0a233ff5d6b3d139d8d5c672e574e32896a71 Mon Sep 17 00:00:00 2001 From: Adrien Grand Date: Fri, 15 Sep 2023 21:50:46 +0200 Subject: [PATCH 06/14] Move test to the correct test case. (#99586) `oe.common.lucene.uid.VersionsTests` is actually about testing the `_version` field, but the test that is being moved tests `IndexVersion`. --- .../common/lucene/uid/VersionsTests.java | 23 ------------------- .../index/IndexVersionTests.java | 22 ++++++++++++++++++ 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java index 976add854c584..d6c5fe812140f 100644 --- a/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java +++ b/server/src/test/java/org/elasticsearch/common/lucene/uid/VersionsTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.index.IndexVersionUtils; import java.io.IOException; import java.util.ArrayList; @@ -198,28 +197,6 @@ public void testCacheFilterReader() throws Exception { dir.close(); } - public void testLuceneVersionOnUnknownVersions() { - // between two known versions, should use the lucene version of the previous version - IndexVersion version = IndexVersionUtils.getPreviousVersion(); - final IndexVersion nextVersion = IndexVersion.fromId(version.id() + 100); - if (IndexVersionUtils.allReleasedVersions().contains(nextVersion) == false) { - // the version is not known, we make an assumption the Lucene version stays the same - assertThat(version.luceneVersion(), equalTo(nextVersion.luceneVersion())); - } else { - // the version is known, the most we can assert is that the Lucene version is not earlier - // Version does not implement Comparable :( - assertTrue(nextVersion.luceneVersion().onOrAfter(version.luceneVersion())); - } - - // too old version, major should be the oldest supported lucene version minus 1 - version = IndexVersion.fromId(5020199); - assertThat(version.luceneVersion().major, equalTo(IndexVersionUtils.getFirstVersion().luceneVersion().major - 1)); - - // future version, should be the same version as today - version = IndexVersion.fromId(IndexVersion.current().id() + 100); - assertThat(version.luceneVersion(), equalTo(IndexVersion.current().luceneVersion())); - } - public void testTimeSeriesLoadDocIdAndVersion() throws Exception { Directory dir = newDirectory(); IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)); diff --git a/server/src/test/java/org/elasticsearch/index/IndexVersionTests.java b/server/src/test/java/org/elasticsearch/index/IndexVersionTests.java index 4ffd98eb4d8a9..452da5279f4c1 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexVersionTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexVersionTests.java @@ -196,4 +196,26 @@ public void testParseLenient() { assertThat(luceneVersion, Matchers.equalTo(Lucene.parseVersionLenient(string, null))); } } + + public void testLuceneVersionOnUnknownVersions() { + // between two known versions, should use the lucene version of the previous version + IndexVersion version = IndexVersionUtils.getPreviousVersion(); + final IndexVersion nextVersion = IndexVersion.fromId(version.id() + 100); + if (IndexVersionUtils.allReleasedVersions().contains(nextVersion) == false) { + // the version is not known, we make an assumption the Lucene version stays the same + assertThat(version.luceneVersion(), equalTo(nextVersion.luceneVersion())); + } else { + // the version is known, the most we can assert is that the Lucene version is not earlier + // Version does not implement Comparable :( + assertTrue(nextVersion.luceneVersion().onOrAfter(version.luceneVersion())); + } + + // too old version, major should be the oldest supported lucene version minus 1 + version = IndexVersion.fromId(5020199); + assertThat(version.luceneVersion().major, equalTo(IndexVersionUtils.getFirstVersion().luceneVersion().major - 1)); + + // future version, should be the same version as today + version = IndexVersion.fromId(IndexVersion.current().id() + 100); + assertThat(version.luceneVersion(), equalTo(IndexVersion.current().luceneVersion())); + } } From 397330e099f364263552ca2fa9d75ae2a663ac4c Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 18 Sep 2023 07:51:44 +0100 Subject: [PATCH 07/14] Adjust other links to unstable cluster docs (#99363) Relates #99354, #99287. --- .../org/elasticsearch/common/reference-docs-links.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json b/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json index ed1041acf3f54..4de327d203d16 100644 --- a/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json +++ b/server/src/main/resources/org/elasticsearch/common/reference-docs-links.json @@ -2,8 +2,8 @@ "INITIAL_MASTER_NODES": "important-settings.html#initial_master_nodes", "DISCOVERY_TROUBLESHOOTING": "discovery-troubleshooting.html", "UNSTABLE_CLUSTER_TROUBLESHOOTING": "troubleshooting-unstable-cluster.html", - "LAGGING_NODE_TROUBLESHOOTING": "cluster-fault-detection.html#_diagnosing_lagging_nodes", - "SHARD_LOCK_TROUBLESHOOTING": "cluster-fault-detection.html#_diagnosing_shardlockobtainfailedexception_failures", + "LAGGING_NODE_TROUBLESHOOTING": "troubleshooting-unstable-cluster.html#_diagnosing_lagging_nodes_2", + "SHARD_LOCK_TROUBLESHOOTING": "troubleshooting-unstable-cluster.html#_diagnosing_shardlockobtainfailedexception_failures_2", "CONCURRENT_REPOSITORY_WRITERS": "add-repository.html", "ARCHIVE_INDICES": "archive-indices.html", "HTTP_TRACER": "modules-network.html#http-rest-request-tracer" From 5bd64e46fd59b3eb14ad35be47fa45b83680b135 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 18 Sep 2023 08:27:01 +0100 Subject: [PATCH 08/14] Remove exception-mangling from RecoverySourceHandler (#99607) `RecoverySourceHandler` uses `Listenable*Future` in several places that do not need the exception-mangling behaviour, so this commit replaces them with `SubscribableListener`. --- .../recovery/RecoverySourceHandler.java | 70 +++++++++++-------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index d00a7e32c1ae1..a3170815251a6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -19,7 +19,6 @@ import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -33,7 +32,6 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.IOUtils; @@ -84,6 +82,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.IntSupplier; @@ -120,7 +119,7 @@ public class RecoverySourceHandler { private final RecoveryPlannerService recoveryPlannerService; private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); - private final ListenableActionFuture future = new ListenableActionFuture<>(); + private final SubscribableListener future = new SubscribableListener<>(); public RecoverySourceHandler( IndexShard shard, @@ -231,10 +230,13 @@ && isTargetSameHistory() logger.trace("history is retained by retention lock"); } - final ListenableFuture sendFileStep = new ListenableFuture<>(); - final ListenableFuture prepareEngineStep = new ListenableFuture<>(); - final ListenableFuture sendSnapshotStep = new ListenableFuture<>(); - final ListenableFuture finalizeStep = new ListenableFuture<>(); + final SubscribableListener sendFileStep = new SubscribableListener<>(); + final SubscribableListener prepareEngineStep = new SubscribableListener<>(); + final SubscribableListener sendSnapshotStep = new SubscribableListener<>(); + final SubscribableListener finalizeStep = new SubscribableListener<>(); + final AtomicReference sendSnapshotStepResult = new AtomicReference<>(); + final AtomicReference sendFileStepResult = new AtomicReference<>(); + final AtomicLong prepareEngineTimeMillisRef = new AtomicLong(); if (isSequenceNumberBasedRecovery) { logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo()); @@ -294,12 +296,14 @@ && isTargetSameHistory() sendFileStep.addListener(ActionListener.wrap(r -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); + sendFileStepResult.set(r); // For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); }, onFailure)); prepareEngineStep.addListener(ActionListener.wrap(prepareEngineTime -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]"); + prepareEngineTimeMillisRef.set(prepareEngineTime.millis()); /* * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. * This means that any document indexed into the primary after this will be replicated to this replica as well @@ -346,14 +350,15 @@ && isTargetSameHistory() // Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2 final long trimAboveSeqNo = startingSeqNo - 1; - sendSnapshotStep.addListener( - ActionListener.wrap(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure) - ); + sendSnapshotStep.addListener(ActionListener.wrap(r -> { + sendSnapshotStepResult.set(r); + finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep); + }, onFailure)); finalizeStep.addListener(ActionListener.wrap(r -> { final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle time - final SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result(); - final SendFileResult sendFileResult = sendFileStep.result(); + final SendSnapshotResult sendSnapshotResult = sendSnapshotStepResult.get(); + final SendFileResult sendFileResult = sendFileStepResult.get(); final RecoveryResponse response = new RecoveryResponse( sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes, @@ -363,7 +368,7 @@ && isTargetSameHistory() sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime, - prepareEngineStep.result().millis(), + prepareEngineTimeMillisRef.get(), sendSnapshotResult.sentOperations, sendSnapshotResult.tookTime.millis() ); @@ -553,7 +558,7 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId()); // but we must still create a retention lease - final ListenableFuture createRetentionLeaseStep = new ListenableFuture<>(); + final SubscribableListener createRetentionLeaseStep = new SubscribableListener<>(); createRetentionLease(startingSeqNo, createRetentionLeaseStep); createRetentionLeaseStep.addListener(listener.delegateFailureAndWrap((l, retentionLease) -> { final TimeValue took = stopWatch.totalTime(); @@ -661,11 +666,12 @@ void recoverFilesFromSourceAndSnapshot( // since the plan can change after a failure recovering files from the snapshots that cannot be // recovered from the source node, in that case we have to start from scratch using the fallback // recovery plan that would be used in subsequent steps. - final ListenableFuture sendFileInfoStep = new ListenableFuture<>(); - final ListenableFuture>> recoverSnapshotFilesStep = new ListenableFuture<>(); - final ListenableFuture sendFilesStep = new ListenableFuture<>(); - final ListenableFuture> createRetentionLeaseStep = new ListenableFuture<>(); - final ListenableFuture cleanFilesStep = new ListenableFuture<>(); + final SubscribableListener sendFileInfoStep = new SubscribableListener<>(); + final SubscribableListener>> recoverSnapshotFilesStep = + new SubscribableListener<>(); + final SubscribableListener sendFilesStep = new SubscribableListener<>(); + final SubscribableListener> createRetentionLeaseStep = new SubscribableListener<>(); + final SubscribableListener cleanFilesStep = new SubscribableListener<>(); final int translogOps = shardRecoveryPlan.getTranslogOps(); recoveryTarget.receiveFileInfo( @@ -790,7 +796,9 @@ private class SnapshotRecoverFileRequestsSender { private final CountDown countDown; private final BlockingQueue pendingSnapshotFilesToRecover; private final AtomicBoolean cancelled = new AtomicBoolean(); - private final Set> outstandingRequests = Sets.newHashSetWithExpectedSize(maxConcurrentSnapshotFileDownloads); + private final Set> outstandingRequests = Sets.newHashSetWithExpectedSize( + maxConcurrentSnapshotFileDownloads + ); private List filesFailedToDownloadFromSnapshot; SnapshotRecoverFileRequestsSender(ShardRecoveryPlan shardRecoveryPlan, ActionListener> listener) { @@ -813,7 +821,7 @@ void sendRequest() { return; } - final ListenableFuture requestFuture = new ListenableFuture<>(); + final SubscribableListener requestFuture = new SubscribableListener<>(); try { cancellableThreads.checkForCancel(); @@ -899,7 +907,7 @@ synchronized List getFilesFailedToRecoverFromSnapshot() { return Objects.requireNonNullElse(filesFailedToDownloadFromSnapshot, Collections.emptyList()); } - private void trackOutstandingRequest(ListenableFuture future) { + private void trackOutstandingRequest(SubscribableListener future) { boolean cancelled; synchronized (outstandingRequests) { cancelled = cancellableThreads.isCancelled() || this.cancelled.get(); @@ -919,7 +927,7 @@ private void trackOutstandingRequest(ListenableFuture future) { } } - private void unTrackOutstandingRequest(ListenableFuture future) { + private void unTrackOutstandingRequest(SubscribableListener future) { synchronized (outstandingRequests) { outstandingRequests.remove(future); } @@ -928,7 +936,7 @@ private void unTrackOutstandingRequest(ListenableFuture future) { private void notifyFailureOnceAllOutstandingRequestAreDone(Exception e) { assert cancelled.get(); - final Set> pendingRequests; + final Set> pendingRequests; synchronized (outstandingRequests) { pendingRequests = new HashSet<>(outstandingRequests); } @@ -941,7 +949,7 @@ private void notifyFailureOnceAllOutstandingRequestAreDone(Exception e) { // new requests and therefore we can safely use to wait until all the pending requests complete // to notify the listener about the cancellation final CountDown pendingRequestsCountDown = new CountDown(pendingRequests.size()); - for (ListenableFuture outstandingFuture : pendingRequests) { + for (SubscribableListener outstandingFuture : pendingRequests) { outstandingFuture.addListener(ActionListener.running(() -> { if (pendingRequestsCountDown.countDown()) { listener.onFailure(e); @@ -1111,7 +1119,7 @@ void phase2( } logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]"); final StopWatch stopWatch = new StopWatch().start(); - final ListenableFuture sendListener = new ListenableFuture<>(); + final SubscribableListener sendListener = new SubscribableListener<>(); final OperationBatchSender sender = new OperationBatchSender( startingSeqNo, endingSeqNo, @@ -1252,7 +1260,7 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis * marking the shard as in-sync. If the relocation handoff holds all the permits then after the handoff completes and we acquire * the permit then the state of the shard will be relocated and this recovery will fail. */ - final ListenableFuture markInSyncStep = new ListenableFuture<>(); + final SubscribableListener markInSyncStep = new SubscribableListener<>(); runUnderPrimaryPermit( () -> shard.markAllocationIdAsInSync(request.targetAllocationId(), targetLocalCheckpoint), shard, @@ -1260,14 +1268,14 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis markInSyncStep ); - final ListenableFuture finalizeListener = new ListenableFuture<>(); + final SubscribableListener finalizeListener = new SubscribableListener<>(); markInSyncStep.addListener(listener.delegateFailureAndWrap((l, ignored) -> { final long globalCheckpoint = shard.getLastKnownGlobalCheckpoint(); // this global checkpoint is persisted in finalizeRecovery cancellableThreads.checkForCancel(); recoveryTarget.finalizeRecovery(globalCheckpoint, trimAboveSeqNo, finalizeListener.map(ignored2 -> globalCheckpoint)); })); - final ListenableFuture updateGlobalCheckpointStep = new ListenableFuture<>(); + final SubscribableListener updateGlobalCheckpointStep = new SubscribableListener<>(); finalizeListener.addListener( listener.delegateFailureAndWrap( (l, globalCheckpoint) -> runUnderPrimaryPermit( @@ -1279,9 +1287,9 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis ) ); - final ListenableFuture finalStep; + final SubscribableListener finalStep; if (request.isPrimaryRelocation()) { - finalStep = new ListenableFuture<>(); + finalStep = new SubscribableListener<>(); updateGlobalCheckpointStep.addListener(listener.delegateFailureAndWrap((l, ignored) -> { logger.trace("performing relocation hand-off"); cancellableThreads.execute( From cafa545786423446f0ff070312d51ef769afda2d Mon Sep 17 00:00:00 2001 From: Simon Cooper Date: Mon, 18 Sep 2023 08:46:19 +0100 Subject: [PATCH 09/14] Read index version from the old deployed cluster instead of inferring it in full restart tests (#99524) The index version is needed to check the output against an upgraded cluster. Now the junit tests share a JVM between the old & upgraded clusters, we can store the old index version in the class to be read when needed --- ...rameterizedFullClusterRestartTestCase.java | 50 ++++++++++++++++--- 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/ParameterizedFullClusterRestartTestCase.java b/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/ParameterizedFullClusterRestartTestCase.java index 4609b27db2909..eef8f62eedd98 100644 --- a/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/ParameterizedFullClusterRestartTestCase.java +++ b/qa/full-cluster-restart/src/javaRestTest/java/org/elasticsearch/upgrades/ParameterizedFullClusterRestartTestCase.java @@ -12,24 +12,30 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import com.carrotsearch.randomizedtesting.annotations.TestCaseOrdering; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.util.Version; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; import org.junit.AfterClass; import org.junit.Before; import java.util.Arrays; import java.util.Locale; +import java.util.Map; import static org.elasticsearch.upgrades.FullClusterRestartUpgradeStatus.OLD; import static org.elasticsearch.upgrades.FullClusterRestartUpgradeStatus.UPGRADED; -import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; @TestCaseOrdering(FullClusterRestartTestOrdering.class) public abstract class ParameterizedFullClusterRestartTestCase extends ESRestTestCase { private static final Version MINIMUM_WIRE_COMPATIBLE_VERSION = Version.fromString("7.17.0"); private static final Version OLD_CLUSTER_VERSION = Version.fromString(System.getProperty("tests.old_cluster_version")); + private static IndexVersion oldIndexVersion; private static boolean upgradeFailed = false; private static boolean upgraded = false; private final FullClusterRestartUpgradeStatus requestedUpgradeStatus; @@ -43,6 +49,39 @@ public static Iterable parameters() throws Exception { return Arrays.stream(FullClusterRestartUpgradeStatus.values()).map(v -> new Object[] { v }).toList(); } + @Before + public void extractOldIndexVersion() throws Exception { + if (upgraded == false) { + IndexVersion indexVersion = null; // these should all be the same version + + Request request = new Request("GET", "_nodes"); + request.addParameter("filter_path", "nodes.*.index_version,nodes.*.name"); + Response response = client().performRequest(request); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + Map nodeMap = objectPath.evaluate("nodes"); + for (String id : nodeMap.keySet()) { + Number ix = objectPath.evaluate("nodes." + id + ".index_version"); + IndexVersion version; + if (ix != null) { + version = IndexVersion.fromId(ix.intValue()); + } else { + // it doesn't have index version (pre 8.11) - just infer it from the release version + version = IndexVersion.fromId(getOldClusterVersion().id); + } + + if (indexVersion == null) { + indexVersion = version; + } else { + String name = objectPath.evaluate("nodes." + id + ".name"); + assertThat("Node " + name + " has a different index version to other nodes", version, equalTo(indexVersion)); + } + } + + assertThat("Index version could not be read", indexVersion, notNullValue()); + oldIndexVersion = indexVersion; + } + } + @Before public void maybeUpgrade() throws Exception { if (upgraded == false && requestedUpgradeStatus == UPGRADED) { @@ -81,13 +120,8 @@ public static org.elasticsearch.Version getOldClusterVersion() { } public static IndexVersion getOldClusterIndexVersion() { - var version = getOldClusterVersion(); - if (version.equals(org.elasticsearch.Version.CURRENT)) { - return IndexVersion.current(); - } else { - assertThat("Index version needs to be added to restart test parameters", version, lessThan(org.elasticsearch.Version.V_8_11_0)); - return IndexVersion.fromId(version.id); - } + assert oldIndexVersion != null; + return oldIndexVersion; } public static Version getOldClusterTestVersion() { From 2d786c05f6e2b4ce928f58c6e2776c37864870bc Mon Sep 17 00:00:00 2001 From: Alexander Spies Date: Mon, 18 Sep 2023 09:46:43 +0200 Subject: [PATCH 10/14] ESQL: Improve log messages (#99470) - Demote another compute service log message to DEBUG to avoid clutter in production logs. - Format ESQL query logs more consistently with other production log messages. --- docs/changelog/99470.yaml | 5 +++++ .../xpack/esql/action/EsqlResponseListener.java | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/99470.yaml diff --git a/docs/changelog/99470.yaml b/docs/changelog/99470.yaml new file mode 100644 index 0000000000000..3e784595cc6ac --- /dev/null +++ b/docs/changelog/99470.yaml @@ -0,0 +1,5 @@ +pr: 99470 +summary: "ESQL: Improve log messages" +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java index 0e0e9a4599780..5ccaeec436f70 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java @@ -143,12 +143,12 @@ public ActionListener wrapWithLogging() { return ActionListener.wrap(r -> { onResponse(r); // At this point, the StopWatch should already have been stopped, so we log a consistent time. - LOGGER.info("Successfully executed ESQL query in {}ms:\n{}", stopWatch.stop().getMillis(), esqlQuery); + LOGGER.info("Successfully executed ESQL query in [{}]ms: [{}]", stopWatch.stop().getMillis(), esqlQuery); }, ex -> { // In case of failure, stop the time manually before sending out the response. long timeMillis = stopWatch.stop().getMillis(); onFailure(ex); - LOGGER.info("Failed executing ESQL query in {}ms:\n{}", timeMillis, esqlQuery); + LOGGER.info("Failed executing ESQL query in [{}]ms: [{}]", timeMillis, esqlQuery); }); } } From 876827711250b2785d25c0562a255f17eadbc2c0 Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Mon, 18 Sep 2023 10:30:36 +0100 Subject: [PATCH 11/14] DSL Disable yaml tests for <8.11 as the parser was not enabled (#99621) --- .../test/data_stream/lifecycle/10_explain_lifecycle.yml | 4 ++-- .../lifecycle/190_create_data_stream_with_lifecycle.yml | 4 ++-- .../test/data_stream/lifecycle/30_not_found.yml | 4 ++-- .../test/cluster.component_template/10_basic.yml | 8 ++++---- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml index 66901f93f522d..51f2980671add 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/10_explain_lifecycle.yml @@ -1,8 +1,8 @@ --- "Explain backing index lifecycle": - skip: - version: " - 8.9.99" - reason: "Explain data stream lifecycle API was updated in 8.10" + version: " - 8.10.99" + reason: "Data stream lifecycle was GA in 8.11" features: allowed_warnings - do: allowed_warnings: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml index 7238cebaf1d7b..0e4bbd795c18a 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/190_create_data_stream_with_lifecycle.yml @@ -1,8 +1,8 @@ --- "Create data stream with lifecycle": - skip: - version: " - 8.9.99" - reason: "data stream lifecycle in index templates was added after 8.10" + version: " - 8.10.99" + reason: "Data stream lifecycle was GA in 8.11" features: allowed_warnings - do: allowed_warnings: diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml index 41f698327bf8c..e0646ba27751e 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/lifecycle/30_not_found.yml @@ -1,8 +1,8 @@ setup: - skip: features: allowed_warnings - version: " - 8.9.99" - reason: "Data stream lifecycles only supported in 8.10+" + version: " - 8.10.99" + reason: "Data stream lifecycle was GA in 8.11" - do: allowed_warnings: - "index template [my-lifecycle] has index patterns [my-data-stream-1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-lifecycle] will take precedence during new index creation" diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml index ab2f522eb3631..500207e969146 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.component_template/10_basic.yml @@ -117,8 +117,8 @@ --- "Add data stream lifecycle": - skip: - version: " - 8.9.99" - reason: "Data stream lifecycle template was updated after 8.9" + version: " - 8.10.99" + reason: "Data stream lifecycle was GA in 8.11" - do: cluster.put_component_template: @@ -145,8 +145,8 @@ --- "Get data stream lifecycle with default rollover": - skip: - version: " - 8.9.99" - reason: "Data stream lifecycle template was added after 8.9" + version: " - 8.10.99" + reason: "Data stream lifecycle was GA in 8.11" - do: cluster.put_component_template: From 81b1097bdfaa1439d0f115df64cc979b51e19819 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 18 Sep 2023 10:32:20 +0100 Subject: [PATCH 12/14] Improve async chaining in RecoverySourceHandler (#99619) Introduces `SubscribableListener#andThen` which encapsulates a very common pattern for chaining a sequence of async actions together. With this utility you can write the actions in their logical order, without needing to introduce extra local variables for the back-references. Also adapts some of the async action chains in `RecoverySourceHandler` to use this new utility. --- .../action/support/SubscribableListener.java | 48 +++ .../recovery/RecoverySourceHandler.java | 286 +++++++++--------- .../support/SubscribableListenerTests.java | 112 +++++++ 3 files changed, 302 insertions(+), 144 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java index 5ba43111b4f03..cebb4ed6e06e6 100644 --- a/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; @@ -333,6 +334,53 @@ public void complete(ActionListener listener) { } } + /** + * Creates and returns a new {@link SubscribableListener} {@code L} and subscribes {@code nextStep} to this listener such that if this + * listener is completed successfully with result {@code R} then {@code nextStep} is invoked with arguments {@code L} and {@code R}. If + * this listener is completed with exception {@code E} then so is {@code L}. + *

+ * This can be used to construct a sequence of async actions, each invoked with the result of the previous one: + *

+     * l.andThen((l1, o1) -> forkAction1(o1, args1, l1)).andThen((l2, o2) -> forkAction2(o2, args2, l2)).addListener(finalListener);
+     * 
+ * After creating this chain, completing {@code l} with a successful response will pass the response to {@code forkAction1}, which will + * on completion pass its response to {@code forkAction2}, which will in turn pass its response to {@code finalListener}. A failure of + * any step will bypass the remaining steps and ultimately fail {@code finalListener}. + *

+ * The threading of the {@code nextStep} callback is the same as for listeners added with {@link #addListener}: if this listener is + * already complete then {@code nextStep} is invoked on the thread calling {@link #andThen} and in its thread context, but if this + * listener is incomplete then {@code nextStep} is invoked on the completing thread and in its thread context. + */ + public SubscribableListener andThen(CheckedBiConsumer, T, ? extends Exception> nextStep) { + return andThen(EsExecutors.DIRECT_EXECUTOR_SERVICE, null, nextStep); + } + + /** + * Creates and returns a new {@link SubscribableListener} {@code L} and subscribes {@code nextStep} to this listener such that if this + * listener is completed successfully with result {@code R} then {@code nextStep} is invoked with arguments {@code L} and {@code R}. If + * this listener is completed with exception {@code E} then so is {@code L}. + *

+ * This can be used to construct a sequence of async actions, each invoked with the result of the previous one: + *

+     * l.andThen(x, t, (l1,o1) -> forkAction1(o1,args1,l1)).andThen(x, t, (l2,o2) -> forkAction2(o2,args2,l2)).addListener(finalListener);
+     * 
+ * After creating this chain, completing {@code l} with a successful response will pass the response to {@code forkAction1}, which will + * on completion pass its response to {@code forkAction2}, which will in turn pass its response to {@code finalListener}. A failure of + * any step will bypass the remaining steps and ultimately fail {@code finalListener}. + *

+ * The threading of the {@code nextStep} callback is the same as for listeners added with {@link #addListener}: if this listener is + * already complete then {@code nextStep} is invoked on the thread calling {@link #andThen} and in its thread context, but if this + * listener is incomplete then {@code nextStep} is invoked using {@code executor}, in a thread context captured when {@link #andThen} + * was called. + */ + public SubscribableListener andThen( + Executor executor, + @Nullable ThreadContext threadContext, + CheckedBiConsumer, T, ? extends Exception> nextStep + ) { + return newForked(l -> addListener(l.delegateFailureAndWrap(nextStep), executor, threadContext)); + } + /** * Adds a timeout to this listener, such that if the timeout elapses before the listener is completed then it will be completed with an * {@link ElasticsearchTimeoutException}. diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index a3170815251a6..1acc0f1041e39 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -39,7 +39,6 @@ import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.core.Tuple; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.RecoveryEngineException; @@ -541,41 +540,49 @@ void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, A // TODO: is this still relevant today? if (hasSameLegacySyncId(recoverySourceMetadata, request.metadataSnapshot()) == false) { cancellableThreads.checkForCancel(); - final boolean canUseSnapshots = canUseSnapshots(); - recoveryPlannerService.computeRecoveryPlan( - shard.shardId(), - shardStateIdentifier, - recoverySourceMetadata, - request.metadataSnapshot(), - startingSeqNo, - translogOps.getAsInt(), - getRequest().targetNode().getMaxIndexVersion(), - canUseSnapshots, - request.isPrimaryRelocation(), - listener.delegateFailureAndWrap((l, plan) -> recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, l)) - ); + SubscribableListener + // compute the plan + .newForked( + l -> recoveryPlannerService.computeRecoveryPlan( + shard.shardId(), + shardStateIdentifier, + recoverySourceMetadata, + request.metadataSnapshot(), + startingSeqNo, + translogOps.getAsInt(), + getRequest().targetNode().getMaxIndexVersion(), + canUseSnapshots(), + request.isPrimaryRelocation(), + l + ) + ) + // perform the file recovery + .andThen((l, plan) -> recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, l)) + // and respond + .addListener(listener); } else { logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId()); - - // but we must still create a retention lease - final SubscribableListener createRetentionLeaseStep = new SubscribableListener<>(); - createRetentionLease(startingSeqNo, createRetentionLeaseStep); - createRetentionLeaseStep.addListener(listener.delegateFailureAndWrap((l, retentionLease) -> { - final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); - l.onResponse( - new SendFileResult( - Collections.emptyList(), - Collections.emptyList(), - 0L, - Collections.emptyList(), - Collections.emptyList(), - 0L, - took - ) - ); - })); - + SubscribableListener + // but we must still create a retention lease + .newForked(leaseListener -> createRetentionLease(startingSeqNo, leaseListener)) + // and then compute the result of sending no files + .andThen((l, ignored) -> { + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + l.onResponse( + new SendFileResult( + Collections.emptyList(), + Collections.emptyList(), + 0L, + Collections.emptyList(), + Collections.emptyList(), + 0L, + took + ) + ); + }) + // and finally respond + .addListener(listener); } } catch (Exception e) { throw new RecoverFilesRecoveryException(request.shardId(), 0, ByteSizeValue.ZERO, e); @@ -594,15 +601,6 @@ void recoverFilesFromSourceAndSnapshot( StopWatch stopWatch, ActionListener listener ) { - cancellableThreads.checkForCancel(); - - final List filesToRecoverNames = shardRecoveryPlan.getFilesToRecoverNames(); - final List filesToRecoverSizes = shardRecoveryPlan.getFilesToRecoverSizes(); - final List phase1ExistingFileNames = shardRecoveryPlan.getFilesPresentInTargetNames(); - final List phase1ExistingFileSizes = shardRecoveryPlan.getFilesPresentInTargetSizes(); - final long totalSize = shardRecoveryPlan.getTotalSize(); - final long existingTotalSize = shardRecoveryPlan.getExistingSize(); - if (logger.isTraceEnabled()) { for (StoreFileMetadata md : shardRecoveryPlan.getFilesPresentInTarget()) { logger.trace( @@ -640,6 +638,9 @@ void recoverFilesFromSourceAndSnapshot( } } + final long totalSize = shardRecoveryPlan.getTotalSize(); + final long existingTotalSize = shardRecoveryPlan.getExistingSize(); + logger.trace( """ recovery [phase1]: total_size[{}], \ @@ -657,121 +658,118 @@ void recoverFilesFromSourceAndSnapshot( .mapToLong(BlobStoreIndexShardSnapshot.FileInfo::length) .sum() ), - phase1ExistingFileNames.size(), + shardRecoveryPlan.getFilesPresentInTarget().size(), ByteSizeValue.ofBytes(existingTotalSize) ); } - // We need to pass the ShardRecovery plan between steps instead of capturing it in the closures + new FileBasedRecoveryContext(store, stopWatch, shardRecoveryPlan).run(listener); + } + + private class FileBasedRecoveryContext { + private final Store store; + private final StopWatch stopWatch; + private final int translogOps; + + // We need to mutate the ShardRecovery plan instead of capturing it in the closures // since the plan can change after a failure recovering files from the snapshots that cannot be // recovered from the source node, in that case we have to start from scratch using the fallback // recovery plan that would be used in subsequent steps. - final SubscribableListener sendFileInfoStep = new SubscribableListener<>(); - final SubscribableListener>> recoverSnapshotFilesStep = - new SubscribableListener<>(); - final SubscribableListener sendFilesStep = new SubscribableListener<>(); - final SubscribableListener> createRetentionLeaseStep = new SubscribableListener<>(); - final SubscribableListener cleanFilesStep = new SubscribableListener<>(); - - final int translogOps = shardRecoveryPlan.getTranslogOps(); - recoveryTarget.receiveFileInfo( - filesToRecoverNames, - filesToRecoverSizes, - phase1ExistingFileNames, - phase1ExistingFileSizes, - translogOps, - sendFileInfoStep - ); - - sendFileInfoStep.addListener( - listener.delegateFailureAndWrap((l, unused) -> recoverSnapshotFiles(shardRecoveryPlan, new ActionListener<>() { - @Override - public void onResponse(List filesFailedToRecoverFromSnapshot) { - recoverSnapshotFilesStep.onResponse(Tuple.tuple(shardRecoveryPlan, filesFailedToRecoverFromSnapshot)); - } - - @Override - public void onFailure(Exception e) { - if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false - && e instanceof CancellableThreads.ExecutionCancelledException == false) { - ShardRecoveryPlan fallbackPlan = shardRecoveryPlan.getFallbackPlan(); - recoveryTarget.receiveFileInfo( - fallbackPlan.getFilesToRecoverNames(), - fallbackPlan.getFilesToRecoverSizes(), - fallbackPlan.getFilesPresentInTargetNames(), - fallbackPlan.getFilesPresentInTargetSizes(), - fallbackPlan.getTranslogOps(), - recoverSnapshotFilesStep.map(r -> Tuple.tuple(fallbackPlan, Collections.emptyList())) - ); - } else { - recoverSnapshotFilesStep.onFailure(e); - } - } - })) - ); + private ShardRecoveryPlan shardRecoveryPlan; - recoverSnapshotFilesStep.addListener(listener.delegateFailureAndWrap((l, planAndFilesFailedToRecoverFromSnapshot) -> { - ShardRecoveryPlan recoveryPlan = planAndFilesFailedToRecoverFromSnapshot.v1(); - List filesFailedToRecoverFromSnapshot = planAndFilesFailedToRecoverFromSnapshot.v2(); - final List filesToRecoverFromSource; - if (filesFailedToRecoverFromSnapshot.isEmpty()) { - filesToRecoverFromSource = recoveryPlan.getSourceFilesToRecover(); - } else { - filesToRecoverFromSource = concatLists(recoveryPlan.getSourceFilesToRecover(), filesFailedToRecoverFromSnapshot); - } + FileBasedRecoveryContext(Store store, StopWatch stopWatch, ShardRecoveryPlan shardRecoveryPlan) { + this.store = store; + this.stopWatch = stopWatch; + this.translogOps = shardRecoveryPlan.getTranslogOps(); + this.shardRecoveryPlan = shardRecoveryPlan; + } - sendFiles( - store, - filesToRecoverFromSource.toArray(new StoreFileMetadata[0]), - recoveryPlan::getTranslogOps, - sendFilesStep.map(unused -> recoveryPlan) + private void sendShardRecoveryPlanFileInfo(ActionListener fileInfoListener) { + recoveryTarget.receiveFileInfo( + shardRecoveryPlan.getFilesToRecoverNames(), + shardRecoveryPlan.getFilesToRecoverSizes(), + shardRecoveryPlan.getFilesPresentInTargetNames(), + shardRecoveryPlan.getFilesPresentInTargetSizes(), + shardRecoveryPlan.getTranslogOps(), + fileInfoListener ); - })); + } - sendFilesStep.addListener( - listener.delegateFailureAndWrap( - (l, recoveryPlan) -> createRetentionLease( - recoveryPlan.getStartingSeqNo(), - createRetentionLeaseStep.map(retentionLease -> Tuple.tuple(recoveryPlan, retentionLease)) - ) - ) - ); + void run(ActionListener listener) { + cancellableThreads.checkForCancel(); - createRetentionLeaseStep.addListener(listener.delegateFailureAndWrap((l, recoveryPlanAndRetentionLease) -> { - final ShardRecoveryPlan recoveryPlan = recoveryPlanAndRetentionLease.v1(); - final RetentionLease retentionLease = recoveryPlanAndRetentionLease.v2(); - final Store.MetadataSnapshot recoverySourceMetadata = recoveryPlan.getSourceMetadataSnapshot(); - final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); - assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint - : retentionLease + " vs " + lastKnownGlobalCheckpoint; - // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want - // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica - // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on - // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. - cleanFiles( - store, - recoverySourceMetadata, - () -> translogOps, - lastKnownGlobalCheckpoint, - cleanFilesStep.map(unused -> recoveryPlan) - ); - })); + SubscribableListener + // send the original plan + .newForked(this::sendShardRecoveryPlanFileInfo) + // instruct the target to recover files from snapshot, possibly updating the plan on failure + .>andThen( + (l, ignored) -> recoverSnapshotFiles(shardRecoveryPlan, l.delegateResponse((recoverSnapshotFilesListener, e) -> { + if (shardRecoveryPlan.canRecoverSnapshotFilesFromSourceNode() == false + && e instanceof CancellableThreads.ExecutionCancelledException == false) { + shardRecoveryPlan = shardRecoveryPlan.getFallbackPlan(); + sendShardRecoveryPlanFileInfo(recoverSnapshotFilesListener.map(r -> Collections.emptyList())); + } else { + recoverSnapshotFilesListener.onFailure(e); + } + })) + ) + // send local files which either aren't in the snapshot, or which failed to be recovered from the snapshot for some reason + .andThen((sendFilesListener, filesFailedToRecoverFromSnapshot) -> { + final List filesToRecoverFromSource; + if (filesFailedToRecoverFromSnapshot.isEmpty()) { + filesToRecoverFromSource = shardRecoveryPlan.getSourceFilesToRecover(); + } else { + filesToRecoverFromSource = concatLists( + shardRecoveryPlan.getSourceFilesToRecover(), + filesFailedToRecoverFromSnapshot + ); + } - cleanFilesStep.addListener(listener.delegateFailureAndWrap((l, recoveryPlan) -> { - final TimeValue took = stopWatch.totalTime(); - logger.trace("recovery [phase1]: took [{}]", took); - l.onResponse( - new SendFileResult( - recoveryPlan.getFilesToRecoverNames(), - recoveryPlan.getFilesToRecoverSizes(), - recoveryPlan.getTotalSize(), - recoveryPlan.getFilesPresentInTargetNames(), - recoveryPlan.getFilesPresentInTargetSizes(), - recoveryPlan.getExistingSize(), - took + sendFiles( + store, + filesToRecoverFromSource.toArray(new StoreFileMetadata[0]), + shardRecoveryPlan::getTranslogOps, + sendFilesListener + ); + }) + // create a retention lease + .andThen( + (createRetentionLeaseListener, ignored) -> createRetentionLease( + shardRecoveryPlan.getStartingSeqNo(), + createRetentionLeaseListener + ) ) - ); - })); + // run cleanFiles, renaming temp files, removing surplus ones, creating an empty translog and so on + .andThen((finalRecoveryPlanListener, retentionLease) -> { + final Store.MetadataSnapshot recoverySourceMetadata = shardRecoveryPlan.getSourceMetadataSnapshot(); + final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint(); + assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint + : retentionLease + " vs " + lastKnownGlobalCheckpoint; + // Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want + // the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica + // to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on + // the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint. + cleanFiles(store, recoverySourceMetadata, () -> translogOps, lastKnownGlobalCheckpoint, finalRecoveryPlanListener); + }) + // compute the result + .andThen((resultListener, ignored) -> { + final TimeValue took = stopWatch.totalTime(); + logger.trace("recovery [phase1]: took [{}]", took); + resultListener.onResponse( + new SendFileResult( + shardRecoveryPlan.getFilesToRecoverNames(), + shardRecoveryPlan.getFilesToRecoverSizes(), + shardRecoveryPlan.getTotalSize(), + shardRecoveryPlan.getFilesPresentInTargetNames(), + shardRecoveryPlan.getFilesPresentInTargetSizes(), + shardRecoveryPlan.getExistingSize(), + took + ) + ); + }) + // and finally respond + .addListener(listener); + } } /** diff --git a/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java index 9ef5e40e1f932..419cac9a0255d 100644 --- a/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/SubscribableListenerTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -398,4 +399,115 @@ public void testCreateUtils() throws Exception { assertTrue(forkFailed.isDone()); assertEquals("simulated fork failure", expectThrows(ElasticsearchException.class, forkFailed::rawResult).getMessage()); } + + public void testAndThenSuccess() { + final var initialListener = new SubscribableListener<>(); + final var forked = new AtomicReference>(); + final var result = new AtomicReference<>(); + + final var chainedListener = initialListener.andThen((l, o) -> { + forked.set(l); + result.set(o); + }); + assertNull(forked.get()); + assertNull(result.get()); + + final var o1 = new Object(); + initialListener.onResponse(o1); + assertSame(o1, result.get()); + assertSame(chainedListener, forked.get()); + assertFalse(chainedListener.isDone()); + } + + public void testAndThenFailure() { + final var initialListener = new SubscribableListener<>(); + + final var chainedListener = initialListener.andThen((l, o) -> fail("should not be called")); + assertFalse(chainedListener.isDone()); + + initialListener.onFailure(new ElasticsearchException("simulated")); + assertComplete(chainedListener, "simulated"); + } + + public void testAndThenThreading() { + runAndThenThreadingTest(true); + runAndThenThreadingTest(false); + } + + private static void runAndThenThreadingTest(boolean testSuccess) { + final var completeListener = testSuccess + ? SubscribableListener.newSucceeded(new Object()) + : SubscribableListener.newFailed(new ElasticsearchException("immediate failure")); + + assertComplete( + completeListener.andThen(r -> fail("should not fork"), null, ActionListener::onResponse), + testSuccess ? null : "immediate failure" + ); + + final var threadContext = new ThreadContext(Settings.EMPTY); + final var headerName = "test-header"; + final var expectedHeader = randomAlphaOfLength(10); + + final SubscribableListener deferredListener = new SubscribableListener<>(); + final SubscribableListener chainedListener; + final AtomicReference forkedRunnable = new AtomicReference<>(); + + try (var ignored = threadContext.stashContext()) { + threadContext.putHeader(headerName, expectedHeader); + chainedListener = deferredListener.andThen( + r -> assertTrue(forkedRunnable.compareAndSet(null, r)), + threadContext, + (l, response) -> { + assertEquals(expectedHeader, threadContext.getHeader(headerName)); + l.onResponse(response); + } + ); + } + + assertFalse(chainedListener.isDone()); + assertNull(forkedRunnable.get()); + + final AtomicBoolean isComplete = new AtomicBoolean(); + + try (var ignored = threadContext.stashContext()) { + threadContext.putHeader(headerName, randomAlphaOfLength(10)); + chainedListener.addListener(ActionListener.running(() -> { + assertEquals(expectedHeader, threadContext.getHeader(headerName)); + assertTrue(isComplete.compareAndSet(false, true)); + })); + } + + try (var ignored = threadContext.stashContext()) { + threadContext.putHeader(headerName, randomAlphaOfLength(10)); + if (testSuccess) { + deferredListener.onResponse(new Object()); + } else { + deferredListener.onFailure(new ElasticsearchException("simulated failure")); + } + } + + assertFalse(chainedListener.isDone()); + assertFalse(isComplete.get()); + + try (var ignored = threadContext.stashContext()) { + threadContext.putHeader(headerName, randomAlphaOfLength(10)); + forkedRunnable.get().run(); + } + + assertComplete(chainedListener, testSuccess ? null : "simulated failure"); + assertTrue(isComplete.get()); + } + + private static void assertComplete(SubscribableListener listener, @Nullable String expectedFailureMessage) { + assertTrue(listener.isDone()); + if (expectedFailureMessage == null) { + try { + listener.rawResult(); + } catch (Exception e) { + throw new AssertionError("unexpected", e); + } + } else { + assertEquals(expectedFailureMessage, expectThrows(ElasticsearchException.class, listener::rawResult).getMessage()); + } + } } From 986b53732789a051584a2a0449923d5d7b109d2a Mon Sep 17 00:00:00 2001 From: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> Date: Mon, 18 Sep 2023 13:29:48 +0300 Subject: [PATCH 13/14] Prepare downsample plugin rest tests for serverless (#99622) Following instructions in [README](https://github.com/elastic/elasticsearch-serverless/blob/main/qa/stateful/README.md) as well as mimicking changes in #99413: * Use the new test-clusters framework using JUnit rules, not the "legacy" Gradle plugin. * Ensure that the elasticsearch.internal-test-artifact, plugin is applied to the project build script. * Remove setting the number of replicas in yaml tests. --- x-pack/plugin/downsample/qa/rest/build.gradle | 19 ++++++++++++------- .../xpack/downsample/DownsampleRestIT.java | 15 +++++++++++++++ .../test/downsample/10_basic.yml | 14 -------------- .../test/downsample/20_unsupported_aggs.yml | 1 - .../test/downsample/30_date_histogram.yml | 1 - .../test/downsample/40_runtime_fields.yml | 3 --- .../downsample/50_auto_date_histogram.yml | 2 -- .../test/downsample/60_settings.yml | 1 - 8 files changed, 27 insertions(+), 29 deletions(-) diff --git a/x-pack/plugin/downsample/qa/rest/build.gradle b/x-pack/plugin/downsample/qa/rest/build.gradle index 1cffa7fcafc6a..603d69d695cac 100644 --- a/x-pack/plugin/downsample/qa/rest/build.gradle +++ b/x-pack/plugin/downsample/qa/rest/build.gradle @@ -5,13 +5,14 @@ * 2.0. */ -import org.elasticsearch.gradle.Version import org.elasticsearch.gradle.internal.info.BuildParams -apply plugin: 'elasticsearch.legacy-yaml-rest-test' -apply plugin: 'elasticsearch.legacy-yaml-rest-compat-test' +apply plugin: 'elasticsearch.internal-yaml-rest-test' +apply plugin: 'elasticsearch.yaml-rest-compat-test' +apply plugin: 'elasticsearch.internal-test-artifact' dependencies { + testImplementation project(path: ':test:test-clusters') yamlRestTestImplementation project(path: xpackModule('rollup')) } @@ -21,12 +22,16 @@ restResources { } } -testClusters.configureEach { - testDistribution = 'DEFAULT' - setting 'xpack.license.self_generated.type', 'basic' - setting 'xpack.security.enabled', 'false' +artifacts { + restXpackTests(new File(projectDir, "src/yamlRestTest/resources/rest-api-spec/test")) } +tasks.named('yamlRestTest') { + usesDefaultDistribution() +} +tasks.named('yamlRestTestV7CompatTest') { + usesDefaultDistribution() +} if (BuildParams.inFipsJvm){ // This test cluster is using a BASIC license and FIPS 140 mode is not supported in BASIC tasks.named("yamlRestTest").configure{enabled = false } diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java index 5a66a97d16e6e..504326f1bd4b1 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/java/org/elasticsearch/xpack/downsample/DownsampleRestIT.java @@ -9,11 +9,26 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.ClassRule; public class DownsampleRestIT extends ESClientYamlSuiteTestCase { + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .distribution(DistributionType.DEFAULT) + .setting("xpack.license.self_generated.type", "basic") + .setting("xpack.security.enabled", "false") + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + public DownsampleRestIT(final ClientYamlTestCandidate testCandidate) { super(testCandidate); } diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml index 70076562b27dd..f4dd187b82445 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml @@ -9,7 +9,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [metricset, k8s.pod.uid] @@ -92,7 +91,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -159,7 +157,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -225,7 +222,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -343,7 +339,6 @@ setup: - match: { test-downsample.settings.index.routing_path: [ "metricset", "k8s.pod.uid"] } - match: { test-downsample.settings.index.downsample.source.name: test } - match: { test-downsample.settings.index.number_of_shards: "1" } - - match: { test-downsample.settings.index.number_of_replicas: "0" } # Assert rollup index mapping - do: @@ -373,7 +368,6 @@ setup: indices.segments: index: test-downsample - - match: { _shards.total: 1} - match: { indices.test-downsample.shards.0.0.num_committed_segments: 1} - match: { indices.test-downsample.shards.0.0.num_search_segments: 1} @@ -424,7 +418,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -438,7 +431,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -515,7 +507,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [metricset] @@ -609,7 +600,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [metricset] @@ -935,7 +925,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -1038,7 +1027,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -1141,7 +1129,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -1377,7 +1364,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [metricset, k8s.pod.uid] diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/20_unsupported_aggs.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/20_unsupported_aggs.yml index f58985189b9e0..bf5f92f628444 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/20_unsupported_aggs.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/20_unsupported_aggs.yml @@ -9,7 +9,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ uid ] diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/30_date_histogram.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/30_date_histogram.yml index b0a6f699a6ac0..b7f3ec7b8f384 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/30_date_histogram.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/30_date_histogram.yml @@ -9,7 +9,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ uid ] diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml index 1d5221fb8d1f1..06d74494e89c7 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/40_runtime_fields.yml @@ -11,7 +11,6 @@ body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -173,7 +172,6 @@ body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] @@ -295,7 +293,6 @@ body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ metricset, k8s.pod.uid ] diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/50_auto_date_histogram.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/50_auto_date_histogram.yml index d314be1f54739..050b16f0674ca 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/50_auto_date_histogram.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/50_auto_date_histogram.yml @@ -9,7 +9,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ uid ] @@ -33,7 +32,6 @@ setup: body: settings: number_of_shards: 1 - number_of_replicas: 0 index: mode: time_series routing_path: [ uid ] diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/60_settings.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/60_settings.yml index a9b72f7f73c6a..6a33cc47e5c51 100644 --- a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/60_settings.yml +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/60_settings.yml @@ -25,7 +25,6 @@ body: settings: number_of_shards: 1 - number_of_replicas: 0 index: default_pipeline: "pipeline" final_pipeline: "pipeline" From 3691312acad38d63d8fe62655d2fd7f56fbeacb8 Mon Sep 17 00:00:00 2001 From: David Turner Date: Mon, 18 Sep 2023 13:26:03 +0100 Subject: [PATCH 14/14] Slightly adjust docs about S3 incompatibilities (#99624) It's often useful to quote these docs to users encountering problems with their not-quite-S3-compatible storage system. In practice we don't need to quote the bits in the middle but we do need the last sentence about working with the supplier to address incompatibilities. This commit reorders things so that the most commonly quoted sentences form a standalone paragraph. --- docs/reference/snapshot-restore/repository-s3.asciidoc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/reference/snapshot-restore/repository-s3.asciidoc b/docs/reference/snapshot-restore/repository-s3.asciidoc index c34b295f9bd1a..16171e2e71631 100644 --- a/docs/reference/snapshot-restore/repository-s3.asciidoc +++ b/docs/reference/snapshot-restore/repository-s3.asciidoc @@ -226,16 +226,16 @@ emulate S3's behaviour in full. The `repository-s3` type requires full compatibility with S3. In particular it must support the same set of API endpoints, return the same errors in case of failures, and offer consistency and performance at least as good as S3 even when accessed concurrently by -multiple nodes. Incompatible error codes, consistency or performance may be -particularly hard to track down since errors, consistency failures, and -performance issues are usually rare and hard to reproduce. +multiple nodes. You will need to work with the supplier of your storage system +to address any incompatibilities you encounter. You can perform some basic checks of the suitability of your storage system using the {ref}/repo-analysis-api.html[repository analysis API]. If this API does not complete successfully, or indicates poor performance, then your storage system is not fully compatible with AWS S3 and therefore unsuitable for -use as a snapshot repository. You will need to work with the supplier of your -storage system to address any incompatibilities you encounter. +use as a snapshot repository. However, these checks do not guarantee full +compatibility. Incompatible error codes and consistency or performance issues +may be rare and hard to reproduce. [[repository-s3-repository]] ==== Repository settings