Skip to content

Commit

Permalink
Introducing mixed mode support for remote store migration
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <[email protected]>
  • Loading branch information
gbbafna committed Feb 5, 2024
1 parent 6b2c2f2 commit 41ee84d
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;

import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.remotestore.RemoteStoreBaseIntegTestCase.remoteStoreClusterSettings;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false)
public class RemoteStoreMigrationTestCase extends OpenSearchIntegTestCase {
protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";

protected Path segmentRepoPath;
protected Path translogRepoPath;

static boolean addRemote = false;

protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
if (addRemote) {
logger.info("Adding remote ndoes node");
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.put("discovery.initial_state_timeout", "500ms")
.build();
} else {
logger.info("Adding non remote ndoes node");
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("discovery.initial_state_timeout", "500ms")
.build();
}
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
}

public void testAddRemoteNode() throws IOException {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
Client client = internalCluster().client(cmNodes.get(0));
addRemote = true;
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
internalCluster().startNode();
internalCluster().startNode();
internalCluster().validateClusterFormed();

// add incompatible remote node in remote mixed cluster
Settings.Builder badSettings = Settings.builder()
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, "REPOSITORY_2_NAME", translogRepoPath))
.put("discovery.initial_state_timeout", "500ms");
String badNode = internalCluster().startNode(badSettings);
assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length);
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(badNode));


// add remote node in docrep cluster
updateSettingsRequest.persistentSettings(Settings.builder().put(DIRECTION_SETTING.getKey(), "docrep"));
assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet());
String wrongNode = internalCluster().startNode();
assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < internalCluster().getNodeNames().length);
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(wrongNode));
}

public void testAddDocRepNode() throws Exception {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
List<String> cmNodes = internalCluster().startNodes(1);
addRemote = false;
Client client = internalCluster().client(cmNodes.get(0));
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed").put(DIRECTION_SETTING.getKey(), "remote_store"));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
String wrongNode = internalCluster().startNode();
String[] allNodes = internalCluster().getNodeNames();
assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() < allNodes.length);

updateSettingsRequest.persistentSettings(Settings.builder().put(DIRECTION_SETTING.getKey(), "docrep"));
assertAcked(client.admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertBusy(() -> assertTrue(client.admin().cluster().prepareClusterStats().get().getNodes().size() == allNodes.length));
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(wrongNode));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
Expand All @@ -66,6 +67,7 @@
import static org.opensearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.DIRECTION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;

/**
Expand Down Expand Up @@ -176,12 +178,18 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo

DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());

// TODO: We are using one of the existing node to build the repository metadata, this will need to be updated
// once we start supporting mixed compatibility mode. An optimization can be done as this will get invoked
// An optimization can be done as this will get invoked
// for every set of node join task which we can optimize to not compute if cluster state already has
// repository information.
Optional<DiscoveryNode> remoteDN = newState.nodes()
.getNodes()
.values()
.stream()
.filter(DiscoveryNode::isRemoteStoreNode)
.findFirst();
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
(currentNodes.getNodes().values()).stream().findFirst().get(),
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);

Expand Down Expand Up @@ -212,6 +220,16 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
// would guarantee that a decommissioned node would never be able to join the cluster and ensures correctness
ensureNodeCommissioned(node, currentState.metadata());
nodesBuilder.add(node);

if (remoteDN.isEmpty()) {
// This is hit only on cases where we encounter first remote node in remote store migration
logger.info("Updating system repository now for remote store migration");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
);
}

nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -498,37 +516,68 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
// TODO: The below check is valid till we don't support migration, once we start supporting migration a remote
// store node will be able to join a non remote store cluster and vice versa. #7986
CompatibilityMode remoteStoreCompatibilityMode = REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(metadata.settings());
RemoteStoreNodeService.Direction direction = DIRECTION_SETTING.get(metadata.settings());
if (STRICT.equals(remoteStoreCompatibilityMode)) {
DiscoveryNode existingNode = existingNodes.get(0);
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
} else {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
} else {
throw new IllegalStateException(
"a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"
);
}
} else {
if (existingNode.isRemoteStoreNode()) {
}
} else {
if (remoteStoreCompatibilityMode == CompatibilityMode.MIXED) {
/*
don't allow remote nodes if direction is DOCREP .
don't allow doc nodes if direction is REMOTE .
*/
if (joiningNode.isRemoteStoreNode() == false && direction == RemoteStoreNodeService.Direction.REMOTE_STORE) {
throw new IllegalStateException(
"a non remote store node [" + joiningNode + "] is trying to join a remote store cluster"
);
} else if (joiningNode.isRemoteStoreNode()) {
if (joiningNode.isRemoteStoreNode() && direction == RemoteStoreNodeService.Direction.DOCREP) {
throw new IllegalStateException(
"a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"
);
}

Optional<DiscoveryNode> remoteDN = existingNodes
.stream()
.filter(DiscoveryNode::isRemoteStoreNode)
.findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
+ "] is trying to join a remote store cluster with incompatible node attributes in "
+ "comparison with existing node ["
+ existingNode
+ "]"
);
}
} else {
throw new IllegalStateException(
"a remote store node [" + joiningNode + "] is trying to join a non remote store cluster"
);
}
}
}

