From 41ee84de2b6b61a45315c6c4909fe2e93fe422a0 Mon Sep 17 00:00:00 2001 From: Gaurav Bafna <gbbafna@amazon.com> Date: Wed, 17 Jan 2024 10:16:21 +0530 Subject: [PATCH] Introducing mixed mode support for remote store migration Signed-off-by: Gaurav Bafna <gbbafna@amazon.com> --- .../RemoteStoreMigrationTestCase.java | 109 ++++++++++++++++++ .../coordination/JoinTaskExecutor.java | 87 +++++++++++--- .../common/settings/ClusterSettings.java | 1 + .../common/settings/FeatureFlagSettings.java | 3 +- .../opensearch/common/util/FeatureFlags.java | 11 ++ .../remotestore/RemoteStoreNodeService.java | 60 +++++++++- .../opensearch/test/InternalTestCluster.java | 26 ++--- 7 files changed, 263 insertions(+), 34 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java new file mode 100644 index 0000000000000..918157587efb3 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreMigrationTestCase.java @@ -0,0 +1,109 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.client.Client; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; + +import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) +public class RemoteStoreMigrationTestCase extends OpenSearchIntegTestCase { + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; + + protected Path segmentRepoPath; + protected Path translogRepoPath; + + static boolean addRemote = false; + + protected Settings nodeSettings(int nodeOrdinal) { + if (segmentRepoPath == null || translogRepoPath == null) { + segmentRepoPath = randomRepoPath().toAbsolutePath(); + translogRepoPath = randomRepoPath().toAbsolutePath(); + } + if (addRemote) { + logger.info("Adding remote ndoes node"); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) + .put("discovery.initial_state_timeout", "500ms") + .build(); + } else { + logger.info("Adding non remote ndoes node"); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("discovery.initial_state_timeout", "500ms") + .build(); + } + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + } + + public void testAddRemoteNode() throws IOException { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List<String> cmNodes = internalCluster().startNodes(1); + Client client = internalCluster().client(cmNodes.get(0)); + addRemote = true; + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + internalCluster().startNode(); + internalCluster().startNode(); + internalCluster().validateClusterFormed(); + + // add incompatible remote node in remote mixed cluster + Settings.Builder badSettings = Settings.builder() + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, "REPOSITORY_2_NAME", translogRepoPath)) + .put("discovery.initial_state_timeout", "500ms"); + String badNode = internalCluster().startNode(badSettings); + assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(badNode)); + + + // add remote node in docrep cluster + updateSettingsRequest.persistentSettings(Settings.builder().put(DIRECTION_SETTING.getKey(), "docrep")); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + String wrongNode = internalCluster().startNode(); + assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(wrongNode)); + } + + public void testAddDocRepNode() throws Exception { + internalCluster().setBootstrapClusterManagerNodeIndex(0); + List<String> cmNodes = internalCluster().startNodes(1); + addRemote = false; + Client client = internalCluster().client(cmNodes.get(0)); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed").put(DIRECTION_SETTING.getKey(), "remote_store")); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + String wrongNode = internalCluster().startNode(); + String[] allNodes = internalCluster().getNodeNames(); + assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < allNodes.length); + + updateSettingsRequest.persistentSettings(Settings.builder().put(DIRECTION_SETTING.getKey(), "docrep")); + assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + assertBusy(() -> assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() == allNodes.length)); + internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(wrongNode)); + } + +} diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index f701a2f52277d..fcc074b95d7b0 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -58,6 +58,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -66,6 +67,7 @@ import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; /** @@ -176,12 +178,18 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes()); - // TODO: We are using one of the existing node to build the repository metadata, this will need to be updated - // once we start supporting mixed compatibility mode. An optimization can be done as this will get invoked + // An optimization can be done as this will get invoked // for every set of node join task which we can optimize to not compute if cluster state already has // repository information. + Optional<DiscoveryNode> remoteDN = newState.nodes() + .getNodes() + .values() + .stream() + .filter(DiscoveryNode::isRemoteStoreNode) + .findFirst(); + DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get()); RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( - (currentNodes.getNodes().values()).stream().findFirst().get(), + dn, currentState.getMetadata().custom(RepositoriesMetadata.TYPE) ); @@ -212,6 +220,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo // would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness ensureNodeCommissioned(node, currentState.metadata()); nodesBuilder.add(node); + + if (remoteDN.isEmpty()) { + // This is hit only on cases where we encounter first remote node in remote store migration + logger.info("Updating system repository now for remote store migration"); + repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + node, + currentState.getMetadata().custom(RepositoriesMetadata.TYPE) + ); + } + nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); @@ -498,37 +516,68 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod // TODO: The below check is valid till we don't support migration, once we start supporting migration a remote // store node will be able to join a non remote store cluster and vice versa. #7986 CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings()); + RemoteStoreNodeService.Direction direction = DIRECTION_SETTING.get(metadata.settings()); if (STRICT.equals(remoteStoreCompatibilityMode)) { DiscoveryNode existingNode = existingNodes.get(0); if (joiningNode.isRemoteStoreNode()) { + ensureRemoteStoreNodesCompatibility(joiningNode, existingNode); + } else { if (existingNode.isRemoteStoreNode()) { - RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode); - RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode); - if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) { - throw new IllegalStateException( - "a remote store node [" - + joiningNode - + "] is trying to join a remote store cluster with incompatible node attributes in " - + "comparison with existing node [" - + existingNode - + "]" - ); - } - } else { throw new IllegalStateException( - "a remote store node [" + joiningNode + "] is trying to join a non remote store cluster" + "a non remote store node [" + joiningNode + "] is trying to join a remote store cluster" ); } - } else { - if (existingNode.isRemoteStoreNode()) { + } + } else { + if (remoteStoreCompatibilityMode == CompatibilityMode.MIXED) { + /* + don't allow remote nodes if direction is DOCREP . + don't allow doc nodes if direction is REMOTE . + */ + if (joiningNode.isRemoteStoreNode() == false && direction == RemoteStoreNodeService.Direction.REMOTE_STORE) { throw new IllegalStateException( "a non remote store node [" + joiningNode + "] is trying to join a remote store cluster" ); + } else if (joiningNode.isRemoteStoreNode()) { + if (joiningNode.isRemoteStoreNode() && direction == RemoteStoreNodeService.Direction.DOCREP) { + throw new IllegalStateException( + "a remote store node [" + joiningNode + "] is trying to join a non remote store cluster" + ); + } + + Optional<DiscoveryNode> remoteDN = existingNodes + .stream() + .filter(DiscoveryNode::isRemoteStoreNode) + .findFirst(); + remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode)); } } } } + private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) { + if (joiningNode.isRemoteStoreNode()) { + if (existingNode.isRemoteStoreNode()) { + RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode); + RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode); + if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) { + throw new IllegalStateException( + "a remote store node [" + + joiningNode + + "] is trying to join a remote store cluster with incompatible node attributes in " + + "comparison with existing node [" + + existingNode + + "]" + ); + } + } else { + throw new IllegalStateException( + "a remote store node [" + joiningNode + "] is trying to join a non remote store cluster" + ); + } + } + } + public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators( Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators ) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 0c97d62c44a5e..c6371be3d86ee 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -698,6 +698,7 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, + RemoteStoreNodeService.DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index 44dc4161f093a..78923aafd8b70 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -34,6 +34,7 @@ protected FeatureFlagSettings( FeatureFlags.IDENTITY_SETTING, FeatureFlags.TELEMETRY_SETTING, FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING, - FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING + FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING, + FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index c88a795501ca6..6c4cd66d7af57 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -20,6 +20,11 @@ * @opensearch.internal */ public class FeatureFlags { + /** + * Gates the visibility of the remote store migration support from docrep . + */ + public static final String REMOTE_STORE_MIGRATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.migration.enabled"; + /** * Gates the ability for Searchable Snapshots to read snapshots that are older than the * guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis. @@ -93,6 +98,12 @@ public static boolean isEnabled(Setting<Boolean> featureFlag) { } } + public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting( + REMOTE_STORE_MIGRATION_EXPERIMENTAL, + false, + Property.NodeScope + ); + public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope); public static final Setting<Boolean> IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope); diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index ca2413a057a6b..c683166ba050c 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.settings.Setting; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.RepositoryException; @@ -27,6 +28,8 @@ import java.util.Map; import java.util.function.Supplier; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; + /** * Contains all the method needed for a remote store backed node lifecycle. */ @@ -39,6 +42,33 @@ public class RemoteStoreNodeService { "remote_store.compatibility_mode", CompatibilityMode.STRICT.name(), CompatibilityMode::parseString, + value -> { + if (value == CompatibilityMode.MIXED + && FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING) == false) { + throw new IllegalArgumentException( + " mixed mode is under an experimental feature and can be activated only by enabling " + + REMOTE_STORE_MIGRATION_EXPERIMENTAL + + " feature flag in the JVM options " + ); + } + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting<Direction> DIRECTION_SETTING = new Setting<>( + "direction", + Direction.NONE.name(), + Direction::parseString, + value -> { + if (value != Direction.NONE && FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING) == false) { + throw new IllegalArgumentException( + " direction is under an experimental feature and can be activated only by enabling " + + REMOTE_STORE_MIGRATION_EXPERIMENTAL + + " feature flag in the JVM options " + ); + } + }, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -49,7 +79,8 @@ public class RemoteStoreNodeService { * @opensearch.internal */ public enum CompatibilityMode { - STRICT("strict"); + STRICT("strict"), + MIXED("mixed"); public final String mode; @@ -73,6 +104,33 @@ public static CompatibilityMode parseString(String compatibilityMode) { } } + public enum Direction { + REMOTE_STORE("remote_store"), + NONE("none"), + DOCREP("docrep"); + + public final String direction; + + Direction(String d) { + this.direction = d; + } + + public static Direction parseString(String direction) { + try { + return Direction.valueOf(direction.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "[" + + direction + + "] direction is not supported. " + + "supported modes are [" + + CompatibilityMode.values().toString() + + "]" + ); + } + } + } + public RemoteStoreNodeService(Supplier<RepositoriesService> repositoriesService, ThreadPool threadPool) { this.repositoriesService = repositoriesService; this.threadPool = threadPool; diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index c2b964aa96212..92419494d6616 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -36,7 +36,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomStrings; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; @@ -153,6 +152,18 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static org.apache.lucene.tests.util.LuceneTestCase.TEST_NIGHTLY; +import static org.apache.lucene.tests.util.LuceneTestCase.rarely; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.opensearch.cluster.coordination.ClusterBootstrapService.INITIAL_CLUSTER_MANAGER_NODES_SETTING; import static org.opensearch.common.unit.TimeValue.timeValueMillis; import static org.opensearch.common.unit.TimeValue.timeValueSeconds; @@ -166,18 +177,6 @@ import static org.opensearch.test.NodeRoles.removeRoles; import static org.opensearch.test.OpenSearchTestCase.assertBusy; import static org.opensearch.test.OpenSearchTestCase.randomFrom; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.apache.lucene.tests.util.LuceneTestCase.TEST_NIGHTLY; -import static org.apache.lucene.tests.util.LuceneTestCase.rarely; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * InternalTestCluster manages a set of JVM private nodes and allows convenient access to them. @@ -1319,6 +1318,7 @@ public synchronized void validateClusterFormed() { assertTrue("Expected node to exist: " + expectedNode + debugString, discoveryNodes.nodeExists(expectedNode)); } }); + //ToDo Fix me states.forEach(cs -> { if (cs.nodes().getNodes().values().stream().findFirst().get().isRemoteStoreNode()) { RepositoriesMetadata repositoriesMetadata = cs.metadata().custom(RepositoriesMetadata.TYPE);