Skip to content

Commit

Permalink
Add canRemain method to TargetPoolAllocationDecider to move shards fr…
Browse files Browse the repository at this point in the history
…om (#15010)

local to remote pool for hot to warm tiering

Signed-off-by: Neetika Singhal <[email protected]>
  • Loading branch information
neetikasinghal authored Sep 3, 2024
1 parent a60b668 commit c19cf88
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))
- Optimise snapshot deletion to speed up snapshot deletion and creation ([#15568](https://github.com/opensearch-project/OpenSearch/pull/15568))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.tiering;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.index.IndexModule;

/**
* Utility class for tiering operations
*
* @opensearch.internal
*/
public class TieringUtils {

/**
* Checks if the specified shard is a partial shard by
* checking the INDEX_STORE_LOCALITY_SETTING for its index.
* see {@link #isPartialIndex(IndexMetadata)}
* @param shard ShardRouting object representing the shard
* @param allocation RoutingAllocation object representing the allocation
* @return true if the shard is a partial shard, false otherwise
*/
public static boolean isPartialShard(ShardRouting shard, RoutingAllocation allocation) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index());
return isPartialIndex(indexMetadata);
}

/**
* Checks if the specified index is a partial index by
* checking the INDEX_STORE_LOCALITY_SETTING for the index.
*
* @param indexMetadata the metadata of the index
* @return true if the index is a partial index, false otherwise
*/
public static boolean isPartialIndex(final IndexMetadata indexMetadata) {
return IndexModule.DataLocalityType.PARTIAL.name()
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.util.FeatureFlags;

import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialIndex;

/**
* {@link RoutingPool} defines the different node types based on the assigned capabilities. The methods
Expand Down Expand Up @@ -58,6 +61,7 @@ public static RoutingPool getShardPool(ShardRouting shard, RoutingAllocation all
* @return {@link RoutingPool} for the given index.
*/
public static RoutingPool getIndexPool(IndexMetadata indexMetadata) {
return indexMetadata.isRemoteSnapshot() ? REMOTE_CAPABLE : LOCAL_ONLY;
return indexMetadata.isRemoteSnapshot()
|| (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialIndex(indexMetadata)) ? REMOTE_CAPABLE : LOCAL_ONLY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.gateway.PriorityComparator;

import java.util.ArrayList;
Expand All @@ -45,6 +46,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.opensearch.action.admin.indices.tiering.TieringUtils.isPartialShard;
import static org.opensearch.cluster.routing.ShardRoutingState.RELOCATING;

/**
Expand Down Expand Up @@ -552,6 +554,16 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) {
}
}

/**
* Checks if the shard can be skipped from the local shard balancer operations
* @param shardRouting the shard to be checked
* @return true if the shard can be skipped, false otherwise
*/
private boolean canShardBeSkipped(ShardRouting shardRouting) {
return (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))
&& !(FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shardRouting, allocation)));
}

/**
* Move started shards that can not be allocated to a node anymore
* <p>
Expand Down Expand Up @@ -603,7 +615,7 @@ void moveShards() {

ShardRouting shardRouting = it.next();

if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
if (canShardBeSkipped(shardRouting)) {
continue;
}

Expand Down Expand Up @@ -669,7 +681,7 @@ void moveShards() {
*/
@Override
MoveDecision decideMove(final ShardRouting shardRouting) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) {
if (canShardBeSkipped(shardRouting)) {
return MoveDecision.NOT_TAKEN;
}

Expand Down Expand Up @@ -758,7 +770,9 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
for (ShardRouting shard : rn) {
assert rn.nodeId().equals(shard.currentNodeId());
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
if ((RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation))
|| (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX) && isPartialShard(shard, allocation)))
&& shard.state() != RELOCATING) {
node.addShard(shard);
++totalShardCount;
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,36 @@ public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode n
return canAllocate(shardRouting, node, allocation);
}

@Override
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
RoutingPool targetPool = RoutingPool.getShardPool(shardRouting, allocation);
RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shardRouting.currentNodeId()));
if (RoutingPool.REMOTE_CAPABLE.equals(targetPool) && targetPool != currentNodePool) {
logger.debug(
"Shard: [{}] has current pool: [{}], target pool: [{}]. Cannot remain on node: [{}]",
shardRouting.shortSummary(),
currentNodePool.name(),
RoutingPool.REMOTE_CAPABLE.name(),
node.node()
);
return allocation.decision(
Decision.NO,
NAME,
"Shard %s is allocated on a different pool %s than the target pool %s",
shardRouting.shortSummary(),
currentNodePool,
targetPool
);
}
return allocation.decision(
Decision.YES,
NAME,
"Routing pools are compatible. Shard pool: [%s], node pool: [%s]",
currentNodePool,
targetPool
);
}

public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
logger.debug("Evaluating node: {} for autoExpandReplica eligibility of index: {}", node, indexMetadata.getIndex());
return canAllocateInTargetPool(indexMetadata, node, allocation);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.Version;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.RoutingPool;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.test.FeatureFlagSetter;
import org.junit.Before;

import static org.opensearch.cluster.routing.RoutingPool.LOCAL_ONLY;
import static org.opensearch.cluster.routing.RoutingPool.REMOTE_CAPABLE;
import static org.opensearch.cluster.routing.RoutingPool.getIndexPool;
import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING;

public class ShardsTieringAllocationTests extends TieringAllocationBaseTestCase {

@Before
public void setup() {
FeatureFlagSetter.set(FeatureFlags.TIERED_REMOTE_INDEX);
}

public void testShardsInLocalPool() {
int localOnlyNodes = 5;
int remoteCapableNodes = 3;
int localIndices = 5;
int remoteIndices = 0;
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
AllocationService service = this.createRemoteCapableAllocationService();
// assign shards to respective nodes
clusterState = allocateShardsAndBalance(clusterState, service);
RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
assertEquals(0, routingNodes.unassigned().size());

for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
assertFalse(shard.unassigned());
RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
assertEquals(LOCAL_ONLY, shardPool);
}
}

public void testShardsInRemotePool() {
int localOnlyNodes = 7;
int remoteCapableNodes = 3;
int localIndices = 0;
int remoteIndices = 13;
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
AllocationService service = this.createRemoteCapableAllocationService();
// assign shards to respective nodes
clusterState = allocateShardsAndBalance(clusterState, service);
RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
assertEquals(0, routingNodes.unassigned().size());

for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
assertFalse(shard.unassigned());
RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
assertEquals(REMOTE_CAPABLE, shardPool);
}
}

public void testShardsWithTiering() {
int localOnlyNodes = 15;
int remoteCapableNodes = 13;
int localIndices = 10;
int remoteIndices = 0;
ClusterState clusterState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices);
AllocationService service = this.createRemoteCapableAllocationService();
// assign shards to respective nodes
clusterState = allocateShardsAndBalance(clusterState, service);
// put indices in the hot to warm tiering state
clusterState = updateIndexMetadataForTiering(
clusterState,
localIndices,
IndexModule.TieringState.HOT_TO_WARM.name(),
IndexModule.DataLocalityType.PARTIAL.name()
);
// trigger shard relocation
clusterState = allocateShardsAndBalance(clusterState, service);
RoutingNodes routingNodes = clusterState.getRoutingNodes();
RoutingAllocation allocation = getRoutingAllocation(clusterState, routingNodes);
assertEquals(0, routingNodes.unassigned().size());

for (ShardRouting shard : clusterState.getRoutingTable().allShards()) {
assertFalse(shard.unassigned());
RoutingNode node = routingNodes.node(shard.currentNodeId());
RoutingPool nodePool = RoutingPool.getNodePool(node);
RoutingPool shardPool = RoutingPool.getShardPool(shard, allocation);
assertEquals(RoutingPool.REMOTE_CAPABLE, shardPool);
assertEquals(nodePool, shardPool);
}
}

public void testShardPoolForPartialIndices() {
String index = "test-index";
IndexMetadata indexMetadata = IndexMetadata.builder(index)
.settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name()))
.numberOfShards(PRIMARIES)
.numberOfReplicas(REPLICAS)
.build();
RoutingPool indexPool = getIndexPool(indexMetadata);
assertEquals(REMOTE_CAPABLE, indexPool);
}

public void testShardPoolForFullIndices() {
String index = "test-index";
IndexMetadata indexMetadata = IndexMetadata.builder(index)
.settings(settings(Version.CURRENT).put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()))
.numberOfShards(PRIMARIES)
.numberOfReplicas(REPLICAS)
.build();
RoutingPool indexPool = getIndexPool(indexMetadata);
assertEquals(LOCAL_ONLY, indexPool);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.allocation;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.settings.Settings;

import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING;
import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE;

@SuppressForbidden(reason = "feature flag overrides")
public abstract class TieringAllocationBaseTestCase extends RemoteShardsBalancerBaseTestCase {

public ClusterState updateIndexMetadataForTiering(
ClusterState clusterState,
int localIndices,
String tieringState,
String dataLocality
) {
Metadata.Builder mb = Metadata.builder(clusterState.metadata());
for (int i = 0; i < localIndices; i++) {
IndexMetadata indexMetadata = clusterState.metadata().index(getIndexName(i, false));
Settings settings = indexMetadata.getSettings();
mb.put(
IndexMetadata.builder(indexMetadata)
.settings(
Settings.builder()
.put(settings)
.put(settings)
.put(INDEX_TIERING_STATE.getKey(), tieringState)
.put(INDEX_STORE_LOCALITY_SETTING.getKey(), dataLocality)
)
);
}
Metadata metadata = mb.build();
return ClusterState.builder(clusterState).metadata(metadata).build();
}
}

0 comments on commit c19cf88

Please sign in to comment.