Skip to content

Commit

Permalink
Reconcile remote index settings on switch to strict mode
Browse files Browse the repository at this point in the history
Signed-off-by: Shourya Dutta Biswas <[email protected]>
  • Loading branch information
shourya035 committed Jul 17, 2024
1 parent 8ae728c commit a16a1de
Show file tree
Hide file tree
Showing 5 changed files with 352 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING;
Expand Down Expand Up @@ -277,4 +278,30 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe
IndexService indexService = indicesService.indexService(new Index(indexName, uuid));
return indexService.getShard(0);
}

public void changeReplicaCountAndEnsureGreen(int replicaCount, String indexName) {
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(indexName)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
);
ensureGreen(indexName);
}

public void completeDocRepToRemoteMigration() {
assertTrue(
internalCluster().client()
.admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(
Settings.builder()
.put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict")
.put(MIGRATION_DIRECTION_SETTING.getKey(), "none")
)
.get()
.isAcknowledged()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,74 @@ public void testRemoteIndexPathFileExistsAfterMigration() throws Exception {
assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix)));
}

/**
* Scenario:
* Creates an index with 1 pri 1 rep setup with 3 docrep nodes (1 cluster manager + 2 data nodes),
* initiate migration and create 3 remote nodes (1 cluster manager + 2 data nodes) and moves over
* only primary shard copy of the index
* After the primary shard copy is relocated, decrease replica count to 0, stop all docrep nodes
* and conclude migration. Remote store index settings should be applied to the index at this point.
*/
public void testIndexSettingsUpdateDuringReplicaCountDecrement() throws Exception {
String indexName = "migration-index-replica-decrement";
String docrepClusterManager = internalCluster().startClusterManagerOnlyNode();

logger.info("---> Starting 2 docrep nodes");
List<String> docrepNodeNames = internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();

logger.info("---> Creating index with 1 primary and 1 replica");
Settings oneReplica = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
createIndexAndAssertDocrepProperties(indexName, oneReplica);

int docsToIndex = randomIntBetween(10, 100);
logger.info("---> Indexing {} on both indices", docsToIndex);
indexBulk(indexName, docsToIndex);

logger.info(
"---> Stopping shard rebalancing to ensure shards do not automatically move over to newer nodes after they are launched"
);
stopShardRebalancing();

logger.info("---> Starting 3 remote store enabled nodes");
initDocRepToRemoteMigration();
setAddRemote(true);
internalCluster().startClusterManagerOnlyNode();
List<String> remoteNodeNames = internalCluster().startDataOnlyNodes(2);
internalCluster().validateClusterFormed();

String primaryNode = primaryNodeName(indexName);

logger.info("---> Moving over primary to remote store enabled nodes");
assertAcked(
client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(indexName, 0, primaryNode, remoteNodeNames.get(0)))
.execute()
.actionGet()
);
waitForRelocation();
waitNoPendingTasksOnAll();

logger.info("---> Reducing replica count to 0 for the index");
changeReplicaCountAndEnsureGreen(0, indexName);
assertDocrepProperties(indexName);

logger.info("---> Stopping all docrep nodes");
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(docrepClusterManager));
for (String node : docrepNodeNames) {
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node));
}
internalCluster().validateClusterFormed();
completeDocRepToRemoteMigration();
waitNoPendingTasksOnAll();
assertRemoteProperties(indexName);
}

