Skip to content

Commit

Permalink
wire in context
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelandis committed Feb 12, 2024
1 parent 0aa53b8 commit 6fe215e
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ public Set<String> sourcePath(String fullName) {
return Set.of(fullName);
}

@Override
public boolean excludeDeletedDocs() {
return false;
}

@Override
public void close() {
List<Releasable> releaseMe = new ArrayList<>(this.releaseMe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1310,7 +1310,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
context::isCancelled,
context::buildFilteredQuery,
enableRewriteAggsToFilterByFilter,
source.aggregations().isInSortOrderExecutionRequired()
source.aggregations().isInSortOrderExecutionRequired(),
true //TODO: FIXME !
);
context.addQuerySearchResultReleasable(aggContext);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ public GlobalOrdinalsStringTermsAggregator(
this.valueCount = valuesSupplier.get().getValueCount();
this.acceptedGlobalOrdinals = acceptedOrds;
if (remapGlobalOrds) {
this.collectionStrategy = new RemapGlobalOrds(cardinality);
this.collectionStrategy = new RemapGlobalOrds(cardinality, context.excludeDeletedDocs());
} else {
this.collectionStrategy = cardinality.map(estimate -> {
if (estimate > 1) {
// This is a 500 class error, because we should never be able to reach it.
throw new AggregationExecutionException("Dense ords don't know how to collect from many buckets");
}
return new DenseGlobalOrds();
return new DenseGlobalOrds(context.excludeDeletedDocs());
});
}
}
Expand Down Expand Up @@ -450,6 +450,13 @@ interface BucketInfoConsumer {
* bucket ordinal.
*/
class DenseGlobalOrds extends CollectionStrategy {

private final boolean excludeDeletedDocs;

DenseGlobalOrds(boolean excludeDeletedDocs) {
this.excludeDeletedDocs = excludeDeletedDocs;
}

@Override
String describe() {
return "dense";
Expand Down Expand Up @@ -477,12 +484,11 @@ long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) {
return globalOrd;
}

boolean EXCLUDE_DELETE_DOCS = true; // TODO: model this as part of the agg itself

@Override
void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException {
assert owningBucketOrd == 0;
if (EXCLUDE_DELETE_DOCS) {
if (excludeDeletedDocs) {
forEachExcludeDeletedDocs(consumer);
} else {
forEachIgnoreDeletedDocs(consumer);
Expand Down Expand Up @@ -565,9 +571,11 @@ public void close() {}
*/
private class RemapGlobalOrds extends CollectionStrategy {
private final LongKeyedBucketOrds bucketOrds;
private final boolean excludeDeletedDocs;

private RemapGlobalOrds(CardinalityUpperBound cardinality) {
private RemapGlobalOrds(CardinalityUpperBound cardinality, boolean excludeDeletedDocs) {
bucketOrds = LongKeyedBucketOrds.buildForValueRange(bigArrays(), cardinality, 0, valueCount - 1);
this.excludeDeletedDocs = excludeDeletedDocs;
}

@Override
Expand Down Expand Up @@ -599,21 +607,19 @@ long globalOrdToBucketOrd(long owningBucketOrd, long globalOrd) {
return bucketOrds.find(owningBucketOrd, globalOrd);
}

boolean EXCLUDE_DELETE_DOCS = true; // TODO: model this as part of the agg itself

@Override
void forEach(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException {
if (EXCLUDE_DELETE_DOCS && bucketCountThresholds.getMinDocCount() == 0) {
if (excludeDeletedDocs && bucketCountThresholds.getMinDocCount() == 0) {
forEachExcludeDeletedDocs(owningBucketOrd, consumer);
} else {
forEachIgnoreDeletedDocs(owningBucketOrd, consumer);
forEachInner(owningBucketOrd, consumer);
}
}

/**
* Allows deleted docs in the results by ignoring the associated liveDocs. More performant than excluding them.
* Allows deleted docs in the results by ignoring the associated liveDocs.
*/
void forEachIgnoreDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException {
void forEachInner(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException {
if (bucketCountThresholds.getMinDocCount() == 0) {
for (long globalOrd = 0; globalOrd < valueCount; globalOrd++) {
if (false == acceptedGlobalOrdinals.test(globalOrd)) {
Expand Down Expand Up @@ -647,13 +653,12 @@ void forEachIgnoreDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer)
}
}

// FIXME : CHECK THIS LOGIC...ADDED AT END OF DAY IN HASTE

/**
* Excludes deleted docs in the results by cross-checking with liveDocs. Less performant than ignoring liveDocs.
* Excludes deleted docs in the results by cross-checking with liveDocs.
*/
void forEachExcludeDeletedDocs(long owningBucketOrd, BucketInfoConsumer consumer) throws IOException {

//TODO: double check this logic
assert bucketCountThresholds.getMinDocCount() == 0;
LongHash accepted = null;
boolean acceptedAllGlobalOrdinals = false;
Expand Down Expand Up @@ -796,6 +801,8 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(globalOrd));
System.out.println("*** global ord -> " + term.utf8ToString() + " <- ***");
}


collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() {
TB spare = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
*/
class StandardTermsResults extends ResultStrategy<StringTerms, StringTerms.Bucket> {
private final ValuesSource valuesSource;
private final boolean excludeDeletedDocs;

StandardTermsResults(ValuesSource valuesSource) {
StandardTermsResults(ValuesSource valuesSource, boolean excludeDeletedDocs) {
this.valuesSource = valuesSource;
this.excludeDeletedDocs = excludeDeletedDocs;
}

@Override
Expand All @@ -370,8 +372,6 @@ LeafBucketCollector wrapCollector(LeafBucketCollector primary) {
return primary;
}

boolean EXCLUDE_DELETE_DOCS = true; // TODO: model this as part of the agg itself

@Override
void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {
if (bucketCountThresholds.getMinDocCount() != 0) {
Expand All @@ -385,7 +385,7 @@ void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {
SortedBinaryDocValues values = valuesSource.bytesValues(ctx);
// brute force
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
if(EXCLUDE_DELETE_DOCS){
if(excludeDeletedDocs){
if(ctx.reader().getLiveDocs() != null && ctx.reader().getLiveDocs().get(docId) == false){ //deleted doc
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
abstract class StandardTermsResultStrategy<R extends InternalMappedTerms<R, B>, B extends InternalTerms.Bucket<B>> extends
ResultStrategy<R, B> {
protected final boolean showTermDocCountError;
private final boolean excludeDeletedDocs;

StandardTermsResultStrategy(boolean showTermDocCountError) {
StandardTermsResultStrategy(boolean showTermDocCountError, boolean excludeDeletedDocs) {
this.showTermDocCountError = showTermDocCountError;
this.excludeDeletedDocs = excludeDeletedDocs;
}

@Override
Expand All @@ -284,8 +286,6 @@ Supplier<B> emptyBucketBuilder(long owningBucketOrd) {

abstract B buildEmptyBucket();

boolean EXCLUDE_DELETE_DOCS = true; // TODO: model this as part of the agg itself

@Override
final void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOException {
if (bucketCountThresholds.getMinDocCount() != 0) {
Expand All @@ -299,7 +299,7 @@ final void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOExceptio
for (LeafReaderContext ctx : searcher().getTopReaderContext().leaves()) {
SortedNumericDocValues values = getValues(ctx);
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
if(EXCLUDE_DELETE_DOCS){
if(excludeDeletedDocs){
if(ctx.reader().getLiveDocs() != null && ctx.reader().getLiveDocs().get(docId) == false){ //deleted doc
continue;
}
Expand All @@ -322,8 +322,8 @@ public final void close() {}
}

class LongTermsResults extends StandardTermsResultStrategy<LongTerms, LongTerms.Bucket> {
LongTermsResults(boolean showTermDocCountError) {
super(showTermDocCountError);
LongTermsResults(boolean showTermDocCountError, boolean excludeDeletedDocs) {
super(showTermDocCountError, excludeDeletedDocs);
}

@Override
Expand Down Expand Up @@ -404,8 +404,8 @@ LongTerms buildEmptyResult() {

class DoubleTermsResults extends StandardTermsResultStrategy<DoubleTerms, DoubleTerms.Bucket> {

DoubleTermsResults(boolean showTermDocCountError) {
super(showTermDocCountError);
DoubleTermsResults(boolean showTermDocCountError, boolean excludeDeletedDocs) {
super(showTermDocCountError, excludeDeletedDocs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ static void registerAggregators(ValuesSourceRegistry.Builder builder) {
*/
static final long MAX_ORDS_TO_TRY_FILTERS = 1000;

static final boolean EXCLUDE_DELETE_DOCS = true; //TODO: put this in the context

/**
* This supplier is used for all the field types that should be aggregated as bytes/strings,
* including those that need global ordinals
Expand Down Expand Up @@ -193,12 +191,12 @@ private static TermsAggregatorSupplier numericSupplier() {
if (includeExclude != null) {
longFilter = includeExclude.convertToDoubleFilter();
}
resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError);
resultStrategy = agg -> agg.new DoubleTermsResults(showTermDocCountError, context.excludeDeletedDocs());
} else {
if (includeExclude != null) {
longFilter = includeExclude.convertToLongFilter(valuesSourceConfig.format());
}
resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError);
resultStrategy = agg -> agg.new LongTermsResults(showTermDocCountError, context.excludeDeletedDocs());
}
return new NumericTermsAggregator(
name,
Expand Down Expand Up @@ -395,7 +393,7 @@ Aggregator create(
name,
factories,
new MapStringTermsAggregator.ValuesSourceCollectorSource(valuesSourceConfig),
a -> a.new StandardTermsResults(valuesSourceConfig.getValuesSource()),
a -> a.new StandardTermsResults(valuesSourceConfig.getValuesSource(), context.excludeDeletedDocs()),
order,
valuesSourceConfig.format(),
bucketCountThresholds,
Expand Down Expand Up @@ -436,7 +434,7 @@ Aggregator create(
&& maxOrd <= MAX_ORDS_TO_TRY_FILTERS
&& context.enableRewriteToFilterByFilter()
&& false == context.isInSortOrderExecutionRequired()
&& false == EXCLUDE_DELETE_DOCS) {
&& false == context.excludeDeletedDocs()) {
StringTermsAggregatorFromFilters adapted = StringTermsAggregatorFromFilters.adaptIntoFiltersOrNull(
name,
factories,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ public final AggregationUsageService getUsageService() {

public abstract Set<String> sourcePath(String fullName);

/**
* return {@code true} if delete documents are required to be omitted from the response. This is only applicable for some aggregations
* where the min_doc_count = 0 and may result in lower performance.
*/
public abstract boolean excludeDeletedDocs();

/**
* Returns the MappingLookup for the index, if one is initialized.
*/
Expand Down Expand Up @@ -360,6 +366,7 @@ public static class ProductionAggregationContext extends AggregationContext {
private final Function<Query, Query> filterQuery;
private final boolean enableRewriteToFilterByFilter;
private final boolean inSortOrderExecutionRequired;
private final boolean excludeDeletedDocs;
private final AnalysisRegistry analysisRegistry;

private final List<Aggregator> releaseMe = new ArrayList<>();
Expand All @@ -380,7 +387,8 @@ public ProductionAggregationContext(
Supplier<Boolean> isCancelled,
Function<Query, Query> filterQuery,
boolean enableRewriteToFilterByFilter,
boolean inSortOrderExecutionRequired
boolean inSortOrderExecutionRequired,
boolean excludeDeleteDocs
) {
this.analysisRegistry = analysisRegistry;
this.context = context;
Expand Down Expand Up @@ -415,6 +423,7 @@ public ProductionAggregationContext(
this.filterQuery = filterQuery;
this.enableRewriteToFilterByFilter = enableRewriteToFilterByFilter;
this.inSortOrderExecutionRequired = inSortOrderExecutionRequired;
this.excludeDeletedDocs = excludeDeleteDocs;
}

@Override
Expand Down Expand Up @@ -625,6 +634,11 @@ public MappingLookup getMappingLookup() {
return context.getMappingLookup();
}

@Override
public boolean excludeDeletedDocs() {
return excludeDeletedDocs;
}

@Override
public void close() {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,11 @@ public Set<String> sourcePath(String fullName) {
return Set.of(fullName);
}

@Override
public boolean excludeDeletedDocs() {
return false;
}

@Override
public void close() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ public Iterable<MappedFieldType> dimensionFields() {
() -> false,
q -> q,
true,
isInSortOrderExecutionRequired
isInSortOrderExecutionRequired,
false
);
return context;
}
Expand Down

0 comments on commit 6fe215e

Please sign in to comment.