Skip to content

Commit

Permalink
Adds more tests around tiered spillover cache
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Alfonsi <[email protected]>
  • Loading branch information
Peter Alfonsi committed May 13, 2024
1 parent d47ccb6 commit 727f1a9
Show file tree
Hide file tree
Showing 5 changed files with 362 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

private Settings defaultSettings(String onHeapCacheSizeInBytesOrPecentage) {
static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
Expand All @@ -88,7 +88,7 @@ private Settings defaultSettings(String onHeapCacheSizeInBytesOrPecentage) {
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSizeInBytesOrPecentage
onHeapCacheSizeInBytesOrPercentage
)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,322 @@
package org.opensearch.cache.common.tier;public class TieredSpilloverCacheStatsIT {
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.service.NodeCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStats;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_NAME;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_DISK;
import static org.opensearch.cache.common.tier.TieredSpilloverCacheStatsHolder.TIER_DIMENSION_VALUE_ON_HEAP;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

// Use a single data node to simplify accessing cache stats across different shards.
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class TieredSpilloverCacheStatsIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, TieredSpilloverCacheIT.MockDiskCachePlugin.class);
}

public TieredSpilloverCacheStatsIT(Settings settings) {
super(settings);
}

private final String HEAP_CACHE_SIZE_STRING = "10000B";
private final int HEAP_CACHE_SIZE = 10_000;

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.<Object[]>asList(new Object[] { Settings.builder().put(FeatureFlags.PLUGGABLE_CACHE, "true").build() });
}

public void testMultiLevelAggregation() throws Exception {
internalCluster().startNodes(
1,
Settings.builder()
.put(TieredSpilloverCacheIT.defaultSettings(HEAP_CACHE_SIZE_STRING))
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.SECONDS)
)
.build()
);
Client client = client();
String index1Name = "index1";
String index2Name = "index2";
startIndex(client, index1Name);
startIndex(client, index2Name);

// First search one time to see how big a single value will be
searchIndex(client, index1Name, 0);
// get total stats
long singleSearchSize = getTotalStats(client).getSizeInBytes();
int itemsOnHeap = HEAP_CACHE_SIZE / (int) singleSearchSize;
int itemsOnDisk = 1 + randomInt(30); // The first one we search (to get the size) always goes to disk

// Put some values on heap and disk for each index
int itemsOnHeapIndex1 = randomInt(itemsOnHeap);
int itemsOnHeapIndex2 = itemsOnHeap - itemsOnHeapIndex1;
int itemsOnDiskIndex1 = 1 + randomInt(itemsOnDisk - 1); // The first one we search (to get the size) always goes to disk
int itemsOnDiskIndex2 = itemsOnDisk - itemsOnDiskIndex1;

// The earliest items (0 - itemsOnDisk) are the ones which get evicted to disk
for (int i = 1; i < itemsOnDiskIndex1; i++) { // Start at 1 as 0 has already been searched
searchIndex(client, index1Name, i);
}
for (int i = itemsOnDiskIndex1; i < itemsOnDiskIndex1 + itemsOnDiskIndex2; i++) {
searchIndex(client, index2Name, i);
}
// The remaining items stay on heap
for (int i = itemsOnDisk; i < itemsOnDisk + itemsOnHeapIndex1; i++) {
searchIndex(client, index1Name, i);
}
for (int i = itemsOnDisk + itemsOnHeapIndex1; i < itemsOnDisk + itemsOnHeap; i++) {
searchIndex(client, index2Name, i);
}

// Get some hits on all combinations of indices and tiers
int hitsOnHeapIndex1 = randomInt(itemsOnHeapIndex1);
int hitsOnDiskIndex1 = randomInt(itemsOnDiskIndex1);
int hitsOnHeapIndex2 = randomInt(itemsOnHeapIndex2);
int hitsOnDiskIndex2 = randomInt(itemsOnDiskIndex2);

for (int i = itemsOnDisk; i < itemsOnDisk + hitsOnHeapIndex1; i++) {
// heap hits for index 1
searchIndex(client, index1Name, i);
}
for (int i = itemsOnDisk + itemsOnHeapIndex1; i < itemsOnDisk + itemsOnHeapIndex1 + hitsOnHeapIndex2; i++) {
// heap hits for index 2
searchIndex(client, index2Name, i);
}
for (int i = 0; i < hitsOnDiskIndex1; i++) {
// disk hits for index 1
searchIndex(client, index1Name, i);
}
for (int i = itemsOnDiskIndex1; i < itemsOnDiskIndex1 + hitsOnDiskIndex2; i++) {
// disk hits for index 2
searchIndex(client, index2Name, i);
}

