Skip to content

Commit

Permalink
Compatibility mode switching checks
Browse files Browse the repository at this point in the history
Signed-off-by: Lakshya Taragi <[email protected]>
  • Loading branch information
ltaragi committed Apr 8, 2024
1 parent 8bc1b40 commit 238ed34
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,6 @@ public void initializeCluster(boolean remoteClusterManager) {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
internalCluster().startNodes(1);
client = internalCluster().client();
setClusterMode(STRICT.mode);
setDirection(NONE.direction);
}

// set the compatibility mode of cluster [strict, mixed]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

import org.opensearch.client.Client;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Optional;
Expand All @@ -23,7 +25,6 @@
import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.MIXED;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode.STRICT;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.NONE;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.Direction.REMOTE_STORE;
import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.assertNodeInCluster;
import static org.opensearch.remotemigration.RemoteStoreMigrationAllocationIT.prepareIndexWithoutReplica;
Expand Down Expand Up @@ -73,6 +74,37 @@ public void testNewIndexIsRemoteStoreBackedForRemoteStoreDirectionAndMixedMode()
assertRemoteStoreBackedIndex(indexName2);
}

// compatibility mode setting test

public void testSwitchToStrictMode() throws Exception {
logger.info(" --> initialize cluster");
initializeCluster(false);

logger.info(" --> create a mixed mode cluster");
setClusterMode(MIXED.mode);
addRemote = true;
String remoteNodeName = internalCluster().startNode();
addRemote = false;
String nonRemoteNodeName = internalCluster().startNode();
internalCluster().validateClusterFormed();
assertNodeInCluster(remoteNodeName);
assertNodeInCluster(nonRemoteNodeName);

logger.info(" --> attempt switching to strict mode");
SettingsException exception = assertThrows(SettingsException.class, () -> setClusterMode(STRICT.mode));
assertEquals(
"can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes",
exception.getMessage()
);

logger.info(" --> stop remote node so that cluster had only non-remote nodes");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(remoteNodeName));
ensureStableCluster(2);

logger.info(" --> attempt switching to strict mode");
setClusterMode(STRICT.mode);
}

// verify that the created index is not remote store backed
private void assertNonRemoteStoreBackedIndex(String indexName) {
Settings indexSettings = client.admin().indices().prepareGetIndex().execute().actionGet().getSettings().get(indexName);
Expand Down Expand Up @@ -100,8 +132,6 @@ private void initializeCluster(boolean remoteClusterManager) {
internalCluster().setBootstrapClusterManagerNodeIndex(0);
internalCluster().startNodes(1);
client = internalCluster().client();
setClusterMode(STRICT.mode);
setDirection(NONE.direction);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.AckedClusterStateUpdateTask;
Expand All @@ -53,12 +54,19 @@
import org.opensearch.common.Priority;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Transport action for updating cluster settings
Expand Down Expand Up @@ -137,6 +145,7 @@ protected void clusterManagerOperation(
final ClusterState state,
final ActionListener<ClusterUpdateSettingsResponse> listener
) {
validateCompatibilityModeSettingRequest(request, state);
final SettingsUpdater updater = new SettingsUpdater(clusterSettings);
clusterService.submitStateUpdateTask(
"cluster_update_settings",
Expand Down Expand Up @@ -264,4 +273,52 @@ public ClusterState execute(final ClusterState currentState) {
);
}

/**
* Runs various checks associated with changing cluster compatibility mode
* @param request cluster settings update request, for settings to be updated and new values
* @param clusterState current state of cluster, for information on nodes
*/
public static void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) {
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(request.persistentSettings())) {
String value = request.persistentSettings()
.get(RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey())
.toLowerCase();
List<DiscoveryNode> discoveryNodeList = new ArrayList<>(clusterState.nodes().getNodes().values());
validateAllNodesOfSameVersion(discoveryNodeList);
if (value.equals(RemoteStoreNodeService.CompatibilityMode.STRICT.mode)) {
validateAllNodesOfSameType(discoveryNodeList);
}
}
}

/**
* Verifies that while trying to change the compatibility mode, all nodes must have the same version.
* If not, it throws SettingsException error
* @param discoveryNodeList list of the current discovery nodes in the cluster
*/
private static void validateAllNodesOfSameVersion(List<DiscoveryNode> discoveryNodeList) {
Set<Version> versions = discoveryNodeList.stream().map(DiscoveryNode::getVersion).collect(Collectors.toSet());
if (versions.size() != 1) {
throw new SettingsException(
"can not change the compatibility mode when all the nodes in cluster are not of the same version. Present versions: "
+ versions
);
}
}

/**
* Verifies that while trying to switch to STRICT compatibility mode, all nodes must be of the
* same type (all remote or all non-remote). If not, it throws SettingsException error
* @param discoveryNodeList list of the current discovery nodes in the cluster
*/
private static void validateAllNodesOfSameType(List<DiscoveryNode> discoveryNodeList) {
Optional<DiscoveryNode> remoteNode = discoveryNodeList.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
Optional<DiscoveryNode> nonRemoteNode = discoveryNodeList.stream().filter(dn -> dn.isRemoteStoreNode() == false).findFirst();
if (remoteNode.isPresent() && nonRemoteNode.isPresent()) {
throw new SettingsException(
"can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes"
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.replication.ClusterStateCreationUtils;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.NotClusterManagerException;
import org.opensearch.cluster.block.ClusterBlock;
Expand All @@ -28,14 +31,18 @@
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.FailedToCommitClusterStateException;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterManagerThrottlingException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
Expand All @@ -44,6 +51,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
Expand All @@ -68,10 +76,16 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.*;
import static org.opensearch.cluster.coordination.JoinTaskExecutorTests.*;
import static org.opensearch.cluster.routing.allocation.RemoteStoreMigrationAllocationDeciderTests.getNonRemoteNode;
import static org.opensearch.cluster.routing.allocation.RemoteStoreMigrationAllocationDeciderTests.getRemoteNode;
import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING;
import static org.opensearch.test.ClusterServiceUtils.createClusterService;
import static org.opensearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.opensearch.test.VersionUtils.randomCompatibleVersion;
import static org.opensearch.test.VersionUtils.randomOpenSearchVersion;

public class TransportClusterManagerNodeActionTests extends OpenSearchTestCase {
private static ThreadPool threadPool;
Expand Down Expand Up @@ -692,4 +706,148 @@ protected void masterOperation(Task task, Request request, ClusterState state, A
assertFalse(retried.get());
assertFalse(exception.get());
}

public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() {
Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

// request to change cluster compatibility mode to STRICT
Settings currentCompatibilityModeSettings = Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED)
.build();
Settings intendedCompatibilityModeSettings = Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.STRICT)
.build();
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
request.persistentSettings(intendedCompatibilityModeSettings);

// mixed cluster (containing both remote and non-remote nodes)
DiscoveryNode nonRemoteNode1 = getNonRemoteNode();
DiscoveryNode remoteNode1 = getRemoteNode();

DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(nonRemoteNode1)
.localNodeId(nonRemoteNode1.getId())
.add(remoteNode1)
.localNodeId(remoteNode1.getId())
.build();

Metadata metadata = Metadata.builder().persistentSettings(currentCompatibilityModeSettings).build();

ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).nodes(discoveryNodes).build();

final SettingsException exception = expectThrows(
SettingsException.class,
() -> TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, clusterState)
);
assertEquals(
"can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes",
exception.getMessage()
);

DiscoveryNode nonRemoteNode2 = getNonRemoteNode();
DiscoveryNode remoteNode2 = getRemoteNode();

// cluster with only non-remote nodes
discoveryNodes = DiscoveryNodes.builder()
.add(nonRemoteNode1)
.localNodeId(nonRemoteNode1.getId())
.add(nonRemoteNode2)
.localNodeId(nonRemoteNode2.getId())
.build();
ClusterState sameTypeClusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).build();
TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameTypeClusterState);

