From 06fab37aae76bcacfa30ebaa6539df8962559a5d Mon Sep 17 00:00:00 2001 From: Neetika Singhal Date: Thu, 11 Jul 2024 12:57:20 -0700 Subject: [PATCH] HotToWarmTieringService changes to tier shards Signed-off-by: Neetika Singhal --- CHANGELOG.md | 1 + .../tiering/HotToWarmTieringServiceIT.java | 114 ++++++++++ .../tiering/TieringBaseIntegTestCase.java | 102 +++++++++ .../tiering/TieringRequestContext.java | 167 +++++++++++++++ .../indices/tiering/TieringRequests.java | 88 ++++++++ .../TieringUpdateClusterStateRequest.java | 40 ++++ .../admin/indices/tiering/TieringUtils.java | 39 ++++ .../tiering/TieringValidationResult.java | 10 - .../TransportHotToWarmTieringAction.java | 30 ++- .../cluster/metadata/IndexMetadata.java | 13 ++ .../metadata/IndexNameExpressionResolver.java | 31 +++ .../AbstractTieringStateProcessor.java | 44 ++++ .../CompletedTieringStateProcessor.java | 39 ++++ .../tiering/HotToWarmTieringService.java | 196 ++++++++++++++++++ .../indices/tiering/IndexTieringState.java | 99 +++++++++ .../PendingCompleteTieringStateProcessor.java | 106 ++++++++++ .../PendingStartTieringStateProcessor.java | 132 ++++++++++++ .../tiering/RunningTieringStateProcessor.java | 79 +++++++ .../tiering/TieringRequestValidator.java | 22 +- .../tiering/TieringStateProcessor.java | 25 +++ .../main/java/org/opensearch/node/Node.java | 11 + .../TransportHotToWarmTieringActionTests.java | 2 +- .../IndexNameExpressionResolverTests.java | 40 ++++ .../tiering/HotToWarmTieringServiceTests.java | 71 +++++++ .../tiering/TieringRequestValidatorTests.java | 22 -- 25 files changed, 1465 insertions(+), 58 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java create mode 100644 server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequests.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java create mode 100644 server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/AbstractTieringStateProcessor.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/CompletedTieringStateProcessor.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/IndexTieringState.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/PendingCompleteTieringStateProcessor.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/PendingStartTieringStateProcessor.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/RunningTieringStateProcessor.java create mode 100644 server/src/main/java/org/opensearch/indices/tiering/TieringStateProcessor.java create mode 100644 server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 663ae586a46a9..5123e0fbcaf02 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709)) - [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897)) - Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153)) +- HotToWarmTieringService changes to tier shards ([#14891](https://github.com/opensearch-project/OpenSearch/pull/14891)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java b/server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java new file mode 100644 index 0000000000000..4ad8d87803bae --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/tiering/HotToWarmTieringServiceIT.java @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + +import org.opensearch.action.admin.indices.get.GetIndexResponse; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse; +import org.opensearch.action.admin.indices.tiering.TieringIndexRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.MockInternalClusterInfoService; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.IndexModule; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.util.Map; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) +// Uncomment the below line to enable trace level logs for this test for better debugging +// @TestLogging(reason = "Getting trace logs from tiering package", value = +// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE") +public class HotToWarmTieringServiceIT extends TieringBaseIntegTestCase { + + protected static final String TEST_IDX_1 = "test-idx-1"; + protected static final String TEST_IDX_2 = "test-idx-2"; + protected static final int NUM_DOCS_IN_BULK = 10; + private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes(); + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + // waiting for the recovery pr to be merged in + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13647") + public void testTieringBasic() { + final int numReplicasIndex = 0; + internalCluster().ensureAtLeastNumDataNodes(1); + final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex) + .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()) + .build(); + + String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 }; + for (String index : indices) { + assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get()); + ensureGreen(index); + // Ingesting some docs + indexBulk(index, NUM_DOCS_IN_BULK); + flushAndRefresh(index); + ensureGreen(); + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + } + + // Spin up node having search role + internalCluster().ensureAtLeastNumSearchAndDataNodes(1); + + final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); + clusterInfoService.setDiskUsageFunctionAndRefresh( + (discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) + ); + + TieringIndexRequest request = new TieringIndexRequest(TARGET_WARM_TIER, indices); + request.waitForCompletion(true); + HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); + assertAcked(response); + assertTrue(response.getFailedIndices().isEmpty()); + assertTrue(response.isAcknowledged()); + ensureGreen(); + for (String index : indices) { + SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); + // Asserting that search returns same number of docs as ingested + assertHitCount(searchResponse, NUM_DOCS_IN_BULK); + GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get(); + assertWarmSettings(getIndexResponse, index); + assertAcked(client().admin().indices().prepareDelete(index).get()); + } + } + + private void assertWarmSettings(GetIndexResponse response, String indexName) { + final Map settings = response.settings(); + assertThat(settings, notNullValue()); + assertThat(settings.size(), equalTo(1)); + Settings indexSettings = settings.get(indexName); + assertThat(indexSettings, notNullValue()); + assertThat( + indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()), + equalTo(IndexModule.DataLocalityType.PARTIAL.name()) + ); + assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name())); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java new file mode 100644 index 0000000000000..0b60e71480315 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/tiering/TieringBaseIntegTestCase.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tiering; + +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.cluster.ClusterInfoService; +import org.opensearch.cluster.MockInternalClusterInfoService; +import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; + +public class TieringBaseIntegTestCase extends OpenSearchIntegTestCase { + + protected Path segmentRepoPath; + protected Path translogRepoPath; + Settings extraSettings = Settings.EMPTY; + private final List documentKeys = List.of( + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5), + randomAlphaOfLength(5) + ); + + protected static final String REPOSITORY_NAME = "test-remote-store-repo"; + protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; + protected static final String TARGET_WARM_TIER = "warm"; + + /** + * Disable MockFSIndexStore plugin as it wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) + * As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory + * + */ + @Override + protected boolean addMockIndexStorePlugin() { + return false; + } + + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); + } + + @Override + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); + return featureSettings.build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + if (segmentRepoPath == null || translogRepoPath == null) { + segmentRepoPath = randomRepoPath().toAbsolutePath(); + translogRepoPath = randomRepoPath().toAbsolutePath(); + } + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(extraSettings) + .put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) + .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) + .build(); + } + + protected BulkResponse indexBulk(String indexName, int numDocs) { + BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < numDocs; i++) { + final IndexRequest request = client().prepareIndex(indexName) + .setId(UUIDs.randomBase64UUID()) + .setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) + .request(); + bulkRequest.add(request); + } + return client().bulk(bulkRequest).actionGet(); + } + + protected MockInternalClusterInfoService getMockInternalClusterInfoService() { + return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); + } + + protected static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { + return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java new file mode 100644 index 0000000000000..bf8565aa5b21d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java @@ -0,0 +1,167 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.indices.tiering.IndexTieringState; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Context class to hold indices to be tiered per request. It also holds + * the listener per request to mark the request as complete once all + * tiering operations are completed. + * + * @opensearch.experimental + */ + +@ExperimentalApi +public class TieringRequestContext { + private final ActionListener actionListener; + private final Map indexTieringStatusMap; + + public TieringRequestContext( + ActionListener actionListener, + Set acceptedIndices, + Map failedIndices + ) { + this.actionListener = actionListener; + indexTieringStatusMap = new ConcurrentHashMap<>(); + for (Index index : acceptedIndices) { + indexTieringStatusMap.put(index, new IndexTieringInfo()); + } + for (Map.Entry entry : failedIndices.entrySet()) { + indexTieringStatusMap.put(entry.getKey(), new IndexTieringInfo(IndexTieringState.FAILED, entry.getValue())); + } + } + + public ActionListener getListener() { + return actionListener; + } + + public boolean hasIndex(Index index) { + return indexTieringStatusMap.containsKey(index); + } + + public Map getFailedIndices() { + Map failedIndicesMap = new HashMap<>(); + for (Index index : filterIndicesByState(IndexTieringState.FAILED)) { + failedIndicesMap.put(index, indexTieringStatusMap.get(index).getReason()); + } + return failedIndicesMap; + } + + public boolean isRequestProcessingComplete() { + return filterIndicesByState(IndexTieringState.COMPLETED).size() + filterIndicesByState(IndexTieringState.FAILED) + .size() == indexTieringStatusMap.size(); + } + + public List filterIndicesByState(IndexTieringState state) { + return indexTieringStatusMap.keySet() + .stream() + .filter(indexTieringInfo -> indexTieringStatusMap.get(indexTieringInfo).getState() == state) + .collect(Collectors.toList()); + } + + public List getIndicesPendingTiering() { + return indexTieringStatusMap.keySet() + .stream() + .filter(indexTieringInfo -> indexTieringStatusMap.get(indexTieringInfo).getState() == IndexTieringState.PENDING_START) + .collect(Collectors.toList()); + } + + public void markIndexFailed(Index index, String reason) { + indexTieringStatusMap.get(index).markFailed(reason); + } + + public void markIndexInProgress(Index index) { + indexTieringStatusMap.get(index).markInProgress(); + } + + public void markIndexAsPendingComplete(Index index) { + indexTieringStatusMap.get(index).markAsPendingComplete(); + } + + public void markIndexAsCompleted(Index index) { + indexTieringStatusMap.get(index).markCompleted(); + } + + public boolean hasIndexFailed(Index index) { + return indexTieringStatusMap.get(index).hasFailed(); + } + + @Override + public String toString() { + return "TieringRequestContext{" + "actionListener=" + actionListener + ", indexTieringStatusMap=" + indexTieringStatusMap + '}'; + } + + /** + * Represents info of a tiering index + * + * @opensearch.experimental + */ + @ExperimentalApi + public static class IndexTieringInfo { + private IndexTieringState state; + private String reason; + + public IndexTieringInfo() { + this.state = IndexTieringState.PENDING_START; + this.reason = null; + } + + public IndexTieringInfo(IndexTieringState state, String reason) { + this.state = state; + this.reason = reason; + } + + public IndexTieringState getState() { + return state; + } + public void markInProgress() { + this.state = IndexTieringState.IN_PROGRESS; + } + + public void markAsPendingComplete() { + this.state = IndexTieringState.PENDING_COMPLETION; + } + + public void markCompleted() { + this.state = IndexTieringState.COMPLETED; + } + + public void markFailed(String reason) { + this.state = IndexTieringState.FAILED; + this.reason = reason; + } + + public String getReason() { + return reason; + } + + public boolean hasFailed() { + return state.failed(); + } + + @Override + public String toString() { + return "IndexTieringInfo{" + + "state=" + state + + ", reason='" + reason + '\'' + + '}'; + } + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequests.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequests.java new file mode 100644 index 0000000000000..a23518c83f5a5 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequests.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.index.Index; +import org.opensearch.indices.tiering.IndexTieringState; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.Map; +import java.util.Queue; +import java.util.Set; + +/** + * Tiering requests + */ +@ExperimentalApi +public class TieringRequests { + private final Set acceptedTieringRequestContexts; + private final Queue queuedTieringContexts; + private final int queueSize; + + public TieringRequests(int queueSize) { + this.queueSize = queueSize; + acceptedTieringRequestContexts = new HashSet<>(); + queuedTieringContexts = new LinkedList<>(); + } + + public Set getAcceptedTieringRequestContexts() { + return acceptedTieringRequestContexts; + } + + + public Queue getQueuedTieringContexts() { + return queuedTieringContexts; + } + + public void addToPending(TieringRequestContext tieringRequestContext) { + queuedTieringContexts.add(tieringRequestContext); + } + + public void addToPending(Set tieringRequestContexts) { + for (TieringRequestContext tieringRequestContext : tieringRequestContexts) { + addToPending(tieringRequestContext); + } + } + + public void addToAccepted(TieringRequestContext tieringRequestContext) { + acceptedTieringRequestContexts.add(tieringRequestContext); + } + + public void addToAccepted(Set tieringRequestContexts) { + for (TieringRequestContext tieringRequestContext : tieringRequestContexts) { + addToAccepted(tieringRequestContext); + } + } + + public Set dequePendingTieringContexts() { + final Set tieringRequestContexts = new HashSet<>(); + int i = queueSize - acceptedTieringRequestContexts.size(); + while (!queuedTieringContexts.isEmpty() && i-- > 0) { + tieringRequestContexts.add(queuedTieringContexts.poll()); + } + return tieringRequestContexts; + } + + public Map getIndices(Set tieringRequestContexts, IndexTieringState state) { + final Map indexTieringRequestContextMap = new HashMap<>(); + for (TieringRequestContext tieringRequestContext : tieringRequestContexts) { + for (Index index : tieringRequestContext.filterIndicesByState(state)) { + indexTieringRequestContextMap.put(index, tieringRequestContext); + } + } + return indexTieringRequestContextMap; + } + + public boolean isEmpty() { + return queuedTieringContexts.isEmpty() && acceptedTieringRequestContexts.isEmpty(); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java new file mode 100644 index 0000000000000..c22a760679495 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.cluster.ack.IndicesClusterStateUpdateRequest; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.index.Index; + +import java.util.Map; + +/** + * Cluster state update request that allows tiering for indices + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TieringUpdateClusterStateRequest extends IndicesClusterStateUpdateRequest { + + private final Map rejectedIndices; + private final boolean waitForCompletion; + + public TieringUpdateClusterStateRequest(Map rejectedIndices, boolean waitForCompletion) { + this.rejectedIndices = rejectedIndices; + this.waitForCompletion = waitForCompletion; + } + + public boolean waitForCompletion() { + return waitForCompletion; + } + + public Map getRejectedIndices() { + return rejectedIndices; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java new file mode 100644 index 0000000000000..3c4e1d5b2f7d6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUtils.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.indices.tiering; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.core.index.Index; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Utility class for tiering operations + * + * @opensearch.experimental + */ +@ExperimentalApi +public class TieringUtils { + + /** + * Constructs a HotToWarmTieringResponse from the rejected indices map + * + * @param rejectedIndices the rejected indices map + * @return the HotToWarmTieringResponse object + */ + public static HotToWarmTieringResponse constructToHotToWarmTieringResponse(final Map rejectedIndices) { + final List indicesResult = new LinkedList<>(); + for (Map.Entry rejectedIndex : rejectedIndices.entrySet()) { + indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue())); + } + return new HotToWarmTieringResponse(true, indicesResult); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java index ccd60daf027ce..f656d7dd28357 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringValidationResult.java @@ -12,8 +12,6 @@ import org.opensearch.core.index.Index; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -50,14 +48,6 @@ public void addToRejected(Index index, String reason) { rejectedIndices.put(index, reason); } - public HotToWarmTieringResponse constructResponse() { - final List indicesResult = new LinkedList<>(); - for (Map.Entry rejectedIndex : rejectedIndices.entrySet()) { - indicesResult.add(new HotToWarmTieringResponse.IndexResult(rejectedIndex.getKey().getName(), rejectedIndex.getValue())); - } - return new HotToWarmTieringResponse(acceptedIndices.size() > 0, indicesResult); - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java index 8d1ab0bb37cdd..257507f100ccf 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringAction.java @@ -25,12 +25,16 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.indices.tiering.HotToWarmTieringService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Set; +import java.util.List; +import static org.opensearch.action.admin.indices.tiering.TieringUtils.constructToHotToWarmTieringResponse; +import static org.opensearch.common.util.set.Sets.newHashSet; import static org.opensearch.indices.tiering.TieringRequestValidator.validateHotToWarm; /** @@ -44,6 +48,7 @@ public class TransportHotToWarmTieringAction extends TransportClusterManagerNode private static final Logger logger = LogManager.getLogger(TransportHotToWarmTieringAction.class); private final ClusterInfoService clusterInfoService; private final DiskThresholdSettings diskThresholdSettings; + private final HotToWarmTieringService hotToWarmTieringService; @Inject public TransportHotToWarmTieringAction( @@ -53,7 +58,8 @@ public TransportHotToWarmTieringAction( ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, ClusterInfoService clusterInfoService, - Settings settings + Settings settings, + HotToWarmTieringService hotToWarmTieringService ) { super( HotToWarmTieringAction.NAME, @@ -66,6 +72,7 @@ public TransportHotToWarmTieringAction( ); this.clusterInfoService = clusterInfoService; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterService.getClusterSettings()); + this.hotToWarmTieringService = hotToWarmTieringService; } @Override @@ -90,21 +97,34 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) throws Exception { - Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + Index[] concreteIndices = indexNameExpressionResolver.concreteIndicesInTier(state, request, IndexModule.TieringState.HOT); if (concreteIndices == null || concreteIndices.length == 0) { + logger.info( + "[HotToWarmTiering] No hot concrete indices resolved for the indices {} in the request", + List.of(request.indices()) + ); listener.onResponse(new HotToWarmTieringResponse(true)); return; } final TieringValidationResult tieringValidationResult = validateHotToWarm( state, - Set.of(concreteIndices), + newHashSet(concreteIndices), clusterInfoService.getClusterInfo(), diskThresholdSettings ); if (tieringValidationResult.getAcceptedIndices().isEmpty()) { - listener.onResponse(tieringValidationResult.constructResponse()); + listener.onResponse(constructToHotToWarmTieringResponse(tieringValidationResult.getRejectedIndices())); return; } + + final TieringUpdateClusterStateRequest updateClusterStateRequest = new TieringUpdateClusterStateRequest( + tieringValidationResult.getRejectedIndices(), + request.waitForCompletion() + ).ackTimeout(request.timeout()) + .masterNodeTimeout(request.clusterManagerNodeTimeout()) + .indices(tieringValidationResult.getAcceptedIndices().toArray(Index.EMPTY_ARRAY)); + + hotToWarmTieringService.tier(updateClusterStateRequest, listener); } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index df0d2609ad83d..3199aaefbf5d2 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -95,6 +95,7 @@ import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR; import static org.opensearch.common.settings.Settings.readSettingsFromStream; import static org.opensearch.common.settings.Settings.writeSettingsToStream; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; /** * Index metadata information @@ -638,6 +639,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_SYSTEM = "system"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store"; + public static final String TIERING_CUSTOM_KEY = "tiering"; public static final String TRANSLOG_METADATA_KEY = "translog_metadata"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; @@ -687,6 +689,8 @@ public static APIBlock readFrom(StreamInput input) throws IOException { private final boolean isSystem; private final boolean isRemoteSnapshot; + private final IndexModule.TieringState tieringState; + private final int indexTotalShardsPerNodeLimit; private IndexMetadata( @@ -750,6 +754,7 @@ private IndexMetadata( this.rolloverInfos = Collections.unmodifiableMap(rolloverInfos); this.isSystem = isSystem; this.isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings); + this.tieringState = IndexModule.TieringState.valueOf(INDEX_TIERING_STATE.get(settings)); this.indexTotalShardsPerNodeLimit = indexTotalShardsPerNodeLimit; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -1220,6 +1225,14 @@ public boolean isRemoteSnapshot() { return isRemoteSnapshot; } + public boolean isHotIndex() { + return IndexModule.TieringState.HOT.equals(tieringState); + } + + public boolean isIndexInTier(IndexModule.TieringState tieringState) { + return this.tieringState == tieringState; + } + public static Builder builder(String index) { return new Builder(index); } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexNameExpressionResolver.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexNameExpressionResolver.java index 24ff83d638d4b..75b21983ec1a5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexNameExpressionResolver.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexNameExpressionResolver.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.common.Booleans; import org.opensearch.common.Nullable; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.collect.Tuple; import org.opensearch.common.logging.DeprecationLogger; @@ -50,6 +51,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; import org.opensearch.indices.IndexClosedException; import org.opensearch.indices.InvalidIndexNameException; @@ -120,6 +122,35 @@ public String[] concreteIndexNamesWithSystemIndexAccess(ClusterState state, Indi return concreteIndexNames(context, request.indices()); } + /** + * Returns the concrete indices that match the provided tiering state. + * + * @param state cluster state + * @param request indices request + * @param tieringState the tiering state of indices + * @return array of concrete indices resolved having the provided tiering state + */ + @ExperimentalApi + public Index[] concreteIndicesInTier(ClusterState state, IndicesRequest request, IndexModule.TieringState tieringState) { + Context context = new Context( + state, + request.indicesOptions(), + false, + false, + request.includeDataStreams(), + isSystemIndexAccessAllowed() + ); + final Index[] concreteIndices = concreteIndices(context, request.indices()); + final Set indicesWithTargetTier = new HashSet<>(); + for (Index index : concreteIndices) { + IndexMetadata indexMetadata = state.metadata().getIndexSafe(index); + if (indexMetadata.isIndexInTier(tieringState)) { + indicesWithTargetTier.add(index); + } + } + return indicesWithTargetTier.toArray(Index.EMPTY_ARRAY); + } + /** * Same as {@link #concreteIndices(ClusterState, IndicesOptions, String...)}, but the index expressions and options * are encapsulated in the specified request and resolves data streams. diff --git a/server/src/main/java/org/opensearch/indices/tiering/AbstractTieringStateProcessor.java b/server/src/main/java/org/opensearch/indices/tiering/AbstractTieringStateProcessor.java new file mode 100644 index 0000000000000..2dc9ebeeed7f6 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/AbstractTieringStateProcessor.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.opensearch.cluster.routing.allocation.AllocationService; + +public abstract class AbstractTieringStateProcessor implements TieringStateProcessor { + + protected final IndexTieringState nextState; + protected final AbstractTieringStateProcessor nextStateProcessor; + protected final AllocationService allocationService; + + public AbstractTieringStateProcessor(IndexTieringState nextState, AllocationService allocationService) { + this.allocationService = allocationService; + this.nextState = nextState; + if (nextState == null) { + this.nextStateProcessor = null; + return; + } + + switch (nextState) { + case PENDING_START: + this.nextStateProcessor = new PendingStartTieringStateProcessor(allocationService); + break; + case IN_PROGRESS: + this.nextStateProcessor = new RunningTieringStateProcessor(allocationService); + break; + case PENDING_COMPLETION: + this.nextStateProcessor = new PendingCompleteTieringStateProcessor(allocationService); + break; + case COMPLETED: + this.nextStateProcessor = new CompletedTieringStateProcessor(allocationService); + break; + default: + this.nextStateProcessor = null; + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/CompletedTieringStateProcessor.java b/server/src/main/java/org/opensearch/indices/tiering/CompletedTieringStateProcessor.java new file mode 100644 index 0000000000000..ef28646b4c4cb --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/CompletedTieringStateProcessor.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.tiering.TieringRequests; +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; + +import static org.opensearch.action.admin.indices.tiering.TieringUtils.constructToHotToWarmTieringResponse; + +public class CompletedTieringStateProcessor extends AbstractTieringStateProcessor { + private static final Logger logger = LogManager.getLogger(CompletedTieringStateProcessor.class); + public CompletedTieringStateProcessor(AllocationService allocationService) { + super(null, allocationService); + } + + @Override + public void process(ClusterState clusterState, ClusterService clusterService, TieringRequests tieringRequests) { + for (TieringRequestContext tieringRequestContext : tieringRequests.getAcceptedTieringRequestContexts()) { + if (tieringRequestContext.isRequestProcessingComplete()) { + logger.info("[HotToWarmTiering] Tiering is completed for the request [{}]", tieringRequestContext); + tieringRequests.getAcceptedTieringRequestContexts().remove(tieringRequestContext); + if (tieringRequestContext.getListener() != null) { + tieringRequestContext.getListener().onResponse(constructToHotToWarmTieringResponse(tieringRequestContext.getFailedIndices())); + } + } + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java b/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java new file mode 100644 index 0000000000000..7aacb83ee910c --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/HotToWarmTieringService.java @@ -0,0 +1,196 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse; +import org.opensearch.action.admin.indices.tiering.TieringRequests; +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.action.admin.indices.tiering.TieringUpdateClusterStateRequest; +import org.opensearch.cluster.ClusterChangedEvent; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateListener; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.action.admin.indices.tiering.TieringUtils.constructToHotToWarmTieringResponse; +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.TIERING_CUSTOM_KEY; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +/** + * Service responsible for tiering indices from hot to warm + * @opensearch.experimental + */ +@ExperimentalApi +public class HotToWarmTieringService extends AbstractLifecycleComponent implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(HotToWarmTieringService.class); + private final ClusterService clusterService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final AllocationService allocationService; + static final String TIERING_START_TIME = "start_time"; + private TieringRequests tieringRequests; + private final PendingStartTieringStateProcessor pendingStartTieringStateProcessor; + private final RunningTieringStateProcessor runningTieringStateProcessor; + + @Inject + public HotToWarmTieringService( + Settings settings, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + AllocationService allocationService + ) { + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.allocationService = allocationService; + // TODO: initialize the queue size via the cluster based setting + this.pendingStartTieringStateProcessor = new PendingStartTieringStateProcessor(allocationService); + this.runningTieringStateProcessor = new RunningTieringStateProcessor(allocationService); + if (DiscoveryNode.isClusterManagerNode(settings) && FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + clusterService.addListener(this); + this.tieringRequests = new TieringRequests(100); + } + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + // TODO: https://github.com/opensearch-project/OpenSearch/issues/14981 + if (event.localNodeClusterManager()) { + if (!tieringRequests.getQueuedTieringContexts().isEmpty()) { + pendingStartTieringStateProcessor.process(event.state(), clusterService, tieringRequests); + } + if (event.routingTableChanged()) { + if (!tieringRequests.getAcceptedTieringRequestContexts().isEmpty()) { + runningTieringStateProcessor.process(event.state(), clusterService, tieringRequests); + } + } + } + } + + /** + * Updates the index metadata with the tiering settings/metadata for an accepted index. + * Accepted index is an index to be tiered from hot to warm. + * @param metadataBuilder metadata builder + * @param indexMetadata index metadata + */ + void updateIndexMetadataForAcceptedIndex( + final Metadata.Builder metadataBuilder, + final IndexMetadata indexMetadata + ) { + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update index settings here + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT_TO_WARM); + + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + final Map tieringCustomData = new HashMap<>(); + tieringCustomData.put(TIERING_START_TIME, String.valueOf(System.currentTimeMillis())); + indexMetadataBuilder.putCustom(TIERING_CUSTOM_KEY, tieringCustomData); + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + metadataBuilder.put(indexMetadataBuilder); + } + + /** + * Tier indices from hot to warm + * @param request - tiering update cluster state request + * @param listener - call back listener + */ + public void tier(final TieringUpdateClusterStateRequest request, final ActionListener listener) { + final Set indices = Set.of(request.indices()); + final TieringRequestContext tieringRequestContext = new TieringRequestContext( + request.waitForCompletion() ? listener : null, + indices, + request.getRejectedIndices() + ); + + logger.info("[HotToWarmTiering] Starting hot to warm tiering for indices {}", indices); + clusterService.submitStateUpdateTask("start hot to warm tiering: " + indices, new ClusterStateUpdateTask(Priority.URGENT) { + + @Override + public ClusterState execute(ClusterState currentState) { + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + for (Index index : tieringRequestContext.getIndicesPendingTiering()) { + final IndexMetadata indexMetadata = currentState.metadata().index(index); + if (indexMetadata == null) { + tieringRequestContext.markIndexFailed(index, "index not found"); + continue; + } else if (!indexMetadata.isHotIndex()) { + tieringRequestContext.markIndexFailed(index, "index is not in the HOT tier"); + continue; + } + updateIndexMetadataForAcceptedIndex(metadataBuilder, indexMetadata); + } + return ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error( + (Supplier) () -> new ParameterizedMessage("[HotToWarmTiering] failed tiering for indices " + "[{}]", indices), + e + ); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("[HotToWarmTiering] Cluster state updated for source " + source); + tieringRequests.addToPending(tieringRequestContext); + if (!request.waitForCompletion()) { + listener.onResponse(constructToHotToWarmTieringResponse(tieringRequestContext.getFailedIndices())); + } + } + + @Override + public TimeValue timeout() { + return request.clusterManagerNodeTimeout(); + } + }); + } + + @Override + protected void doStart() {} + + @Override + protected void doStop() {} + + @Override + protected void doClose() throws IOException {} + +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/IndexTieringState.java b/server/src/main/java/org/opensearch/indices/tiering/IndexTieringState.java new file mode 100644 index 0000000000000..d4b45282cae10 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/IndexTieringState.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Index Tiering status + * + * @opensearch.experimental + */ +@ExperimentalApi +public enum IndexTieringState { + + PENDING_START((byte) 0), + + /** + * Tiering in progress (running shard relocation) + */ + IN_PROGRESS((byte) 1), + + /** + * Index tiered (shard relocation completed) + */ + PENDING_COMPLETION((byte) 2), + + /** + * Tiering finished successfully + */ + COMPLETED((byte) 3), + + /** + * Tiering failed + */ + FAILED((byte) 4); + + private final byte value; + + /** + * Constructs new state + * + * @param value state code + */ + IndexTieringState(byte value) { + this.value = value; + } + + /** + * Returns state code + * + * @return state code + */ + public byte value() { + return value; + } + + /** + * @return true if tiering is successful + */ + public boolean successful() { + return this == COMPLETED; + } + + /** + * @return true if tiering is failed + */ + public boolean failed() { + return this == FAILED; + } + + /** + * Returns state corresponding to state code + * + * @param value stat code + * @return state + */ + public static IndexTieringState fromValue(byte value) { + switch (value) { + case 0: + return PENDING_START; + case 1: + return IN_PROGRESS; + case 2: + return PENDING_COMPLETION; + case 3: + return COMPLETED; + case 4: + return FAILED; + default: + throw new IllegalArgumentException("No tiering state for value [" + value + "]"); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/PendingCompleteTieringStateProcessor.java b/server/src/main/java/org/opensearch/indices/tiering/PendingCompleteTieringStateProcessor.java new file mode 100644 index 0000000000000..4621519680339 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/PendingCompleteTieringStateProcessor.java @@ -0,0 +1,106 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.admin.indices.tiering.TieringRequests; +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.util.Map; + +import static org.opensearch.cluster.metadata.IndexMetadata.TIERING_CUSTOM_KEY; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +public class PendingCompleteTieringStateProcessor extends AbstractTieringStateProcessor { + + private static final Logger logger = LogManager.getLogger(PendingCompleteTieringStateProcessor.class); + + public PendingCompleteTieringStateProcessor(AllocationService allocationService) { + super(IndexTieringState.COMPLETED, allocationService); + } + + /** + * Updates the index metadata with the tiering settings/metadata for a tiered index. + * @param metadataBuilder metadata builder + * @param indexMetadata index metadata + */ + void updateIndexMetadataForTieredIndex(final Metadata.Builder metadataBuilder, final IndexMetadata indexMetadata) { + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update tiering settings here + indexSettingsBuilder.put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM); + // trying to put transient index metadata in the custom index metadata + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + indexMetadataBuilder.removeCustom(TIERING_CUSTOM_KEY); + + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + metadataBuilder.put(indexMetadataBuilder); + } + + @Override + public void process(ClusterState clusterState, ClusterService clusterService, TieringRequests tieringRequests) { + clusterService.submitStateUpdateTask( + "complete hot to warm tiering for tiered indices: ", + new ClusterStateUpdateTask(Priority.NORMAL) { + final Map indices = tieringRequests.getIndices(tieringRequests.getAcceptedTieringRequestContexts(), IndexTieringState.PENDING_COMPLETION); + + @Override + public ClusterState execute(ClusterState currentState) { + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + for (Map.Entry entry : indices.entrySet()) { + Index index = entry.getKey(); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + if (indexMetadata == null) { + entry.getValue().markIndexFailed(index, "index not found"); + continue; + } + updateIndexMetadataForTieredIndex(metadataBuilder, indexMetadata); + } + return ClusterState.builder(currentState).metadata(metadataBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error( + (Supplier) () -> new ParameterizedMessage( + "[HotToWarmTiering] failed to complete tiering for tiered indices " + "[{}]", + indices + ), + e + ); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("[HotToWarmTiering] Cluster state updated for source " + source); + for (Map.Entry entry : indices.entrySet()) { + if (!entry.getValue().hasIndexFailed(entry.getKey())) { + entry.getValue().markIndexAsCompleted(entry.getKey()); + } + } + assert nextStateProcessor != null; + nextStateProcessor.process(clusterState, clusterService, tieringRequests); + } + } + ); + } +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/PendingStartTieringStateProcessor.java b/server/src/main/java/org/opensearch/indices/tiering/PendingStartTieringStateProcessor.java new file mode 100644 index 0000000000000..31662f466ad98 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/PendingStartTieringStateProcessor.java @@ -0,0 +1,132 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Supplier; +import org.opensearch.action.admin.indices.tiering.TieringRequests; +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING; +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; + +public class PendingStartTieringStateProcessor extends AbstractTieringStateProcessor { + + private static final Logger logger = LogManager.getLogger(PendingStartTieringStateProcessor.class); + + public PendingStartTieringStateProcessor(AllocationService allocationService) { + super(IndexTieringState.IN_PROGRESS, allocationService); + } + + /** + * Updates the index metadata with the tiering settings/metadata for an accepted index. + * Accepted index is an index to be tiered from hot to warm. + * @param metadataBuilder metadata builder + * @param routingTableBuilder routing builder + * @param indexMetadata index metadata + * @param index index + */ + void updateIndexMetadataForAcceptedIndex( + final Metadata.Builder metadataBuilder, + final RoutingTable.Builder routingTableBuilder, + final IndexMetadata indexMetadata, + final Index index + ) { + Settings.Builder indexSettingsBuilder = Settings.builder().put(indexMetadata.getSettings()); + // update index settings here + indexSettingsBuilder.put(INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL); + + // Update number of replicas to 1 in case the number of replicas is greater than 1 + if (Integer.parseInt(indexMetadata.getSettings().get(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey())) > 1) { + final String[] indices = new String[] { index.getName() }; + routingTableBuilder.updateNumberOfReplicas(1, indices); + metadataBuilder.updateNumberOfReplicas(1, indices); + } + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata).settings(indexSettingsBuilder); + // Update index settings version + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + metadataBuilder.put(indexMetadataBuilder); + } + + + @Override + public void process(ClusterState clusterState, ClusterService clusterService, TieringRequests tieringRequests) { + + clusterService.submitStateUpdateTask("start hot to warm tiering: ", new ClusterStateUpdateTask(Priority.URGENT) { + final Set tieringRequestContexts = tieringRequests.dequePendingTieringContexts(); + final Map indices = tieringRequests.getIndices(tieringRequestContexts, IndexTieringState.PENDING_START); + + @Override + public ClusterState execute(ClusterState currentState) { + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(currentState.routingTable()); + final Metadata.Builder metadataBuilder = Metadata.builder(currentState.metadata()); + for (Map.Entry entry : indices.entrySet()) { + Index index = entry.getKey(); + final IndexMetadata indexMetadata = currentState.metadata().index(index); + if (indexMetadata == null) { + indices.get(index).markIndexFailed(index, "index not found"); + continue; + } else if (!indexMetadata.isIndexInTier(IndexModule.TieringState.HOT_TO_WARM)) { + indices.get(index).markIndexFailed(index, "index is not in the HOT tier"); + continue; + } + updateIndexMetadataForAcceptedIndex(metadataBuilder, routingTableBuilder, indexMetadata, index); + } + ClusterState updatedState = ClusterState.builder(currentState) + .metadata(metadataBuilder) + .routingTable(routingTableBuilder.build()) + .build(); + + // now, reroute to trigger shard relocation for the dedicated case + updatedState = allocationService.reroute(updatedState, "hot to warm tiering"); + + return updatedState; + } + + @Override + public void onFailure(String source, Exception e) { + // process failed request here and add the retry logic + logger.error( + (Supplier) () -> new ParameterizedMessage("[HotToWarmTiering] failed tiering for indices " + "[{}]", indices), + e + ); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.info("[HotToWarmTiering] Cluster state updated for source " + source); + for (Map.Entry entry : indices.entrySet()) { + if (!entry.getValue().hasIndexFailed(entry.getKey())) { + entry.getValue().markIndexInProgress(entry.getKey()); + } + } + tieringRequests.addToAccepted(tieringRequestContexts); + assert nextStateProcessor != null; + nextStateProcessor.process(clusterState, clusterService, tieringRequests); + } + }); + } +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/RunningTieringStateProcessor.java b/server/src/main/java/org/opensearch/indices/tiering/RunningTieringStateProcessor.java new file mode 100644 index 0000000000000..725907ef690d7 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/RunningTieringStateProcessor.java @@ -0,0 +1,79 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.tiering.TieringRequests; +import org.opensearch.action.admin.indices.tiering.TieringRequestContext; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.index.Index; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RunningTieringStateProcessor extends AbstractTieringStateProcessor { + private static final Logger logger = LogManager.getLogger(RunningTieringStateProcessor.class); + + public RunningTieringStateProcessor(AllocationService allocationService) { + super(IndexTieringState.PENDING_COMPLETION, allocationService); + } + + /** + * Checks if the shard is in the warm tier. + * @param shard shard routing + * @param clusterState current cluster state + * @return true if shard is started on the search node, false otherwise + */ + boolean isShardInWarmTier(final ShardRouting shard, final ClusterState clusterState) { + if (shard.unassigned()) { + return false; + } + final boolean isShardFoundOnSearchNode = clusterState.getNodes().get(shard.currentNodeId()).isSearchNode(); + return shard.started() && isShardFoundOnSearchNode; + } + + @Override + public void process(ClusterState clusterState, ClusterService clusterService, TieringRequests tieringRequests) { + final Map tieredIndices = new HashMap<>(); + for (TieringRequestContext tieringRequestContext : tieringRequests.getAcceptedTieringRequestContexts()) { + List shardRoutings; + for (Index index : tieringRequestContext.filterIndicesByState(IndexTieringState.IN_PROGRESS)) { + if (clusterState.routingTable().hasIndex(index)) { + // Ensure index is not deleted + shardRoutings = clusterState.routingTable().allShards(index.getName()); + } else { + // Index already deleted nothing to do + logger.warn("[HotToWarmTiering] Index [{}] deleted before shard relocation finished", index.getName()); + tieringRequestContext.markIndexFailed(index, "index not found"); + continue; + } + + boolean relocationCompleted = true; + for (ShardRouting shard : shardRoutings) { + if (!isShardInWarmTier(shard, clusterState)) { + relocationCompleted = false; + break; + } + } + if (relocationCompleted) { + logger.debug("[HotToWarmTiering] Shard relocation completed for index [{}]", index.getName()); + tieringRequestContext.markIndexAsPendingComplete(index); + tieredIndices.put(index, tieringRequestContext); + } + } + } + assert nextStateProcessor != null; + nextStateProcessor.process(clusterState, clusterService, tieringRequests); + } +} diff --git a/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java b/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java index 2de50f4d4295d..0b81393d21bd4 100644 --- a/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java +++ b/server/src/main/java/org/opensearch/indices/tiering/TieringRequestValidator.java @@ -22,7 +22,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.DiskThresholdSettings; import org.opensearch.core.index.Index; -import org.opensearch.index.IndexModule; import java.util.HashMap; import java.util.List; @@ -30,8 +29,6 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; - /** * Validator class to validate the tiering requests of the index * @opensearch.experimental @@ -62,10 +59,6 @@ public static TieringValidationResult validateHotToWarm( final TieringValidationResult tieringValidationResult = new TieringValidationResult(concreteIndices); for (Index index : concreteIndices) { - if (!validateHotIndex(currentState, index)) { - tieringValidationResult.addToRejected(index, "index is not in the HOT tier"); - continue; - } if (!validateRemoteStoreIndex(currentState, index)) { tieringValidationResult.addToRejected(index, "index is not backed up by the remote store"); continue; @@ -82,7 +75,7 @@ public static TieringValidationResult validateHotToWarm( validateEligibleNodesCapacity(clusterInfo, currentState, tieringValidationResult); logger.info( - "Successfully accepted indices for tiering are [{}], rejected indices are [{}]", + "[HotToWarmTiering] Successfully accepted indices for tiering are [{}], rejected indices are [{}]", tieringValidationResult.getAcceptedIndices(), tieringValidationResult.getRejectedIndices() ); @@ -119,17 +112,6 @@ static boolean validateRemoteStoreIndex(final ClusterState state, final Index in return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(state.metadata().getIndexSafe(index).getSettings()); } - /** - * Validates that the specified index is in the "hot" tiering state. - * - * @param state the current cluster state - * @param index the index to be validated - * @return true if the index is in the "hot" tiering state, false otherwise - */ - static boolean validateHotIndex(final ClusterState state, final Index index) { - return IndexModule.TieringState.HOT.name().equals(INDEX_TIERING_STATE.get(state.metadata().getIndexSafe(index).getSettings())); - } - /** * Validates the health of the specified index in the current cluster state. * @@ -172,7 +154,7 @@ static void validateDiskThresholdWaterMarkNotBreached( ) { final Map usages = clusterInfo.getNodeLeastAvailableDiskUsages(); if (usages == null) { - logger.trace("skipping monitor as no disk usage information is available"); + logger.trace("[Tiering] skipping monitor as no disk usage information is available"); return; } final Set nodeIds = getEligibleNodes(currentState).stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); diff --git a/server/src/main/java/org/opensearch/indices/tiering/TieringStateProcessor.java b/server/src/main/java/org/opensearch/indices/tiering/TieringStateProcessor.java new file mode 100644 index 0000000000000..f4648e07bc219 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/tiering/TieringStateProcessor.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.opensearch.action.admin.indices.tiering.TieringRequests; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.annotation.ExperimentalApi; + +/** + * Interface for processing tiering state. + * + * @opensearch.experimental + */ +@ExperimentalApi +public interface TieringStateProcessor { + + void process(final ClusterState clusterState, final ClusterService clusterService, final TieringRequests tieringRequests); +} diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 1a9b233b387b2..a163ce92da669 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -179,6 +179,7 @@ import org.opensearch.indices.replication.SegmentReplicationSourceService; import org.opensearch.indices.replication.SegmentReplicationTargetService; import org.opensearch.indices.store.IndicesStore; +import org.opensearch.indices.tiering.HotToWarmTieringService; import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.monitor.fs.FsHealthService; @@ -1224,6 +1225,13 @@ protected Node( remoteClusterStateService ); + final HotToWarmTieringService hotToWarmTieringService = new HotToWarmTieringService( + settings, + clusterService, + clusterModule.getIndexNameExpressionResolver(), + clusterModule.getAllocationService() + ); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor( settings, clusterService::state, @@ -1427,6 +1435,9 @@ protected Node( b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus); b.bind(RestoreService.class).toInstance(restoreService); b.bind(RemoteStoreRestoreService.class).toInstance(remoteStoreRestoreService); + if (FeatureFlags.isEnabled(FeatureFlags.TIERED_REMOTE_INDEX)) { + b.bind(HotToWarmTieringService.class).toInstance(hotToWarmTieringService); + } b.bind(RerouteService.class).toInstance(rerouteService); b.bind(ShardLimitValidator.class).toInstance(shardLimitValidator); b.bind(FsHealthService.class).toInstance(fsHealthService); diff --git a/server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java index 10273366af804..c8880112f8c8e 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/tiering/TransportHotToWarmTieringActionTests.java @@ -109,7 +109,7 @@ public void testNoConcreteIndices() { public void testNoAcceptedIndices() { TieringIndexRequest request = new TieringIndexRequest(TARGET_TIER, "test-idx-*", "idx-*"); HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); - assertFalse(response.isAcknowledged()); + assertTrue(response.isAcknowledged()); assertEquals(2, response.getFailedIndices().size()); for (HotToWarmTieringResponse.IndexResult result : response.getFailedIndices()) { assertEquals("index is not backed up by the remote store", result.getFailureReason()); diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IndexNameExpressionResolverTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IndexNameExpressionResolverTests.java index fda2f411b1994..316c0bee1e614 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IndexNameExpressionResolverTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IndexNameExpressionResolverTests.java @@ -49,6 +49,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.common.Strings; import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexSettings; import org.opensearch.indices.IndexClosedException; @@ -70,6 +71,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING; import static org.opensearch.cluster.metadata.IndexNameExpressionResolver.SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY; import static org.opensearch.common.util.set.Sets.newHashSet; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.hamcrest.Matchers.arrayWithSize; @@ -891,6 +893,44 @@ public void testConcreteIndicesNoIndicesErrorMessageNoExpand() { assertThat(infe.getMessage(), is("no such index [_all] and no indices exist")); } + public void testConcreteIndicesWithHotTier() { + Metadata.Builder mdBuilder = Metadata.builder() + .put( + indexBuilder("test-hot", Settings.builder().put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT.name()).build()) + .state(State.OPEN) + ) + .put( + indexBuilder( + "test-warm", + Settings.builder().put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM.name()).build() + ).state(State.OPEN) + ); + ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); + SearchRequest request = new SearchRequest("test*"); + Index[] indices = indexNameExpressionResolver.concreteIndicesInTier(state, request, IndexModule.TieringState.HOT); + assertEquals(1, indices.length); + assertEquals("test-hot", indices[0].getName()); + } + + public void testConcreteIndicesWithWarmTier() { + Metadata.Builder mdBuilder = Metadata.builder() + .put( + indexBuilder("test-hot", Settings.builder().put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.HOT.name()).build()) + .state(State.OPEN) + ) + .put( + indexBuilder( + "test-warm", + Settings.builder().put(INDEX_TIERING_STATE.getKey(), IndexModule.TieringState.WARM.name()).build() + ).state(State.OPEN) + ); + ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build(); + SearchRequest request = new SearchRequest("test*"); + Index[] indices = indexNameExpressionResolver.concreteIndicesInTier(state, request, IndexModule.TieringState.WARM); + assertEquals(1, indices.length); + assertEquals("test-warm", indices[0].getName()); + } + public void testConcreteIndicesWildcardExpansion() { Metadata.Builder mdBuilder = Metadata.builder() .put(indexBuilder("testXXX").state(State.OPEN)) diff --git a/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java b/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java new file mode 100644 index 0000000000000..a4b6905fd7de9 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/tiering/HotToWarmTieringServiceTests.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.tiering; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexModule; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.index.IndexModule.INDEX_STORE_LOCALITY_SETTING; +import static org.opensearch.index.IndexModule.INDEX_TIERING_STATE; + +public class HotToWarmTieringServiceTests extends OpenSearchSingleNodeTestCase { + + private ClusterService clusterService; + private HotToWarmTieringService hotToWarmTieringService; + + @Before + public void beforeTest() { + clusterService = this.getInstanceFromNode(ClusterService.class); + hotToWarmTieringService = this.getInstanceFromNode(HotToWarmTieringService.class); + } + + public void testUpdateIndexMetadataForAcceptedIndices() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + hotToWarmTieringService.updateIndexMetadataForAcceptedIndex( + metadataBuilder, + clusterService.state().metadata().index(index) + ); + IndexMetadata indexMetadata = metadataBuilder.build().index(indexName); + assertEquals( + IndexModule.DataLocalityType.PARTIAL, + IndexModule.DataLocalityType.getValueOf(indexMetadata.getSettings().get(INDEX_STORE_LOCALITY_SETTING.getKey())) + ); + assertEquals(IndexModule.TieringState.HOT_TO_WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + Map customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNotNull(customData); + assertNotNull(customData.get(HotToWarmTieringService.TIERING_START_TIME)); + } + + public void testUpdateIndexMetadataForSuccessfulIndex() { + String indexName = "test_index"; + createIndex(indexName); + Index index = resolveIndex(indexName); + final Metadata.Builder metadataBuilder = Metadata.builder(clusterService.state().metadata()); + Map customData = new HashMap<>(); + customData.put(HotToWarmTieringService.TIERING_START_TIME, String.valueOf(System.currentTimeMillis())); + metadataBuilder.put(IndexMetadata.builder(metadataBuilder.getSafe(index)).putCustom(IndexMetadata.TIERING_CUSTOM_KEY, customData)); +// hotToWarmTieringService.updateIndexMetadataForTieredIndex(metadataBuilder, clusterService.state().metadata().index(index)); + IndexMetadata indexMetadata = metadataBuilder.build().index(indexName); + assertEquals(IndexModule.TieringState.WARM.name(), indexMetadata.getSettings().get(INDEX_TIERING_STATE.getKey())); + customData = indexMetadata.getCustomData(IndexMetadata.TIERING_CUSTOM_KEY); + assertNull(customData); + } +} diff --git a/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java index 6b6f74353812b..d4436a2f66a34 100644 --- a/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java +++ b/server/src/test/java/org/opensearch/indices/tiering/TieringRequestValidatorTests.java @@ -24,7 +24,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; -import org.opensearch.index.IndexModule; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchTestCase; @@ -42,7 +41,6 @@ import static org.opensearch.indices.tiering.TieringRequestValidator.getTotalAvailableBytesInWarmTier; import static org.opensearch.indices.tiering.TieringRequestValidator.validateDiskThresholdWaterMarkNotBreached; import static org.opensearch.indices.tiering.TieringRequestValidator.validateEligibleNodesCapacity; -import static org.opensearch.indices.tiering.TieringRequestValidator.validateHotIndex; import static org.opensearch.indices.tiering.TieringRequestValidator.validateIndexHealth; import static org.opensearch.indices.tiering.TieringRequestValidator.validateOpenIndex; import static org.opensearch.indices.tiering.TieringRequestValidator.validateRemoteStoreIndex; @@ -92,26 +90,6 @@ public void testDocRepIndex() { assertFalse(validateRemoteStoreIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); } - public void testValidHotIndex() { - String indexUuid = UUID.randomUUID().toString(); - String indexName = "test_index"; - assertTrue(validateHotIndex(buildClusterState(indexName, indexUuid, Settings.EMPTY), new Index(indexName, indexUuid))); - } - - public void testIndexWithOngoingOrCompletedTiering() { - String indexUuid = UUID.randomUUID().toString(); - String indexName = "test_index"; - - IndexModule.TieringState tieringState = randomBoolean() ? IndexModule.TieringState.HOT_TO_WARM : IndexModule.TieringState.WARM; - - ClusterState clusterState = buildClusterState( - indexName, - indexUuid, - Settings.builder().put(IndexModule.INDEX_TIERING_STATE.getKey(), tieringState).build() - ); - assertFalse(validateHotIndex(clusterState, new Index(indexName, indexUuid))); - } - public void testValidateIndexHealth() { String indexUuid = UUID.randomUUID().toString(); String indexName = "test_index";