Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Jan 3, 2024
1 parent debfa27 commit 444cc81
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,21 +175,21 @@ protected String toString(int dimension, byte[] value) {
}

/**
* @param computeBounds get the lower and upper bound of the field in a shard search
* @param roundingFunction produce Rounding that contains interval of date range.
* Rounding is computed dynamically using the bounds in AutoDateHistogram
* @param computeBounds get the lower and upper bound of the field in a shard search
* @param roundingFunction produce Rounding that contains interval of date range.
* Rounding is computed dynamically using the bounds in AutoDateHistogram
* @param preparedRoundingSupplier produce PreparedRounding to round values at call-time
*/
public static void buildFastFilter(
SearchContext context,
CheckedFunction<FastFilterContext, long[], IOException> computeBounds,
Function<long[], Rounding> roundingFunction,
Supplier<Rounding.Prepared> preparedRoundingSupplier,
FastFilterContext fastFilterContext,
CheckedFunction<FastFilterContext, long[], IOException> computeBounds
FastFilterContext fastFilterContext
) throws IOException {
assert fastFilterContext.fieldType instanceof DateFieldMapper.DateFieldType;
DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
final long[] bounds = computeBounds.apply(fastFilterContext);
final long[] bounds = computeBounds.apply(fastFilterContext); // TODO b do we need to pass in the context? or specific things
if (bounds != null) {
final Rounding rounding = roundingFunction.apply(bounds);
final OptionalLong intervalOpt = Rounding.getInterval(rounding);
Expand Down Expand Up @@ -221,21 +221,26 @@ public static void buildFastFilter(
* Encapsulates metadata about a value source needed to rewrite
*/
public static class FastFilterContext {
private boolean missing = false;
private boolean missing = false; // TODO b confirm UT that can catch this
private boolean hasScript = false;
private boolean showOtherBucket = false;

private final MappedFieldType fieldType;

private long afterKey = -1L;
private int size = Integer.MAX_VALUE; // only used by composite aggregation for pagination
private Weight[] filters = null;

private Type type = Type.UNKEYED;
private final Type type;

/**
* @param fieldType null if the field doesn't exist
*/
public FastFilterContext(MappedFieldType fieldType) {
this.fieldType = fieldType;
this.type = Type.DATE_HISTO;
}

public FastFilterContext(Type type) {
this.fieldType = null;
this.type = type;
}

public DateFieldMapper.DateFieldType getFieldType() {
Expand All @@ -255,32 +260,31 @@ public void setAfterKey(long afterKey) {
this.afterKey = afterKey;
}

public void setMissingAndHasScript(boolean missing, boolean hasScript) {
public void setMissing(boolean missing) {
this.missing = missing;
}

public void setHasScript(boolean hasScript) {
this.hasScript = hasScript;
}

/**
* The pre-conditions to initiate fast filter optimization on aggregations are:
* <ul>
* <li>No parent/sub aggregations</li>
* <li>No missing value/bucket or script</li>
* <li>Field type is date</li>
* </ul>
*/
public void setShowOtherBucket(boolean showOtherBucket) {
this.showOtherBucket = showOtherBucket;
}

public boolean isRewriteable(Object parent, int subAggLength) {
if (parent == null && subAggLength == 0 && !missing && !hasScript) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
if (type == Type.FILTERS) {
return !showOtherBucket;
} else if (type == Type.DATE_HISTO) {
return fieldType != null && fieldType instanceof DateFieldMapper.DateFieldType;
}
}
return false;
}

public void setType(Type type) {
this.type = type;
}

public enum Type {
KEYED, UNKEYED
FILTERS, DATE_HISTO
}
}

Expand All @@ -301,7 +305,6 @@ public static boolean tryFastFilterAggregation(
final LeafReaderContext ctx,
FastFilterContext fastFilterContext,
final BiConsumer<Long, Integer> incrementDocCount
// TODO b can I have a function that calculates the bucket ord, so
) throws IOException {
if (fastFilterContext == null) return false;
if (fastFilterContext.filters == null) return false;
Expand All @@ -322,18 +325,14 @@ public static boolean tryFastFilterAggregation(
int s = 0;
for (i = 0; i < filters.length; i++) {
if (counts[i] > 0) {
long key = i;
if (fastFilterContext.type == FastFilterContext.Type.UNKEYED) {
long bucketKey = i; // the index of filters is the key for filters aggregation
if (fastFilterContext.type == FastFilterContext.Type.DATE_HISTO) {
final DateFieldMapper.DateFieldType fieldType = (DateFieldMapper.DateFieldType) fastFilterContext.fieldType;
key = fieldType.convertNanosToMillis(
bucketKey = fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
);
}
incrementDocCount.accept(
// TODO b this is what should be the bucket key showing out
key,
counts[i]
);
incrementDocCount.accept(bucketKey, counts[i]);
s++;
if (s > fastFilterContext.size) return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,8 @@ final class CompositeAggregator extends BucketsAggregator {
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(
sourceConfigs[0].fieldType()
);
fastFilterContext.setMissingAndHasScript(
sourceConfigs[0].missingBucket(),
sourceConfigs[0].hasScript()
);
fastFilterContext.setMissing(sourceConfigs[0].missingBucket());
fastFilterContext.setHasScript(sourceConfigs[0].hasScript());
if (rawAfterKey != null) {
assert rawAfterKey.size() == 1 && formats.size() == 1;
long afterValue = formats.get(0).parseLong(rawAfterKey.get(0).toString(), false, () -> {
Expand All @@ -186,10 +184,9 @@ final class CompositeAggregator extends BucketsAggregator {
fastFilterContext.setSize(size);
FastFilterRewriteHelper.buildFastFilter(
context,
x -> dateHistogramSource.getRounding(),
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), x -> dateHistogramSource.getRounding(),
() -> preparedRounding,
fastFilterContext,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name())
fastFilterContext
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,23 @@ public FiltersAggregator(
this.totalNumKeys = keys.length;
}

fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(null);
fastFilterContext.setType(FastFilterRewriteHelper.FastFilterContext.Type.KEYED);
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(FastFilterRewriteHelper.FastFilterContext.Type.FILTERS);
fastFilterContext.setShowOtherBucket(showOtherBucket);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
fastFilterContext.setFilters(filters.get());
}
}

@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
// no need to provide deleted docs to the filter
Weight[] filters = this.filters.get();
fastFilterContext.setFilters(filters);
boolean optimized = FastFilterRewriteHelper.tryFastFilterAggregation(ctx, fastFilterContext, (key, count) -> {
incrementBucketDocCount(bucketOrd(0, key.intValue()), count); // TODO b this key should be the index of filter
});
if (optimized) throw new CollectionTerminatedException();

// no need to provide deleted docs to the filter
Weight[] filters = this.filters.get();

final Bits[] bits = new Bits[filters.length];
for (int i = 0; i < filters.length; ++i) {
bits[i] = Lucene.asSequentialAccessBits(ctx.reader().maxDoc(), filters[i].scorerSupplier(ctx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,16 @@ private AutoDateHistogramAggregator(
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(
valuesSourceConfig.fieldType()
);
fastFilterContext.setMissingAndHasScript(
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null
);
fastFilterContext.setMissing(valuesSourceConfig.missing() != null);
fastFilterContext.setHasScript(valuesSourceConfig.script() != null);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
FastFilterRewriteHelper.buildFastFilter(
context,
b -> getMinimumRounding(b[0], b[1]),
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name()), b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding,
fastFilterContext,
fc -> FastFilterRewriteHelper.getAggregationBounds(context, fc.getFieldType().name())
fastFilterContext
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,14 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
fastFilterContext = new FastFilterRewriteHelper.FastFilterContext(
valuesSourceConfig.fieldType()
);
fastFilterContext.setMissingAndHasScript(
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null
);
fastFilterContext.setMissing(valuesSourceConfig.missing() != null);
fastFilterContext.setHasScript(valuesSourceConfig.script() != null);
if (fastFilterContext.isRewriteable(parent, subAggregators.length)) {
FastFilterRewriteHelper.buildFastFilter(
context,
x -> rounding,
this::computeBounds, x -> rounding,
() -> preparedRounding,
fastFilterContext,
this::computeBounds
fastFilterContext
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ protected Aggregator doCreateInternal(
Map<String, Object> metadata
) throws IOException {

// TODO b may need to pass in the value source config
return queryShardContext.getValuesSourceRegistry()
.getAggregator(registryKey, config)
.build(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ public RangeAggregator(
maxTo[i] = Math.max(this.ranges[i].to, maxTo[i - 1]);
}

// TODO b if valuesSource is ValuesSource.Numeric.FieldData.
}

@Override
Expand Down

0 comments on commit 444cc81

Please sign in to comment.