From 044ea5b435982d94925be57428ed3a38259fd5f7 Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 11 Jul 2024 12:57:20 -0700 Subject: [PATCH] HotToWarmTieringService changes and changes in shard balancer to tier shards Signed-off-by: Neetika Singhal --- .../tiering/HotToWarmTieringServiceIT.java | 132 +++ .../TransportHotToWarmTieringAction.java | 8 +- .../cluster/metadata/IndexMetadata.java | 1 + .../allocator/BalancedShardsAllocator.java | 1 + .../allocator/LocalShardsBalancer.java | 127 +++ .../allocator/RemoteShardsBalancer.java | 3 + .../allocation/allocator/ShardsBalancer.java | 2 + .../common/settings/IndexScopedSettings.java | 1 + .../main/java/org/opensearch/node/Node.java | 9 + .../tiering/HotToWarmTieringService.java | 874 ++++++++++++++++++ .../tiering/HotToWarmTieringServiceTests.java | 173 ++++ 11 files changed, 1330 insertions(+), 1 deletion(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/tiering/HotToWarmTieringServiceIT.java create mode 100644 server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java create mode 100644 server/src/test/java/org/opensearch/tiering/HotToWarmTieringServiceTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/tiering/HotToWarmTieringServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/tiering/HotToWarmTieringServiceIT.java new file mode 100644 index 0000000000000..f05715fa540ef --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/tiering/HotToWarmTieringServiceIT.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore.tiering; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.tiering.HotToWarmTieringAction; +import org.opensearch.action.tiering.HotToWarmTieringResponse; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.MockInternalClusterInfoService; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.plugins.Plugin; +import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +// Uncomment the below line to enable trace level logs for this test for better debugging +// @TestLogging(reason = "Getting trace logs from tiering package", value = +// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE") +public class HotToWarmTieringServiceIT extends RemoteStoreBaseIntegTestCase { + + protected static final String TEST_IDX_1 = "test-idx-1"; + protected static final String TEST_IDX_2 = "test-idx-2"; + protected static final String TARGET_TIER = "warm"; + protected static final int NUM_DOCS_IN_BULK = 10; + private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes(); + + /* + Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) + As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory + */ + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); + return featureSettings.build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + @Test + public void testTieringBasic() { + final int numReplicasIndex = 0; + internalCluster().ensureAtLeastNumDataNodes(1); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()) + .build(); + + String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 }; + for (String index : indices) { + assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get()); + ensureGreen(index); + // Ingesting some docs + indexBulk(index, NUM_DOCS_IN_BULK); + flushAndRefresh(index); + // ensuring cluster is green after performing force-merge + ensureGreen(); + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + } + + // Spin up node having search role + internalCluster().ensureAtLeastNumSearchAndDataNodes(1); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) + ); + + TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, indices); + request.waitForCompletion(true); + HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); + assertAcked(response); + assertTrue(response.getFailedIndices().isEmpty()); + assertTrue(response.isAcknowledged()); + for (String index : indices) { + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + assertAcked(client().admin().indices().prepareDelete(index).get()); + } + } + + private MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + } + + private static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); + } +} diff --git a/server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java index 431e984c283fc..f3870cc480dc8 100644 --- a/server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java +++ b/server/src/main/java/org/opensearch/action/tiering/TransportHotToWarmTieringAction.java @@ -26,6 +26,7 @@ import org.opensearch.core.index.Index; import org.opensearch.index.IndexNotFoundException; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tiering.HotToWarmTieringService; import org.opensearch.transport.TransportService; import java.io.IOException; @@ -45,6 +46,8 @@ public class TransportHotToWarmTieringAction extends TransportClusterManagerNode private final ClusterInfoService clusterInfoService; private final DiskThresholdSettings diskThresholdSettings; + private final HotToWarmTieringService hotToWarmTieringService; + @Inject public TransportHotToWarmTieringAction( TransportService transportService, @@ -53,7 +56,8 @@ public TransportHotToWarmTieringAction( ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterInfoService clusterInfoService, - Settings settings + Settings settings, + HotToWarmTieringService hotToWarmTieringService ) { super( HotToWarmTieringAction.NAME, @@ -65,6 +69,7 @@ public TransportHotToWarmTieringAction( indexNameExpressionResolver ); this.clusterInfoService = clusterInfoService; + this.hotToWarmTieringService = hotToWarmTieringService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings());; } @@ -124,5 +129,6 @@ protected void clusterManagerOperation( listener.onResponse(hotToWarmTieringRequestContext.constructResponse()); return; } + hotToWarmTieringService.tier(hotToWarmTieringRequestContext, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 9e7fe23f29872..195021252135f 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -637,6 +637,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_SYSTEM = "system"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store"; + public static final String TIERING_CUSTOM_KEY = "tiering"; public static final String TRANSLOG_METADATA_KEY = "translog_metadata"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b2443490dd973..e00fda62a1c2e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -284,6 +284,7 @@ public void allocate(RoutingAllocation allocation) { preferPrimaryShardBalance, preferPrimaryShardRebalance ); + localShardsBalancer.tierShards(); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); localShardsBalancer.balance(); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 00eb79add9f1d..74585f6356af3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -28,9 +28,12 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.Randomness; import org.opensearch.common.collect.Tuple; import org.opensearch.gateway.PriorityComparator; +import org.opensearch.index.IndexModule; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -39,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -527,6 +531,128 @@ private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) { } } + /** + * Triggers shard relocation for the shards that are submitted + * for tiering to go from local node pool to remote capable node + * pool. + */ + public void tierShards() { + List shardsPendingTiering = new ArrayList<>(); + List remoteRoutingNodes = new ArrayList<>(); + Iterator nodes = routingNodes.mutableIterator(); + while (nodes.hasNext()) { + RoutingNode node = nodes.next(); + RoutingPool pool = RoutingPool.getNodePool(node); + if (pool == RoutingPool.REMOTE_CAPABLE) { + remoteRoutingNodes.add(node); + } + } + + /* Routing Table is created after reroute and is immutable. It does not hold + * updated shard states with changes applied during the reroute cycle. Throughout the + * allocation (reroute) cycle, we need to work on RoutingNodes shard objects. + */ + List indexShards = allocation.routingTable().allShards(); + for (ShardRouting shard : indexShards) { + IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shard.index()); + if (IndexModule.TieringState.HOT_TO_WARM.name() + .equals(indexMetadata.getSettings().get(IndexModule.INDEX_TIERING_STATE.getKey()))) { + List shardRoutings = allocation.routingNodes().assignedShards(shard.shardId()); + if (shardRoutings.isEmpty()) { + logger.debug( + "Tiering in progress found for shards in [{}] state. Can only tier assigned shards", + shard.state().toString() + ); + continue; + } + if (shard.started()) { + RoutingPool targetPool = RoutingPool.getShardPool(shard, allocation); + RoutingPool currentNodePool = RoutingPool.getNodePool(allocation.routingNodes().node(shard.currentNodeId())); + if (targetPool != currentNodePool) { + logger.trace( + "Found shard [{}] with target pool [{}] on node pool [{}]. Adding to tiering list.", + shard.toString(), + targetPool.toString(), + currentNodePool.toString() + ); + shardsPendingTiering.add(shard); + } + } + } + } + + if (!shardsPendingTiering.isEmpty() && remoteRoutingNodes.isEmpty()) { + logger.warn("No nodes available in the remote capable pool: [{}]"); + return; + } + + Randomness.shuffle(remoteRoutingNodes); + Queue nodeQueue = new ArrayDeque<>(remoteRoutingNodes); + + // Relocate shards pending tiering + for (ShardRouting shard : shardsPendingTiering) { + if (nodeQueue.isEmpty()) { + logger.error("[Tier Shards] No nodes available. Cannot tier to target pool: [{}]", RoutingPool.REMOTE_CAPABLE.name()); + break; + } + + logger.info( + "Processing tiering, Target Pool: [{}]. Pending Shards: [{}], Available Nodes: [{}]", + RoutingPool.REMOTE_CAPABLE.name(), + shardsPendingTiering, + nodeQueue.size() + ); + + // Find a tiering target node for shard and initiate relocation + int nodesCheckedForShard = 0; + while (!nodeQueue.isEmpty()) { + RoutingNode targetNode = nodeQueue.poll(); + Decision tieringDecision = allocation.deciders().canAllocate(shard, targetNode, allocation); + + if (tieringDecision.type() == Decision.Type.YES) { + logger.debug( + "[Tier Shards] Relocating shard: [{}] from node: [{}] to node: [{}].", + shard.toString(), + shard.currentNodeId(), + targetNode.nodeId() + ); + routingNodes.relocateShard( + shard, + targetNode.nodeId(), + allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), + allocation.changes() + ); + nodeQueue.offer(targetNode); + break; + } else { + logger.trace( + "[Tier Shards] Cannot relocate shard: [{}] to node: [{}]. Decisions: [{}]", + shard.toString(), + targetNode.nodeId(), + tieringDecision.getDecisions() + ); + + Decision nodeLevelDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation); + if (nodeLevelDecision.type() == Decision.Type.YES) { + logger.debug("[Tier Shards] Node: [{}] can still accept shards. Adding it back to the queue.", targetNode.nodeId()); + nodeQueue.offer(targetNode); + nodesCheckedForShard++; + } else { + logger.debug( + "[TIer Shards] Node: [{}] cannot accept any more shards. Removing it from queue.", + targetNode.nodeId() + ); + } + + // Break out if all nodes in the queue have been checked for this shard + if (nodeQueue.size() == nodesCheckedForShard) { + break; + } + } + } + } + } + /** * Move started shards that can not be allocated to a node anymore *

@@ -725,6 +851,7 @@ private Map buildModelFromAssigned() assert rn.nodeId().equals(shard.currentNodeId()); /* we skip relocating shards here since we expect an initializing shard with the same id coming in */ if (RoutingPool.LOCAL_ONLY.equals(RoutingPool.getShardPool(shard, allocation)) && shard.state() != RELOCATING) { + // reverted node.addShard(shard); ++totalShardCount; if (logger.isTraceEnabled()) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index a05938c176678..4473944218d33 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -266,6 +266,9 @@ void balance() { } } + @Override + void tierShards() {} + /** * Calculates the total number of primary shards per node. * @param remoteRoutingNodes routing nodes for which the aggregation needs to be performed diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index ef2dbd34644a7..17b0ed0234e53 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -37,6 +37,8 @@ public abstract class ShardsBalancer { */ abstract void balance(); + abstract void tierShards(); + /** * Make a decision for allocating an unassigned shard. * @param shardRouting the shard for which the decision has to be made diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index ca2c4dab6102b..ee26b537ce888 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -191,6 +191,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { BitsetFilterCache.INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING, IndexModule.INDEX_STORE_TYPE_SETTING, IndexModule.INDEX_STORE_PRE_LOAD_SETTING, + IndexModule.INDEX_TIERING_STATE, IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS, IndexModule.INDEX_STORE_HYBRID_NIO_EXTENSIONS, IndexModule.INDEX_RECOVERY_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index d91b2a45a48c6..b269a47af5aea 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -256,6 +256,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tiering.HotToWarmTieringService; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -1195,6 +1196,13 @@ protected Node( remoteClusterStateService ); + final HotToWarmTieringService tieringService = new HotToWarmTieringService( + settings, + clusterService, + clusterModule.getIndexNameExpressionResolver(), + clusterModule.getAllocationService() + ); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( settings, clusterService::state, @@ -1391,6 +1399,7 @@ protected Node( b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService); + b.bind(HotToWarmTieringService.class).toInstance(tieringService); b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); diff --git a/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java b/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java new file mode 100644 index 0000000000000..cb10d65be9607 --- /dev/null +++ b/server/src/main/java/org/opensearch/tiering/HotToWarmTieringService.java @@ -0,0 +1,874 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.tiering.HotToWarmTieringRequestContext; +import org.opensearch.action.tiering.HotToWarmTieringResponse; +import org.opensearch.action.tiering.TieringIndexRequest; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.NotClusterManagerException; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexNotFoundException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.TIERING_CUSTOM_KEY; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Service responsible for tiering indices from hot to warm + * @opensearch.experimental + */ +public class HotToWarmTieringService extends AbstractLifecycleComponent implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(HotToWarmTieringService.class); + protected final ClusterService clusterService; + protected final IndexNameExpressionResolver indexNameExpressionResolver; + protected final AllocationService allocationService; + private final Map requestUuidToRequestContext = new ConcurrentHashMap<>(); + private final Map indexTieringInfoMap = new ConcurrentHashMap<>(); + public static final String HOT_TO_WARM_START_TIME = "hot_to_warm_start_time"; + public static final String HOT_TO_WARM_END_TIME = "hot_to_warm_end_time"; + + @Inject + public HotToWarmTieringService( + Settings settings, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService + ) { + super(); + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.allocationService = allocationService; + + if (DiscoveryNode.isClusterManagerNode(settings) && FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + clusterService.addListener(this); + } + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + // TODO add handling for master switch, dangling indices, master reload + if (event.routingTableChanged()) { + if (!indexTieringInfoMap.isEmpty()) { + updateIndexShardStatus(event.state()); + } + if (!requestUuidToRequestContext.isEmpty()) { + completeRequestLevelTiering(); + } + } + } + + private void completeRequestLevelTiering() { + for (String requestUuid : requestUuidToRequestContext.keySet()) { + final HotToWarmTieringRequestContext requestContext = requestUuidToRequestContext.get(requestUuid); + if (requestContext.isRequestProcessingComplete()) { + logger.info("Tiering is completed for the request [{}]", requestContext); + for (Index index : indexTieringInfoMap.keySet()) { + indexTieringInfoMap.remove(index); + } + requestUuidToRequestContext.remove(requestUuid); + if (requestContext.getRequest().waitForCompletion()) { + requestContext.getListener().onResponse(requestContext.constructResponse()); + } + } + } + } + + private void processNotFoundIndex(IndexTieringInfo indexTieringInfo) { + Index index = indexTieringInfo.getIndex(); + String reqId = indexTieringInfo.getRequestUuid(); + HotToWarmTieringRequestContext requestContext = requestUuidToRequestContext.get(reqId); + requestContext.addToNotFound(index.getName()); + requestContext.removeFromInProgress(index); + requestUuidToRequestContext.put(reqId, requestContext); + indexTieringInfoMap.remove(index); + } + + protected boolean isSearchNode(DiscoveryNode node) { + return node != null && node.isSearchNode(); + } + + protected ShardTieringStatus updateShardTieringStatus(final ShardRouting shard, ClusterState clusterState) { + final boolean isShardFoundOnSearchNode = isSearchNode(clusterState.getNodes().get(shard.currentNodeId())); + final boolean isShardRelocatingToSearchNode = isSearchNode(clusterState.getNodes().get(shard.relocatingNodeId())); + ShardTieringState tieringState; + String reason = null; + if (shard.started() && isShardFoundOnSearchNode) { + tieringState = ShardTieringState.SUCCESSFUL; + } else if (shard.unassigned()) { + tieringState = ShardTieringState.INIT; + reason = "Shard is unassigned due to " + shard.unassignedInfo().getReason(); + } else if (((shard.initializing() || shard.started()) && !isShardFoundOnSearchNode) + || (shard.relocating() && !isShardRelocatingToSearchNode)) { + tieringState = ShardTieringState.FAILED; + reason = "Shard with current state: " + + shard.state().toString() + + " is neither allocated nor relocating to the search node, " + + "current node: " + + shard.currentNodeId() + + ", relocating node: " + + shard.relocatingNodeId(); + } else { + tieringState = ShardTieringState.PROCESSING; + } + return new ShardTieringStatus( + shard.relocating() ? shard.relocatingNodeId() : shard.currentNodeId(), + shard.state(), + tieringState, + reason + ); + } + + private void updateIndexShardStatus(ClusterState clusterState) { + List shardRoutings; + // to include successful/failed indices + final Set completedIndices = new HashSet<>(); + for (Index index : indexTieringInfoMap.keySet()) { + IndexTieringInfo indexTieringInfo = indexTieringInfoMap.get(index); + if (indexTieringInfo.isCompleted()) { + continue; + } + try { + // Ensure index is not deleted + shardRoutings = clusterState.routingTable().allShards(index.getName()); + } catch (IndexNotFoundException ex) { + // Index already deleted nothing to do + logger.warn("Index [{}] deleted before hot to warm relocation finished", index.getName()); + processNotFoundIndex(indexTieringInfo); + continue; + } + final Map shardTieringStatusMap = indexTieringInfo.getShardTieringStatus(); + boolean relocationCompleted = true; + for (ShardRouting shard : shardRoutings) { + ShardInfo shardInfo = new ShardInfo(shard.shardId(), shard.primary()); + ShardTieringStatus currentShardTieringStatus = shardTieringStatusMap.get(shardInfo); + if (currentShardTieringStatus != null && (currentShardTieringStatus.isShardCompleted())) { + // shard is either already successful or failed + continue; + } + logger.debug("Shard tiering status for the shard {} : {}", shard.toString(), currentShardTieringStatus.toString()); + ShardTieringStatus updatedShardTieringStatus = updateShardTieringStatus(shard, clusterState); + logger.debug("Updated Shard tiering status for the shard {} : {}", shard.toString(), updatedShardTieringStatus.toString()); + if (updatedShardTieringStatus.isShardNotSuccessful()) { + relocationCompleted = false; + } + shardTieringStatusMap.put(shardInfo, updatedShardTieringStatus); + if (!completedIndices.contains(indexTieringInfo) && updatedShardTieringStatus.hasShardFailed()) { + indexTieringInfo.setState(IndexTieringStatus.FAILED); + completedIndices.add(indexTieringInfo); + } + } + indexTieringInfo.setShardTieringStatus(shardTieringStatusMap); + if (relocationCompleted) { + logger.info("Hot to warm relocation completed for index [{}]", index.getName()); + indexTieringInfo.setState(IndexTieringStatus.SUCCESSFUL); + completedIndices.add(indexTieringInfo); + } + indexTieringInfoMap.put(index, indexTieringInfo); + } + if (!completedIndices.isEmpty()) { + updateRequestContextForCompletedIndices(completedIndices); + updateClusterStateForCompletedIndices(completedIndices); + } + } + + protected IndexMetadata.Builder updateIndexMetadataForCompletedIndex( + final Metadata.Builder metadataBuilder, + final RoutingTable.Builder routingTableBuilder, + final IndexTieringInfo indexTieringInfo + ) { + final IndexMetadata indexMetadata = metadataBuilder.get(indexTieringInfo.getIndex().getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update tiering settings here + if (indexTieringInfo.hasFailed()) { + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT); + + } else { // successful case here + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM); + } + + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + Map tieringCustomData = new HashMap<>(); + tieringCustomData.put(HOT_TO_WARM_END_TIME, String.valueOf(System.currentTimeMillis())); + indexMetadataBuilder.putCustom(TIERING_CUSTOM_KEY, tieringCustomData); + + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + return indexMetadataBuilder; + } + + protected boolean hasFailedIndices(final Set completedIndices) { + for (IndexTieringInfo indexTieringInfo : completedIndices) { + if (indexTieringInfo.hasFailed()) { + return true; + } + } + return false; + } + + private void updateRequestContextForCompletedIndices(final Set completedIndices) { + for (IndexTieringInfo indexTieringInfo : completedIndices) { + String reqId = indexTieringInfo.getRequestUuid(); + HotToWarmTieringRequestContext requestContext = requestUuidToRequestContext.get(reqId); + if (indexTieringInfo.hasFailed()) { + requestContext.addToFailed(indexTieringInfo.getIndex().getName(), indexTieringInfo.getFailureReason()); + } else { + requestContext.addToSuccessful(indexTieringInfo.getIndex().getName()); + } + requestContext.removeFromInProgress(indexTieringInfo.getIndex()); + requestUuidToRequestContext.put(reqId, requestContext); + } + } + + private void updateClusterStateForCompletedIndices(final Set completedIndices) { + clusterService.submitStateUpdateTask( + "process hot to warm tiering for completed indices", + new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + for (IndexTieringInfo indexTieringInfo : completedIndices) { + final IndexMetadata.Builder indexMetadataBuilder = updateIndexMetadataForCompletedIndex( + metadataBuilder, + routingTableBuilder, + indexTieringInfo + ); + metadataBuilder.put(indexMetadataBuilder); + } + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + + if (hasFailedIndices(completedIndices)) { + // only reroute in case of failed indices to trigger shard relocation for shards to go back to hot nodes + updatedState = allocationService.reroute(updatedState, "hot to warm revert tiering"); + } + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "failed to process hot to warm tiering for completed indices " + "[{}]", + completedIndices + ), + e + ); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + } + } + ); + } + + protected IndexMetadata.Builder updateIndexMetadataForAcceptedIndex( + final Metadata.Builder metadataBuilder, + final RoutingTable.Builder routingTableBuilder, + final Index index + ) { + final IndexMetadata indexMetadata = metadataBuilder.get(index.getName()); + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update index settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT_TO_WARM); + + // Update number of replicas to 1 in case the number of replicas is greater than 1 + if (Integer.parseInt(metadataBuilder.getSafe(index).getSettings().get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())) > 1) { + final String[] indices = new String[] { index.getName() }; + routingTableBuilder.updateNumberOfReplicas(1, indices); + metadataBuilder.updateNumberOfReplicas(1, indices); + } + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + final Map tieringCustomData = new HashMap<>(); + tieringCustomData.put(HOT_TO_WARM_START_TIME, String.valueOf(System.currentTimeMillis())); + tieringCustomData.put(HOT_TO_WARM_END_TIME, "-1"); + indexMetadataBuilder.putCustom(TIERING_CUSTOM_KEY, tieringCustomData); + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + return indexMetadataBuilder; + } + + /** + * Tier indices from hot to warm + * @param hotToWarmTieringRequestContext - request context for the tiering request + * @param listener - call back listener + */ + public void tier( + final HotToWarmTieringRequestContext hotToWarmTieringRequestContext, + final ActionListener listener + ) { + final TieringIndexRequest request = hotToWarmTieringRequestContext.getRequest(); + logger.info("Starting hot to warm tiering for indices {}", hotToWarmTieringRequestContext); + + clusterService.submitStateUpdateTask("start hot to warm tiering", new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + for (Index index : hotToWarmTieringRequestContext.getAcceptedIndices()) { + final IndexMetadata.Builder indexMetadataBuilder = updateIndexMetadataForAcceptedIndex( + metadataBuilder, + routingTableBuilder, + index + ); + metadataBuilder.put(indexMetadataBuilder); + final Map shardTieringStatus = new HashMap<>(); + currentState.routingTable().allShards(index.getName()).forEach(shardRouting -> { + shardTieringStatus.put( + new ShardInfo(shardRouting.shardId(), shardRouting.primary()), + new ShardTieringStatus(currentState.nodes().getLocalNodeId(), shardRouting.state()) + ); + }); + + final IndexTieringInfo indexTieringInfo = new IndexTieringInfo( + hotToWarmTieringRequestContext.getRequestUuid(), + index, + shardTieringStatus + ); + indexTieringInfoMap.put(index, indexTieringInfo); + } + requestUuidToRequestContext.put(hotToWarmTieringRequestContext.getRequestUuid(), hotToWarmTieringRequestContext); + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + + // now, reroute to trigger shard relocation for the dedicated case + updatedState = allocationService.reroute(updatedState, "hot to warm tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "failed to start hot to warm tiering for indices " + "[{}]", + (Object) request.indices() + ), + e + ); + listener.onFailure(e); + } + + @Override + public void onNoLongerClusterManager(String source) { + this.onFailure(source, new NotClusterManagerException("no longer cluster manager. source: [" + source + "]")); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("Cluster state updated for source " + source); + if (!request.waitForCompletion()) { + listener.onResponse(hotToWarmTieringRequestContext.constructResponse()); + } + } + + @Override + public TimeValue timeout() { + return request.clusterManagerNodeTimeout(); + } + }); + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() {} + + @Override + protected void doClose() throws IOException {} + + /** + * Represents info of a tiering index + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class IndexTieringInfo { + private String requestUuid; + private Index index; + private IndexTieringStatus state; + private Map shardTieringStatus; + + public IndexTieringInfo( + String requestUuid, + Index index, + IndexTieringStatus state, + Map shardTieringStatus + ) { + this.requestUuid = requestUuid; + this.index = index; + this.state = state; + this.shardTieringStatus = shardTieringStatus; + } + + public IndexTieringInfo(String requestUuid, Index index, Map shardTieringStatus) { + this.requestUuid = requestUuid; + this.index = index; + this.state = IndexTieringStatus.INIT; + this.shardTieringStatus = shardTieringStatus; + } + + public Index getIndex() { + return index; + } + + public void setIndex(Index index) { + this.index = index; + } + + public IndexTieringStatus getState() { + return state; + } + + public void setState(IndexTieringStatus state) { + this.state = state; + } + + public boolean isSuccessful() { + return state.successful(); + } + + public boolean hasFailed() { + return state.failed(); + } + + public boolean isCompleted() { + return state.successful() || state.failed(); + } + + public String getRequestUuid() { + return requestUuid; + } + + public void setRequestUuid(String requestUuid) { + this.requestUuid = requestUuid; + } + + public Map getShardTieringStatus() { + return shardTieringStatus; + } + + public void setShardTieringStatus(Map shardTieringStatus) { + this.shardTieringStatus = shardTieringStatus; + } + + public String getFailureReason() { + StringBuilder reason = new StringBuilder(); + reason.append("Failure for index: ").append(index.getName()).append("["); + for (ShardInfo shardInfo : shardTieringStatus.keySet()) { + ShardTieringStatus shardStatus = shardTieringStatus.get(shardInfo); + if (shardStatus.hasShardFailed()) { + reason.append("{Shard: ").append(shardInfo.toString()).append(", reason: ").append(shardStatus.reason()).append("}"); + } + } + reason.append("]"); + return reason.toString(); + } + + @Override + public String toString() { + return "IndexTieringInfo{" + + "requestUuid='" + + requestUuid + + '\'' + + ", index=" + + index + + ", state=" + + state + + ", shardTieringStatus=" + + shardTieringStatus + + '}'; + } + } + + /** + * Represents info of a tiering shard + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class ShardInfo { + private ShardId shardId; + private boolean primary; + + public ShardInfo(ShardId shardId) { + this.shardId = shardId; + this.primary = true; + } + + public ShardInfo(ShardId shardId, boolean primary) { + this.shardId = shardId; + this.primary = primary; + } + + public ShardId getShardId() { + return shardId; + } + + public void setShardId(ShardId shardId) { + this.shardId = shardId; + } + + public boolean isPrimary() { + return primary; + } + + public void setPrimary(boolean primary) { + this.primary = primary; + } + + @Override + public String toString() { + return "ShardInfo{" + "shardId=" + shardId + ", primary=" + primary + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ShardInfo shardInfo = (ShardInfo) o; + + if (primary != shardInfo.primary) return false; + return Objects.equals(shardId, shardInfo.shardId); + } + + @Override + public int hashCode() { + int result = shardId != null ? shardId.hashCode() : 0; + result = 31 * result + (primary ? 1 : 0); + return result; + } + } + + /** + * Represents status of a tiering shard + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class ShardTieringStatus { + private ShardTieringState state; + private ShardRoutingState shardRoutingState; + private String nodeId; + private String reason; + + /** + * Constructs a new shard tiering status in initializing state on the given node + * + * @param nodeId node id + */ + public ShardTieringStatus(String nodeId) { + this(nodeId, null); + } + + /** + * Constructs a new shard tiering status in with specified state on the given node + * + * @param nodeId node id + * @param shardRoutingState shard state + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState) { + this(nodeId, shardRoutingState, ShardTieringState.INIT, null); + } + + /** + * Constructs a new shard tiering status in with specified state on the given node with specified failure reason + * + * @param nodeId node id + * @param shardRoutingState shard routing state + * @param state shard tiering state + * @param reason failure reason + */ + public ShardTieringStatus(String nodeId, ShardRoutingState shardRoutingState, ShardTieringState state, String reason) { + this.nodeId = nodeId; + this.state = state; + this.reason = reason; + this.shardRoutingState = shardRoutingState; + } + + /** + * Returns current state + * + * @return current state + */ + public ShardTieringState state() { + return state; + } + + /** + * Returns node id of the node where shared is getting tiered + * + * @return node id + */ + public String nodeId() { + return nodeId; + } + + /** + * Returns failure reason + * + * @return failure reason + */ + public String reason() { + return reason; + } + + public boolean isShardCompleted() { + return this.state == ShardTieringState.SUCCESSFUL || this.state == ShardTieringState.FAILED; + } + + public boolean hasShardFailed() { + return this.state == ShardTieringState.FAILED; + } + + public boolean isShardNotSuccessful() { + return this.state != ShardTieringState.SUCCESSFUL; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ShardTieringStatus status = (ShardTieringStatus) o; + return state == status.state && Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason); + } + + @Override + public int hashCode() { + return Objects.hash(state, nodeId, reason); + } + + @Override + public String toString() { + return "ShardTieringStatus{" + + "state=" + + state + + ", shardRoutingState=" + + shardRoutingState + + ", nodeId='" + + nodeId + + '\'' + + ", reason='" + + reason + + '\'' + + '}'; + } + } + + /** + * Index Tiering status + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum IndexTieringStatus { + /** + * Initializing state + */ + INIT((byte) 0), + /** + * Tiering finished successfully + */ + SUCCESSFUL((byte) 1), + /** + * Tiering failed + */ + FAILED((byte) 2); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + IndexTieringStatus(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * @return true if tiering is successful + */ + public boolean successful() { + return this == SUCCESSFUL; + } + + /** + * @return true if tiering is failed + */ + public boolean failed() { + return this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static IndexTieringStatus fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return SUCCESSFUL; + case 2: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } + } + + /** + * Shard Tiering state + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum ShardTieringState { + /** + * Initializing state + */ + INIT((byte) 0), + /** + * Processing state + */ + PROCESSING((byte) 1), + /** + * Tiering finished successfully + */ + SUCCESSFUL((byte) 2), + /** + * Tiering failed + */ + FAILED((byte) 3); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + ShardTieringState(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * Returns true if tiering completed (either successfully or with failure) + * + * @return true if tiering completed + */ + public boolean completed() { + return this == SUCCESSFUL || this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static ShardTieringState fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return PROCESSING; + case 2: + return SUCCESSFUL; + case 3: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } + } +} diff --git a/server/src/test/java/org/opensearch/tiering/HotToWarmTieringServiceTests.java b/server/src/test/java/org/opensearch/tiering/HotToWarmTieringServiceTests.java new file mode 100644 index 0000000000000..e8c7c8870d195 --- /dev/null +++ b/server/src/test/java/org/opensearch/tiering/HotToWarmTieringServiceTests.java @@ -0,0 +1,173 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.IndexModule; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +public class HotToWarmTieringServiceTests extends OpenSearchSingleNodeTestCase { + + private ClusterService clusterService; + private HotToWarmTieringService hotToWarmTieringService; + + @Before + public void beforeTest() { + clusterService = this.getInstanceFromNode(ClusterService.class); + hotToWarmTieringService = this.getInstanceFromNode(HotToWarmTieringService.class); + } + + @Test + public void testUpdateIndexMetadataForAcceptedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterService.state().routingTable()); + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForAcceptedIndex( + metadataBuilder, + routingTableBuilder, + index + ); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals( + IndexModule.DataLocalityType.PARTIAL, + IndexModule.DataLocalityType.getValueOf(indexMetadata.getSettings().get(INDEX_STORE_LOCALITY_SETTING.getKey())) + ); + assertEquals(IndexModule.TieringState.HOT_TO_WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + Map customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_START_TIME)); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME)); + } + + @Test + public void testHasFailedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.UNASSIGNED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.FAILED, + shardTieringStatus + ); + boolean hasFailedIndices = hotToWarmTieringService.hasFailedIndices(Set.of(indexTieringInfo)); + assertTrue(hasFailedIndices); + } + + @Test + public void testNoFailedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.STARTED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.SUCCESSFUL, + shardTieringStatus + ); + boolean hasFailedIndices = hotToWarmTieringService.hasFailedIndices(Set.of(indexTieringInfo)); + assertFalse(hasFailedIndices); + } + + @Test + public void testUpdateIndexMetadataForSuccessfulIndex() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterService.state().routingTable()); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.STARTED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.SUCCESSFUL, + shardTieringStatus + ); + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForCompletedIndex( + metadataBuilder, + routingTableBuilder, + indexTieringInfo + ); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals(IndexModule.TieringState.WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + Map customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME)); + } + + @Test + public void testUpdateIndexMetadataForFailedIndex() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterService.state().routingTable()); + final Map shardTieringStatus = new HashMap<>(); + shardTieringStatus.put( + new HotToWarmTieringService.ShardInfo(new ShardId(index, 0), false), + new HotToWarmTieringService.ShardTieringStatus("nodeId", ShardRoutingState.UNASSIGNED) + ); + final HotToWarmTieringService.IndexTieringInfo indexTieringInfo = new HotToWarmTieringService.IndexTieringInfo( + UUID.randomUUID().toString(), + index, + HotToWarmTieringService.IndexTieringStatus.FAILED, + shardTieringStatus + ); + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForCompletedIndex( + metadataBuilder, + routingTableBuilder, + indexTieringInfo + ); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals( + IndexModule.DataLocalityType.FULL, + IndexModule.DataLocalityType.getValueOf(indexMetadata.getSettings().get(INDEX_STORE_LOCALITY_SETTING.getKey())) + ); + assertEquals(IndexModule.TieringState.HOT.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + Map customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME)); + } + +}