From 691c4edc0bc44158e93beb34969ff2721066dd58 Mon Sep 17 00:00:00 2001 From: Sorabh Hamirwasia Date: Thu, 24 Aug 2023 23:41:03 -0700 Subject: [PATCH] For sort request on timeseries field use non concurrent search path Signed-off-by: Sorabh Hamirwasia --- CHANGELOG.md | 1 + .../search/DefaultSearchContext.java | 17 +- .../search/internal/ContextIndexSearcher.java | 17 +- .../internal/FilteredSearchContext.java | 5 + .../search/internal/SearchContext.java | 5 + .../query/QueryPhaseSearcherWrapper.java | 6 +- .../search/DefaultSearchContextTests.java | 160 ++++++++++++++++++ .../opensearch/test/TestSearchContext.java | 11 ++ 8 files changed, 203 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96af8741a0de9..b7133d9e0abf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -158,6 +158,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Removing the vec file extension from INDEX_STORE_HYBRID_NIO_EXTENSIONS, to ensure the no performance degradation for vector search via Lucene Engine.([#9528](https://github.com/opensearch-project/OpenSearch/pull/9528))) - Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469)) - [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/)) +- Use non-concurrent path for sort request on timeseries index and field() ### Deprecated diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index ef8a6c9f36b0c..93eda7009f43f 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -43,6 +43,7 @@ import org.opensearch.Version; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.metadata.DataStream; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.SetOnce; @@ -890,11 +891,11 @@ public boolean shouldUseConcurrentSearch() { * Evaluate if parsed request supports concurrent segment search */ public void evaluateRequestShouldUseConcurrentSearch() { + boolean useConcurrentSearch = !isSortOnTimeSeriesField(); if (aggregations() != null && aggregations().factories() != null) { - requestShouldUseConcurrentSearch.set(aggregations().factories().allFactoriesSupportConcurrentSearch()); - } else { - requestShouldUseConcurrentSearch.set(true); + useConcurrentSearch = useConcurrentSearch && aggregations().factories().allFactoriesSupportConcurrentSearch(); } + requestShouldUseConcurrentSearch.set(useConcurrentSearch); } public void setProfilers(Profilers profilers) { @@ -965,4 +966,14 @@ public int getTargetMaxSliceCount() { } return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING); } + + @Override + public boolean isSortOnTimeSeriesField() { + return sort != null + && sort.sort != null + && sort.sort.getSort() != null + && sort.sort.getSort().length > 0 + && sort.sort.getSort()[0].getField() != null + && sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 7974561d6187c..ee54abf7e44cd 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -64,7 +64,6 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.CombinedBitSet; import org.apache.lucene.util.SparseFixedBitSet; -import org.opensearch.cluster.metadata.DataStream; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.search.DocValueFormat; @@ -522,19 +521,9 @@ private boolean shouldReverseLeafReaderContexts() { // This is actually beneficial for search queries to start search on latest segments first for time series workload. // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf // reader order here. - if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) { - // Only reverse order for asc order sort queries - if (searchContext.sort() != null - && searchContext.sort().sort != null - && searchContext.sort().sort.getSort() != null - && searchContext.sort().sort.getSort().length > 0 - && searchContext.sort().sort.getSort()[0].getReverse() == false - && searchContext.sort().sort.getSort()[0].getField() != null - && searchContext.sort().sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) { - return true; - } - } - return false; + return searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled() + && searchContext.isSortOnTimeSeriesField() + && searchContext.sort().sort.getSort()[0].getReverse() == false; } // package-private for testing diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 32de5fc9864ce..4f9d04d08766f 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -569,4 +569,9 @@ public boolean shouldUseConcurrentSearch() { public int getTargetMaxSliceCount() { return in.getTargetMaxSliceCount(); } + + @Override + public boolean isSortOnTimeSeriesField() { + return in.isSortOnTimeSeriesField(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 590ce4b077cbc..4d2546ef2d494 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -487,4 +487,9 @@ public String toString() { public abstract BucketCollectorProcessor bucketCollectorProcessor(); public abstract int getTargetMaxSliceCount(); + + /** + * @return true: if sort is on timestamp field, false: otherwise + */ + public abstract boolean isSortOnTimeSeriesField(); } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java index 115f7503631c1..af6ad5b03881c 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java @@ -58,9 +58,10 @@ public boolean searchWith( boolean hasTimeout ) throws IOException { if (searchContext.shouldUseConcurrentSearch()) { - LOGGER.info("Using concurrent search over segments (experimental)"); + LOGGER.debug("Using concurrent search over segments (experimental)"); return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } else { + LOGGER.debug("Using non-concurrent search over segments"); return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } } @@ -73,9 +74,10 @@ public boolean searchWith( @Override public AggregationProcessor aggregationProcessor(SearchContext searchContext) { if (searchContext.shouldUseConcurrentSearch()) { - LOGGER.info("Using concurrent search over segments (experimental)"); + LOGGER.debug("Using concurrent search over segments (experimental)"); return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext); } else { + LOGGER.debug("Using non-concurrent search over segments"); return defaultQueryPhaseSearcher.aggregationProcessor(searchContext); } } diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index f569fe3b63af0..0ca003f87331a 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -40,16 +40,21 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.SearchType; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -75,6 +80,7 @@ import org.opensearch.search.rescore.RescoreContext; import org.opensearch.search.slice.SliceBuilder; import org.opensearch.search.sort.SortAndFormats; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -82,6 +88,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -547,6 +554,159 @@ protected Engine.Searcher acquireSearcherInternal(String source) { } } + public void testSearchPathEvaluationUsingSortField() throws Exception { + // enable the concurrent set FeatureFlag + FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH); + ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class); + when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT); + ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1); + when(shardSearchRequest.shardId()).thenReturn(shardId); + + ThreadPool threadPool = new TestThreadPool(this.getClass().getName()); + IndexShard indexShard = mock(IndexShard.class); + QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class); + when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy); + when(indexShard.getThreadPool()).thenReturn(threadPool); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .build(); + + IndexService indexService = mock(IndexService.class); + QueryShardContext queryShardContext = mock(QueryShardContext.class); + when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn( + queryShardContext + ); + + IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + when(indexService.getIndexSettings()).thenReturn(indexSettings); + + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + + try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { + + final Supplier searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) { + @Override + protected void doClose() {} + + @Override + protected Engine.Searcher acquireSearcherInternal(String source) { + try { + IndexReader reader = w.getReader(); + return new Engine.Searcher( + "test", + reader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + reader + ); + } catch (IOException exc) { + throw new AssertionError(exc); + } + } + }; + + SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); + ReaderContext readerContext = new ReaderContext( + newContextId(), + indexService, + indexShard, + searcherSupplier.get(), + randomNonNegativeLong(), + false + ); + + final ClusterService clusterService = mock(ClusterService.class); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, Collections.EMPTY_SET); + clusterSettings.registerSetting(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING); + clusterSettings.applySettings( + Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + DefaultSearchContext context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + null, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + + // Case1: if sort is on timestamp field, non-concurrent path is used + context.sort( + new SortAndFormats(new Sort(new SortField("@timestamp", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW }) + ); + context.evaluateRequestShouldUseConcurrentSearch(); + assertFalse(context.shouldUseConcurrentSearch()); + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // Case2: if sort is on other field, concurrent path is used + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + clusterService, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.sort( + new SortAndFormats(new Sort(new SortField("test2", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW }) + ); + context.evaluateRequestShouldUseConcurrentSearch(); + if (executor == null) { + assertFalse(context.shouldUseConcurrentSearch()); + } else { + assertTrue(context.shouldUseConcurrentSearch()); + } + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // Case 3: With no sort, concurrent path is used + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + clusterService, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.evaluateRequestShouldUseConcurrentSearch(); + if (executor == null) { + assertFalse(context.shouldUseConcurrentSearch()); + } else { + assertTrue(context.shouldUseConcurrentSearch()); + } + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // shutdown the threadpool + threadPool.shutdown(); + } + } + private ShardSearchContextId newContextId() { return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()); } diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index dd4a05b67271c..bfb80f2dbfb9e 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -38,6 +38,7 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.SearchShardTask; import org.opensearch.action.search.SearchType; +import org.opensearch.cluster.metadata.DataStream; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; import org.opensearch.core.index.shard.ShardId; @@ -692,6 +693,16 @@ public int getTargetMaxSliceCount() { return maxSliceCount; } + @Override + public boolean isSortOnTimeSeriesField() { + return sort != null + && sort.sort != null + && sort.sort.getSort() != null + && sort.sort.getSort().length > 0 + && sort.sort.getSort()[0].getField() != null + && sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME); + } + /** * Clean the query results by consuming all of it */