public static Collection<BiConsumer<DiscoveryNode, ClusterState>> addBuiltInJoinValidators(
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ protected FeatureFlagSettings(
FeatureFlags.IDENTITY_SETTING,
FeatureFlags.TELEMETRY_SETTING,
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING
FeatureFlags.WRITEABLE_REMOTE_INDEX_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING
);
}
11 changes: 11 additions & 0 deletions server/src/main/java/org/opensearch/common/util/FeatureFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
* @opensearch.internal
*/
public class FeatureFlags {
/**
* Gates the visibility of the remote store migration support from docrep .
*/
public static final String REMOTE_STORE_MIGRATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.migration.enabled";

/**
* Gates the ability for Searchable Snapshots to read snapshots that are older than the
* guaranteed backward compatibility for OpenSearch (one prior major version) on a best effort basis.
Expand Down Expand Up @@ -93,6 +98,12 @@ public static boolean isEnabled(Setting<Boolean> featureFlag) {
}
}

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Property.NodeScope
);

public static final Setting<Boolean> EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope);

public static final Setting<Boolean> IDENTITY_SETTING = Setting.boolSetting(IDENTITY, false, Property.NodeScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.metadata.RepositoryMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryException;
Expand All @@ -27,6 +28,8 @@
import java.util.Map;
import java.util.function.Supplier;

import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;

/**
* Contains all the method needed for a remote store backed node lifecycle.
*/
Expand All @@ -39,6 +42,33 @@ public class RemoteStoreNodeService {
"remote_store.compatibility_mode",
CompatibilityMode.STRICT.name(),
CompatibilityMode::parseString,
value -> {
if (value == CompatibilityMode.MIXED
&& FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING) == false) {
throw new IllegalArgumentException(
" mixed mode is under an experimental feature and can be activated only by enabling "
+ REMOTE_STORE_MIGRATION_EXPERIMENTAL
+ " feature flag in the JVM options "
);
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<Direction> DIRECTION_SETTING = new Setting<>(
"direction",
Direction.NONE.name(),
Direction::parseString,
value -> {
if (value != Direction.NONE && FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING) == false) {
throw new IllegalArgumentException(
" direction is under an experimental feature and can be activated only by enabling "
+ REMOTE_STORE_MIGRATION_EXPERIMENTAL
+ " feature flag in the JVM options "
);
}
},
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand All @@ -49,7 +79,8 @@ public class RemoteStoreNodeService {
* @opensearch.internal
*/
public enum CompatibilityMode {
STRICT("strict");
STRICT("strict"),
MIXED("mixed");

public final String mode;

Expand All @@ -73,6 +104,33 @@ public static CompatibilityMode parseString(String compatibilityMode) {
}
}

public enum Direction {
REMOTE_STORE("remote_store"),
NONE("none"),
DOCREP("docrep");

public final String direction;

Direction(String d) {
this.direction = d;
}

public static Direction parseString(String direction) {
try {
return Direction.valueOf(direction.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"["
+ direction
+ "] direction is not supported. "
+ "supported modes are ["
+ CompatibilityMode.values().toString()
+ "]"
);
}
}
}

public RemoteStoreNodeService(Supplier<RepositoriesService> repositoriesService, ThreadPool threadPool) {
this.repositoriesService = repositoriesService;
this.threadPool = threadPool;
Expand Down
Loading

0 comments on commit 41ee84d

Please sign in to comment.