ImmutableCacheStatsHolder allLevelsStatsHolder = getNodeCacheStatsResult(
client,
List.of(IndicesRequestCache.INDEX_DIMENSION_NAME, TIER_DIMENSION_NAME)
);
ImmutableCacheStatsHolder indicesOnlyStatsHolder = getNodeCacheStatsResult(
client,
List.of(IndicesRequestCache.INDEX_DIMENSION_NAME)
);

// Get values for indices alone, assert these match for statsHolders that have additional dimensions vs. a statsHolder that only has
// the indices dimension
ImmutableCacheStats index1ExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnHeapIndex1 + hitsOnDiskIndex1,
itemsOnDiskIndex1 + itemsOnHeapIndex1,
0,
(itemsOnDiskIndex1 + itemsOnHeapIndex1) * singleSearchSize,
itemsOnDiskIndex1 + itemsOnHeapIndex1
)
);
ImmutableCacheStats index2ExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnHeapIndex2 + hitsOnDiskIndex2,
itemsOnDiskIndex2 + itemsOnHeapIndex2,
0,
(itemsOnDiskIndex2 + itemsOnHeapIndex2) * singleSearchSize,
itemsOnDiskIndex2 + itemsOnHeapIndex2
)
);

for (ImmutableCacheStatsHolder statsHolder : List.of(allLevelsStatsHolder, indicesOnlyStatsHolder)) {
assertEquals(index1ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index1Name)));
assertEquals(index2ExpectedStats, statsHolder.getStatsForDimensionValues(List.of(index2Name)));
}

// Get values broken down by indices+tiers
ImmutableCacheStats index1HeapExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnHeapIndex1,
itemsOnHeapIndex1 + itemsOnDiskIndex1 + hitsOnDiskIndex1,
itemsOnDiskIndex1,
itemsOnHeapIndex1 * singleSearchSize,
itemsOnHeapIndex1
)
);
assertEquals(
index1HeapExpectedStats,
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_ON_HEAP))
);

ImmutableCacheStats index2HeapExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnHeapIndex2,
itemsOnHeapIndex2 + itemsOnDiskIndex2 + hitsOnDiskIndex2,
itemsOnDiskIndex2,
itemsOnHeapIndex2 * singleSearchSize,
itemsOnHeapIndex2
)
);
assertEquals(
index2HeapExpectedStats,
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_ON_HEAP))
);

ImmutableCacheStats index1DiskExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnDiskIndex1,
itemsOnHeapIndex1 + itemsOnDiskIndex1,
0,
itemsOnDiskIndex1 * singleSearchSize,
itemsOnDiskIndex1
)
);
assertEquals(
index1DiskExpectedStats,
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index1Name, TIER_DIMENSION_VALUE_DISK))
);

ImmutableCacheStats index2DiskExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnDiskIndex2,
itemsOnHeapIndex2 + itemsOnDiskIndex2,
0,
itemsOnDiskIndex2 * singleSearchSize,
itemsOnDiskIndex2
)
);
assertEquals(
index2DiskExpectedStats,
allLevelsStatsHolder.getStatsForDimensionValues(List.of(index2Name, TIER_DIMENSION_VALUE_DISK))
);

// Get values for tiers alone and check they add correctly across indices
ImmutableCacheStatsHolder tiersOnlyStatsHolder = getNodeCacheStatsResult(client, List.of(TIER_DIMENSION_NAME));
ImmutableCacheStats totalHeapExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnHeapIndex1 + hitsOnHeapIndex2,
itemsOnHeap + itemsOnDisk + hitsOnDiskIndex1 + hitsOnDiskIndex2,
itemsOnDisk,
itemsOnHeap * singleSearchSize,
itemsOnHeap
)
);
ImmutableCacheStats heapStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_ON_HEAP));
assertEquals(totalHeapExpectedStats, heapStats);
ImmutableCacheStats totalDiskExpectedStats = returnNullIfAllZero(
new ImmutableCacheStats(
hitsOnDiskIndex1 + hitsOnDiskIndex2,
itemsOnHeap + itemsOnDisk,
0,
itemsOnDisk * singleSearchSize,
itemsOnDisk
)
);
ImmutableCacheStats diskStats = tiersOnlyStatsHolder.getStatsForDimensionValues(List.of(TIER_DIMENSION_VALUE_DISK));
assertEquals(totalDiskExpectedStats, diskStats);
}

private void startIndex(Client client, String indexName) throws InterruptedException {
assertAcked(
client.admin()
.indices()
.prepareCreate(indexName)
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
)
.get()
);
indexRandom(true, client.prepareIndex(indexName).setSource("k", "hello"));
ensureSearchable(indexName);
}

