-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds more tests around tiered spillover cache
Signed-off-by: Peter Alfonsi <[email protected]>
- Loading branch information
Peter Alfonsi
committed
May 13, 2024
1 parent
d47ccb6
commit 5192014
Showing
5 changed files
with
357 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
322 changes: 321 additions & 1 deletion
322
...nternalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheStatsIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,322 @@ | ||
package org.opensearch.cache.common.tier;public class TieredSpilloverCacheStatsIT { | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.cache.common.tier; | ||
|
||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; | ||
|
||
import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; | ||
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; | ||
import org.opensearch.action.admin.indices.stats.CommonStatsFlags; | ||
import org.opensearch.action.search.SearchResponse; | ||
import org.opensearch.client.Client; | ||
import org.opensearch.cluster.metadata.IndexMetadata; | ||
import org.opensearch.common.cache.CacheType; | ||
import org.opensearch.common.cache.service.NodeCacheStats; | ||
import org.opensearch.common.cache.stats.ImmutableCacheStats; | ||
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.common.util.FeatureFlags; | ||
import org.opensearch.index.query.QueryBuilders; | ||
import org.opensearch.indices.IndicesRequestCache; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.test.OpenSearchIntegTestCase; | ||
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase; | ||
import org.opensearch.test.hamcrest.OpenSearchAssertions; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME; | ||
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK; | ||
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; | ||
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; | ||
|
||
// Use a single data node to simplify accessing cache stats across different shards. | ||
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class TieredSpilloverCacheStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase { | ||
@Override | ||
protected Collection<Class<? extends Plugin>> nodePlugins() { | ||
return Arrays.asList(TieredSpilloverCachePlugin.class, TieredSpilloverCacheIT.MockDiskCachePlugin.class); | ||
} | ||
|
||
public TieredSpilloverCacheStatsIT(Settings settings) { | ||
super(settings); | ||
} | ||
|
||
private final String HEAP_CACHE_SIZE_STRING = "10000B"; | ||
private final int HEAP_CACHE_SIZE = 10_000; | ||
|
||
@ParametersFactory | ||
public static Collection<Object[]> parameters() { | ||
return Arrays.<Object[]>asList(new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() }); | ||
} | ||
|
||
public void testMultiLevelAggregation() throws Exception { | ||
internalCluster().startNodes( | ||
1, | ||
Settings.builder() | ||
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING)) | ||
.put( | ||
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), | ||
new TimeValue(0, TimeUnit.SECONDS) | ||
) | ||
.build() | ||
); | ||
Client client = client(); | ||
String index1Name = "index1"; | ||
String index2Name = "index2"; | ||
startIndex(client, index1Name); | ||
startIndex(client, index2Name); | ||
|
||
// First search one time to see how big a single value will be | ||
searchIndex(client, index1Name, 0); | ||
// get total stats | ||
long singleSearchSize = getTotalStats(client).getSizeInBytes(); | ||
int itemsOnHeap = HEAP_CACHE_SIZE / (int) singleSearchSize; | ||
int itemsOnDisk = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk | ||
|
||
// Put some values on heap and disk for each index | ||
int itemsOnHeapIndex1 = randomInt(itemsOnHeap); | ||
int itemsOnHeapIndex2 = itemsOnHeap - itemsOnHeapIndex1; | ||
int itemsOnDiskIndex1 = 1 + randomInt(itemsOnDisk - 1); // The first one we search (to get the size) always goes to disk | ||
int itemsOnDiskIndex2 = itemsOnDisk - itemsOnDiskIndex1; | ||
|
||
// The earliest items (0 - itemsOnDisk) are the ones which get evicted to disk | ||
for (int i = 1; i < itemsOnDiskIndex1; i++) { // Start at 1 as 0 has already been searched | ||
searchIndex(client, index1Name, i); | ||
} | ||
for (int i = itemsOnDiskIndex1; i < itemsOnDiskIndex1 + itemsOnDiskIndex2; i++) { | ||
searchIndex(client, index2Name, i); | ||
} | ||
// The remaining items stay on heap | ||
for (int i = itemsOnDisk; i < itemsOnDisk + itemsOnHeapIndex1; i++) { | ||
searchIndex(client, index1Name, i); | ||
} | ||
for (int i = itemsOnDisk + itemsOnHeapIndex1; i < itemsOnDisk + itemsOnHeap; i++) { | ||
searchIndex(client, index2Name, i); | ||
} | ||
|
||
// Get some hits on all combinations of indices and tiers | ||
int hitsOnHeapIndex1 = randomInt(itemsOnHeapIndex1); | ||
int hitsOnDiskIndex1 = randomInt(itemsOnDiskIndex1); | ||
int hitsOnHeapIndex2 = randomInt(itemsOnHeapIndex2); | ||
int hitsOnDiskIndex2 = randomInt(itemsOnDiskIndex2); | ||
|
||
for (int i = itemsOnDisk; i < itemsOnDisk + hitsOnHeapIndex1; i++) { | ||
// heap hits for index 1 | ||
searchIndex(client, index1Name, i); | ||
} | ||
for (int i = itemsOnDisk + itemsOnHeapIndex1; i < itemsOnDisk + itemsOnHeapIndex1 + hitsOnHeapIndex2; i++) { | ||
// heap hits for index 2 | ||
searchIndex(client, index2Name, i); | ||
} | ||
for (int i = 0; i < hitsOnDiskIndex1; i++) { | ||
// disk hits for index 1 | ||
searchIndex(client, index1Name, i); | ||
} | ||
for (int i = itemsOnDiskIndex1; i < itemsOnDiskIndex1 + hitsOnDiskIndex2; i++) { | ||
// disk hits for index 2 | ||
searchIndex(client, index2Name, i); | ||
} | ||
|
||
ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult( | ||
client, | ||
List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME) | ||
); | ||
ImmutableCacheStatsHolder indicesOnlyStatsHolder = getNodeCacheStatsResult( | ||
client, | ||
List.of(IndicesRequestCache.INDEX_DIMENSION_NAME) | ||
); | ||
|
||
// Get values for indices alone, assert these match for statsHolders that have additional dimensions vs. a statsHolder that only has | ||
// the indices dimension | ||
ImmutableCacheStats index1ExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnHeapIndex1 + hitsOnDiskIndex1, | ||
itemsOnDiskIndex1 + itemsOnHeapIndex1, | ||
0, | ||
(itemsOnDiskIndex1 + itemsOnHeapIndex1) * singleSearchSize, | ||
itemsOnDiskIndex1 + itemsOnHeapIndex1 | ||
) | ||
); | ||
ImmutableCacheStats index2ExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnHeapIndex2 + hitsOnDiskIndex2, | ||
itemsOnDiskIndex2 + itemsOnHeapIndex2, | ||
0, | ||
(itemsOnDiskIndex2 + itemsOnHeapIndex2) * singleSearchSize, | ||
itemsOnDiskIndex2 + itemsOnHeapIndex2 | ||
) | ||
); | ||
|
||
for (ImmutableCacheStatsHolder statsHolder : List.of(allLevelsStatsHolder, indicesOnlyStatsHolder)) { | ||
assertEquals(index1ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index1Name))); | ||
assertEquals(index2ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index2Name))); | ||
} | ||
|
||
// Get values broken down by indices+tiers | ||
ImmutableCacheStats index1HeapExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnHeapIndex1, | ||
itemsOnHeapIndex1 + itemsOnDiskIndex1 + hitsOnDiskIndex1, | ||
itemsOnDiskIndex1, | ||
itemsOnHeapIndex1 * singleSearchSize, | ||
itemsOnHeapIndex1 | ||
) | ||
); | ||
assertEquals( | ||
index1HeapExpectedStats, | ||
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_ON_HEAP)) | ||
); | ||
|
||
ImmutableCacheStats index2HeapExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnHeapIndex2, | ||
itemsOnHeapIndex2 + itemsOnDiskIndex2 + hitsOnDiskIndex2, | ||
itemsOnDiskIndex2, | ||
itemsOnHeapIndex2 * singleSearchSize, | ||
itemsOnHeapIndex2 | ||
) | ||
); | ||
assertEquals( | ||
index2HeapExpectedStats, | ||
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_ON_HEAP)) | ||
); | ||
|
||
ImmutableCacheStats index1DiskExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnDiskIndex1, | ||
itemsOnHeapIndex1 + itemsOnDiskIndex1, | ||
0, | ||
itemsOnDiskIndex1 * singleSearchSize, | ||
itemsOnDiskIndex1 | ||
) | ||
); | ||
assertEquals( | ||
index1DiskExpectedStats, | ||
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_DISK)) | ||
); | ||
|
||
ImmutableCacheStats index2DiskExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnDiskIndex2, | ||
itemsOnHeapIndex2 + itemsOnDiskIndex2, | ||
0, | ||
itemsOnDiskIndex2 * singleSearchSize, | ||
itemsOnDiskIndex2 | ||
) | ||
); | ||
assertEquals( | ||
index2DiskExpectedStats, | ||
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_DISK)) | ||
); | ||
|
||
// Get values for tiers alone and check they add correctly across indices | ||
ImmutableCacheStatsHolder tiersOnlyStatsHolder = getNodeCacheStatsResult(client, List.of(TIER_DIMENSION_NAME)); | ||
ImmutableCacheStats totalHeapExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnHeapIndex1 + hitsOnHeapIndex2, | ||
itemsOnHeap + itemsOnDisk + hitsOnDiskIndex1 + hitsOnDiskIndex2, | ||
itemsOnDisk, | ||
itemsOnHeap * singleSearchSize, | ||
itemsOnHeap | ||
) | ||
); | ||
ImmutableCacheStats heapStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_ON_HEAP)); | ||
assertEquals(totalHeapExpectedStats, heapStats); | ||
ImmutableCacheStats totalDiskExpectedStats = returnNullIfAllZero( | ||
new ImmutableCacheStats( | ||
hitsOnDiskIndex1 + hitsOnDiskIndex2, | ||
itemsOnHeap + itemsOnDisk, | ||
0, | ||
itemsOnDisk * singleSearchSize, | ||
itemsOnDisk | ||
) | ||
); | ||
ImmutableCacheStats diskStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_DISK)); | ||
assertEquals(totalDiskExpectedStats, diskStats); | ||
} | ||
|
||
private void startIndex(Client client, String indexName) throws InterruptedException { | ||
assertAcked( | ||
client.admin() | ||
.indices() | ||
.prepareCreate(indexName) | ||
.setMapping("k", "type=keyword") | ||
.setSettings( | ||
Settings.builder() | ||
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) | ||
.build() | ||
) | ||
.get() | ||
); | ||
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello")); | ||
ensureSearchable(indexName); | ||
} | ||
|
||
private ImmutableCacheStats returnNullIfAllZero(ImmutableCacheStats expectedStats) { | ||
// If the randomly chosen numbers are such that the expected stats would be 0, we actually have not interacted with the cache for | ||
// this index. | ||
// In this case, we expect the stats holder to have no stats for this node, and therefore we should get null from | ||
// statsHolder.getStatsForDimensionValues(). | ||
// We will not see it in the XContent response. | ||
if (expectedStats.equals(new ImmutableCacheStats(0, 0, 0, 0, 0))) { | ||
return null; | ||
} | ||
return expectedStats; | ||
} | ||
|
||
// Duplicated from CacheStatsAPIIndicesRequestCacheIT.java, as we can't add a dependency on server.internalClusterTest | ||
|
||
private SearchResponse searchIndex(Client client, String index, int searchSuffix) { | ||
SearchResponse resp = client.prepareSearch(index) | ||
.setRequestCache(true) | ||
.setQuery(QueryBuilders.termQuery("k", "hello" + String.format("%04d", searchSuffix))) | ||
// pad with zeros so request 0 and request 10 have the same size ("0000" and "0010" instead of "0" and "10") | ||
.get(); | ||
assertSearchResponse(resp); | ||
OpenSearchAssertions.assertAllSuccessful(resp); | ||
return resp; | ||
} | ||
|
||
private ImmutableCacheStats getTotalStats(Client client) throws IOException { | ||
ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of()); | ||
return statsHolder.getStatsForDimensionValues(List.of()); | ||
} | ||
|
||
private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client, List<String> aggregationLevels) throws IOException { | ||
CommonStatsFlags statsFlags = new CommonStatsFlags(); | ||
statsFlags.includeAllCacheTypes(); | ||
String[] flagsLevels; | ||
if (aggregationLevels == null) { | ||
flagsLevels = null; | ||
} else { | ||
flagsLevels = aggregationLevels.toArray(new String[0]); | ||
} | ||
statsFlags.setLevels(flagsLevels); | ||
|
||
NodesStatsResponse nodeStatsResponse = client.admin() | ||
.cluster() | ||
.prepareNodesStats("data:true") | ||
.addMetric(NodesStatsRequest.Metric.CACHE_STATS.metricName()) | ||
.setIndices(statsFlags) | ||
.get(); | ||
// Can always get the first data node as there's only one in this test suite | ||
assertEquals(1, nodeStatsResponse.getNodes().size()); | ||
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats(); | ||
return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.