Skip to content

Commit

Permalink
HotToWarmTieringService changes to tier shards
Browse files Browse the repository at this point in the history
Signed-off-by: Neetika Singhal <[email protected]>
  • Loading branch information
neetikasinghal committed Aug 15, 2024
1 parent 2311195 commit 9c42d55
Show file tree
Hide file tree
Showing 25 changed files with 1,459 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
- Add slice execution listeners to SearchOperationListener interface ([#15153](https://github.com/opensearch-project/OpenSearch/pull/15153))
- HotToWarmTieringService changes to tier shards ([#14891](https://github.com/opensearch-project/OpenSearch/pull/14891))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tiering;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction;
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse;
import org.opensearch.action.admin.indices.tiering.TieringIndexRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.MockInternalClusterInfoService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.util.Map;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from tiering package", value =
// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE")
public class HotToWarmTieringServiceIT extends TieringBaseIntegTestCase {

protected static final String TEST_IDX_1 = "test-idx-1";
protected static final String TEST_IDX_2 = "test-idx-2";
protected static final int NUM_DOCS_IN_BULK = 10;
private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes();

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
}

// waiting for the recovery pr to be merged in
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13647")
public void testTieringBasic() {
final int numReplicasIndex = 0;
internalCluster().ensureAtLeastNumDataNodes(1);
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name())
.build();

String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 };
for (String index : indices) {
assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get());
ensureGreen(index);
// Ingesting some docs
indexBulk(index, NUM_DOCS_IN_BULK);
flushAndRefresh(index);
ensureGreen();
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
}

// Spin up node having search role
internalCluster().ensureAtLeastNumSearchAndDataNodes(1);

final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService();
clusterInfoService.setDiskUsageFunctionAndRefresh(
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES)
);

TieringIndexRequest request = new TieringIndexRequest(TARGET_WARM_TIER, indices);
request.waitForCompletion(true);
HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet();
assertAcked(response);
assertTrue(response.getFailedIndices().isEmpty());
assertTrue(response.isAcknowledged());
ensureGreen();
for (String index : indices) {
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get();
assertWarmSettings(getIndexResponse, index);
assertAcked(client().admin().indices().prepareDelete(index).get());
}
}

private void assertWarmSettings(GetIndexResponse response, String indexName) {
final Map<String, Settings> settings = response.settings();
assertThat(settings, notNullValue());
assertThat(settings.size(), equalTo(1));
Settings indexSettings = settings.get(indexName);
assertThat(indexSettings, notNullValue());
assertThat(
indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()),
equalTo(IndexModule.DataLocalityType.PARTIAL.name())
);
assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tiering;

import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.cluster.ClusterInfoService;
import org.opensearch.cluster.MockInternalClusterInfoService;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;

public class TieringBaseIntegTestCase extends OpenSearchIntegTestCase {

protected Path segmentRepoPath;
protected Path translogRepoPath;
Settings extraSettings = Settings.EMPTY;
private final List<String> documentKeys = List.of(
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5),
randomAlphaOfLength(5)
);

protected static final String REPOSITORY_NAME = "test-remote-store-repo";
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2";
protected static final String TARGET_WARM_TIER = "warm";

/**
* Disable MockFSIndexStore plugin as it wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
* As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
return featureSettings.build();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (segmentRepoPath == null || translogRepoPath == null) {
segmentRepoPath = randomRepoPath().toAbsolutePath();
translogRepoPath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(extraSettings)
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath))
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true)
.build();
}

protected BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
final IndexRequest request = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.request();
bulkRequest.add(request);
}
return client().bulk(bulkRequest).actionGet();
}

protected MockInternalClusterInfoService getMockInternalClusterInfoService() {
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class);
}

protected static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) {
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.tiering;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.indices.tiering.IndexTieringState;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Context class to hold indices to be tiered per request. It also holds
* the listener per request to mark the request as complete once all
* tiering operations are completed.
*
* @opensearch.experimental
*/

@ExperimentalApi
public class TieringRequestContext {
private final ActionListener<HotToWarmTieringResponse> actionListener;
private final Map<Index, IndexTieringInfo> indexTieringStatusMap;

public TieringRequestContext(
ActionListener<HotToWarmTieringResponse> actionListener,
Set<Index> acceptedIndices,
Map<Index, String> failedIndices
) {
this.actionListener = actionListener;
indexTieringStatusMap = new ConcurrentHashMap<>();
for (Index index : acceptedIndices) {
indexTieringStatusMap.put(index, new IndexTieringInfo());
}
for (Map.Entry<Index, String> entry : failedIndices.entrySet()) {
indexTieringStatusMap.put(entry.getKey(), new IndexTieringInfo(IndexTieringState.FAILED, entry.getValue()));
}
}

public ActionListener<HotToWarmTieringResponse> getListener() {
return actionListener;
}

public boolean hasIndex(Index index) {
return indexTieringStatusMap.containsKey(index);
}

public Map<Index, String> getFailedIndices() {
Map<Index, String> failedIndicesMap = new HashMap<>();
for (Index index : filterIndicesByState(IndexTieringState.FAILED)) {
failedIndicesMap.put(index, indexTieringStatusMap.get(index).getReason());
}
return failedIndicesMap;
}

public boolean isRequestProcessingComplete() {
return filterIndicesByState(IndexTieringState.COMPLETED).size() + filterIndicesByState(IndexTieringState.FAILED)
.size() == indexTieringStatusMap.size();
}

public List<Index> filterIndicesByState(IndexTieringState state) {
return indexTieringStatusMap.keySet()
.stream()
.filter(indexTieringInfo -> indexTieringStatusMap.get(indexTieringInfo).getState() == state)
.collect(Collectors.toList());
}

public List<Index> getIndicesPendingTiering() {
return indexTieringStatusMap.keySet()
.stream()
.filter(indexTieringInfo -> indexTieringStatusMap.get(indexTieringInfo).getState() == IndexTieringState.PENDING_START)
.collect(Collectors.toList());
}

public void markIndexFailed(Index index, String reason) {
indexTieringStatusMap.get(index).markFailed(reason);
}

public void markIndexInProgress(Index index) {
indexTieringStatusMap.get(index).markInProgress();
}

public void markIndexAsPendingComplete(Index index) {
indexTieringStatusMap.get(index).markAsPendingComplete();
}

public void markIndexAsCompleted(Index index) {
indexTieringStatusMap.get(index).markCompleted();
}

@Override
public String toString() {
return "TieringRequestContext{" + "actionListener=" + actionListener + ", indexTieringStatusMap=" + indexTieringStatusMap + '}';
}

/**
* Represents info of a tiering index
*
* @opensearch.experimental
*/
@ExperimentalApi
public static class IndexTieringInfo {
private IndexTieringState state;
private String reason;

public IndexTieringInfo() {
this.state = IndexTieringState.PENDING_START;
}

public IndexTieringInfo(IndexTieringState state, String reason) {
this.state = state;
this.reason = reason;
}

public IndexTieringState getState() {
return state;
}
public void markInProgress() {
this.state = IndexTieringState.IN_PROGRESS;
}

public void markAsPendingComplete() {
this.state = IndexTieringState.PENDING_COMPLETION;
}

public void markCompleted() {
this.state = IndexTieringState.COMPLETED;
}

public void markFailed(String reason) {
this.state = IndexTieringState.FAILED;
this.reason = reason;
}

public String getReason() {
return reason;
}

@Override
public String toString() {
return "IndexTieringInfo{" +
"state=" + state +
", reason='" + reason + '\'' +
'}';
}
}
}
Loading

0 comments on commit 9c42d55

Please sign in to comment.