private void createIndexAndAssertDocrepProperties(String index, Settings settings) {
createIndexAssertHealthAndDocrepProperties(index, settings, this::ensureGreen);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
Expand All @@ -66,10 +67,13 @@

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasAllRemoteStoreRelatedMetadata;
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;

/**
* Transport action for updating cluster settings
Expand Down Expand Up @@ -270,6 +274,13 @@ public ClusterState execute(final ClusterState currentState) {
logger
);
changed = clusterState != currentState;
// Remote Store migration: Checks if the applied cluster settings
// has switched the cluster to STRICT mode. If so, checks and applies
// appropriate index settings depending on the current set of node types
// in the cluster
if (isSwitchToStrictCompatibilityMode(clusterState.metadata().settings())) {
return finalizeMigration(clusterState);
}
return clusterState;
}
}
Expand All @@ -284,11 +295,9 @@ public ClusterState execute(final ClusterState currentState) {
public void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) {
Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build();
if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) {
String value = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode;
validateAllNodesOfSameVersion(clusterState.nodes());
if (RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(value)) {
if (isSwitchToStrictCompatibilityMode(settings)) {
validateAllNodesOfSameType(clusterState.nodes());
validateIndexSettings(clusterState);
}
}
}
Expand Down Expand Up @@ -323,18 +332,83 @@ private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) {
}

/**
* Verifies that while trying to switch to STRICT compatibility mode,
* all indices in the cluster have {@link RemoteMigrationIndexMetadataUpdater#indexHasAllRemoteStoreRelatedMetadata(IndexMetadata)} as <code>true</code>.
* If not, throws {@link SettingsException}
* @param clusterState current cluster state
* Finalizes the docrep to remote-store migration process by applying remote store based index settings
* on indices that are missing them. No-Op if all indices already have the settings applied through
* IndexMetadataUpdater
*
* @param incomingState mutated cluster state after cluster settings were applied
* @return new cluster state with index settings updated
*/
public ClusterState finalizeMigration(ClusterState incomingState) {
Map<String, DiscoveryNode> discoveryNodeMap = incomingState.nodes().getNodes();
if (discoveryNodeMap.isEmpty() == false) {
boolean allNodesRemote = discoveryNodeMap.values().stream().allMatch(discoNode -> discoNode.isRemoteStoreNode() == true);
if (allNodesRemote == true) {
List<IndexMetadata> indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState);
if (indicesWithoutRemoteStoreSettings.isEmpty() == true) {
logger.info("All indices in the cluster has remote store based index settings");
} else {
Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings);
return ClusterState.builder(incomingState).metadata(mutatedMetadata).build();
}
} else {
// TODO: Revert all remote store settings for remote store -> docrep migration
logger.debug("All nodes in the cluster are not remote nodes. Skipping.");
}
}
return incomingState;
}

/**
* Filters out indices which does not have remote store based
* index settings applied even after all shard copies have
* migrated to remote store enabled nodes
*/
private void validateIndexSettings(ClusterState clusterState) {
private List<IndexMetadata> getIndicesWithoutRemoteStoreSettings(ClusterState clusterState) {
Collection<IndexMetadata> allIndicesMetadata = clusterState.metadata().indices().values();
if (allIndicesMetadata.isEmpty() == false
&& allIndicesMetadata.stream().anyMatch(indexMetadata -> indexHasAllRemoteStoreRelatedMetadata(indexMetadata) == false)) {
throw new SettingsException(
"can not switch to STRICT compatibility mode since all indices in the cluster does not have remote store based index settings"
if (allIndicesMetadata.isEmpty() == false) {
List<IndexMetadata> indicesWithoutRemoteSettings = allIndicesMetadata.stream()
.filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false)
.collect(Collectors.toList());
logger.debug(
"Attempting to switch to strict mode. Count of indices without remote store settings {}",
indicesWithoutRemoteSettings.size()
);
return indicesWithoutRemoteSettings;
}
return Collections.emptyList();
}

/**
* Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater}
*/
private Metadata applyRemoteStoreSettings(ClusterState clusterState, List<IndexMetadata> indicesWithRemoteStoreSettings) {
Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata());
RoutingTable currentRoutingTable = clusterState.getRoutingTable();
DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes();
for (IndexMetadata indexMetadata : indicesWithRemoteStoreSettings) {
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata);
RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater(
currentDiscoveryNodes,
currentRoutingTable,
indexMetadata,
null,
logger
);
indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName());
metadataBuilder.put(indexMetadataBuilder);
}
return metadataBuilder.build();
}

/**
* Checks if the incoming cluster settings payload is attempting to switch
* the cluster to `STRICT` compatibility mode
* Visible only for tests
*/
public boolean isSwitchToStrictCompatibilityMode(Settings settings) {
return RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -49,7 +50,7 @@ public RemoteMigrationIndexMetadataUpdater(
DiscoveryNodes discoveryNodes,
RoutingTable routingTable,
IndexMetadata indexMetadata,
Settings clusterSettings,
@Nullable Settings clusterSettings,
Logger logger

) {
Expand Down
Loading

0 comments on commit a16a1de

Please sign in to comment.