Skip to content

Commit

Permalink
Adds cluster setting to allow caching requests with size>0 in request…
Browse files Browse the repository at this point in the history
… cache (opensearch-project#16484)

* Add cluster setting to allow size>0 in request cache

Signed-off-by: Peter Alfonsi <[email protected]>

* Add to changelog

Signed-off-by: Peter Alfonsi <[email protected]>

* addressed dbwiddis's comments

Signed-off-by: Peter Alfonsi <[email protected]>

* make canCacheSizeNonzeroRequests volatile

Signed-off-by: Peter Alfonsi <[email protected]>

* fix changelog merge

Signed-off-by: Peter Alfonsi <[email protected]>

* Changed setting name

Signed-off-by: Peter Alfonsi <[email protected]>

* more renaming

Signed-off-by: Peter Alfonsi <[email protected]>

* fix spotless check

Signed-off-by: Peter Alfonsi <[email protected]>

* rerun gradle check

Signed-off-by: Peter Alfonsi <[email protected]>

---------

Signed-off-by: Peter Alfonsi <[email protected]>
Signed-off-by: Peter Alfonsi <[email protected]>
Co-authored-by: Peter Alfonsi <[email protected]>
  • Loading branch information
peteralfonsi and Peter Alfonsi authored Nov 1, 2024
1 parent 80ca32f commit 0363aa7
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))
- Switch from `buildSrc/version.properties` to Gradle version catalog (`gradle/libs.versions.toml`) to enable dependabot to perform automated upgrades on common libs ([#16284](https://github.com/opensearch-project/OpenSearch/pull/16284))
- Add dynamic setting allowing size > 0 requests to be cached in the request cache ([#16483](https://github.com/opensearch-project/OpenSearch/pull/16483/files))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.25.1 to 12.27.1 ([#16521](https://github.com/opensearch-project/OpenSearch/pull/16521))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse;
Expand Down Expand Up @@ -89,6 +90,7 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_ENABLE_FOR_ALL_REQUESTS_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.search.aggregations.AggregationBuilders.dateRange;
Expand Down Expand Up @@ -579,6 +581,22 @@ public void testCanCache() throws Exception {
OpenSearchAssertions.assertAllSuccessful(r4);
assertThat(r4.getHits().getTotalHits().value, equalTo(7L));
assertCacheState(client, index, 0, 4);

// If size > 0 we should cache if this is enabled via cluster setting
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(INDICES_REQUEST_CACHE_ENABLE_FOR_ALL_REQUESTS_SETTING.getKey(), true)
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

final SearchResponse r7 = client.prepareSearch(index)
.setSearchType(SearchType.QUERY_THEN_FETCH)
.setSize(1)
.setQuery(QueryBuilders.rangeQuery("s").gte("2016-03-22").lte("2016-03-26"))
.get();
OpenSearchAssertions.assertAllSuccessful(r7);
assertThat(r7.getHits().getTotalHits().value, equalTo(5L));
assertCacheState(client, index, 0, 6);
}

public void testCacheWithFilteredAlias() throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesRequestCache.INDICES_CACHE_QUERY_EXPIRE,
IndicesRequestCache.INDICES_REQUEST_CACHE_CLEANUP_INTERVAL_SETTING,
IndicesRequestCache.INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING,
IndicesRequestCache.INDICES_REQUEST_CACHE_ENABLE_FOR_ALL_REQUESTS_SETTING,
HunspellService.HUNSPELL_LAZY_LOAD,
HunspellService.HUNSPELL_IGNORE_CASE,
HunspellService.HUNSPELL_DICTIONARY_OPTIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ public final class IndicesRequestCache implements RemovalListener<ICacheKey<Indi
Property.NodeScope
);

/**
* If enabled, allows caching all cacheable queries. For now, this means also allowing size > 0 queries.
* If enabled, fundamentally non-cacheable queries like DFS queries, queries using the `now` keyword, and
* scroll requests are still not cached.
*/
public static final Setting<Boolean> INDICES_REQUEST_CACHE_ENABLE_FOR_ALL_REQUESTS_SETTING = Setting.boolSetting(
"indices.requests.cache.enable_for_all_requests",
false,
Property.NodeScope,
Property.Dynamic
);

private final static long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Key.class);

private final ConcurrentMap<CleanupKey, Boolean> registeredClosedListeners = ConcurrentCollections.newConcurrentMap();
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@
import static org.opensearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.opensearch.index.IndexService.IndexCreationContext.METADATA_VERIFICATION;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
import static org.opensearch.indices.IndicesRequestCache.INDICES_REQUEST_CACHE_ENABLE_FOR_ALL_REQUESTS_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteDataAttributePresent;
import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES;

Expand Down Expand Up @@ -360,6 +361,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private volatile boolean requestCachingEnabledForAllQueries;

@Override
protected void doStart() {
Expand Down Expand Up @@ -507,6 +509,9 @@ protected void closeInternal() {
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
this.requestCachingEnabledForAllQueries = INDICES_REQUEST_CACHE_ENABLE_FOR_ALL_REQUESTS_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(INDICES_REQUEST_CACHE_ENABLE_FOR_ALL_REQUESTS_SETTING, this::setRequestCachingEnabledForAllQueries);
}

public IndicesService(
Expand Down Expand Up @@ -1746,11 +1751,11 @@ public boolean canCache(ShardSearchRequest request, SearchContext context) {
IndexSettings settings = context.indexShard().indexSettings();
// if not explicitly set in the request, use the index setting, if not, use the request
if (request.requestCache() == null) {
if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false) {
return false;
} else if (context.size() != 0) {
if (settings.getValue(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING) == false
|| (context.size() > 0 && !requestCachingEnabledForAllQueries)) {
// If no request cache query parameter and shard request cache
// is enabled in settings don't cache for requests with size > 0
// unless this is enabled via cluster setting
return false;
}
} else if (request.requestCache() == false) {
Expand Down Expand Up @@ -2118,4 +2123,9 @@ public RemoteStoreSettings getRemoteStoreSettings() {
public CompositeIndexSettings getCompositeIndexSettings() {
return this.compositeIndexSettings;
}

// Package-private for testing
void setRequestCachingEnabledForAllQueries(Boolean requestCachingEnabledForAllQueries) {
this.requestCachingEnabledForAllQueries = requestCachingEnabledForAllQueries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,25 +641,68 @@ public void testDirectoryReaderWithoutDelegatingCacheHelperNotCacheable() throws
ShardSearchRequest request = mock(ShardSearchRequest.class);
when(request.requestCache()).thenReturn(true);

TestSearchContext context = new TestSearchContext(indexService.getBigArrays(), indexService) {
@Override
public SearchType searchType() {
return SearchType.QUERY_THEN_FETCH;
}
};
TestSearchContext context = getTestContext(indexService, 0);
IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class);
DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class);
for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) {
IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper;
setupMocksForCanCache(context, cacheHelper);
assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context));
}
}

public void testCanCacheSizeNonzero() {
// Size == 0 requests should always be cacheable (if they pass the other checks).
// Size > 0 requests should only be cacheable if ALLOW_SIZE_NONZERO_SETTING is true.

final IndexService indexService = createIndex("test");
ShardSearchRequest request = mock(ShardSearchRequest.class);
when(request.requestCache()).thenReturn(null);

TestSearchContext sizeZeroContext = getTestContext(indexService, 0);
TestSearchContext sizeNonzeroContext = getTestContext(indexService, 10);

// Test for an IndicesService with the default setting value of false
IndicesService indicesService = getIndicesService();
DelegatingCacheHelper cacheHelper = mock(DelegatingCacheHelper.class);
Map<TestSearchContext, Boolean> expectedResultMap = Map.of(sizeZeroContext, true, sizeNonzeroContext, false);

for (Map.Entry<TestSearchContext, Boolean> entry : expectedResultMap.entrySet()) {
TestSearchContext context = entry.getKey();
setupMocksForCanCache(context, cacheHelper);
assertEquals(entry.getValue(), indicesService.canCache(request, context));
}
// Simulate the cluster setting update by manually calling setCanCacheSizeNonzeroRequests
indicesService.setRequestCachingEnabledForAllQueries(true);
expectedResultMap = Map.of(sizeZeroContext, true, sizeNonzeroContext, true);

for (Map.Entry<TestSearchContext, Boolean> entry : expectedResultMap.entrySet()) {
TestSearchContext context = entry.getKey();
setupMocksForCanCache(context, cacheHelper);
assertEquals(entry.getValue(), indicesService.canCache(request, context));
}
}

private void setupMocksForCanCache(TestSearchContext context, IndexReader.CacheHelper cacheHelper) {
ContextIndexSearcher searcher = mock(ContextIndexSearcher.class);
context.setSearcher(searcher);
DirectoryReader reader = mock(DirectoryReader.class);
when(searcher.getDirectoryReader()).thenReturn(reader);
when(searcher.getIndexReader()).thenReturn(reader);
IndexReader.CacheHelper notDelegatingCacheHelper = mock(IndexReader.CacheHelper.class);
DelegatingCacheHelper delegatingCacheHelper = mock(DelegatingCacheHelper.class);
when(reader.getReaderCacheHelper()).thenReturn(cacheHelper);
}

for (boolean useDelegatingCacheHelper : new boolean[] { true, false }) {
IndexReader.CacheHelper cacheHelper = useDelegatingCacheHelper ? delegatingCacheHelper : notDelegatingCacheHelper;
when(reader.getReaderCacheHelper()).thenReturn(cacheHelper);
assertEquals(useDelegatingCacheHelper, indicesService.canCache(request, context));
}
private TestSearchContext getTestContext(IndexService indexService, int size) {
return new TestSearchContext(indexService.getBigArrays(), indexService) {
@Override
public SearchType searchType() {
return SearchType.QUERY_THEN_FETCH;
}

@Override
public int size() {
return size;
}
};
}
}

0 comments on commit 0363aa7

Please sign in to comment.