-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
Signed-off-by: Neetika Singhal <[email protected]>
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.tiering; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
|
||
import org.opensearch.action.admin.indices.get.GetIndexResponse; | ||
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringAction; | ||
import org.opensearch.action.admin.indices.tiering.HotToWarmTieringResponse; | ||
import org.opensearch.action.admin.indices.tiering.TieringIndexRequest; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.cluster.MockInternalClusterInfoService; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.core.common.unit.ByteSizeUnit; | ||
import org.opensearch.core.common.unit.ByteSizeValue; | ||
import org.opensearch.index.IndexModule; | ||
import org.opensearch.index.query.QueryBuilders; | ||
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
import org.junit.Before; | ||
|
||
import java.util.Map; | ||
|
||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; | ||
import static org.hamcrest.Matchers.equalTo; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
|
||
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class) | ||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false) | ||
// Uncomment the below line to enable trace level logs for this test for better debugging | ||
// @TestLogging(reason = "Getting trace logs from tiering package", value = | ||
// "org.opensearch.tiering:TRACE,org.opensearch.cluster.routing.allocation.decider:TRACE") | ||
public class HotToWarmTieringServiceIT extends TieringBaseIntegTestCase { | ||
|
||
protected static final String TEST_IDX_1 = "test-idx-1"; | ||
protected static final String TEST_IDX_2 = "test-idx-2"; | ||
protected static final int NUM_DOCS_IN_BULK = 10; | ||
private static final long TOTAL_SPACE_BYTES = new ByteSizeValue(1000, ByteSizeUnit.KB).getBytes(); | ||
|
||
@Before | ||
public void setup() { | ||
internalCluster().startClusterManagerOnlyNode(); | ||
} | ||
|
||
// waiting for the recovery pr to be merged in | ||
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/13647") | ||
public void testTieringBasic() { | ||
final int numReplicasIndex = 0; | ||
internalCluster().ensureAtLeastNumDataNodes(1); | ||
final Settings settings = Settings.builder() | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicasIndex) | ||
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.FULL.name()) | ||
.build(); | ||
|
||
String[] indices = new String[] { TEST_IDX_1, TEST_IDX_2 }; | ||
for (String index : indices) { | ||
assertAcked(client().admin().indices().prepareCreate(index).setSettings(settings).get()); | ||
ensureGreen(index); | ||
// Ingesting some docs | ||
indexBulk(index, NUM_DOCS_IN_BULK); | ||
flushAndRefresh(index); | ||
ensureGreen(); | ||
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); | ||
// Asserting that search returns same number of docs as ingested | ||
assertHitCount(searchResponse, NUM_DOCS_IN_BULK); | ||
} | ||
|
||
// Spin up node having search role | ||
internalCluster().ensureAtLeastNumSearchAndDataNodes(1); | ||
|
||
final MockInternalClusterInfoService clusterInfoService = getMockInternalClusterInfoService(); | ||
clusterInfoService.setDiskUsageFunctionAndRefresh( | ||
(discoveryNode, fsInfoPath) -> setDiskUsage(fsInfoPath, TOTAL_SPACE_BYTES, TOTAL_SPACE_BYTES) | ||
); | ||
|
||
TieringIndexRequest request = new TieringIndexRequest(TARGET_WARM_TIER, indices); | ||
request.waitForCompletion(true); | ||
HotToWarmTieringResponse response = client().admin().indices().execute(HotToWarmTieringAction.INSTANCE, request).actionGet(); | ||
assertAcked(response); | ||
assertTrue(response.getFailedIndices().isEmpty()); | ||
assertTrue(response.isAcknowledged()); | ||
ensureGreen(); | ||
for (String index : indices) { | ||
SearchResponse searchResponse = client().prepareSearch(index).setQuery(QueryBuilders.matchAllQuery()).get(); | ||
// Asserting that search returns same number of docs as ingested | ||
assertHitCount(searchResponse, NUM_DOCS_IN_BULK); | ||
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices(index).get(); | ||
assertWarmSettings(getIndexResponse, index); | ||
assertAcked(client().admin().indices().prepareDelete(index).get()); | ||
} | ||
} | ||
|
||
private void assertWarmSettings(GetIndexResponse response, String indexName) { | ||
final Map<String, Settings> settings = response.settings(); | ||
assertThat(settings, notNullValue()); | ||
assertThat(settings.size(), equalTo(1)); | ||
Settings indexSettings = settings.get(indexName); | ||
assertThat(indexSettings, notNullValue()); | ||
assertThat( | ||
indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()), | ||
equalTo(IndexModule.DataLocalityType.PARTIAL.name()) | ||
); | ||
assertThat(indexSettings.get(IndexModule.INDEX_TIERING_STATE.getKey()), equalTo(IndexModule.TieringState.WARM.name())); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.tiering; | ||
|
||
import org.opensearch.action.bulk.BulkRequest; | ||
import org.opensearch.action.bulk.BulkResponse; | ||
import org.opensearch.action.index.IndexRequest; | ||
import org.opensearch.cluster.ClusterInfoService; | ||
import org.opensearch.cluster.MockInternalClusterInfoService; | ||
import org.opensearch.common.UUIDs; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.monitor.fs.FsInfo; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
|
||
import java.nio.file.Path; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; | ||
|
||
public class TieringBaseIntegTestCase extends OpenSearchIntegTestCase { | ||
|
||
protected Path segmentRepoPath; | ||
protected Path translogRepoPath; | ||
Settings extraSettings = Settings.EMPTY; | ||
private final List<String> documentKeys = List.of( | ||
randomAlphaOfLength(5), | ||
randomAlphaOfLength(5), | ||
randomAlphaOfLength(5), | ||
randomAlphaOfLength(5), | ||
randomAlphaOfLength(5) | ||
); | ||
|
||
protected static final String REPOSITORY_NAME = "test-remote-store-repo"; | ||
protected static final String REPOSITORY_2_NAME = "test-remote-store-repo-2"; | ||
protected static final String TARGET_WARM_TIER = "warm"; | ||
|
||
/** | ||
* Disable MockFSIndexStore plugin as it wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory) | ||
* As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory | ||
* | ||
*/ | ||
@Override | ||
protected boolean addMockIndexStorePlugin() { | ||
return false; | ||
} | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); | ||
} | ||
|
||
@Override | ||
protected Settings featureFlagSettings() { | ||
Settings.Builder featureSettings = Settings.builder(); | ||
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true); | ||
return featureSettings.build(); | ||
} | ||
|
||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal) { | ||
if (segmentRepoPath == null || translogRepoPath == null) { | ||
segmentRepoPath = randomRepoPath().toAbsolutePath(); | ||
translogRepoPath = randomRepoPath().toAbsolutePath(); | ||
} | ||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal)) | ||
.put(extraSettings) | ||
.put(remoteStoreClusterSettings(REPOSITORY_NAME, segmentRepoPath, REPOSITORY_2_NAME, translogRepoPath)) | ||
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) | ||
.build(); | ||
} | ||
|
||
protected BulkResponse indexBulk(String indexName, int numDocs) { | ||
BulkRequest bulkRequest = new BulkRequest(); | ||
for (int i = 0; i < numDocs; i++) { | ||
final IndexRequest request = client().prepareIndex(indexName) | ||
.setId(UUIDs.randomBase64UUID()) | ||
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5)) | ||
.request(); | ||
bulkRequest.add(request); | ||
} | ||
return client().bulk(bulkRequest).actionGet(); | ||
} | ||
|
||
protected MockInternalClusterInfoService getMockInternalClusterInfoService() { | ||
return (MockInternalClusterInfoService) internalCluster().getCurrentClusterManagerNodeInstance(ClusterInfoService.class); | ||
} | ||
|
||
protected static FsInfo.Path setDiskUsage(FsInfo.Path original, long totalBytes, long freeBytes) { | ||
return new FsInfo.Path(original.getPath(), original.getMount(), totalBytes, freeBytes, freeBytes); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.tiering; | ||
|
||
import org.opensearch.common.annotation.ExperimentalApi; | ||
import org.opensearch.core.action.ActionListener; | ||
import org.opensearch.core.index.Index; | ||
|
||
import java.util.HashSet; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
/** | ||
* Context class to hold indices to be tiered per request. It also holds | ||
* the listener per request to mark the request as complete once all | ||
* tiering operations are completed. | ||
* | ||
* @opensearch.experimental | ||
*/ | ||
|
||
@ExperimentalApi | ||
public class TieringRequestContext { | ||
private final ActionListener<HotToWarmTieringResponse> actionListener; | ||
private final Set<Index> inProgressIndices; | ||
private final Set<Index> tieredIndices; | ||
private final Set<Index> completedIndices; | ||
private final Map<Index, String> failedIndices; | ||
|
||
public TieringRequestContext( | ||
ActionListener<HotToWarmTieringResponse> actionListener, | ||
Set<Index> acceptedIndices, | ||
Map<Index, String> failedIndices | ||
) { | ||
this.actionListener = actionListener; | ||
Check warning on line 41 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L40-L41
|
||
// by default all the accepted indices are added to the in-progress set | ||
this.inProgressIndices = ConcurrentHashMap.newKeySet(); | ||
inProgressIndices.addAll(acceptedIndices); | ||
this.failedIndices = failedIndices; | ||
this.tieredIndices = new HashSet<>(); | ||
this.completedIndices = new HashSet<>(); | ||
} | ||
Check warning on line 48 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L43-L48
|
||
|
||
public ActionListener<HotToWarmTieringResponse> getListener() { | ||
return actionListener; | ||
Check warning on line 51 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L51
|
||
} | ||
|
||
public Map<Index, String> getFailedIndices() { | ||
return failedIndices; | ||
Check warning on line 55 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L55
|
||
} | ||
|
||
public Set<Index> getInProgressIndices() { | ||
return inProgressIndices; | ||
Check warning on line 59 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L59
|
||
} | ||
|
||
public Set<Index> getCompletedIndices() { | ||
return completedIndices; | ||
Check warning on line 63 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L63
|
||
} | ||
|
||
public Set<Index> getTieredIndices() { | ||
return tieredIndices; | ||
Check warning on line 67 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L67
|
||
} | ||
|
||
public boolean isRequestProcessingComplete() { | ||
return inProgressIndices.isEmpty() && tieredIndices.isEmpty(); | ||
} | ||
|
||
public void addToFailed(Index index, String reason) { | ||
inProgressIndices.remove(index); | ||
failedIndices.put(index, reason); | ||
} | ||
Check warning on line 77 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L75-L77
|
||
|
||
public void addToTiered(Index index) { | ||
inProgressIndices.remove(index); | ||
tieredIndices.add(index); | ||
} | ||
Check warning on line 82 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L80-L82
|
||
|
||
public void addToCompleted(Index index) { | ||
tieredIndices.remove(index); | ||
completedIndices.add(index); | ||
} | ||
Check warning on line 87 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L85-L87
|
||
|
||
@Override | ||
public String toString() { | ||
return "TieringRequestContext{" | ||
Check warning on line 91 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringRequestContext.java#L91
|
||
+ "actionListener=" | ||
+ actionListener | ||
+ ", inProgressIndices=" | ||
+ inProgressIndices | ||
+ ", tieredIndices=" | ||
+ tieredIndices | ||
+ ", completedIndices=" | ||
+ completedIndices | ||
+ ", failedIndices=" | ||
+ failedIndices | ||
+ '}'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.admin.indices.tiering; | ||
|
||
import org.opensearch.cluster.ack.IndicesClusterStateUpdateRequest; | ||
import org.opensearch.common.annotation.ExperimentalApi; | ||
import org.opensearch.core.index.Index; | ||
|
||
import java.util.Map; | ||
|
||
/** | ||
* Cluster state update request that allows tiering for indices | ||
* | ||
* @opensearch.experimental | ||
*/ | ||
@ExperimentalApi | ||
public class TieringUpdateClusterStateRequest extends IndicesClusterStateUpdateRequest<TieringUpdateClusterStateRequest> { | ||
|
||
private final Map<Index, String> rejectedIndices; | ||
private final boolean waitForCompletion; | ||
|
||
public TieringUpdateClusterStateRequest(Map<Index, String> rejectedIndices, boolean waitForCompletion) { | ||
this.rejectedIndices = rejectedIndices; | ||
this.waitForCompletion = waitForCompletion; | ||
} | ||
Check warning on line 31 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java#L28-L31
|
||
|
||
public boolean waitForCompletion() { | ||
return waitForCompletion; | ||
Check warning on line 34 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java#L34
|
||
} | ||
|
||
public Map<Index, String> getRejectedIndices() { | ||
return rejectedIndices; | ||
Check warning on line 38 in server/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java Codecov / codecov/patchserver/src/main/java/org/opensearch/action/admin/indices/tiering/TieringUpdateClusterStateRequest.java#L38
|
||
} | ||
} |