Skip to content

Commit

Permalink
add debug info
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Apr 19, 2024
1 parent eaa10d3 commit d91edf0
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,15 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

Expand Down Expand Up @@ -253,7 +248,13 @@ public static class FastFilterContext {

private long[][] ranges;

public int leaf;
public int inner;
public int segments;
public int optimizedSegments;

private String fieldName;

public void setFieldName(String fieldName) {
this.fieldName = fieldName;
}
Expand Down Expand Up @@ -300,6 +301,11 @@ public Weight[] buildFastFilter(LeafReaderContext leaf) throws IOException {
public void buildRanges() throws IOException {
this.ranges = this.aggregationType.buildRanges(context);
}

private void consumeDebugInfo(DebugCollector debug) {
leaf += debug.leaf;
inner += debug.inner;
}
}

/**
Expand Down Expand Up @@ -479,6 +485,7 @@ public static boolean tryFastFilterAggregation(
FastFilterContext fastFilterContext,
final BiConsumer<Long, Integer> incrementDocCount
) throws IOException {
fastFilterContext.segments++;
if (fastFilterContext == null) return false;
if (!fastFilterContext.rewriteable) {
return false;
Expand All @@ -500,60 +507,70 @@ public static boolean tryFastFilterAggregation(
// // if no filters built at shard level (see getDateHistoAggBounds method for possible reasons)
// // check if the query is functionally match-all at segment level
// if (!fastFilterContext.filtersBuiltAtShardLevel && !segmentMatchAll(fastFilterContext.context, ctx)) {
// return false;
// return false;
// }
// Weight[] filters = fastFilterContext.filters;
// if (filters == null) {
// logger.debug(
// "Shard {} segment {} functionally match all documents. Build the fast filter",
// fastFilterContext.context.indexShard().shardId(),
// ctx.ord
// );
// filters = fastFilterContext.buildFastFilter(ctx);
// if (filters == null) {
// return false;
// }
// logger.debug(
// "Shard {} segment {} functionally match all documents. Build the fast filter",
// fastFilterContext.context.indexShard().shardId(),
// ctx.ord
// );
// filters = fastFilterContext.buildFastFilter(ctx);
// if (filters == null) {
// return false;
// }
// }

// plugin multi traversal logic
// TODO revisit the segment match all logic
if (fastFilterContext.ranges == null) return false;
final DateFieldMapper.DateFieldType fieldType =
((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType();
multiRangesTraverse(ctx.reader(), fastFilterContext.fieldName, fastFilterContext.ranges, incrementDocCount, fieldType);
final DateFieldMapper.DateFieldType fieldType = ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType)
.getFieldType();
DebugCollector debug = multiRangesTraverse(
ctx.reader(),
fastFilterContext.fieldName,
fastFilterContext.ranges,
incrementDocCount,
fieldType
);
// get the debug info from one segment
// save in the fastFilterContext
fastFilterContext.consumeDebugInfo(debug);

// final int[] counts = new int[filters.length];
// int i;
// for (i = 0; i < filters.length; i++) {
// counts[i] = filters[i].count(ctx);
// if (counts[i] == -1) {
// // Cannot use the optimization if any of the counts
// // is -1 indicating the segment might have deleted documents
// return false;
// }
// counts[i] = filters[i].count(ctx);
// if (counts[i] == -1) {
// // Cannot use the optimization if any of the counts
// // is -1 indicating the segment might have deleted documents
// return false;
// }
// }
//
// int s = 0;
// int size = fastFilterContext.aggregationType.getSize();
// for (i = 0; i < filters.length; i++) {
// if (counts[i] > 0) {
// long bucketKey = i; // the index of filters is the key for filters aggregation
// if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) {
// final DateFieldMapper.DateFieldType fieldType =
// ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType();
// bucketKey = fieldType.convertNanosToMillis(
// NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
// );
// }
// incrementDocCount.accept(bucketKey, counts[i]);
// s++;
// if (s > size) {
// break;
// }
// }
// if (counts[i] > 0) {
// long bucketKey = i; // the index of filters is the key for filters aggregation
// if (fastFilterContext.aggregationType instanceof AbstractDateHistogramAggregationType) {
// final DateFieldMapper.DateFieldType fieldType =
// ((AbstractDateHistogramAggregationType) fastFilterContext.aggregationType).getFieldType();
// bucketKey = fieldType.convertNanosToMillis(
// NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
// );
// }
// incrementDocCount.accept(bucketKey, counts[i]);
// s++;
// if (s > size) {
// break;
// }
// }
// }

logger.debug("Fast filter optimization applied to shard {} segment {}", fastFilterContext.context.indexShard().shardId(), ctx.ord);
fastFilterContext.optimizedSegments++;
return true;
}

Expand Down Expand Up @@ -595,11 +612,11 @@ private static long[][] createRangesFromAgg(
roundedLow = preparedRounding.round(roundedLow + interval);
// final byte[] upper = new byte[8];
// NumericUtils.longToSortableBytes(i + 1 == bucketCount ? high :
// // Subtract -1 if the minimum is roundedLow as roundedLow itself
// // is included in the next bucket
// fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0);
// // Subtract -1 if the minimum is roundedLow as roundedLow itself
// // is included in the next bucket
// fieldType.convertRoundedMillisToNanos(roundedLow) - 1, upper, 0);

long upper = i+1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1;
long upper = i + 1 == bucketCount ? high : fieldType.convertRoundedMillisToNanos(roundedLow) - 1;

ranges[i][0] = lower;
ranges[i][1] = upper;
Expand All @@ -610,7 +627,10 @@ private static long[][] createRangesFromAgg(
return ranges;
}

private static void multiRangesTraverse(
/**
* @return the collect debug info
*/
private static DebugCollector multiRangesTraverse(
final LeafReader reader,
final String field,
final long[][] ranges,
Expand All @@ -635,29 +655,49 @@ private static void multiRangesTraverse(
break;
}
}
DebugCollector debugCollector = new DebugCollector();
if (noRangeMatches) {
return;
return debugCollector;
}

TreeCollector collector = new TreeCollector(incrementDocCount, fieldType, iterator);
collector.setActiveRange(activeRange);

VisitorWithRanges visitor = getIntersectVisitor(collector);
intersectWithRanges(visitor, values.getPointTree());

intersectWithRanges(visitor, values.getPointTree(), debugCollector);
collector.finalizePreviousRange();

return debugCollector;
}

