Skip to content

Commit

Permalink
HotToWarmTieringService changes and changes in shard balancer to tier
Browse files Browse the repository at this point in the history
shards

Signed-off-by: Neetika Singhal <[email protected]>
  • Loading branch information
neetikasinghal committed Jul 16, 2024
1 parent 5569b43 commit 044ea5b
Show file tree
Hide file tree
Showing 11 changed files with 1,330 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore.tiering;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.tiering.HotToWarmTieringAction;
import org.opensearch.action.tiering.HotToWarmTieringResponse;
import org.opensearch.action.tiering.TieringIndexRequest;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.MockInternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.junit.Test;

import java.util.Collection;
import java.util.Collections;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from tiering package", value =
// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE")
public class HotToWarmTieringServiceIT extends RemoteStoreBaseIntegTestCase {

protected static final String TEST_IDX_1 = "test-idx-1";
protected static final String TEST_IDX_2 = "test-idx-2";
protected static final String TARGET_TIER = "warm";
protected static final int NUM_DOCS_IN_BULK = 10;
private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes();

/*
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
return featureSettings.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
}

@Test
public void testTieringBasic() {
final int numReplicasIndex = 0;
internalCluster().ensureAtLeastNumDataNodes(1);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())
.build();

String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 };
for (String index : indices) {
assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get());
ensureGreen(index);
// Ingesting some docs
indexBulk(index, NUM_DOCS_IN_BULK);
flushAndRefresh(index);
// ensuring cluster is green after performing force-merge
ensureGreen();
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
}

// Spin up node having search role
internalCluster().ensureAtLeastNumSearchAndDataNodes(1);

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);

TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, indices);
request.waitForCompletion(true);
HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet();
assertAcked(response);
assertTrue(response.getFailedIndices().isEmpty());
assertTrue(response.isAcknowledged());
for (String index : indices) {
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
assertAcked(client().admin().indices().prepareDelete(index).get());
}
}

private MockInternalClusterInfoService getMockInternalClusterInfoService() {
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
}

private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) {
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.tiering.HotToWarmTieringService;
import org.opensearch.transport.TransportService;

import java.io.IOException;
Expand All @@ -45,6 +46,8 @@ public class TransportHotToWarmTieringAction extends TransportClusterManagerNode
private final ClusterInfoService clusterInfoService;
private final DiskThresholdSettings diskThresholdSettings;

private final HotToWarmTieringService hotToWarmTieringService;

@Inject
public TransportHotToWarmTieringAction(
TransportService transportService,
Expand All @@ -53,7 +56,8 @@ public TransportHotToWarmTieringAction(
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterInfoService clusterInfoService,
Settings settings
Settings settings,
HotToWarmTieringService hotToWarmTieringService
) {
super(
HotToWarmTieringAction.NAME,
Expand All @@ -65,6 +69,7 @@ public TransportHotToWarmTieringAction(
indexNameExpressionResolver
);
this.clusterInfoService = clusterInfoService;
this.hotToWarmTieringService = hotToWarmTieringService;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings());;
}

Expand Down Expand Up @@ -124,5 +129,6 @@ protected void clusterManagerOperation(
listener.onResponse(hotToWarmTieringRequestContext.constructResponse());
return;
}
hotToWarmTieringService.tier(hotToWarmTieringRequestContext, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException {
static final String KEY_SYSTEM = "system";
public static final String KEY_PRIMARY_TERMS = "primary_terms";
public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store";
public static final String TIERING_CUSTOM_KEY = "tiering";
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";

public static final String INDEX_STATE_FILE_PREFIX = "state-";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public void allocate(RoutingAllocation allocation) {
preferPrimaryShardBalance,
preferPrimaryShardRebalance
);
localShardsBalancer.tierShards();
localShardsBalancer.allocateUnassigned();
localShardsBalancer.moveShards();
localShardsBalancer.balance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.common.Randomness;
import org.opensearch.common.collect.Tuple;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.index.IndexModule;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -39,6 +42,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -527,6 +531,128 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) {
}
}

/**
* Triggers shard relocation for the shards that are submitted
* for tiering to go from local node pool to remote capable node
* pool.
*/
public void tierShards() {
List<ShardRouting> shardsPendingTiering = new ArrayList<>();
List<RoutingNode> remoteRoutingNodes = new ArrayList<>();
Iterator<RoutingNode> nodes = routingNodes.mutableIterator();
while (nodes.hasNext()) {
RoutingNode node = nodes.next();
RoutingPool pool = RoutingPool.getNodePool(node);
if (pool == RoutingPool.REMOTE_CAPABLE) {
remoteRoutingNodes.add(node);
}
}

/* Routing Table is created after reroute and is immutable. It does not hold
* updated shard states with changes applied during the reroute cycle. Throughout the
* allocation (reroute) cycle, we need to work on RoutingNodes shard objects.
*/
List<ShardRouting> indexShards = allocation.routingTable().allShards();
for (ShardRouting shard : indexShards) {
IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index());
if (IndexModule.TieringState.HOT_TO_WARM.name()
.equals(indexMetadata.getSettings().get(IndexModule.INDEX_TIERING_STATE.getKey()))) {
List<ShardRouting> shardRoutings = allocation.routingNodes().assignedShards(shard.shardId());
if (shardRoutings.isEmpty()) {
logger.debug(
"Tiering in progress found for shards in [{}] state. Can only tier assigned shards",
shard.state().toString()
);
continue;
}
if (shard.started()) {
RoutingPool targetPool = RoutingPool.getShardPool(shard, allocation);
RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shard.currentNodeId()));
if (targetPool != currentNodePool) {
logger.trace(
"Found shard [{}] with target pool [{}] on node pool [{}]. Adding to tiering list.",
shard.toString(),
targetPool.toString(),
currentNodePool.toString()
);
shardsPendingTiering.add(shard);
}
}
}
}

if (!shardsPendingTiering.isEmpty() && remoteRoutingNodes.isEmpty()) {
logger.warn("No nodes available in the remote capable pool: [{}]");
return;
}

Randomness.shuffle(remoteRoutingNodes);
Queue<RoutingNode> nodeQueue = new ArrayDeque<>(remoteRoutingNodes);

// Relocate shards pending tiering
for (ShardRouting shard : shardsPendingTiering) {
if (nodeQueue.isEmpty()) {
logger.error("[Tier Shards] No nodes available. Cannot tier to target pool: [{}]", RoutingPool.REMOTE_CAPABLE.name());
break;
}

logger.info(
"Processing tiering, Target Pool: [{}]. Pending Shards: [{}], Available Nodes: [{}]",
RoutingPool.REMOTE_CAPABLE.name(),
shardsPendingTiering,
nodeQueue.size()
);

// Find a tiering target node for shard and initiate relocation
int nodesCheckedForShard = 0;
while (!nodeQueue.isEmpty()) {
RoutingNode targetNode = nodeQueue.poll();
Decision tieringDecision = allocation.deciders().canAllocate(shard, targetNode, allocation);

if (tieringDecision.type() == Decision.Type.YES) {
logger.debug(
"[Tier Shards] Relocating shard: [{}] from node: [{}] to node: [{}].",
shard.toString(),
shard.currentNodeId(),
targetNode.nodeId()
);
routingNodes.relocateShard(
shard,
targetNode.nodeId(),
allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
allocation.changes()
);
nodeQueue.offer(targetNode);
break;
} else {
logger.trace(
"[Tier Shards] Cannot relocate shard: [{}] to node: [{}]. Decisions: [{}]",
shard.toString(),
targetNode.nodeId(),
tieringDecision.getDecisions()
);

Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation);
if (nodeLevelDecision.type() == Decision.Type.YES) {
logger.debug("[Tier Shards] Node: [{}] can still accept shards. Adding it back to the queue.", targetNode.nodeId());
nodeQueue.offer(targetNode);
nodesCheckedForShard++;
} else {
logger.debug(
"[TIer Shards] Node: [{}] cannot accept any more shards. Removing it from queue.",
targetNode.nodeId()
);
}

// Break out if all nodes in the queue have been checked for this shard
if (nodeQueue.size() == nodesCheckedForShard) {
break;
}
}
}
}
}

/**
* Move started shards that can not be allocated to a node anymore
* <p>
Expand Down Expand Up @@ -725,6 +851,7 @@ private Map<String, BalancedShardsAllocator.ModelNode> buildModelFromAssigned()
assert rn.nodeId().equals(shard.currentNodeId());
/* we skip relocating shards here since we expect an initializing shard with the same id coming in */
if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) {
// reverted
node.addShard(shard);
++totalShardCount;
if (logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ void balance() {
}
}

@Override
void tierShards() {}

/**
* Calculates the total number of primary shards per node.
* @param remoteRoutingNodes routing nodes for which the aggregation needs to be performed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public abstract class ShardsBalancer {
*/
abstract void balance();

abstract void tierShards();

/**
* Make a decision for allocating an unassigned shard.
* @param shardRouting the shard for which the decision has to be made
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING,
IndexModule.INDEX_STORE_TYPE_SETTING,
IndexModule.INDEX_STORE_PRE_LOAD_SETTING,
IndexModule.INDEX_TIERING_STATE,
IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS,
IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS,
IndexModule.INDEX_RECOVERY_TYPE_SETTING,
Expand Down
Loading

0 comments on commit 044ea5b

Please sign in to comment.