private ImmutableCacheStats returnNullIfAllZero(ImmutableCacheStats expectedStats) {
// If the randomly chosen numbers are such that the expected stats would be 0, we actually have not interacted with the cache for
// this index.
// In this case, we expect the stats holder to have no stats for this node, and therefore we should get null from
// statsHolder.getStatsForDimensionValues().
// We will not see it in the XContent response.
if (expectedStats.equals(new ImmutableCacheStats(0, 0, 0, 0, 0))) {
return null;
}
return expectedStats;
}

// Duplicated from CacheStatsAPIIndicesRequestCacheIT.java, as we can't add a dependency on server.internalClusterTest

private SearchResponse searchIndex(Client client, String index, int searchSuffix) {
SearchResponse resp = client.prepareSearch(index)
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k", "hello" + String.format("%04d", searchSuffix)))
// pad with zeros so request 0 and request 10 have the same size ("0000" and "0010" instead of "0" and "10")
.get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
return resp;
}

private ImmutableCacheStats getTotalStats(Client client) throws IOException {
ImmutableCacheStatsHolder statsHolder = getNodeCacheStatsResult(client, List.of());
return statsHolder.getStatsForDimensionValues(List.of());
}

private static ImmutableCacheStatsHolder getNodeCacheStatsResult(Client client, List<String> aggregationLevels) throws IOException {
CommonStatsFlags statsFlags = new CommonStatsFlags();
statsFlags.includeAllCacheTypes();
String[] flagsLevels;
if (aggregationLevels == null) {
flagsLevels = null;
} else {
flagsLevels = aggregationLevels.toArray(new String[0]);
}
statsFlags.setLevels(flagsLevels);

NodesStatsResponse nodeStatsResponse = client.admin()
.cluster()
.prepareNodesStats("data:true")
.addMetric(NodesStatsRequest.Metric.CACHE_STATS.metricName())
.setIndices(statsFlags)
.get();
// Can always get the first data node as there's only one in this test suite
assertEquals(1, nodeStatsResponse.getNodes().size());
NodeCacheStats ncs = nodeStatsResponse.getNodes().get(0).getNodeCacheStats();
return ncs.getStatsByCache(CacheType.INDICES_REQUEST_CACHE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ private Function<ICacheKey<K>, Tuple<V, String>> getValueFromTieredCache(boolean
void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification) {
ICacheKey<K> key = notification.getKey();
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
boolean countEvictionTowardsTotal = false; // Don't count this eviction towards the cache's total if it ends up in the disk tier
if (caches.get(diskCache).isEnabled() && wasEvicted && evaluatePolicies(notification.getValue())) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(key, notification.getValue()); // spill over to the disk tier and increment its stats
Expand All @@ -334,21 +335,28 @@ void handleRemovalFromHeapTier(RemovalNotification<ICacheKey<K>, V> notification
// If the value is not going to the disk cache, send this notification to the TSC's removal listener
// as the value is leaving the TSC entirely
removalListener.onRemoval(notification);
countEvictionTowardsTotal = true;
}
updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue());
updateStatsOnRemoval(TIER_DIMENSION_VALUE_ON_HEAP, wasEvicted, key, notification.getValue(), countEvictionTowardsTotal);
}

void handleRemovalFromDiskTier(RemovalNotification<ICacheKey<K>, V> notification) {
// Values removed from the disk tier leave the TSC entirely
removalListener.onRemoval(notification);
boolean wasEvicted = SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason());
updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue());
updateStatsOnRemoval(TIER_DIMENSION_VALUE_DISK, wasEvicted, notification.getKey(), notification.getValue(), true);
}

void updateStatsOnRemoval(String removedFromTierValue, boolean wasEvicted, ICacheKey<K> key, V value) {
void updateStatsOnRemoval(
String removedFromTierValue,
boolean wasEvicted,
ICacheKey<K> key,
V value,
boolean countEvictionTowardsTotal
) {
List<String> dimensionValues = statsHolder.getDimensionsWithTierValue(key.dimensions, removedFromTierValue);
if (wasEvicted) {
statsHolder.incrementEvictions(dimensionValues);
statsHolder.incrementEvictions(dimensionValues, countEvictionTowardsTotal);
}
statsHolder.decrementItems(dimensionValues);
statsHolder.decrementSizeInBytes(dimensionValues, weigher.applyAsLong(key, value));
Expand Down
Loading

0 comments on commit 727f1a9

Please sign in to comment.