Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Tiered Caching] Stats rework (4/4): Adds stats implementation for TieredSpilloverCache #13520

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Tiered Caching] Add dimension-based stats to TieredSpilloverCache ([#13236](https://github.com/opensearch-project/OpenSearch/pull/13236))
- [Tiered Caching] Expose new cache stats API ([#13237](https://github.com/opensearch-project/OpenSearch/pull/13237))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
}

@Override
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

/**
* A tier-aware version of DefaultCacheStatsHolder. Overrides the incrementer functions, as we can't just add the on-heap
* and disk stats to get a total for the cache as a whole. If the disk tier is present, the total hits, size, and entries
* should be the sum of both tiers' values, but the total misses and evictions should be the disk tier's values.
* When the disk tier isn't present, on-heap misses and evictions should contribute to the total.
*
* For example, if the heap tier has 5 misses and the disk tier has 4, the total cache has had 4 misses, not 9.
* The same goes for evictions. Other stats values add normally.
*
* This means for misses and evictions, if we are incrementing for the on-heap tier and the disk tier is present,
* we have to increment only the leaf nodes corresponding to the on-heap tier itself, and not its ancestors,
* which correspond to totals including both tiers. If the disk tier is not present, we do increment the ancestor nodes.
*/
public class TieredSpilloverCacheStatsHolder extends DefaultCacheStatsHolder {

/** Whether the disk cache is currently enabled. */
private boolean diskCacheEnabled;

// Common values used for tier dimension

/** The name for the tier dimension. */
public static final String TIER_DIMENSION_NAME = "tier";

/** Dimension value for on-heap cache, like OpenSearchOnHeapCache.*/
public static final String TIER_DIMENSION_VALUE_ON_HEAP = "on_heap";

/** Dimension value for on-disk cache, like EhcacheDiskCache. */
public static final String TIER_DIMENSION_VALUE_DISK = "disk";

/**
* Constructor for the stats holder.
* @param originalDimensionNames the original dimension names, not including TIER_DIMENSION_NAME
* @param diskCacheEnabled whether the disk tier starts out enabled
*/
public TieredSpilloverCacheStatsHolder(List<String> originalDimensionNames, boolean diskCacheEnabled) {
super(
getDimensionNamesWithTier(originalDimensionNames),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
);
this.diskCacheEnabled = diskCacheEnabled;
}

private static List<String> getDimensionNamesWithTier(List<String> dimensionNames) {
List<String> dimensionNamesWithTier = new ArrayList<>(dimensionNames);
dimensionNamesWithTier.add(TIER_DIMENSION_NAME);
return dimensionNamesWithTier;
}

/**
* Add tierValue to the end of a copy of the initial dimension values, so they can appropriately be used in this stats holder.
*/
List<String> getDimensionsWithTierValue(List<String> initialDimensions, String tierValue) {
List<String> result = new ArrayList<>(initialDimensions);
result.add(tierValue);
return result;
}

private String validateTierDimensionValue(List<String> dimensionValues) {
String tierDimensionValue = dimensionValues.get(dimensionValues.size() - 1);
assert tierDimensionValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) || tierDimensionValue.equals(TIER_DIMENSION_VALUE_DISK)
: "Invalid tier dimension value";
return tierDimensionValue;
}

@Override
public void incrementHits(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Hits from either tier should be included in the total values.
super.incrementHits(dimensionValues);
}

@Override
public void incrementMisses(List<String> dimensionValues) {
final String tierValue = validateTierDimensionValue(dimensionValues);

// If the disk tier is present, only misses from the disk tier should be included in total values.
Consumer<Node> missIncrementer = (node) -> {
if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) {
// If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent
// nodes
if (node.isAtLowestLevel()) {
node.incrementMisses();
}
} else {
// If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents
node.incrementMisses();
}
};
internalIncrement(dimensionValues, missIncrementer, true);
}

@Override
public void incrementEvictions(List<String> dimensionValues) {
final String tierValue = validateTierDimensionValue(dimensionValues);

// If the disk tier is present, only evictions from the disk tier should be included in total values.
Consumer<DefaultCacheStatsHolder.Node> evictionsIncrementer = (node) -> {
if (tierValue.equals(TIER_DIMENSION_VALUE_ON_HEAP) && diskCacheEnabled) {
// If on-heap tier, increment only the leaf node corresponding to the on heap values; not the total values in its parent
// nodes
if (node.isAtLowestLevel()) {
node.incrementEvictions();
}
} else {
// If disk tier, or on-heap tier with a disabled disk tier, increment the leaf node and its parents
node.incrementEvictions();
}
};
internalIncrement(dimensionValues, evictionsIncrementer, true);
}

@Override
public void incrementSizeInBytes(List<String> dimensionValues, long amountBytes) {
validateTierDimensionValue(dimensionValues);
// Size from either tier should be included in the total values.
super.incrementSizeInBytes(dimensionValues, amountBytes);
}

// For decrements, we should not create nodes if they are absent. This protects us from erroneously decrementing values for keys
// which have been entirely deleted, for example in an async removal listener.
@Override
public void decrementSizeInBytes(List<String> dimensionValues, long amountBytes) {
validateTierDimensionValue(dimensionValues);
// Size from either tier should be included in the total values.
super.decrementSizeInBytes(dimensionValues, amountBytes);
}

@Override
public void incrementItems(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Entries from either tier should be included in the total values.
super.incrementItems(dimensionValues);
}

@Override
public void decrementItems(List<String> dimensionValues) {
validateTierDimensionValue(dimensionValues);
// Entries from either tier should be included in the total values.
super.decrementItems(dimensionValues);
}

void setDiskCacheEnabled(boolean diskCacheEnabled) {
this.diskCacheEnabled = diskCacheEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.RemovalReason;
import org.opensearch.common.cache.serializer.Serializer;
import org.opensearch.common.cache.stats.CacheStatsHolder;
import org.opensearch.common.cache.stats.DefaultCacheStatsHolder;
import org.opensearch.common.cache.stats.ImmutableCacheStatsHolder;
import org.opensearch.common.cache.stats.NoopCacheStatsHolder;
import org.opensearch.common.cache.store.builders.ICacheBuilder;
import org.opensearch.common.cache.store.config.CacheConfig;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -32,12 +36,19 @@ public class MockDiskCache<K, V> implements ICache<K, V> {
long delay;

private final RemovalListener<ICacheKey<K>, V> removalListener;
private final CacheStatsHolder statsHolder; // Only update for number of entries; this is only used to test statsTrackingEnabled logic
// in TSC

public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener) {
public MockDiskCache(int maxSize, long delay, RemovalListener<ICacheKey<K>, V> removalListener, boolean statsTrackingEnabled) {
this.maxSize = maxSize;
this.delay = delay;
this.removalListener = removalListener;
this.cache = new ConcurrentHashMap<ICacheKey<K>, V>();
if (statsTrackingEnabled) {
this.statsHolder = new DefaultCacheStatsHolder(List.of(), "mock_disk_cache");
} else {
this.statsHolder = NoopCacheStatsHolder.getInstance();
}
}

@Override
Expand All @@ -50,13 +61,15 @@ public V get(ICacheKey<K> key) {
public void put(ICacheKey<K> key, V value) {
if (this.cache.size() >= maxSize) { // For simplification
this.removalListener.onRemoval(new RemovalNotification<>(key, value, RemovalReason.EVICTED));
this.statsHolder.decrementItems(List.of());
}
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.cache.put(key, value);
this.statsHolder.incrementItems(List.of());
}

@Override
Expand All @@ -73,6 +86,7 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>

@Override
public void invalidate(ICacheKey<K> key) {
removalListener.onRemoval(new RemovalNotification<>(key, cache.get(key), RemovalReason.INVALIDATED));
this.cache.remove(key);
}

Expand All @@ -96,7 +110,9 @@ public void refresh() {}

@Override
public ImmutableCacheStatsHolder stats() {
return null;
// To allow testing of statsTrackingEnabled logic in TSC, return a dummy ImmutableCacheStatsHolder with the
// right number of entries, unless statsTrackingEnabled is false
return statsHolder.getImmutableCacheStatsHolder(null);
}

@Override
Expand All @@ -114,10 +130,12 @@ public static class MockDiskCacheFactory implements Factory {
public static final String NAME = "mockDiskCache";
final long delay;
final int maxSize;
final boolean statsTrackingEnabled;

public MockDiskCacheFactory(long delay, int maxSize) {
public MockDiskCacheFactory(long delay, int maxSize, boolean statsTrackingEnabled) {
this.delay = delay;
this.maxSize = maxSize;
this.statsTrackingEnabled = statsTrackingEnabled;
}

@Override
Expand All @@ -128,6 +146,7 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
.setMaxSize(maxSize)
.setDeliberateDelay(delay)
.setRemovalListener(config.getRemovalListener())
.setStatsTrackingEnabled(config.getStatsTrackingEnabled())
.build();
}

Expand All @@ -146,7 +165,7 @@ public static class Builder<K, V> extends ICacheBuilder<K, V> {

@Override
public ICache<K, V> build() {
return new MockDiskCache<K, V>(this.maxSize, this.delay, this.getRemovalListener());
return new MockDiskCache<K, V>(this.maxSize, this.delay, this.getRemovalListener(), getStatsTrackingEnabled());
}

public Builder<K, V> setMaxSize(int maxSize) {
Expand Down
Loading
Loading