Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Sorabh Hamirwasia <[email protected]>
  • Loading branch information
sohami committed Aug 28, 2023
1 parent 56aba56 commit 7929ae9
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.opensearch.Version;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
Expand Down Expand Up @@ -891,11 +890,15 @@ public boolean shouldUseConcurrentSearch() {
* Evaluate if parsed request supports concurrent segment search
*/
public void evaluateRequestShouldUseConcurrentSearch() {
boolean useConcurrentSearch = !isSortOnTimeSeriesField();
if (aggregations() != null && aggregations().factories() != null) {
useConcurrentSearch = useConcurrentSearch && aggregations().factories().allFactoriesSupportConcurrentSearch();
}
requestShouldUseConcurrentSearch.set(useConcurrentSearch);
if (sort != null && sort.isSortOnTimeSeriesField()) {
requestShouldUseConcurrentSearch.set(false);
} else if (aggregations() != null
&& aggregations().factories() != null
&& !aggregations().factories().allFactoriesSupportConcurrentSearch()) {
requestShouldUseConcurrentSearch.set(false);
} else {
requestShouldUseConcurrentSearch.set(true);
}
}

public void setProfilers(Profilers profilers) {
Expand Down Expand Up @@ -968,12 +971,10 @@ public int getTargetMaxSliceCount() {
}

@Override
public boolean isSortOnTimeSeriesField() {
return sort != null
&& sort.sort != null
&& sort.sort.getSort() != null
&& sort.sort.getSort().length > 0
&& sort.sort.getSort()[0].getField() != null
&& sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME);
public boolean shouldUseTimeSeriesDescSortOptimization() {
return indexShard.isTimeSeriesDescSortOptimizationEnabled()
&& sort != null
&& sort.isSortOnTimeSeriesField()
&& sort.sort.getSort()[0].getReverse() == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,11 @@ public void search(

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
if (shouldReverseLeafReaderContexts()) {
// reverse the segment search order if this flag is true.
// Certain queries can benefit if we reverse the segment read order,
// for example time series based queries if searched for desc sort order.
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext.shouldUseTimeSeriesDescSortOptimization()) {
for (int i = leaves.size() - 1; i >= 0; i--) {
searchLeaf(leaves.get(i), weight, collector);
}
Expand Down Expand Up @@ -516,16 +517,6 @@ private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
return true;
}

private boolean shouldReverseLeafReaderContexts() {
// Time series based workload by default traverses segments in desc order i.e. latest to the oldest order.
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
return searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()
&& searchContext.isSortOnTimeSeriesField()
&& searchContext.sort().sort.getSort()[0].getReverse() == false;
}

// package-private for testing
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
LeafSlice[] leafSlices;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public int getTargetMaxSliceCount() {
}

@Override
public boolean isSortOnTimeSeriesField() {
return in.isSortOnTimeSeriesField();
public boolean shouldUseTimeSeriesDescSortOptimization() {
return in.shouldUseTimeSeriesDescSortOptimization();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,5 @@ public String toString() {

public abstract int getTargetMaxSliceCount();

/**
* @return true: if sort is on timestamp field, false: otherwise
*/
public abstract boolean isSortOnTimeSeriesField();
public abstract boolean shouldUseTimeSeriesDescSortOptimization();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ public boolean searchWith(
boolean hasTimeout
) throws IOException {
if (searchContext.shouldUseConcurrentSearch()) {
LOGGER.debug("Using concurrent search over segments (experimental)");
LOGGER.debug("Using concurrent search over segments (experimental) for request with context id {}", searchContext.id());
return concurrentQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
} else {
LOGGER.debug("Using non-concurrent search over segments");
LOGGER.debug("Using non-concurrent search over segments for request with context id {}", searchContext.id());
return defaultQueryPhaseSearcher.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
}
}
Expand All @@ -74,10 +74,13 @@ public boolean searchWith(
@Override
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
if (searchContext.shouldUseConcurrentSearch()) {
LOGGER.debug("Using concurrent search over segments (experimental)");
LOGGER.debug(
"Using concurrent aggregation processor over segments (experimental) for request with context id {}",
searchContext.id()
);
return concurrentQueryPhaseSearcher.aggregationProcessor(searchContext);
} else {
LOGGER.debug("Using non-concurrent search over segments");
LOGGER.debug("Using non-concurrent aggregation processor over segments for request with context id {}", searchContext.id());
return defaultQueryPhaseSearcher.aggregationProcessor(searchContext);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.search.sort;

import org.apache.lucene.search.Sort;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.search.DocValueFormat;

/**
Expand All @@ -52,4 +53,13 @@ public SortAndFormats(Sort sort, DocValueFormat[] formats) {
this.formats = formats;
}

/**
* @return true: if sort is on timestamp field, false: otherwise
*/
public boolean isSortOnTimeSeriesField() {
return sort.getSort().length > 0
&& sort.getSort()[0].getField() != null
&& sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.index.shard.ShardId;
Expand Down Expand Up @@ -694,13 +693,12 @@ public int getTargetMaxSliceCount() {
}

@Override
public boolean isSortOnTimeSeriesField() {
return sort != null
&& sort.sort != null
&& sort.sort.getSort() != null
&& sort.sort.getSort().length > 0
&& sort.sort.getSort()[0].getField() != null
&& sort.sort.getSort()[0].getField().equals(DataStream.TIMESERIES_FIELDNAME);
public boolean shouldUseTimeSeriesDescSortOptimization() {
return indexShard != null
&& indexShard.isTimeSeriesDescSortOptimizationEnabled()
&& sort != null
&& sort.isSortOnTimeSeriesField()
&& sort.sort.getSort()[0].getReverse() == false;
}

/**
Expand Down

0 comments on commit 7929ae9

Please sign in to comment.