/**
* collect debug info, inner node visited, leaf node visited
*/
private static class DebugCollector {
private int leaf = 0;
private int inner = 0;

private void visitLeaf() {
leaf++;
}

private void visitInner() {
inner++;
}
}

private static class TreeCollector {
private final BiConsumer<Long, Integer> incrementDocCount;
private final DateFieldMapper.DateFieldType fieldType;
private int counter = 0;
private long[] activeRange;
private Iterator<long[]> rangeIter;
private final Iterator<long[]> rangeIter;

public TreeCollector(
BiConsumer<Long, Integer> incrementDocCount,
DateFieldMapper.DateFieldType fieldType,
Iterator<long[]> rangeIter
DateFieldMapper.DateFieldType fieldType,
Iterator<long[]> rangeIter
) {
this.incrementDocCount = incrementDocCount;
this.fieldType = fieldType;
Expand Down Expand Up @@ -696,7 +736,8 @@ private boolean goToNextRange(long value) {
}
}

private static void intersectWithRanges(VisitorWithRanges visitor, PointValues.PointTree pointTree) throws IOException {
private static void intersectWithRanges(VisitorWithRanges visitor, PointValues.PointTree pointTree, DebugCollector debug)
throws IOException {
// long min = NumericUtils.sortableBytesToLong(pointTree.getMinPackedValue(), 0);
// long max = NumericUtils.sortableBytesToLong(pointTree.getMaxPackedValue(), 0);
// maxPackedValue seems to be the max value + 1
Expand All @@ -712,16 +753,18 @@ private static void intersectWithRanges(VisitorWithRanges visitor, PointValues.P
case CELL_INSIDE_QUERY:
// visit would count
pointTree.visitDocIDs(visitor);
debug.visitInner();
break;
case CELL_CROSSES_QUERY:
if (pointTree.moveToChild()) {
intersectWithRanges(visitor, pointTree);
intersectWithRanges(visitor, pointTree, debug);
pointTree.moveToSibling();
intersectWithRanges(visitor, pointTree);
intersectWithRanges(visitor, pointTree, debug);
pointTree.moveToParent();
} else {
// visitDocValues may move to next range
pointTree.visitDocValues(visitor);
debug.visitLeaf();
}
break;
case CELL_OUTSIDE_QUERY:
Expand Down Expand Up @@ -762,10 +805,10 @@ public void visit(DocIdSetIterator iterator) throws IOException {
logger.debug("visit iterator");
// int count = 0;
// for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
// count++;
// count++;
// }
// for (long[] range : ranges) {
// incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
// incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
// }
}

Expand All @@ -792,14 +835,14 @@ public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOExcept
logger.debug("visit iterator with packedValue");
// int count = 0;
// for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
// count++;
// count++;
// }
// long value = NumericUtils.sortableBytesToLong(packedValue, 0);
// // System.out.println("value" + value + " count=" + count);
// for (long[] range : ranges) {
// if (value >= range[0] && value <= range[1]) {
// incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
// }
// if (value >= range[0] && value <= range[1]) {
// incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
// }
// }
}

Expand Down Expand Up @@ -841,8 +884,7 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue
};
}

interface VisitorWithRanges extends PointValues.IntersectVisitor {
}
interface VisitorWithRanges extends PointValues.IntersectVisitor {}

private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException {
Weight weight = ctx.searcher().createWeight(ctx.query(), ScoreMode.COMPLETE_NO_SCORES, 1f);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCol
)
);
if (optimized) throw new CollectionTerminatedException();
// we will return the debug info for each segment
// or we should just cache it in the fast filter context

SortedNumericDocValues values = valuesSource.longValues(ctx);
return new LeafBucketCollectorBase(sub, values) {
Expand Down Expand Up @@ -256,6 +258,12 @@ public void doClose() {
@Override
public void collectDebugInfo(BiConsumer<String, Object> add) {
add.accept("total_buckets", bucketOrds.size());
if (fastFilterContext.optimizedSegments > 0) {
add.accept("optimized_segments", fastFilterContext.optimizedSegments);
add.accept("unoptimized_segments", fastFilterContext.segments - fastFilterContext.optimizedSegments);
add.accept("leaf_visited", fastFilterContext.leaf);
add.accept("inner_visited", fastFilterContext.inner);
}
}

/**
Expand Down

0 comments on commit d91edf0

Please sign in to comment.