From 99d85102d118661946fc1ec113e3c547bfe892c9 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 25 Jul 2024 16:23:43 -0700 Subject: [PATCH] Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering Signed-off-by: Neetika Singhal --- CHANGELOG.md | 2 + .../admin/indices/tiering/TieringUtils.java | 47 +++++++ .../cluster/routing/RoutingPool.java | 6 +- .../allocator/LocalShardsBalancer.java | 20 ++- .../decider/TargetPoolAllocationDecider.java | 30 ++++ .../ShardsTieringAllocationTests.java | 128 ++++++++++++++++++ .../TieringAllocationBaseTestCase.java | 47 +++++++ 7 files changed, 276 insertions(+), 4 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java create mode 100644 server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 93a98de2c5af8..0f77c03f0bd55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add index creation using the context field ([#15290](https://github.com/opensearch-project/OpenSearch/pull/15290)) - Add fieldType to AbstractQueryBuilder and FieldSortBuilder ([#15328](https://github.com/opensearch-project/OpenSearch/pull/15328))) - [Reader Writer Separation] Add searchOnly replica routing configuration ([#15410](https://github.com/opensearch-project/OpenSearch/pull/15410)) +<<<<<<< HEAD - [Range Queries] Add new approximateable query framework to short-circuit range queries ([#13788](https://github.com/opensearch-project/OpenSearch/pull/13788)) - [Workload Management] Add query group level failure tracking ([#15227](https://github.com/opensearch-project/OpenSearch/pull/15527)) - Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426)) @@ -47,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409)) - Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557)) - [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218)) +- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java new file mode 100644 index 0000000000000..46912de17f213 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.index.IndexModule; + +/** + * Utility class for tiering operations + * + * @opensearch.internal + */ +public class TieringUtils { + + /** + * Checks if the specified shard is a partial shard by + * checking the INDEX_STORE_LOCALITY_SETTING for its index. + * see {@link #isPartialIndex(IndexMetadata)} + * @param shard ShardRouting object representing the shard + * @param allocation RoutingAllocation object representing the allocation + * @return true if the shard is a partial shard, false otherwise + */ + public static boolean isPartialShard(ShardRouting shard, RoutingAllocation allocation) { + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index()); + return isPartialIndex(indexMetadata); + } + + /** + * Checks if the specified index is a partial index by + * checking the INDEX_STORE_LOCALITY_SETTING for the index. + * + * @param indexMetadata the metadata of the index + * @return true if the index is a partial index, false otherwise + */ + public static boolean isPartialIndex(final IndexMetadata indexMetadata) { + return IndexModule.DataLocalityType.PARTIAL.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey())); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java index db10ad61c7d6d..647e993339476 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingPool.java @@ -11,6 +11,9 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.util.FeatureFlags; + +import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex; /** * {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods @@ -58,6 +61,7 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all * @return {@link RoutingPool} for the given index. */ public static RoutingPool getIndexPool(IndexMetadata indexMetadata) { - return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY; + return indexMetadata.isRemoteSnapshot() + || (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialIndex(indexMetadata)) ? REMOTE_CAPABLE : LOCAL_ONLY; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index adb8ee2cf7e85..7f6a7790d1db0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -30,6 +30,7 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.gateway.PriorityComparator; import java.util.ArrayList; @@ -45,6 +46,7 @@ import java.util.stream.Stream; import java.util.stream.StreamSupport; +import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialShard; import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING; /** @@ -552,6 +554,16 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { } } + /** + * Checks if the shard can be skipped from the local shard balancer operations + * @param shardRouting the shard to be checked + * @return true if the shard can be skipped, false otherwise + */ + private boolean canShardBeSkipped(ShardRouting shardRouting) { + return (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation)) + && !(FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shardRouting, allocation))); + } + /** * Move started shards that can not be allocated to a node anymore *

@@ -603,7 +615,7 @@ void moveShards() { ShardRouting shardRouting = it.next(); - if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + if (canShardBeSkipped(shardRouting)) { continue; } @@ -669,7 +681,7 @@ void moveShards() { */ @Override MoveDecision decideMove(final ShardRouting shardRouting) { - if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + if (canShardBeSkipped(shardRouting)) { return MoveDecision.NOT_TAKEN; } @@ -758,7 +770,9 @@ private Map buildModelFromAssigned() for (ShardRouting shard : rn) { assert rn.nodeId().equals(shard.currentNodeId()); /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ - if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) { + if ((RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) + || (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shard, allocation))) + && shard.state() != RELOCATING) { node.addShard(shard); ++totalShardCount; if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java index 76f9f44077ad8..493d23b57d271 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/TargetPoolAllocationDecider.java @@ -87,6 +87,36 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n return canAllocate(shardRouting, node, allocation); } + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + RoutingPool targetPool = RoutingPool.getShardPool(shardRouting, allocation); + RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shardRouting.currentNodeId())); + if (RoutingPool.REMOTE_CAPABLE.equals(targetPool) && targetPool != currentNodePool) { + logger.debug( + "Shard: [{}] has current pool: [{}], target pool: [{}]. Cannot remain on node: [{}]", + shardRouting.shortSummary(), + currentNodePool.name(), + RoutingPool.REMOTE_CAPABLE.name(), + node.node() + ); + return allocation.decision( + Decision.NO, + NAME, + "Shard %s is allocated on a different pool %s than the target pool %s", + shardRouting.shortSummary(), + currentNodePool, + targetPool + ); + } + return allocation.decision( + Decision.YES, + NAME, + "Routing pools are compatible. Shard pool: [%s], node pool: [%s]", + currentNodePool, + targetPool + ); + } + public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex()); return canAllocateInTargetPool(indexMetadata, node, allocation); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java new file mode 100644 index 0000000000000..8d45ebd2781b1 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ShardsTieringAllocationTests.java @@ -0,0 +1,128 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.Version; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.index.IndexModule; +import org.opensearch.test.FeatureFlagSetter; +import org.junit.Before; + +import static org.opensearch.cluster.routing.RoutingPool.LOCAL_ONLY; +import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE; +import static org.opensearch.cluster.routing.RoutingPool.getIndexPool; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; + +public class ShardsTieringAllocationTests extends TieringAllocationBaseTestCase { + + @Before + public void setup() { + FeatureFlagSetter.set(FeatureFlags.TIERED_REMOTE_INDEX); + } + + public void testShardsInLocalPool() { + int localOnlyNodes = 5; + int remoteCapableNodes = 3; + int localIndices = 5; + int remoteIndices = 0; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + // assign shards to respective nodes + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + assertEquals(0, routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + assertEquals(LOCAL_ONLY, shardPool); + } + } + + public void testShardsInRemotePool() { + int localOnlyNodes = 7; + int remoteCapableNodes = 3; + int localIndices = 0; + int remoteIndices = 13; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + // assign shards to respective nodes + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + assertEquals(0, routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + assertEquals(REMOTE_CAPABLE, shardPool); + } + } + + public void testShardsWithTiering() { + int localOnlyNodes = 15; + int remoteCapableNodes = 13; + int localIndices = 10; + int remoteIndices = 0; + ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + AllocationService service = this.createRemoteCapableAllocationService(); + // assign shards to respective nodes + clusterState = allocateShardsAndBalance(clusterState, service); + // put indices in the hot to warm tiering state + clusterState = updateIndexMetadataForTiering( + clusterState, + localIndices, + IndexModule.TieringState.HOT_TO_WARM.name(), + IndexModule.DataLocalityType.PARTIAL.name() + ); + // trigger shard relocation + clusterState = allocateShardsAndBalance(clusterState, service); + RoutingNodes routingNodes = clusterState.getRoutingNodes(); + RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes); + assertEquals(0, routingNodes.unassigned().size()); + + for (ShardRouting shard : clusterState.getRoutingTable().allShards()) { + assertFalse(shard.unassigned()); + RoutingNode node = routingNodes.node(shard.currentNodeId()); + RoutingPool nodePool = RoutingPool.getNodePool(node); + RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation); + assertEquals(RoutingPool.REMOTE_CAPABLE, shardPool); + assertEquals(nodePool, shardPool); + } + } + + public void testShardPoolForPartialIndices() { + String index = "test-index"; + IndexMetadata indexMetadata = IndexMetadata.builder(index) + .settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + .build(); + RoutingPool indexPool = getIndexPool(indexMetadata); + assertEquals(REMOTE_CAPABLE, indexPool); + } + + public void testShardPoolForFullIndices() { + String index = "test-index"; + IndexMetadata indexMetadata = IndexMetadata.builder(index) + .settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())) + .numberOfShards(PRIMARIES) + .numberOfReplicas(REPLICAS) + .build(); + RoutingPool indexPool = getIndexPool(indexMetadata); + assertEquals(LOCAL_ONLY, indexPool); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java new file mode 100644 index 0000000000000..aba6fe74e0634 --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TieringAllocationBaseTestCase.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.settings.Settings; + +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +@SuppressForbidden(reason = "feature flag overrides") +public abstract class TieringAllocationBaseTestCase extends RemoteShardsBalancerBaseTestCase { + + public ClusterState updateIndexMetadataForTiering( + ClusterState clusterState, + int localIndices, + String tieringState, + String dataLocality + ) { + Metadata.Builder mb = Metadata.builder(clusterState.metadata()); + for (int i = 0; i < localIndices; i++) { + IndexMetadata indexMetadata = clusterState.metadata().index(getIndexName(i, false)); + Settings settings = indexMetadata.getSettings(); + mb.put( + IndexMetadata.builder(indexMetadata) + .settings( + Settings.builder() + .put(settings) + .put(settings) + .put(INDEX_TIERING_STATE.getKey(), tieringState) + .put(INDEX_STORE_LOCALITY_SETTING.getKey(), dataLocality) + ) + ); + } + Metadata metadata = mb.build(); + return ClusterState.builder(clusterState).metadata(metadata).build(); + } +}