From 3f7e036d79a401d806dd142b4b1d5bb1a284d904 Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Tue, 19 Mar 2024 09:57:12 -0700 Subject: [PATCH] Perform buildAggregation concurrently and support Composite Aggregations Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../aggregations/bucket/CompositeAggIT.java | 29 ++++---- .../AggregationCollectorManager.java | 14 +--- .../search/aggregations/Aggregator.java | 17 +++++ .../BucketCollectorProcessor.java | 67 +++++++++++++++++++ .../MultiBucketConsumerService.java | 19 +++--- .../CompositeAggregationFactory.java | 5 +- .../GlobalOrdinalsStringTermsAggregator.java | 17 +++-- .../search/internal/SearchContext.java | 6 ++ .../opensearch/test/OpenSearchTestCase.java | 7 +- 10 files changed, 140 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fb141e189bd3..421414f1b482e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625)) - [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709)) - [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583)) +- [Concurrent Segment Search] Perform buildAggregation concurrently and support Composite Aggregations ([#12697](https://github.com/opensearch-project/OpenSearch/pull/12697)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java index 5a38ba670f1dc..a743f22a2ff77 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/aggregations/bucket/CompositeAggIT.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.List; +import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING; import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; @@ -50,23 +51,25 @@ public void setupSuiteScopeCluster() throws Exception { assertAcked( prepareCreate( "idx", - Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false) ).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer") ); waitForRelocation(ClusterHealthStatus.GREEN); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get(); - refresh("idx"); - client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get(); - client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get(); - refresh("idx"); + indexRandom( + true, + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"), + client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"), + client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100") + ); waitForRelocation(ClusterHealthStatus.GREEN); refresh(); diff --git a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java index 0bb2d1d7ca933..da2bc3ba7a259 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java +++ b/server/src/main/java/org/opensearch/search/aggregations/AggregationCollectorManager.java @@ -15,9 +15,9 @@ import org.opensearch.search.query.ReduceableSearchResult; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; /** * Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global @@ -56,17 +56,9 @@ public String getCollectorReason() { @Override public ReduceableSearchResult reduce(Collection collectors) throws IOException { - final List aggregators = context.bucketCollectorProcessor().toAggregators(collectors); - final List internals = new ArrayList<>(aggregators.size()); + List internals = context.bucketCollectorProcessor().toInternalAggregations(collectors); + assert internals.stream().noneMatch(Objects::isNull); context.aggregations().resetBucketMultiConsumer(); - for (Aggregator aggregator : aggregators) { - try { - // post collection is called in ContextIndexSearcher after search on leaves are completed - internals.add(aggregator.buildTopLevel()); - } catch (IOException e) { - throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); - } - } final InternalAggregations internalAggregations = InternalAggregations.from(internals); return buildAggregationResult(internalAggregations); diff --git a/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java b/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java index 8744d1f6a07d3..e41770d5ea2d5 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/Aggregator.java @@ -33,6 +33,7 @@ package org.opensearch.search.aggregations; import org.opensearch.OpenSearchParseException; +import org.opensearch.common.SetOnce; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.lease.Releasable; import org.opensearch.core.ParseField; @@ -61,6 +62,8 @@ @PublicApi(since = "1.0.0") public abstract class Aggregator extends BucketCollector implements Releasable { + final SetOnce internalAggregation = new SetOnce<>(); + /** * Parses the aggregation request and creates the appropriate aggregator factory for it. * @@ -83,6 +86,20 @@ public interface Parser { AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException; } + /** + * Stores an InternalAggregation + */ + public void setInternalAggregation(InternalAggregation internalAggregation) { + this.internalAggregation.set(internalAggregation); + } + + /** + * Returns the stored InternalAggregation + */ + public InternalAggregation getInternalAggregation() { + return internalAggregation.get(); + } + /** * Return the name of this aggregator. */ diff --git a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java index 135fda71a757a..bb6af459c0b1f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java +++ b/server/src/main/java/org/opensearch/search/aggregations/BucketCollectorProcessor.java @@ -13,6 +13,7 @@ import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.lucene.MinimumScoreCollector; import org.opensearch.search.internal.SearchContext; +import org.opensearch.search.profile.aggregation.ProfilingAggregator; import org.opensearch.search.profile.query.InternalProfileCollector; import java.io.IOException; @@ -22,6 +23,7 @@ import java.util.Deque; import java.util.LinkedList; import java.util.List; +import java.util.Objects; import java.util.Queue; /** @@ -72,6 +74,15 @@ public void processPostCollection(Collector collectorTree) throws IOException { } } else if (currentCollector instanceof BucketCollector) { ((BucketCollector) currentCollector).postCollection(); + + // Perform build aggregation during post collection + if (currentCollector instanceof Aggregator) { + ((Aggregator) currentCollector).setInternalAggregation(((Aggregator) currentCollector).buildTopLevel()); + } else if (currentCollector instanceof MultiBucketCollector) { + for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) { + collectors.offer(innerCollector); + } + } } } } @@ -106,4 +117,60 @@ public List toAggregators(Collection collectors) { } return aggregators; } + + /** + * Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The + * input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager} + * during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before + * returning response to coordinator + * @param collectors collection of aggregation collectors to reduce + * @return list of unwrapped {@link InternalAggregation} + */ + public List toInternalAggregations(Collection collectors) throws IOException { + List internalAggregations = new ArrayList<>(); + + final Deque allCollectors = new LinkedList<>(collectors); + while (!allCollectors.isEmpty()) { + final Collector currentCollector = allCollectors.pop(); + if (currentCollector instanceof AggregatorBase) { + internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation()); + } else if (currentCollector instanceof ProfilingAggregator) { + internalAggregations.add(((ProfilingAggregator) currentCollector).getInternalAggregation()); + } else if (currentCollector instanceof InternalProfileCollector) { + if (((InternalProfileCollector) currentCollector).getCollector() instanceof AggregatorBase) { + internalAggregations.add( + ((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation() + ); + } else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) { + allCollectors.addAll( + Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors()) + ); + } + } else if (currentCollector instanceof MultiBucketCollector) { + allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors())); + } + } + + // Check that internalAggregations does not contain any null objects as that means postCollection was not called for a given + // collector. This can happen as collect will not get called whenever there are no leaves on a shard. Since we build the + // InternalAggregation in postCollection that will not get called in such cases either. Therefore we need to manually call it again + // here to build empty InternalAggregation objects for this collector tree. + if (internalAggregations.stream().anyMatch(Objects::isNull)) { + allCollectors.addAll(collectors); + while (!allCollectors.isEmpty()) { + final Collector currentCollector = allCollectors.pop(); + if (currentCollector instanceof Aggregator) { + ((Aggregator) currentCollector).setInternalAggregation(((Aggregator) currentCollector).buildTopLevel()); + } else if (currentCollector instanceof MultiBucketCollector) { + for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) { + allCollectors.offer(innerCollector); + } + } + } + // Iterate through collector tree again to get InternalAggregations object + return toInternalAggregations(collectors); + } else { + return internalAggregations; + } + } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java index 1461dd3009b44..f7104896eb0ac 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java +++ b/server/src/main/java/org/opensearch/search/aggregations/MultiBucketConsumerService.java @@ -131,12 +131,8 @@ public static class MultiBucketConsumer implements IntConsumer { private final int limit; private final CircuitBreaker breaker; - // aggregations execute in a single thread for both sequential - // and concurrent search, so no atomic here - private int count; - - // will be updated by multiple threads in concurrent search - // hence making it as LongAdder + // count and callCount will both be updated by multiple threads in concurrent segment search hence using LongAdder + private final LongAdder count; private final LongAdder callCount; private volatile boolean circuitBreakerTripped; private final int availProcessors; @@ -145,6 +141,7 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) { this.limit = limit; this.breaker = breaker; callCount = new LongAdder(); + count = new LongAdder(); availProcessors = Runtime.getRuntime().availableProcessors(); } @@ -158,6 +155,7 @@ protected MultiBucketConsumer( ) { this.limit = limit; this.breaker = breaker; + this.count = new LongAdder(); this.callCount = callCount; this.circuitBreakerTripped = circuitBreakerTripped; this.availProcessors = availProcessors; @@ -166,8 +164,9 @@ protected MultiBucketConsumer( @Override public void accept(int value) { if (value != 0) { - count += value; - if (count > limit) { + count.add(value); + ; + if (count.intValue() > limit) { throw new TooManyBucketsException( "Trying to create too many buckets. Must be less than or equal to: [" + limit @@ -205,11 +204,11 @@ public void accept(int value) { } public void reset() { - this.count = 0; + this.count.reset(); } public int getCount() { - return count; + return count.intValue(); } public int getLimit() { diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java index 4af14ab014db5..6c5619a843fae 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/composite/CompositeAggregationFactory.java @@ -40,6 +40,7 @@ import org.opensearch.search.internal.SearchContext; import java.io.IOException; +import java.util.Arrays; import java.util.Map; /** @@ -80,7 +81,7 @@ protected Aggregator createInternal( @Override protected boolean supportsConcurrentSegmentSearch() { - // See https://github.com/opensearch-project/OpenSearch/issues/12331 for details - return false; + // Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details + return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript); } } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 69fda2f3f6133..eb09aac46186f 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -94,7 +94,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr private final long valueCount; private final String fieldName; private Weight weight; - private final GlobalOrdLookupFunction lookupGlobalOrd; protected final CollectionStrategy collectionStrategy; protected int segmentsWithSingleValuedOrds = 0; protected int segmentsWithMultiValuedOrds = 0; @@ -129,11 +128,10 @@ public GlobalOrdinalsStringTermsAggregator( this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.valuesSource = valuesSource; final IndexReader reader = context.searcher().getIndexReader(); - final SortedSetDocValues values = reader.leaves().size() > 0 + final SortedSetDocValues values = !reader.leaves().isEmpty() ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) : DocValues.emptySortedSet(); this.valueCount = values.getValueCount(); - this.lookupGlobalOrd = values::lookupOrd; this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get; if (remapGlobalOrds) { this.collectionStrategy = new RemapGlobalOrds(cardinality); @@ -885,7 +883,12 @@ PriorityQueue buildPriorityQueue(int size) { } StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException { - BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); + // Recreate DocValues as needed for concurrent segment search + SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty() + ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) + : DocValues.emptySortedSet(); + BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd)); + StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); result.bucketOrd = temp.bucketOrd; result.docCountError = 0; @@ -1001,7 +1004,11 @@ BucketUpdater bucketUpdater(long owningBucketOrd) long subsetSize = subsetSize(owningBucketOrd); return (spare, globalOrd, bucketOrd, docCount) -> { spare.bucketOrd = bucketOrd; - oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes); + // Recreate DocValues as needed for concurrent segment search + SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty() + ? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0)) + : DocValues.emptySortedSet(); + oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes); spare.subsetDf = docCount; spare.subsetSize = subsetSize; spare.supersetDf = backgroundFrequencies.freq(spare.termBytes); diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index 3d13378e58e5d..8f6001dc06755 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -113,6 +113,12 @@ public List toAggregators(Collection collectors) { // should not be called when there is no aggregation collector throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR"); } + + @Override + public List toInternalAggregations(Collection collectors) { + // should not be called when there is no aggregation collector + throw new IllegalStateException("Unexpected toInternalAggregations call on NO_OP_BUCKET_COLLECTOR_PROCESSOR"); + } }; private final List releasables = new CopyOnWriteArrayList<>(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java index aac3fca9e1e16..f381ebdb64fc2 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchTestCase.java @@ -211,7 +211,12 @@ "LuceneFixedGap", "LuceneVarGapFixedInterval", "LuceneVarGapDocFreqInterval", - "Lucene50" }) + "Lucene50", + "Lucene90", + "Lucene94", + "Lucene90", + "Lucene95", + "Lucene99" }) @LuceneTestCase.SuppressReproduceLine public abstract class OpenSearchTestCase extends LuceneTestCase {