From 41f986ce9eb445e5fc52ebebb6005ffc3a6f13ec Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 11 Jul 2024 12:57:20 -0700 Subject: [PATCH] HotToWarmTieringService changes to tier shards Signed-off-by: Neetika Singhal --- .../tiering/HotToWarmTieringServiceIT.java | 114 +++++ .../tiering/TieringBaseIntegTestCase.java | 102 +++++ .../tiering/TieringRequestContext.java | 249 +++++++++++ .../TieringUpdateClusterStateRequest.java | 31 ++ .../admin/indices/tiering/TieringUtils.java | 53 +++ .../tiering/TieringValidationResult.java | 10 - .../TransportHotToWarmTieringAction.java | 43 +- .../cluster/metadata/IndexMetadata.java | 1 + .../tiering/HotToWarmTieringService.java | 413 ++++++++++++++++++ .../tiering/TieringRequestValidator.java | 9 +- .../main/java/org/opensearch/node/Node.java | 12 + .../tiering/HotToWarmTieringServiceTests.java | 86 ++++ 12 files changed, 1088 insertions(+), 35 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java create mode 100644 server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java new file mode 100644 index 0000000000000..4ad8d87803bae --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse; +import org.opensearch.action.admin.indices.tiering.TieringIndexRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.MockInternalClusterInfoService; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +// Uncomment the below line to enable trace level logs for this test for better debugging +// @TestLogging(reason = "Getting trace logs from tiering package", value = +// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE") +public class HotToWarmTieringServiceIT extends TieringBaseIntegTestCase { + + protected static final String TEST_IDX_1 = "test-idx-1"; + protected static final String TEST_IDX_2 = "test-idx-2"; + protected static final int NUM_DOCS_IN_BULK = 10; + private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes(); + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + // waiting for the recovery pr to be merged in + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13647") + public void testTieringBasic() { + final int numReplicasIndex = 0; + internalCluster().ensureAtLeastNumDataNodes(1); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()) + .build(); + + String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 }; + for (String index : indices) { + assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get()); + ensureGreen(index); + // Ingesting some docs + indexBulk(index, NUM_DOCS_IN_BULK); + flushAndRefresh(index); + ensureGreen(); + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + } + + // Spin up node having search role + internalCluster().ensureAtLeastNumSearchAndDataNodes(1); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) + ); + + TieringIndexRequest request = new TieringIndexRequest(TARGET_WARM_TIER, indices); + request.waitForCompletion(true); + HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); + assertAcked(response); + assertTrue(response.getFailedIndices().isEmpty()); + assertTrue(response.isAcknowledged()); + ensureGreen(); + for (String index : indices) { + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get(); + assertWarmSettings(getIndexResponse, index); + assertAcked(client().admin().indices().prepareDelete(index).get()); + } + } + + private void assertWarmSettings(GetIndexResponse response, String indexName) { + final Map settings = response.settings(); + assertThat(settings, notNullValue()); + assertThat(settings.size(), equalTo(1)); + Settings indexSettings = settings.get(indexName); + assertThat(indexSettings, notNullValue()); + assertThat( + indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()), + equalTo(IndexModule.DataLocalityType.PARTIAL.name()) + ); + assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name())); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java new file mode 100644 index 0000000000000..0b60e71480315 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.MockInternalClusterInfoService; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + +public class TieringBaseIntegTestCase extends OpenSearchIntegTestCase { + + protected Path segmentRepoPath; + protected Path translogRepoPath; + Settings extraSettings = Settings.EMPTY; + private final List documentKeys = List.of( + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5) + ); + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; + protected static final String TARGET_WARM_TIER = "warm"; + + /** + * Disable MockFSIndexStore plugin as it wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) + * As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory + * + */ + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); + return featureSettings.build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (segmentRepoPath == null || translogRepoPath == null) { + segmentRepoPath = randomRepoPath().toAbsolutePath(); + translogRepoPath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(extraSettings) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + } + + protected BulkResponse indexBulk(String indexName, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + final IndexRequest request = client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) + .request(); + bulkRequest.add(request); + } + return client().bulk(bulkRequest).actionGet(); + } + + protected MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + } + + protected static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java new file mode 100644 index 0000000000000..fd9e482248365 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java @@ -0,0 +1,249 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Context class to hold indices to be tiered per request. It also holds + * the listener per request to mark the request as complete once all + * tiering operations are completed. + * + * @opensearch.experimental + */ + +@ExperimentalApi +public class TieringRequestContext { + private final ActionListener actionListener; + private final Map indexTieringStatusMap; + + public TieringRequestContext(ActionListener actionListener) { + this.actionListener = actionListener; + indexTieringStatusMap = new ConcurrentHashMap<>(); + } + + public void setIndexTieringStatusMap(Set acceptedIndices, Map failedIndices) { + for (Index index : acceptedIndices) { + indexTieringStatusMap.put(index, new IndexTieringInfo(index)); + } + for (Map.Entry entry : failedIndices.entrySet()) { + indexTieringStatusMap.put(entry.getKey(), new IndexTieringInfo(entry.getKey(), IndexTieringState.FAILED, entry.getValue())); + } + } + + public ActionListener getListener() { + return actionListener; + } + + public IndexTieringInfo getIndexTieringInfo(Index index) { + return indexTieringStatusMap.get(index); + } + + public boolean hasIndex(Index index) { + return indexTieringStatusMap.containsKey(index); + } + + public Map getFailedIndices() { + Map failedIndicesMap = new HashMap<>(); + for (IndexTieringInfo indexTieringInfo : filterIndicesByState(IndexTieringState.FAILED)) { + failedIndicesMap.put(indexTieringInfo.getIndex(), indexTieringInfo.getReason()); + } + return failedIndicesMap; + } + + public boolean isRequestProcessingComplete() { + return filterIndicesByState(IndexTieringState.SUCCESSFUL).size() + filterIndicesByState(IndexTieringState.FAILED) + .size() == indexTieringStatusMap.size(); + } + + public List filterIndicesByState(IndexTieringState state) { + return indexTieringStatusMap.values() + .stream() + .filter(indexTieringInfo -> indexTieringInfo.getState() == state) + .collect(Collectors.toList()); + } + + public List getIndicesWithInitState() { + return indexTieringStatusMap.values() + .stream() + .filter(indexTieringInfo -> indexTieringInfo.getState() == IndexTieringState.INIT) + .collect(Collectors.toList()); + } + + public List getIndicesPendingTiering() { + return indexTieringStatusMap.values() + .stream() + .filter( + indexTieringInfo -> indexTieringInfo.getState() == IndexTieringState.INIT + || indexTieringInfo.getState() == IndexTieringState.IN_PROGRESS + ) + .collect(Collectors.toList()); + } + + @Override + public String toString() { + return "TieringRequestContext{" + "actionListener=" + actionListener + ", indexTieringStatusMap=" + indexTieringStatusMap + '}'; + } + + /** + * Represents info of a tiering index + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class IndexTieringInfo { + private final Index index; + private IndexTieringState state; + private String reason; + + public IndexTieringInfo(Index index) { + this.index = index; + this.state = IndexTieringState.INIT; + } + + public IndexTieringInfo(Index index, IndexTieringState state, String reason) { + this.index = index; + this.state = state; + this.reason = reason; + } + + public Index getIndex() { + return index; + } + + public IndexTieringState getState() { + return state; + } + + public void markTiered() { + this.state = IndexTieringState.TIERED; + } + + public void markInProgress() { + this.state = IndexTieringState.IN_PROGRESS; + } + + public void markSuccessful() { + this.state = IndexTieringState.SUCCESSFUL; + } + + public void markFailed(String reason) { + this.state = IndexTieringState.FAILED; + this.reason = reason; + } + + public String getReason() { + return reason; + } + + @Override + public String toString() { + return "IndexTieringInfo{" + "index=" + index + ", state=" + state + ", reason='" + reason + '\'' + '}'; + } + } + + /** + * Index Tiering status + * + * @opensearch.experimental + */ + @ExperimentalApi + public enum IndexTieringState { + /** + * Tiering started + */ + INIT((byte) 0), + + /** + * Tiering in progress (running shard relocation) + */ + IN_PROGRESS((byte) 1), + + /** + * Index tiered (shard relocation completed) + */ + TIERED((byte) 2), + + /** + * Tiering finished successfully + */ + SUCCESSFUL((byte) 3), + + /** + * Tiering failed + */ + FAILED((byte) 4); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + IndexTieringState(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * @return true if tiering is successful + */ + public boolean successful() { + return this == SUCCESSFUL; + } + + /** + * @return true if tiering is failed + */ + public boolean failed() { + return this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static IndexTieringState fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 1: + return IN_PROGRESS; + case 2: + return TIERED; + case 3: + return SUCCESSFUL; + case 4: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java new file mode 100644 index 0000000000000..7a861d9fb545a --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.cluster.ack.IndicesClusterStateUpdateRequest; +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Cluster state update request that allows tiering for indices + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TieringUpdateClusterStateRequest extends IndicesClusterStateUpdateRequest { + + private final boolean waitForCompletion; + + public TieringUpdateClusterStateRequest(final boolean waitForCompletion) { + this.waitForCompletion = waitForCompletion; + } + + public boolean waitForCompletion() { + return waitForCompletion; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java new file mode 100644 index 0000000000000..3c9863a8f3c13 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Utility class for tiering operations + * + * @opensearch.internal + */ +@ExperimentalApi +public class TieringUtils { + + /** + * Constructs a HotToWarmTieringResponse from the rejected indices map + * + * @param rejectedIndices the rejected indices map + * @return the HotToWarmTieringResponse object + */ + public static HotToWarmTieringResponse constructToHotToWarmTieringResponse(final Map rejectedIndices) { + final List indicesResult = new LinkedList<>(); + for (Map.Entry rejectedIndex : rejectedIndices.entrySet()) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue())); + } + return new HotToWarmTieringResponse(true, indicesResult); + } + + /** + * Checks if the specified index is in the "hot" tiering state. + * + * @param indexMetadata the metadata of the index + * @return true if the index is in the "hot" tiering state, false otherwise + */ + public static boolean isHotIndex(final IndexMetadata indexMetadata) { + return IndexModule.TieringState.HOT.name().equals(INDEX_TIERING_STATE.get(indexMetadata.getSettings())); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java index ccd60daf027ce..f656d7dd28357 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java @@ -12,8 +12,6 @@ import org.opensearch.core.index.Index; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -50,14 +48,6 @@ public void addToRejected(Index index, String reason) { rejectedIndices.put(index, reason); } - public HotToWarmTieringResponse constructResponse() { - final List indicesResult = new LinkedList<>(); - for (Map.Entry rejectedIndex : rejectedIndices.entrySet()) { - indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue())); - } - return new HotToWarmTieringResponse(acceptedIndices.size() > 0, indicesResult); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java index 8d1ab0bb37cdd..bdeac2646e9a0 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java @@ -12,26 +12,25 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; -import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.inject.Inject; -import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.Index; +import org.opensearch.indices.tiering.HotToWarmTieringService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Set; +import java.util.ArrayList; +import java.util.List; -import static org.opensearch.indices.tiering.TieringRequestValidator.validateHotToWarm; +import static org.opensearch.action.admin.indices.tiering.TieringUtils.isHotIndex; /** * Transport Tiering action to move indices from hot to warm @@ -42,8 +41,7 @@ public class TransportHotToWarmTieringAction extends TransportClusterManagerNodeAction { private static final Logger logger = LogManager.getLogger(TransportHotToWarmTieringAction.class); - private final ClusterInfoService clusterInfoService; - private final DiskThresholdSettings diskThresholdSettings; + private final HotToWarmTieringService hotToWarmTieringService; @Inject public TransportHotToWarmTieringAction( @@ -52,8 +50,7 @@ public TransportHotToWarmTieringAction( ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, - ClusterInfoService clusterInfoService, - Settings settings + HotToWarmTieringService hotToWarmTieringService ) { super( HotToWarmTieringAction.NAME, @@ -64,8 +61,7 @@ public TransportHotToWarmTieringAction( TieringIndexRequest::new, indexNameExpressionResolver ); - this.clusterInfoService = clusterInfoService; - this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings()); + this.hotToWarmTieringService = hotToWarmTieringService; } @Override @@ -92,19 +88,26 @@ protected void clusterManagerOperation( ) throws Exception { Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); if (concreteIndices == null || concreteIndices.length == 0) { + logger.info("[HotToWarmTiering] No concrete indices resolved for the indices {} in the request", List.of(request.indices())); listener.onResponse(new HotToWarmTieringResponse(true)); return; } - final TieringValidationResult tieringValidationResult = validateHotToWarm( - state, - Set.of(concreteIndices), - clusterInfoService.getClusterInfo(), - diskThresholdSettings - ); - - if (tieringValidationResult.getAcceptedIndices().isEmpty()) { - listener.onResponse(tieringValidationResult.constructResponse()); + final List hotIndices = new ArrayList<>(); + for (Index index : concreteIndices) { + if (isHotIndex(clusterService.state().metadata().getIndexSafe(index))) { + hotIndices.add(index); + } + } + if (hotIndices.isEmpty()) { + logger.info("[HotToWarmTiering] No hot indices found out of the resolved concrete indices {}", List.of(concreteIndices)); + listener.onResponse(new HotToWarmTieringResponse(true)); return; } + final TieringUpdateClusterStateRequest updateClusterStateRequest = new TieringUpdateClusterStateRequest(request.waitForCompletion()) + .ackTimeout(request.timeout()) + .masterNodeTimeout(request.clusterManagerNodeTimeout()) + .indices(hotIndices.toArray(Index.EMPTY_ARRAY)); + + hotToWarmTieringService.tier(updateClusterStateRequest, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 9e7fe23f29872..195021252135f 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -637,6 +637,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_SYSTEM = "system"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store"; + public static final String TIERING_CUSTOM_KEY = "tiering"; public static final String TRANSLOG_METADATA_KEY = "translog_metadata"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; diff --git a/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java b/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java new file mode 100644 index 0000000000000..a644b6231a7e2 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java @@ -0,0 +1,413 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse; +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.action.admin.indices.tiering.TieringUpdateClusterStateRequest; +import org.opensearch.action.admin.indices.tiering.TieringValidationResult; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.index.IndexNotFoundException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.opensearch.action.admin.indices.tiering.TieringUtils.constructToHotToWarmTieringResponse; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.TIERING_CUSTOM_KEY; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; +import static org.opensearch.indices.tiering.TieringRequestValidator.validateHotToWarm; + +/** + * Service responsible for tiering indices from hot to warm + * @opensearch.experimental + */ +public class HotToWarmTieringService extends AbstractLifecycleComponent implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(HotToWarmTieringService.class); + private final ClusterService clusterService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final AllocationService allocationService; + private final ClusterInfoService clusterInfoService; + private final DiskThresholdSettings diskThresholdSettings; + private final Set tieringRequestContexts = ConcurrentHashMap.newKeySet(); + static final String HOT_TO_WARM_START_TIME = "hot_to_warm_start_time"; + static final String HOT_TO_WARM_END_TIME = "hot_to_warm_end_time"; + + @Inject + public HotToWarmTieringService( + Settings settings, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService, + ClusterInfoService clusterInfoService + ) { + super(); + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.allocationService = allocationService; + this.clusterInfoService = clusterInfoService; + this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings()); + + if (DiscoveryNode.isClusterManagerNode(settings) && FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + clusterService.addListener(this); + } + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + // TODO: https://github.com/opensearch-project/OpenSearch/issues/14981 + if (event.routingTableChanged()) { + if (!tieringRequestContexts.isEmpty()) { + processTieringRequestContexts(event.state()); + } + } + } + + void processTieringRequestContexts(final ClusterState clusterState) { + final Set tieredIndices = new HashSet<>(); + for (TieringRequestContext tieringRequestContext : tieringRequestContexts) { + if (tieringRequestContext.isRequestProcessingComplete()) { + logger.info("[HotToWarmTiering] Tiering is completed for the request [{}]", tieringRequestContext); + completeRequestLevelTiering(tieringRequestContext); + continue; + } + List shardRoutings; + for (TieringRequestContext.IndexTieringInfo indexTieringInfo : tieringRequestContext.getIndicesPendingTiering()) { + Index index = indexTieringInfo.getIndex(); + if (clusterState.routingTable().hasIndex(index)) { + // Ensure index is not deleted + shardRoutings = clusterState.routingTable().allShards(index.getName()); + } else { + // Index already deleted nothing to do + logger.warn("[HotToWarmTiering] Index [{}] deleted before shard relocation finished", index.getName()); + indexTieringInfo.markFailed("index not found"); + continue; + } + + boolean relocationCompleted = true; + for (ShardRouting shard : shardRoutings) { + if (!isRelocationCompleted(shard, clusterState)) { + relocationCompleted = false; + break; + } + } + if (relocationCompleted) { + logger.info("[HotToWarmTiering] Shard relocation completed for index [{}]", index.getName()); + indexTieringInfo.markTiered(); + tieredIndices.add(indexTieringInfo); + } else { + indexTieringInfo.markInProgress(); + } + } + } + if (!tieredIndices.isEmpty()) { + updateClusterStateForTieredIndices(tieredIndices); + } + } + + /** + * Checks if the relocation of the shard is completed. + * @param shard shard routing + * @param clusterState current cluster state + * @return true if relocation is completed, false otherwise + */ + boolean isRelocationCompleted(final ShardRouting shard, final ClusterState clusterState) { + if (shard.unassigned()) { + return false; + } + final boolean isShardFoundOnSearchNode = clusterState.getNodes().get(shard.currentNodeId()).isSearchNode(); + return shard.started() && isShardFoundOnSearchNode; + } + + /** + * Completes the request level tiering for requestContext. + * @param requestContext tiering request context + */ + void completeRequestLevelTiering(TieringRequestContext requestContext) { + tieringRequestContexts.remove(requestContext); + if (requestContext.getListener() != null) { + requestContext.getListener().onResponse(constructToHotToWarmTieringResponse(requestContext.getFailedIndices())); + } + } + + TieringRequestContext findRequestContextForIndex(final Index index) { + return tieringRequestContexts.stream() + .filter(tieringRequestContext -> tieringRequestContext.hasIndex(index)) + .findFirst() + .orElse(null); + } + + /** + * Updates the request context for tiered indices, + * Moves tiered indices to successful state, + * Checks and completes the request level tiering + * @param indexTieringInfos set of tiered indices + */ + void updateRequestContextForTieredIndices(final Set indexTieringInfos) { + for (TieringRequestContext.IndexTieringInfo indexTieringInfo : indexTieringInfos) { + Index tieredIndex = indexTieringInfo.getIndex(); + TieringRequestContext tieringRequestContext = findRequestContextForIndex(tieredIndex); + assert tieringRequestContext != null; + assert tieringRequestContext.getIndexTieringInfo(tieredIndex) == indexTieringInfo; + indexTieringInfo.markSuccessful(); + if (tieringRequestContext.isRequestProcessingComplete()) { + logger.info("[HotToWarmTiering] Tiering is completed for the request [{}]", tieringRequestContext); + completeRequestLevelTiering(tieringRequestContext); + } + } + } + + IndexMetadata findIndexMetadata(final Metadata.Builder metadataBuilder, final Index index) { + IndexMetadata indexMetadata; + try { + indexMetadata = metadataBuilder.getSafe(index); + } catch (IndexNotFoundException e) { + return null; + } + return indexMetadata; + } + + /** + * Updates the index metadata with the tiering settings/metadata for an accepted index. + * Accepted index is an index to be tiered from hot to warm. + * @param metadataBuilder metadata builder + * @param routingTableBuilder routing builder + * @param indexTieringInfo index tiering info + * @return updated index metadata builder + */ + IndexMetadata.Builder updateIndexMetadataForAcceptedIndex( + final Metadata.Builder metadataBuilder, + final RoutingTable.Builder routingTableBuilder, + final TieringRequestContext.IndexTieringInfo indexTieringInfo + ) { + final Index index = indexTieringInfo.getIndex(); + final IndexMetadata indexMetadata = findIndexMetadata(metadataBuilder, index); + if (indexMetadata == null) { + indexTieringInfo.markFailed("index not found"); + return null; + } + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update index settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL); + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT_TO_WARM); + + // Update number of replicas to 1 in case the number of replicas is greater than 1 + if (Integer.parseInt(indexMetadata.getSettings().get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())) > 1) { + final String[] indices = new String[] { index.getName() }; + routingTableBuilder.updateNumberOfReplicas(1, indices); + metadataBuilder.updateNumberOfReplicas(1, indices); + } + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + final Map tieringCustomData = new HashMap<>(); + tieringCustomData.put(HOT_TO_WARM_START_TIME, String.valueOf(System.currentTimeMillis())); + tieringCustomData.put(HOT_TO_WARM_END_TIME, "-1"); + indexMetadataBuilder.putCustom(TIERING_CUSTOM_KEY, tieringCustomData); + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + return indexMetadataBuilder; + } + + /** + * Updates the cluster state by updating the index metadata for tiered indices. + * @param tieredIndices set of tiered indices + */ + void updateClusterStateForTieredIndices(final Set tieredIndices) { + clusterService.submitStateUpdateTask( + "complete hot to warm tiering for tiered indices: " + tieredIndices, + new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + for (TieringRequestContext.IndexTieringInfo indexTieringInfo : tieredIndices) { + final IndexMetadata.Builder indexMetadataBuilder = updateIndexMetadataForTieredIndex( + metadataBuilder, + indexTieringInfo + ); + if (indexMetadataBuilder != null) { + metadataBuilder.put(indexMetadataBuilder); + } + } + return ClusterState.builder(currentState).metadata(metadataBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage( + "[HotToWarmTiering] failed to complete tiering for tiered indices " + "[{}]", + tieredIndices + ), + e + ); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("[HotToWarmTiering] Cluster state updated for source " + source); + updateRequestContextForTieredIndices(tieredIndices); + } + } + ); + } + + /** + * Updates the index metadata with the tiering settings/metadata for a tiered index. + * @param metadataBuilder metadata builder + * @param indexTieringInfo index tiering info + * @return updated index metadata builder + */ + IndexMetadata.Builder updateIndexMetadataForTieredIndex( + final Metadata.Builder metadataBuilder, + final TieringRequestContext.IndexTieringInfo indexTieringInfo + ) { + final Index index = indexTieringInfo.getIndex(); + final IndexMetadata indexMetadata = findIndexMetadata(metadataBuilder, index); + if (indexMetadata == null) { + indexTieringInfo.markFailed("index not found"); + return null; + } + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update tiering settings here + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM); + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + Map tieringCustomData = new HashMap<>(); + tieringCustomData.put(HOT_TO_WARM_START_TIME, indexMetadata.getCustomData(TIERING_CUSTOM_KEY).get(HOT_TO_WARM_START_TIME)); + tieringCustomData.put(HOT_TO_WARM_END_TIME, String.valueOf(System.currentTimeMillis())); + indexMetadataBuilder.putCustom(TIERING_CUSTOM_KEY, tieringCustomData); + + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + return indexMetadataBuilder; + } + + /** + * Tier indices from hot to warm + * @param tieringUpdateClusterStateRequest - tiering update cluster state request + * @param listener - call back listener + */ + public void tier( + final TieringUpdateClusterStateRequest tieringUpdateClusterStateRequest, + final ActionListener listener + ) { + + final Set indices = Set.of(tieringUpdateClusterStateRequest.indices()); + logger.info("[HotToWarmTiering] Starting hot to warm tiering for indices {}", indices); + final TieringRequestContext tieringRequestContext = new TieringRequestContext( + tieringUpdateClusterStateRequest.waitForCompletion() ? listener : null + ); + clusterService.submitStateUpdateTask("start hot to warm tiering: " + indices, new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final TieringValidationResult tieringValidationResult = validateHotToWarm( + currentState, + indices, + clusterInfoService.getClusterInfo(), + diskThresholdSettings + ); + + tieringRequestContext.setIndexTieringStatusMap( + tieringValidationResult.getAcceptedIndices(), + tieringValidationResult.getRejectedIndices() + ); + + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + for (TieringRequestContext.IndexTieringInfo indexTieringInfo : tieringRequestContext.getIndicesWithInitState()) { + final IndexMetadata.Builder indexMetadataBuilder = updateIndexMetadataForAcceptedIndex( + metadataBuilder, + routingTableBuilder, + indexTieringInfo + ); + if (indexMetadataBuilder != null) { + metadataBuilder.put(indexMetadataBuilder); + } + } + + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + + // now, reroute to trigger shard relocation for the dedicated case + updatedState = allocationService.reroute(updatedState, "hot to warm tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn( + (Supplier) () -> new ParameterizedMessage("[HotToWarmTiering] failed tiering for indices " + "[{}]", indices), + e + ); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("[HotToWarmTiering] Cluster state updated for source " + source); + tieringRequestContexts.add(tieringRequestContext); + if (!tieringUpdateClusterStateRequest.waitForCompletion()) { + listener.onResponse(constructToHotToWarmTieringResponse(tieringRequestContext.getFailedIndices())); + } + } + + @Override + public TimeValue timeout() { + return tieringUpdateClusterStateRequest.clusterManagerNodeTimeout(); + } + }); + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() {} + + @Override + protected void doClose() throws IOException {} + +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java b/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java index 2de50f4d4295d..a3297d2c46a18 100644 --- a/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java +++ b/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java @@ -22,7 +22,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.core.index.Index; -import org.opensearch.index.IndexModule; import java.util.HashMap; import java.util.List; @@ -30,7 +29,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; +import static org.opensearch.action.admin.indices.tiering.TieringUtils.isHotIndex; /** * Validator class to validate the tiering requests of the index @@ -82,7 +81,7 @@ public static TieringValidationResult validateHotToWarm( validateEligibleNodesCapacity(clusterInfo, currentState, tieringValidationResult); logger.info( - "Successfully accepted indices for tiering are [{}], rejected indices are [{}]", + "[HotToWarmTiering] Successfully accepted indices for tiering are [{}], rejected indices are [{}]", tieringValidationResult.getAcceptedIndices(), tieringValidationResult.getRejectedIndices() ); @@ -127,7 +126,7 @@ static boolean validateRemoteStoreIndex(final ClusterState state, final Index in * @return true if the index is in the "hot" tiering state, false otherwise */ static boolean validateHotIndex(final ClusterState state, final Index index) { - return IndexModule.TieringState.HOT.name().equals(INDEX_TIERING_STATE.get(state.metadata().getIndexSafe(index).getSettings())); + return isHotIndex(state.metadata().getIndexSafe(index)); } /** @@ -172,7 +171,7 @@ static void validateDiskThresholdWaterMarkNotBreached( ) { final Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); if (usages == null) { - logger.trace("skipping monitor as no disk usage information is available"); + logger.trace("[Tiering] skipping monitor as no disk usage information is available"); return; } final Set nodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 448cb3627651c..96c56818562d8 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -179,6 +179,7 @@ import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.store.IndicesStore; +import org.opensearch.indices.tiering.HotToWarmTieringService; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.monitor.fs.FsHealthService; @@ -1199,6 +1200,14 @@ protected Node( remoteClusterStateService ); + final HotToWarmTieringService hotToWarmTieringService = new HotToWarmTieringService( + settings, + clusterService, + clusterModule.getIndexNameExpressionResolver(), + clusterModule.getAllocationService(), + clusterInfoService + ); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( settings, clusterService::state, @@ -1395,6 +1404,9 @@ protected Node( b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + b.bind(HotToWarmTieringService.class).toInstance(hotToWarmTieringService); + } b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); diff --git a/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java b/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java new file mode 100644 index 0000000000000..5d5d85958adf2 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +public class HotToWarmTieringServiceTests extends OpenSearchSingleNodeTestCase { + + private ClusterService clusterService; + private HotToWarmTieringService hotToWarmTieringService; + + @Before + public void beforeTest() { + clusterService = this.getInstanceFromNode(ClusterService.class); + hotToWarmTieringService = this.getInstanceFromNode(HotToWarmTieringService.class); + } + + public void testUpdateIndexMetadataForAcceptedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterService.state().routingTable()); + final TieringRequestContext.IndexTieringInfo indexTieringInfo = new TieringRequestContext.IndexTieringInfo(index); + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForAcceptedIndex( + metadataBuilder, + routingTableBuilder, + indexTieringInfo + ); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals( + IndexModule.DataLocalityType.PARTIAL, + IndexModule.DataLocalityType.getValueOf(indexMetadata.getSettings().get(INDEX_STORE_LOCALITY_SETTING.getKey())) + ); + assertEquals(IndexModule.TieringState.HOT_TO_WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + Map customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_START_TIME)); + assertNotNull(customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME)); + } + + public void testUpdateIndexMetadataForSuccessfulIndex() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + Map customData = new HashMap<>(); + customData.put(HotToWarmTieringService.HOT_TO_WARM_START_TIME, String.valueOf(System.currentTimeMillis())); + metadataBuilder.put(IndexMetadata.builder(metadataBuilder.getSafe(index)).putCustom(IndexMetadata.TIERING_CUSTOM_KEY, customData)); + final TieringRequestContext.IndexTieringInfo indexTieringInfo = new TieringRequestContext.IndexTieringInfo( + index, + TieringRequestContext.IndexTieringState.SUCCESSFUL, + null + ); + IndexMetadata.Builder builder = hotToWarmTieringService.updateIndexMetadataForTieredIndex(metadataBuilder, indexTieringInfo); + assertNotNull(builder); + IndexMetadata indexMetadata = builder.index(indexName).build(); + assertEquals(IndexModule.TieringState.WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + String endTime = customData.get(HotToWarmTieringService.HOT_TO_WARM_END_TIME); + assertNotNull(endTime); + assertNotEquals("-1", endTime); + } +}