diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff048fdbf1a3..db57363ab605f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -160,6 +160,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Separate request-based and settings-based concurrent segment search controls and introduce AggregatorFactory method to determine concurrent search support ([#9469](https://github.com/opensearch-project/OpenSearch/pull/9469)) - [Remote Store] Rate limiter integration for remote store uploads and downloads([#9448](https://github.com/opensearch-project/OpenSearch/pull/9448/)) - [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264)) +- Use non-concurrent path for sort request on timeseries index and field([#9562](https://github.com/opensearch-project/OpenSearch/pull/9562)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index ef8a6c9f36b0c..28931bb5a860f 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -890,11 +890,15 @@ public boolean shouldUseConcurrentSearch() { * Evaluate if parsed request supports concurrent segment search */ public void evaluateRequestShouldUseConcurrentSearch() { - if (aggregations() != null && aggregations().factories() != null) { - requestShouldUseConcurrentSearch.set(aggregations().factories().allFactoriesSupportConcurrentSearch()); - } else { - requestShouldUseConcurrentSearch.set(true); - } + if (sort != null && sort.isSortOnTimeSeriesField()) { + requestShouldUseConcurrentSearch.set(false); + } else if (aggregations() != null + && aggregations().factories() != null + && !aggregations().factories().allFactoriesSupportConcurrentSearch()) { + requestShouldUseConcurrentSearch.set(false); + } else { + requestShouldUseConcurrentSearch.set(true); + } } public void setProfilers(Profilers profilers) { @@ -965,4 +969,12 @@ public int getTargetMaxSliceCount() { } return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING); } + + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + return indexShard.isTimeSeriesDescSortOptimizationEnabled() + && sort != null + && sort.isSortOnTimeSeriesField() + && sort.sort.getSort()[0].getReverse() == false; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 7974561d6187c..6c3d2bb278bd0 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -64,7 +64,6 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.CombinedBitSet; import org.apache.lucene.util.SparseFixedBitSet; -import org.opensearch.cluster.metadata.DataStream; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.search.DocValueFormat; @@ -268,10 +267,11 @@ public void search( @Override protected void search(List leaves, Weight weight, Collector collector) throws IOException { - if (shouldReverseLeafReaderContexts()) { - // reverse the segment search order if this flag is true. - // Certain queries can benefit if we reverse the segment read order, - // for example time series based queries if searched for desc sort order. + // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. + // This is actually beneficial for search queries to start search on latest segments first for time series workload. + // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf + // reader order here. + if (searchContext.shouldUseTimeSeriesDescSortOptimization()) { for (int i = leaves.size() - 1; i >= 0; i--) { searchLeaf(leaves.get(i), weight, collector); } @@ -517,26 +517,6 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException { return true; } - private boolean shouldReverseLeafReaderContexts() { - // Time series based workload by default traverses segments in desc order i.e. latest to the oldest order. - // This is actually beneficial for search queries to start search on latest segments first for time series workload. - // That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf - // reader order here. - if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) { - // Only reverse order for asc order sort queries - if (searchContext.sort() != null - && searchContext.sort().sort != null - && searchContext.sort().sort.getSort() != null - && searchContext.sort().sort.getSort().length > 0 - && searchContext.sort().sort.getSort()[0].getReverse() == false - && searchContext.sort().sort.getSort()[0].getField() != null - && searchContext.sort().sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME)) { - return true; - } - } - return false; - } - // package-private for testing LeafSlice[] slicesInternal(List leaves, int targetMaxSlice) { LeafSlice[] leafSlices; diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 32de5fc9864ce..151ef97a2a141 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -569,4 +569,9 @@ public boolean shouldUseConcurrentSearch() { public int getTargetMaxSliceCount() { return in.getTargetMaxSliceCount(); } + + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + return in.shouldUseTimeSeriesDescSortOptimization(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 590ce4b077cbc..dce6da897a74b 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -487,4 +487,6 @@ public String toString() { public abstract BucketCollectorProcessor bucketCollectorProcessor(); public abstract int getTargetMaxSliceCount(); + + public abstract boolean shouldUseTimeSeriesDescSortOptimization(); } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java index 115f7503631c1..631ace41090d7 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhaseSearcherWrapper.java @@ -58,9 +58,10 @@ public boolean searchWith( boolean hasTimeout ) throws IOException { if (searchContext.shouldUseConcurrentSearch()) { - LOGGER.info("Using concurrent search over segments (experimental)"); + LOGGER.debug("Using concurrent search over segments (experimental) for request with context id {}", searchContext.id()); return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } else { + LOGGER.debug("Using non-concurrent search over segments for request with context id {}", searchContext.id()); return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); } } @@ -73,9 +74,13 @@ public boolean searchWith( @Override public AggregationProcessor aggregationProcessor(SearchContext searchContext) { if (searchContext.shouldUseConcurrentSearch()) { - LOGGER.info("Using concurrent search over segments (experimental)"); + LOGGER.debug( + "Using concurrent aggregation processor over segments (experimental) for request with context id {}", + searchContext.id() + ); return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext); } else { + LOGGER.debug("Using non-concurrent aggregation processor over segments for request with context id {}", searchContext.id()); return defaultQueryPhaseSearcher.aggregationProcessor(searchContext); } } diff --git a/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java b/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java index 272b1e9c1dc8d..e65187e558aef 100644 --- a/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java +++ b/server/src/main/java/org/opensearch/search/sort/SortAndFormats.java @@ -32,6 +32,7 @@ package org.opensearch.search.sort; import org.apache.lucene.search.Sort; +import org.opensearch.cluster.metadata.DataStream; import org.opensearch.search.DocValueFormat; /** @@ -52,4 +53,13 @@ public SortAndFormats(Sort sort, DocValueFormat[] formats) { this.formats = formats; } + /** + * @return true: if sort is on timestamp field, false: otherwise + */ + public boolean isSortOnTimeSeriesField() { + return sort.getSort().length > 0 + && sort.getSort()[0].getField() != null + && sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME); + } + } diff --git a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java index f569fe3b63af0..347011af98c6d 100644 --- a/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/opensearch/search/DefaultSearchContextTests.java @@ -40,16 +40,21 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.opensearch.Version; import org.opensearch.action.OriginalIndices; import org.opensearch.action.search.SearchType; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.MockBigArrays; import org.opensearch.common.util.MockPageCacheRecycler; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; @@ -75,6 +80,7 @@ import org.opensearch.search.rescore.RescoreContext; import org.opensearch.search.slice.SliceBuilder; import org.opensearch.search.sort.SortAndFormats; +import org.opensearch.test.FeatureFlagSetter; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -547,6 +553,159 @@ protected Engine.Searcher acquireSearcherInternal(String source) { } } + public void testSearchPathEvaluationUsingSortField() throws Exception { + // enable the concurrent set FeatureFlag + FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH); + ShardSearchRequest shardSearchRequest = mock(ShardSearchRequest.class); + when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT); + ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1); + when(shardSearchRequest.shardId()).thenReturn(shardId); + + ThreadPool threadPool = new TestThreadPool(this.getClass().getName()); + IndexShard indexShard = mock(IndexShard.class); + QueryCachingPolicy queryCachingPolicy = mock(QueryCachingPolicy.class); + when(indexShard.getQueryCachingPolicy()).thenReturn(queryCachingPolicy); + when(indexShard.getThreadPool()).thenReturn(threadPool); + + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2) + .build(); + + IndexService indexService = mock(IndexService.class); + QueryShardContext queryShardContext = mock(QueryShardContext.class); + when(indexService.newQueryShardContext(eq(shardId.id()), any(), any(), nullable(String.class), anyBoolean())).thenReturn( + queryShardContext + ); + + IndexMetadata indexMetadata = IndexMetadata.builder("index").settings(settings).build(); + IndexSettings indexSettings = new IndexSettings(indexMetadata, Settings.EMPTY); + when(indexService.getIndexSettings()).thenReturn(indexSettings); + + BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + + try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { + + final Supplier searcherSupplier = () -> new Engine.SearcherSupplier(Function.identity()) { + @Override + protected void doClose() {} + + @Override + protected Engine.Searcher acquireSearcherInternal(String source) { + try { + IndexReader reader = w.getReader(); + return new Engine.Searcher( + "test", + reader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + reader + ); + } catch (IOException exc) { + throw new AssertionError(exc); + } + } + }; + + SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); + ReaderContext readerContext = new ReaderContext( + newContextId(), + indexService, + indexShard, + searcherSupplier.get(), + randomNonNegativeLong(), + false + ); + + final ClusterService clusterService = mock(ClusterService.class); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + clusterSettings.registerSetting(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING); + clusterSettings.applySettings( + Settings.builder().put(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + DefaultSearchContext context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + null, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + + // Case1: if sort is on timestamp field, non-concurrent path is used + context.sort( + new SortAndFormats(new Sort(new SortField("@timestamp", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW }) + ); + context.evaluateRequestShouldUseConcurrentSearch(); + assertFalse(context.shouldUseConcurrentSearch()); + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // Case2: if sort is on other field, concurrent path is used + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + clusterService, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.sort( + new SortAndFormats(new Sort(new SortField("test2", SortField.Type.INT)), new DocValueFormat[] { DocValueFormat.RAW }) + ); + context.evaluateRequestShouldUseConcurrentSearch(); + if (executor == null) { + assertFalse(context.shouldUseConcurrentSearch()); + } else { + assertTrue(context.shouldUseConcurrentSearch()); + } + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // Case 3: With no sort, concurrent path is used + context = new DefaultSearchContext( + readerContext, + shardSearchRequest, + target, + clusterService, + bigArrays, + null, + null, + null, + false, + Version.CURRENT, + false, + executor, + null + ); + context.evaluateRequestShouldUseConcurrentSearch(); + if (executor == null) { + assertFalse(context.shouldUseConcurrentSearch()); + } else { + assertTrue(context.shouldUseConcurrentSearch()); + } + assertThrows(SetOnce.AlreadySetException.class, context::evaluateRequestShouldUseConcurrentSearch); + + // shutdown the threadpool + threadPool.shutdown(); + } + } + private ShardSearchContextId newContextId() { return new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong()); } diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index dd4a05b67271c..2fb345f73fb06 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -692,6 +692,15 @@ public int getTargetMaxSliceCount() { return maxSliceCount; } + @Override + public boolean shouldUseTimeSeriesDescSortOptimization() { + return indexShard != null + && indexShard.isTimeSeriesDescSortOptimizationEnabled() + && sort != null + && sort.isSortOnTimeSeriesField() + && sort.sort.getSort()[0].getReverse() == false; + } + /** * Clean the query results by consuming all of it */