Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic #12975

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow setting KEYSTORE_PASSWORD through env variable ([#12865](https://github.com/opensearch-project/OpenSearch/pull/12865))
- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697))
- [Concurrent Segment Search] Disable concurrent segment search for system indices and throttled requests ([#12954](https://github.com/opensearch-project/OpenSearch/pull/12954))
- [Tiered Caching] Make Indices Request Cache Stale Key Mgmt Threshold setting dynamic ([#12975](https://github.com/opensearch-project/OpenSearch/pull/12975))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -42,16 +43,20 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.opensearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.opensearch.search.aggregations.bucket.histogram.Histogram;
import org.opensearch.search.aggregations.bucket.histogram.Histogram.Bucket;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;
import org.opensearch.test.hamcrest.OpenSearchAssertions;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
Expand All @@ -60,6 +65,9 @@
import java.util.Collection;
import java.util.List;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.search.aggregations.AggregationBuilders.dateRange;
Expand All @@ -69,6 +77,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
public class IndicesRequestCacheIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {
public IndicesRequestCacheIT(Settings settings) {
super(settings);
Expand Down Expand Up @@ -97,7 +106,12 @@ public void testCacheAggs() throws Exception {
.indices()
.prepareCreate("index")
.setMapping("f", "type=date")
.setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true))
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(
Expand Down Expand Up @@ -677,14 +691,89 @@ public void testCacheWithInvalidation() throws Exception {
assertCacheState(client, "index", 1, 2);
}

// when staleness threshold is high, it should NOT clean-up
public void testStaleKeysCleanup_ThresholdUpdates() throws Exception {
Instant start = Instant.now();
long thresholdInMillis = 1_500;
String node = internalCluster().startNode(
Settings.builder()
.put(IndicesRequestCache.SETTING_INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING, 0.90)
.put(IndicesRequestCache.SETTING_INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING, TimeValue.timeValueMillis(thresholdInMillis))
);
String index = "index";
Client client = client(node);
setupIndex(client, index);

// create first cache entry
createCacheEntry(client, index, "hello");
assertCacheState(client, index, 0, 1);
long expectedFirstCachedItemEntrySize = getRequestCacheStats(client, index).getMemorySizeInBytes();
assertTrue(expectedFirstCachedItemEntrySize > 0);

// create second cache entry
createCacheEntry(client, index, "there");
assertCacheState(client, "index", 0, 2);
assertEquals(expectedFirstCachedItemEntrySize * 2, getRequestCacheStats(client, "index").getMemorySizeInBytes());

// force refresh so that it creates 2 stale keys in the cache for the cache cleaner to pick up.
flushAndRefresh("index");
client().prepareIndex("index").setId("1").setSource("k", "good bye");
ensureSearchable("index");

// create another entry
createCacheEntry(client, index, "hello1");
assertCacheState(client, "index", 0, 3);
long cacheSizeBeforeCleanup = getRequestCacheStats(client, "index").getMemorySizeInBytes();
assertTrue(cacheSizeBeforeCleanup > expectedFirstCachedItemEntrySize * 2);
assertEquals(cacheSizeBeforeCleanup, expectedFirstCachedItemEntrySize * 3, 2);

Instant end = Instant.now();
long elapsedTimeMillis = Duration.between(start, end).toMillis();
// if this test is flaky, increase the sleep time.
long sleepTime = (thresholdInMillis - elapsedTimeMillis) + 2_000;
Thread.sleep(sleepTime);

// cache cleaner should have skipped the cleanup
long cacheSizeAfterCleanup = getRequestCacheStats(client, "index").getMemorySizeInBytes();
assertEquals(cacheSizeBeforeCleanup, cacheSizeAfterCleanup);

// Set indices.requests.cache.cleanup.staleness_threshold to "10%"
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(Settings.builder().put(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING.getKey(), 0.10));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

Thread.sleep(1_500);
cacheSizeAfterCleanup = getRequestCacheStats(client, "index").getMemorySizeInBytes();
assertTrue(cacheSizeBeforeCleanup > cacheSizeAfterCleanup);
}

private void setupIndex(Client client, String index) throws Exception {
assertAcked(
client.admin()
.indices()
.prepareCreate(index)
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
.get()
);
indexRandom(true, client.prepareIndex(index).setSource("k", "hello"));
indexRandom(true, client.prepareIndex(index).setSource("k", "there"));
ensureSearchable(index);
}

private void createCacheEntry(Client client, String index, String value) {
SearchResponse resp = client.prepareSearch(index).setRequestCache(true).setQuery(QueryBuilders.termQuery("k", value)).get();
assertSearchResponse(resp);
OpenSearchAssertions.assertAllSuccessful(resp);
}

private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) {
RequestCacheStats requestCacheStats = client.admin()
.indices()
.prepareStats(index)
.setRequestCache(true)
.get()
.getTotal()
.getRequestCache();
RequestCacheStats requestCacheStats = getRequestCacheStats(client, index);
// Check the hit count and miss count together so if they are not
// correct we can see both values
assertEquals(
Expand All @@ -694,4 +783,7 @@ private static void assertCacheState(Client client, String index, long expectedH

}

private static RequestCacheStats getRequestCacheStats(Client client, String index) {
return client.admin().indices().prepareStats(index).setRequestCache(true).get().getTotal().getRequestCache();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesFieldDataCache.INDICES_FIELDDATA_CACHE_SIZE_KEY,
IndicesRequestCache.INDICES_CACHE_QUERY_SIZE,
IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE,
IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING,
IndicesRequestCache.INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING,
HunspellService.HUNSPELL_LAZY_LOAD,
HunspellService.HUNSPELL_IGNORE_CASE,
HunspellService.HUNSPELL_DICTIONARY_OPTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.OpenSearchParseException;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedSupplier;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
Expand Down Expand Up @@ -107,6 +108,10 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
* A setting to enable or disable request caching on an index level. Its dynamic by default
* since we are checking on the cluster state IndexMetadata always.
*/
public static final String SETTING_INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING =
"indices.requests.cache.cleanup.staleness_threshold";
public static final String SETTING_INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING = "indices.requests.cache.cleanup.interval";

public static final Setting<Boolean> INDEX_CACHE_REQUEST_ENABLED_SETTING = Setting.boolSetting(
"index.requests.cache.enable",
true,
Expand All @@ -124,14 +129,15 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
Property.NodeScope
);
public static final Setting<TimeValue> INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING = Setting.positiveTimeSetting(
"indices.requests.cache.cleanup.interval",
SETTING_INDICES_REQUEST_CACHE_CLEAN_INTERVAL_SETTING,
INDICES_CACHE_CLEAN_INTERVAL_SETTING,
Property.NodeScope
);
public static final Setting<String> INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING = new Setting<>(
"indices.requests.cache.cleanup.staleness_threshold",
SETTING_INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING,
"0%",
IndicesRequestCache::validateStalenessSetting,
Property.Dynamic,
Property.NodeScope
);

Expand All @@ -141,6 +147,7 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
private final ByteSizeValue size;
private final TimeValue expire;
private final ICache<Key, BytesReference> cache;
private final ClusterService clusterService;
private final Function<ShardId, Optional<CacheEntity>> cacheEntityLookup;
// pkg-private for testing
final IndicesRequestCacheCleanupManager cacheCleanupManager;
Expand All @@ -149,7 +156,8 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
Settings settings,
Function<ShardId, Optional<CacheEntity>> cacheEntityFunction,
CacheService cacheService,
ThreadPool threadPool
ThreadPool threadPool,
ClusterService clusterService
) {
this.size = INDICES_CACHE_QUERY_SIZE.get(settings);
this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null;
Expand All @@ -161,6 +169,9 @@ public final class IndicesRequestCache implements RemovalListener<IndicesRequest
getStalenessThreshold(settings)
);
this.cacheEntityLookup = cacheEntityFunction;
this.clusterService = clusterService;
this.clusterService.getClusterSettings()
.addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING, this::setStalenessThreshold);
this.cache = cacheService.createCache(
new CacheConfig.Builder<Key, BytesReference>().setSettings(settings)
.setWeigher(weigher)
Expand Down Expand Up @@ -196,6 +207,11 @@ private double getStalenessThreshold(Settings settings) {
return RatioValue.parseRatioValue(threshold).getAsRatio();
}

// pkg-private for testing
void setStalenessThreshold(String threshold) {
this.cacheCleanupManager.updateStalenessThreshold(RatioValue.parseRatioValue(threshold).getAsRatio());
}

void clear(CacheEntity entity) {
cacheCleanupManager.enqueueCleanupKey(new CleanupKey(entity, null));
cacheCleanupManager.forceCleanCache();
Expand Down Expand Up @@ -440,7 +456,7 @@ class IndicesRequestCacheCleanupManager implements Closeable {
private final Set<CleanupKey> keysToClean;
private final ConcurrentMap<ShardId, HashMap<String, Integer>> cleanupKeyToCountMap;
private final AtomicInteger staleKeysCount;
private final double stalenessThreshold;
private volatile double stalenessThreshold;
private final IndicesRequestCacheCleaner cacheCleaner;

IndicesRequestCacheCleanupManager(ThreadPool threadpool, TimeValue cleanInterval, double stalenessThreshold) {
Expand All @@ -452,6 +468,12 @@ class IndicesRequestCacheCleanupManager implements Closeable {
threadpool.schedule(cacheCleaner, cleanInterval, ThreadPool.Names.SAME);
}

void updateStalenessThreshold(double stalenessThreshold) {
double oldStalenessThreshold = this.stalenessThreshold;
this.stalenessThreshold = stalenessThreshold;
logger.debug(" StalenessThreshold is updated from : " + oldStalenessThreshold + " to : " + this.stalenessThreshold);
}

/**
* Enqueue cleanup key.
*
Expand All @@ -475,7 +497,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
* @param cleanupKey the CleanupKey to be updated in the map
*/
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
if (cleanupKey.entity == null) {
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand All @@ -491,7 +513,7 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
}

private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
if (cleanupKey.entity == null) {
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand Down Expand Up @@ -524,7 +546,7 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
* @param cleanupKey the CleanupKey that has been marked for cleanup
*/
private void incrementStaleKeysCount(CleanupKey cleanupKey) {
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
if (cleanupKey.entity == null) {
return;
}
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public IndicesService(
return Optional.empty();
}
return Optional.of(new IndexShardCacheEntity(indexService.getShard(shardId.id())));
}), cacheService, threadPool);
}), cacheService, threadPool, clusterService);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.mapperRegistry = mapperRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
Expand Down
Loading
Loading