// cluster with only non-remote nodes
discoveryNodes = DiscoveryNodes.builder()
.add(remoteNode1)
.localNodeId(remoteNode1.getId())
.add(remoteNode2)
.localNodeId(remoteNode2.getId())
.build();
sameTypeClusterState = ClusterState.builder(sameTypeClusterState).nodes(discoveryNodes).build();
TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameTypeClusterState);
}

public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersions() {
Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build();
FeatureFlags.initializeFeatureFlags(nodeSettings);

// request to change cluster compatibility mode
boolean toStrictMode = randomBoolean();
Settings currentCompatibilityModeSettings = Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED)
.build();
Settings intendedCompatibilityModeSettings = Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), toStrictMode ? RemoteStoreNodeService.CompatibilityMode.STRICT : RemoteStoreNodeService.CompatibilityMode.MIXED)
.build();
ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest();
request.persistentSettings(intendedCompatibilityModeSettings);

// two different but compatible open search versions for the discovery nodes
final Version version1 = randomOpenSearchVersion(random());
final Version version2 = randomCompatibleVersion(random(), version1);

assert version1.equals(version2) == false : "current nodes in the cluster must be of different versions";

DiscoveryNode discoveryNode1 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
toStrictMode ? remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO) : Collections.emptyMap(),
DiscoveryNodeRole.BUILT_IN_ROLES,
version1
);
DiscoveryNode discoveryNode2 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
toStrictMode ? remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO) : Collections.emptyMap(),
DiscoveryNodeRole.BUILT_IN_ROLES,
version2 // not same as discoveryNode1
);

DiscoveryNodes discoveryNodes = DiscoveryNodes.builder()
.add(discoveryNode1)
.localNodeId(discoveryNode1.getId())
.add(discoveryNode2)
.localNodeId(discoveryNode2.getId())
.build();

Metadata metadata = Metadata.builder().persistentSettings(currentCompatibilityModeSettings).build();

ClusterState differentVersionClusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.nodes(discoveryNodes)
.build();

// changing compatibility mode when all nodes are not of the same version
final SettingsException exception = expectThrows(
SettingsException.class,
() -> TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, differentVersionClusterState)
);
assertThat(
exception.getMessage(),
containsString(
"can not change the compatibility mode when all the nodes in cluster are not of the same version. Present versions: ["
)
);

// changing compatibility mode when all nodes are of the same version
discoveryNode2 = new DiscoveryNode(
UUIDs.base64UUID(),
buildNewFakeTransportAddress(),
toStrictMode ? remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO) : Collections.emptyMap(),
DiscoveryNodeRole.BUILT_IN_ROLES,
version1 // same as discoveryNode1
);
discoveryNodes = DiscoveryNodes.builder()
.add(discoveryNode1)
.localNodeId(discoveryNode1.getId())
.add(discoveryNode2)
.localNodeId(discoveryNode2.getId())
.build();

ClusterState sameVersionClusterState = ClusterState.builder(differentVersionClusterState).nodes(discoveryNodes).build();
TransportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameVersionClusterState);
}

}

0 comments on commit 238ed34

Please sign in to comment.