From 18e58160a70595ed450c663b78445abb795499d3 Mon Sep 17 00:00:00 2001 From: Lakshya Taragi <157457166+ltaragi@users.noreply.github.com> Date: Fri, 15 Mar 2024 16:06:11 +0530 Subject: [PATCH] Add allocation decider for mixed cluster during remote store migration (#12505) Signed-off-by: Lakshya Taragi --- .../org/opensearch/cluster/ClusterModule.java | 5 + ...RemoteStoreMigrationAllocationDecider.java | 174 +++++ .../cluster/ClusterModuleTests.java | 5 + ...eStoreMigrationAllocationDeciderTests.java | 681 ++++++++++++++++++ 4 files changed, 865 insertions(+) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreMigrationAllocationDecider.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index d2f4888ae8971..b846d382db89d 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -69,6 +69,7 @@ import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.RemoteStoreMigrationAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; @@ -83,6 +84,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; import org.opensearch.core.ParseField; @@ -373,6 +375,9 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new NodeLoadAwareAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new TargetPoolAllocationDecider()); + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING)) { + addAllocationDecider(deciders, new RemoteStoreMigrationAllocationDecider(settings, clusterSettings)); + } clusterPlugins.stream() .flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream()) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreMigrationAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreMigrationAllocationDecider.java new file mode 100644 index 0000000000000..27ebe5390ea6d --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/RemoteStoreMigrationAllocationDecider.java @@ -0,0 +1,174 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.remotestore.RemoteStoreNodeService; +import org.opensearch.node.remotestore.RemoteStoreNodeService.CompatibilityMode; +import org.opensearch.node.remotestore.RemoteStoreNodeService.Direction; + +import java.util.Locale; + +/** + * A new allocation decider for migration of document replication clusters to remote store backed clusters: + * - For STRICT compatibility mode, the decision is always YES + * - For remote store backed indices, relocation or allocation/relocation can only be towards a remote node + * - For "REMOTE_STORE" migration direction: + * - New primary shards can only be allocated to a remote node + * - New replica shards can be allocated to a remote node iff the primary has been migrated/allocated to a remote node + * - For other directions ("DOCREP", "NONE"), the decision is always YES + * + * @opensearch.internal + */ +public class RemoteStoreMigrationAllocationDecider extends AllocationDecider { + + public static final String NAME = "remote_store_migration"; + + private Direction migrationDirection; + private CompatibilityMode compatibilityMode; + private boolean remoteStoreBackedIndex; + + public RemoteStoreMigrationAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + this.migrationDirection = RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING.get(settings); + this.compatibilityMode = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, this::setMigrationDirection); + clusterSettings.addSettingsUpdateConsumer( + RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING, + this::setCompatibilityMode + ); + } + + private void setMigrationDirection(Direction migrationDirection) { + this.migrationDirection = migrationDirection; + } + + private void setCompatibilityMode(CompatibilityMode compatibilityMode) { + this.compatibilityMode = compatibilityMode; + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + DiscoveryNode targetNode = node.node(); + + if (compatibilityMode.equals(CompatibilityMode.STRICT)) { + // assuming all nodes are of the same type (all remote or all non-remote) + return allocation.decision( + Decision.YES, + NAME, + getDecisionDetails(true, shardRouting, targetNode, " for strict compatibility mode") + ); + } + + if (migrationDirection.equals(Direction.REMOTE_STORE) == false) { + // docrep migration direction is currently not supported + return allocation.decision( + Decision.YES, + NAME, + getDecisionDetails(true, shardRouting, targetNode, " for non remote_store direction") + ); + } + + // check for remote store backed indices + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index()); + if (IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.exists(indexMetadata.getSettings())) { + remoteStoreBackedIndex = IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings()); + } + if (remoteStoreBackedIndex && targetNode.isRemoteStoreNode() == false) { + // allocations and relocations must be to a remote node + String reason = String.format( + Locale.ROOT, + " because a remote store backed index's shard copy can only be %s to a remote node", + ((shardRouting.assignedToNode() == false) ? "allocated" : "relocated") + ); + return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, shardRouting, targetNode, reason)); + } + + if (shardRouting.primary()) { + return primaryShardDecision(shardRouting, targetNode, allocation); + } + return replicaShardDecision(shardRouting, targetNode, allocation); + } + + // handle scenarios for allocation of a new shard's primary copy + private Decision primaryShardDecision(ShardRouting primaryShardRouting, DiscoveryNode targetNode, RoutingAllocation allocation) { + if (targetNode.isRemoteStoreNode() == false) { + return allocation.decision(Decision.NO, NAME, getDecisionDetails(false, primaryShardRouting, targetNode, "")); + } + return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, primaryShardRouting, targetNode, "")); + } + + private Decision replicaShardDecision(ShardRouting replicaShardRouting, DiscoveryNode targetNode, RoutingAllocation allocation) { + if (targetNode.isRemoteStoreNode()) { + ShardRouting primaryShardRouting = allocation.routingNodes().activePrimary(replicaShardRouting.shardId()); + boolean primaryHasMigratedToRemote = false; + if (primaryShardRouting != null) { + DiscoveryNode primaryShardNode = allocation.nodes().getNodes().get(primaryShardRouting.currentNodeId()); + primaryHasMigratedToRemote = primaryShardNode.isRemoteStoreNode(); + } + if (primaryHasMigratedToRemote == false) { + return allocation.decision( + Decision.NO, + NAME, + getDecisionDetails(false, replicaShardRouting, targetNode, " since primary shard copy is not yet migrated to remote") + ); + } + return allocation.decision( + Decision.YES, + NAME, + getDecisionDetails(true, replicaShardRouting, targetNode, " since primary shard copy has been migrated to remote") + ); + } + return allocation.decision(Decision.YES, NAME, getDecisionDetails(true, replicaShardRouting, targetNode, "")); + } + + // get detailed reason for the decision + private String getDecisionDetails(boolean isYes, ShardRouting shardRouting, DiscoveryNode targetNode, String reason) { + return String.format( + Locale.ROOT, + "[%s migration_direction]: %s shard copy %s be %s to a %s node%s", + migrationDirection.direction, + (shardRouting.primary() ? "primary" : "replica"), + (isYes ? "can" : "can not"), + ((shardRouting.assignedToNode() == false) ? "allocated" : "relocated"), + (targetNode.isRemoteStoreNode() ? "remote" : "non-remote"), + reason + ); + } + +} diff --git a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java index 535444cd866b8..b30ebaf183084 100644 --- a/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java +++ b/server/src/test/java/org/opensearch/cluster/ClusterModuleTests.java @@ -51,6 +51,7 @@ import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RebalanceOnlyWhenActiveAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.RemoteStoreMigrationAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; @@ -67,6 +68,7 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsModule; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.plugins.ClusterPlugin; @@ -242,6 +244,9 @@ public void testAllocationDeciderOrder() { NodeLoadAwareAllocationDecider.class, TargetPoolAllocationDecider.class ); + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING)) { + expectedDeciders.add(RemoteStoreMigrationAllocationDecider.class); + } Collection deciders = ClusterModule.createAllocationDeciders( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java new file mode 100644 index 0000000000000..43363407d9249 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteStoreMigrationAllocationDeciderTests.java @@ -0,0 +1,681 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.Decision; +import org.opensearch.cluster.routing.allocation.decider.RemoteStoreMigrationAllocationDecider; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.node.remotestore.RemoteStoreNodeService; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_STORE_ENABLED; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; +import static org.hamcrest.core.Is.is; + +public class RemoteStoreMigrationAllocationDeciderTests extends OpenSearchAllocationTestCase { + + private final static String TEST_INDEX = "test_index"; + private final static String TEST_REPO = "test_repo"; + + private final Settings directionEnabledNodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + + private final Settings strictModeCompatibilitySettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.STRICT) + .build(); + private final Settings mixedModeCompatibilitySettings = Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) + .build(); + + private final Settings remoteStoreDirectionSettings = Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .build(); + private final Settings docrepDirectionSettings = Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.DOCREP) + .build(); + + private Boolean isRemoteStoreBackedIndex = null, isMixedMode; + private int shardCount, replicaCount; + private IndexMetadata.Builder indexMetadataBuilder; + private Settings customSettings; + private DiscoveryNodes discoveryNodes; + private ClusterState clusterState; + private RemoteStoreMigrationAllocationDecider remoteStoreMigrationAllocationDecider; + private RoutingAllocation routingAllocation; + private Metadata metadata; + private RoutingTable routingTable = null; + + private void beforeAllocation() { + FeatureFlags.initializeFeatureFlags(directionEnabledNodeSettings); + if (isRemoteStoreBackedIndex == null) { + isRemoteStoreBackedIndex = randomBoolean(); + } + indexMetadataBuilder = getIndexMetadataBuilder(isRemoteStoreBackedIndex, shardCount, replicaCount); + + String compatibilityMode = isMixedMode + ? RemoteStoreNodeService.CompatibilityMode.MIXED.mode + : RemoteStoreNodeService.CompatibilityMode.STRICT.mode; + customSettings = getCustomSettings( + RemoteStoreNodeService.Direction.REMOTE_STORE.direction, + compatibilityMode, + indexMetadataBuilder + ); + + if (routingTable != null) { + metadata = Metadata.builder().put(indexMetadataBuilder).build(); + clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metadata(metadata) + .routingTable(routingTable) + .nodes(discoveryNodes) + .build(); + } else { + clusterState = getInitialClusterState(customSettings, indexMetadataBuilder, discoveryNodes); + } + + remoteStoreMigrationAllocationDecider = new RemoteStoreMigrationAllocationDecider( + customSettings, + getClusterSettings(customSettings) + ); + + routingAllocation = new RoutingAllocation( + new AllocationDeciders(Collections.singleton(remoteStoreMigrationAllocationDecider)), + clusterState.getRoutingNodes(), + clusterState, + null, + null, + 0L + ); + routingAllocation.debugDecision(true); + } + + // tests for primary shard copy allocation with MIXED mode and REMOTE_STORE direction + + public void testDontAllocateNewPrimaryShardOnNonRemoteNodeForMixedModeAndRemoteStoreDirection() { + shardCount = 1; + replicaCount = 0; + isMixedMode = true; + + DiscoveryNode remoteNode = getRemoteNode(); + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + + discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build(); + + beforeAllocation(); + + ShardRouting primaryShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); + RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode.getId()); + + Decision decision = remoteStoreMigrationAllocationDecider.canAllocate(primaryShardRouting, nonRemoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.NO)); + String reason = "[remote_store migration_direction]: primary shard copy can not be allocated to a non-remote node"; + if (isRemoteStoreBackedIndex) { + reason = + "[remote_store migration_direction]: primary shard copy can not be allocated to a non-remote node because a remote store backed index's shard copy can only be allocated to a remote node"; + } + assertThat(decision.getExplanation().toLowerCase(Locale.ROOT), is(reason)); + } + + public void testAllocateNewPrimaryShardOnRemoteNodeForMixedModeAndRemoteStoreDirection() { + shardCount = 1; + replicaCount = 0; + isMixedMode = true; + + DiscoveryNode remoteNode = getRemoteNode(); + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + + discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build(); + + beforeAllocation(); + + ShardRouting primaryShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); + RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode.getId()); + + Decision decision = remoteStoreMigrationAllocationDecider.canAllocate(primaryShardRouting, remoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + assertThat( + decision.getExplanation().toLowerCase(Locale.ROOT), + is("[remote_store migration_direction]: primary shard copy can be allocated to a remote node") + ); + } + + // tests for replica shard copy allocation with MIXED mode and REMOTE_STORE direction + + public void testDontAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnNonRemoteNodeForMixedModeAndRemoteStoreDirection() { + shardCount = 1; + replicaCount = 1; + isMixedMode = true; + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + DiscoveryNode remoteNode = getRemoteNode(); + + routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId).addShard( + // primary on non-remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + nonRemoteNode.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build(); + + beforeAllocation(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode.getId()); + + Decision decision = remoteStoreMigrationAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.NO)); + assertThat( + decision.getExplanation().toLowerCase(Locale.ROOT), + is( + "[remote_store migration_direction]: replica shard copy can not be allocated to a remote node since primary shard copy is not yet migrated to remote" + ) + ); + } + + public void testAllocateNewReplicaShardOnRemoteNodeIfPrimaryShardOnRemoteNodeForMixedModeAndRemoteStoreDirection() { + shardCount = 1; + replicaCount = 1; + isMixedMode = true; + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode remoteNode1 = getRemoteNode(); + DiscoveryNode remoteNode2 = getRemoteNode(); + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + + routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId).addShard( + // primary on remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + remoteNode1.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .add(remoteNode2) + .localNodeId(remoteNode2.getId()) + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .build(); + + beforeAllocation(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode2.getId()); + + Decision decision = remoteStoreMigrationAllocationDecider.canAllocate(replicaShardRouting, remoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + assertThat( + decision.getExplanation().toLowerCase(Locale.ROOT), + is( + "[remote_store migration_direction]: replica shard copy can be allocated to a remote node since primary shard copy has been migrated to remote" + ) + ); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnNonRemoteNodeForMixedModeAndRemoteStoreDirection() { + shardCount = 1; + replicaCount = 1; + isMixedMode = true; + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode remoteNode = getRemoteNode(); + DiscoveryNode nonRemoteNode1 = getNonRemoteNode(); + DiscoveryNode nonRemoteNode2 = getNonRemoteNode(); + + routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId).addShard( + // primary shard on non-remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + nonRemoteNode1.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .add(nonRemoteNode1) + .localNodeId(nonRemoteNode1.getId()) + .add(nonRemoteNode2) + .localNodeId(nonRemoteNode2.getId()) + .build(); + + beforeAllocation(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode2.getId()); + + Decision decision = remoteStoreMigrationAllocationDecider.canAllocate(replicaShardRouting, nonRemoteRoutingNode, routingAllocation); + Decision.Type type = Decision.Type.YES; + String reason = "[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node"; + if (isRemoteStoreBackedIndex) { + type = Decision.Type.NO; + reason = + "[remote_store migration_direction]: replica shard copy can not be allocated to a non-remote node because a remote store backed index's shard copy can only be allocated to a remote node"; + } + assertThat(decision.type(), is(type)); + assertThat(decision.getExplanation().toLowerCase(Locale.ROOT), is(reason)); + } + + public void testAllocateNewReplicaShardOnNonRemoteNodeIfPrimaryShardOnRemoteNodeForRemoteStoreDirection() { + shardCount = 1; + replicaCount = 1; + isMixedMode = true; + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode nonRemoteNode = getNonRemoteNode(); + DiscoveryNode remoteNode = getRemoteNode(); + + routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId).addShard( + // primary shard on non-remote node + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + remoteNode.getId(), + true, + ShardRoutingState.STARTED + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode) + .localNodeId(nonRemoteNode.getId()) + .add(remoteNode) + .localNodeId(remoteNode.getId()) + .build(); + + beforeAllocation(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + ShardRouting replicaShardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode.getId()); + + Decision decision = remoteStoreMigrationAllocationDecider.canAllocate(replicaShardRouting, nonRemoteRoutingNode, routingAllocation); + Decision.Type type = Decision.Type.YES; + String reason = "[remote_store migration_direction]: replica shard copy can be allocated to a non-remote node"; + if (isRemoteStoreBackedIndex) { + type = Decision.Type.NO; + reason = + "[remote_store migration_direction]: replica shard copy can not be allocated to a non-remote node because a remote store backed index's shard copy can only be allocated to a remote node"; + } + assertThat(decision.type(), is(type)); + assertThat(decision.getExplanation().toLowerCase(Locale.ROOT), is(reason)); + } + + // test for STRICT mode + + public void testAlwaysAllocateNewShardForStrictMode() { + shardCount = 1; + replicaCount = 1; + isMixedMode = false; + isRemoteStoreBackedIndex = false; + + ShardId shardId = new ShardId(TEST_INDEX, "_na_", 0); + + DiscoveryNode nonRemoteNode1 = getNonRemoteNode(); + DiscoveryNode nonRemoteNode2 = getNonRemoteNode(); + + boolean isReplicaAllocation = randomBoolean(); + + routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId).addShard( + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + (isReplicaAllocation ? nonRemoteNode1.getId() : null), + true, + (isReplicaAllocation ? ShardRoutingState.STARTED : ShardRoutingState.UNASSIGNED) + ) + ) + .addShard( + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + discoveryNodes = DiscoveryNodes.builder() + .add(nonRemoteNode1) + .localNodeId(nonRemoteNode1.getId()) + .add(nonRemoteNode2) + .localNodeId(nonRemoteNode2.getId()) + .build(); + + beforeAllocation(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + + ShardRouting shardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); + if (isReplicaAllocation) { + shardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + } + RoutingNode nonRemoteRoutingNode = clusterState.getRoutingNodes().node(nonRemoteNode2.getId()); + + Decision decision = remoteStoreMigrationAllocationDecider.canAllocate(shardRouting, nonRemoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + String reason = String.format( + Locale.ROOT, + "[remote_store migration_direction]: %s shard copy can be allocated to a non-remote node for strict compatibility mode", + (isReplicaAllocation ? "replica" : "primary") + ); + assertThat(decision.getExplanation().toLowerCase(Locale.ROOT), is(reason)); + + isRemoteStoreBackedIndex = true; + + DiscoveryNode remoteNode1 = getRemoteNode(); + DiscoveryNode remoteNode2 = getRemoteNode(); + + routingTable = RoutingTable.builder() + .add( + IndexRoutingTable.builder(shardId.getIndex()) + .addIndexShard( + new IndexShardRoutingTable.Builder(shardId).addShard( + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + (isReplicaAllocation ? remoteNode1.getId() : null), + true, + (isReplicaAllocation ? ShardRoutingState.STARTED : ShardRoutingState.UNASSIGNED) + ) + ) + .addShard( + // new replica's allocation + TestShardRouting.newShardRouting( + shardId.getIndexName(), + shardId.getId(), + null, + false, + ShardRoutingState.UNASSIGNED + ) + ) + .build() + ) + ) + .build(); + + discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .add(remoteNode2) + .localNodeId(remoteNode2.getId()) + .build(); + + beforeAllocation(); + + assertEquals(2, clusterState.getRoutingTable().allShards().size()); + + shardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).primaryShard(); + if (isReplicaAllocation) { + shardRouting = clusterState.getRoutingTable().shardRoutingTable(TEST_INDEX, 0).replicaShards().get(0); + } + RoutingNode remoteRoutingNode = clusterState.getRoutingNodes().node(remoteNode2.getId()); + + decision = remoteStoreMigrationAllocationDecider.canAllocate(shardRouting, remoteRoutingNode, routingAllocation); + assertThat(decision.type(), is(Decision.Type.YES)); + reason = String.format( + Locale.ROOT, + "[remote_store migration_direction]: %s shard copy can be allocated to a remote node for strict compatibility mode", + (isReplicaAllocation ? "replica" : "primary") + ); + assertThat(decision.getExplanation().toLowerCase(Locale.ROOT), is(reason)); + } + + // prepare index metadata for test-index + private IndexMetadata.Builder getIndexMetadataBuilder(boolean isRemoteStoreBackedIndex, int shardCount, int replicaCount) { + Settings.Builder builder = settings(Version.CURRENT); + if (isRemoteStoreBackedIndex) { + builder.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, TEST_REPO) + .put(SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, TEST_REPO) + .put(SETTING_REMOTE_STORE_ENABLED, true); + } + return IndexMetadata.builder(TEST_INDEX).settings(builder).numberOfShards(shardCount).numberOfReplicas(replicaCount); + } + + // get node-level settings + private Settings getCustomSettings(String direction, String compatibilityMode, IndexMetadata.Builder indexMetadataBuilder) { + Settings.Builder builder = Settings.builder(); + // direction settings + if (direction.toLowerCase(Locale.ROOT).equals(RemoteStoreNodeService.Direction.REMOTE_STORE.direction)) { + builder.put(remoteStoreDirectionSettings); + } else if (direction.toLowerCase(Locale.ROOT).equals(RemoteStoreNodeService.Direction.DOCREP.direction)) { + builder.put(docrepDirectionSettings); + } + + // compatibility mode settings + if (compatibilityMode.toLowerCase(Locale.ROOT).equals(RemoteStoreNodeService.CompatibilityMode.STRICT.mode)) { + builder.put(strictModeCompatibilitySettings); + } else if (compatibilityMode.toLowerCase(Locale.ROOT).equals(RemoteStoreNodeService.CompatibilityMode.MIXED.mode)) { + builder.put(mixedModeCompatibilitySettings); + } + + // index metadata settings + builder.put(indexMetadataBuilder.build().getSettings()); + + builder.put(directionEnabledNodeSettings); + + return builder.build(); + } + + private String getRandomCompatibilityMode() { + return randomFrom(RemoteStoreNodeService.CompatibilityMode.STRICT.mode, RemoteStoreNodeService.CompatibilityMode.MIXED.mode); + } + + private ClusterSettings getClusterSettings(Settings settings) { + return new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + } + + private ClusterState getInitialClusterState( + Settings settings, + IndexMetadata.Builder indexMetadataBuilder, + DiscoveryNodes discoveryNodes + ) { + Metadata metadata = Metadata.builder().persistentSettings(settings).put(indexMetadataBuilder).build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(indexMetadataBuilder.build()) + .addAsNew(metadata.index(TEST_INDEX)) + .build(); + + return ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).routingTable(routingTable).nodes(discoveryNodes).build(); + } + + // get a dummy non-remote node + private DiscoveryNode getNonRemoteNode() { + return new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + } + + // get a dummy remote node + public DiscoveryNode getRemoteNode() { + Map attributes = new HashMap<>(); + attributes.put( + REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY, + "REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_VALUE" + ); + return new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + attributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + } +}