Skip to content

Commit

Permalink
nearly finished
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Apr 19, 2024
1 parent f56e0c2 commit eaa10d3
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
Expand Down Expand Up @@ -619,143 +623,211 @@ private static void multiRangesTraverse(

// System.out.println("intersect with ranges");
// System.out.println(Arrays.deepToString(ranges));
intersectWithRanges(getIntersectVisitor(incrementDocCount, fieldType), values.getPointTree(), ranges);
Iterator<long[]> iterator = Arrays.stream(ranges).iterator();
long[] activeRange = iterator.next();
PointValues.PointTree tree = values.getPointTree();
boolean noRangeMatches = false;
while (activeRange[1] < NumericUtils.sortableBytesToLong(tree.getMinPackedValue(), 0)) {
if (iterator.hasNext()) {
activeRange = iterator.next();
} else {
noRangeMatches = true;
break;
}
}
if (noRangeMatches) {
return;
}

TreeCollector collector = new TreeCollector(incrementDocCount, fieldType, iterator);
collector.setActiveRange(activeRange);

VisitorWithRanges visitor = getIntersectVisitor(collector);
intersectWithRanges(visitor, values.getPointTree());
collector.finalizePreviousRange();
}

private static void intersectWithRanges(VisitorWithRanges visitor, PointValues.PointTree pointTree, long[][] ranges) throws IOException {
Set<long[]> cross = new HashSet<>();
Set<long[]> inside = new HashSet<>();
Set<long[]> outside = new HashSet<>();
private static class TreeCollector {
private final BiConsumer<Long, Integer> incrementDocCount;
private final DateFieldMapper.DateFieldType fieldType;
private int counter = 0;
private long[] activeRange;
private Iterator<long[]> rangeIter;

public TreeCollector(
BiConsumer<Long, Integer> incrementDocCount,
DateFieldMapper.DateFieldType fieldType,
Iterator<long[]> rangeIter
) {
this.incrementDocCount = incrementDocCount;
this.fieldType = fieldType;
this.rangeIter = rangeIter;
}

long min = NumericUtils.sortableBytesToLong(pointTree.getMinPackedValue(), 0);
long max = NumericUtils.sortableBytesToLong(pointTree.getMaxPackedValue(), 0);
// maxPackedValue seems to be the max value + 1
// System.out.println("===============");
// System.out.println("current node range min=" + min + " max=" + max);
private void count() {
counter++;
}

for (long[] range : ranges) {
visitor.setCompareRange(range);
PointValues.Relation r = visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue());
// System.out.println("relation=" + r);

switch (r) {
case CELL_INSIDE_QUERY:
inside.add(range);
break;
case CELL_CROSSES_QUERY:
cross.add(range);
break;
case CELL_OUTSIDE_QUERY:
outside.add(range);
private void finalizePreviousRange() {
if (counter > 0) {
incrementDocCount.accept(fieldType.convertNanosToMillis(activeRange[0]), counter);
counter = 0;
}
}

// based on the relation of each range, we decide
// 1. what to do for this range at this node
// 1.1. inside: record all values
// 1.2. outside: nothing
// 1.3. cross: if inner node, continue traverse
// if leaf node, visit this node and record matches
// 2. how to traverse next
// 2.1. inside and outside: no need to pass to child
// 2.2. cross: need to pass to child
// so if any range has cross relation, we need to move to child and also sibling
if (!inside.isEmpty()) {
visitor.setRanges(inside);
// System.out.println("Collect full node for ranges: " +
// inside.stream().map(Arrays::toString).collect(Collectors.joining(", ")));
pointTree.visitDocIDs(visitor);
}

if (!cross.isEmpty()) {
if (pointTree.moveToChild()) {
// System.out.println("move left child");
intersectWithRanges(visitor, pointTree, cross.toArray(new long[cross.size()][]));

pointTree.moveToSibling();
// System.out.println("move right child");
// although the position on the tree is at child, the ranges here still comes from parent
intersectWithRanges(visitor, pointTree, cross.toArray(new long[cross.size()][]));

pointTree.moveToParent();
// System.out.println("move to parent");
} else {
visitor.setRanges(cross);
// System.out.println("Collect leaf node for ranges: " +
// cross.stream().map(Arrays::toString).collect(Collectors.joining(", ")));
pointTree.visitDocValues(visitor);
private void setActiveRange(long[] activeRange) {
this.activeRange = activeRange;
}

/**
* @return false when iterator exhausted
*/
private boolean goToNextRange(long value) {
// the new value we see may not be continuous with previous visited value
// 2 2 2 100
// so it may bypass some ranges
while (activeRange[1] < value) {
if (!rangeIter.hasNext()) {
return false;
}
activeRange = rangeIter.next();
}
return true;
}
}

private static VisitorWithRanges getIntersectVisitor(final BiConsumer<Long, Integer> incrementDocCount, final DateFieldMapper.DateFieldType fieldType) {
private static void intersectWithRanges(VisitorWithRanges visitor, PointValues.PointTree pointTree) throws IOException {
// long min = NumericUtils.sortableBytesToLong(pointTree.getMinPackedValue(), 0);
// long max = NumericUtils.sortableBytesToLong(pointTree.getMaxPackedValue(), 0);
// maxPackedValue seems to be the max value + 1
// System.out.println("===============");
// System.out.println("current node range min=" + min + " max=" + max);

// when the range iterator is empty, we can stop the visit
// compare may move to next range
PointValues.Relation r = visitor.compare(pointTree.getMinPackedValue(), pointTree.getMaxPackedValue());
// System.out.println("relation=" + r);

switch (r) {
case CELL_INSIDE_QUERY:
// visit would count
pointTree.visitDocIDs(visitor);
break;
case CELL_CROSSES_QUERY:
if (pointTree.moveToChild()) {
intersectWithRanges(visitor, pointTree);
pointTree.moveToSibling();
intersectWithRanges(visitor, pointTree);
pointTree.moveToParent();
} else {
// visitDocValues may move to next range
pointTree.visitDocValues(visitor);
}
break;
case CELL_OUTSIDE_QUERY:
}
}

private static VisitorWithRanges getIntersectVisitor(TreeCollector collector) {
return new VisitorWithRanges() {

long[] compareRange;
Set<long[]> ranges = new HashSet<>();
public void setCompareRange(long[] compareRange) {
this.compareRange = compareRange;
}
public void setRanges(Set<long[]> ranges) {
this.ranges.clear();
this.ranges = ranges;
}
/**
* The doc visited is ever-increasing in terms of its value
* The node range visited is every-increasing at any level
* possible next node is sibling or parent sibling, this is the proof of ever-increasing
* <p>
* the first node is either inside inner or leaf node, or cross leaf
* inside node won't change activeRange
* cross leaf probably will
* after the first node, the next node could change activeRange
* Compare min max of next node with next range, when its outside activeRange
* ranges are always connected, but the values may not, so should iterate ranges until range[0] >= min
* <p>
* if node cross activeRange, we need to visit children recursively, we will always be able to stop at leaf or when found the inner
* <p>
* Before starting we need to choose activeRange that range[1] >= root.min
* Before went to the first node, can we meet node that outside activeRange but match following range?
* not possible
*/

@Override
public void visit(int docID) throws IOException {
// System.out.println("visit docID=" + docID);
for (long[] range : ranges) {
incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), 1);
}
// called when cell is inside query
collector.count();
}

@Override
public void visit(DocIdSetIterator iterator) throws IOException {
int count = 0;
for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
count++;
}
for (long[] range : ranges) {
incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
}
logger.debug("visit iterator");
// int count = 0;
// for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
// count++;
// }
// for (long[] range : ranges) {
// incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
// }
}

@Override
public void visit(int docID, byte[] packedValue) throws IOException {
long value = NumericUtils.sortableBytesToLong(packedValue, 0);
// System.out.println("value" + value + " count=" + 1);
for (long[] range : ranges) {
if (value >= range[0] && value <= range[1]) {
incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), 1);
Instant instant = Instant.ofEpochMilli(value);
if (value > collector.activeRange[1]) {
// need to move to next range
collector.finalizePreviousRange();

if (!collector.goToNextRange(value)) {
return;
}
}
if (value >= collector.activeRange[0] && value <= collector.activeRange[1]) {
collector.count();
}
}

@Override
public void visit(DocIdSetIterator iterator, byte[] packedValue) throws IOException {
int count = 0;
for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
count++;
}
long value = NumericUtils.sortableBytesToLong(packedValue, 0);
// System.out.println("value" + value + " count=" + count);
for (long[] range : ranges) {
if (value >= range[0] && value <= range[1]) {
incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
}
}
logger.debug("visit iterator with packedValue");
// int count = 0;
// for (int doc = iterator.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = iterator.nextDoc()) {
// count++;
// }
// long value = NumericUtils.sortableBytesToLong(packedValue, 0);
// // System.out.println("value" + value + " count=" + count);
// for (long[] range : ranges) {
// if (value >= range[0] && value <= range[1]) {
// incrementDocCount.accept(fieldType.convertNanosToMillis(range[0]), count);
// }
// }
}

@Override
public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue) {
long min = NumericUtils.sortableBytesToLong(minPackedValue, 0);
long max = NumericUtils.sortableBytesToLong(maxPackedValue, 0);
long queryMin = compareRange[0];
long queryMax = compareRange[1];
long queryMin = collector.activeRange[0];
long queryMax = collector.activeRange[1];

boolean crosses = false;
if (queryMin > max || queryMax < min) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
if (min > collector.activeRange[1]) {
// need to move to next range

// finalize the results for the previous range
collector.finalizePreviousRange();

// go to next range
if (!collector.goToNextRange(min)) {
return PointValues.Relation.CELL_OUTSIDE_QUERY;
}

// compare this range with this node's min max again
// since ranges are connected to previous range
// it cannot be outside again, can only be cross or inside
if (collector.activeRange[1] < max) {
crosses = true;
}
} else if (queryMin > min || queryMax < max) {
crosses = true;
}
Expand All @@ -770,8 +842,6 @@ public PointValues.Relation compare(byte[] minPackedValue, byte[] maxPackedValue
}

interface VisitorWithRanges extends PointValues.IntersectVisitor {
void setCompareRange(long[] compareRange);
void setRanges(Set<long[]> ranges);
}

private static boolean segmentMatchAll(SearchContext ctx, LeafReaderContext leafCtx) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,54 @@ public void testFilterRewriteOptimizationWithRangeQuery() throws IOException {
);
}

public void testMultiRangeDebug() throws IOException {
testSearchCase(
new MatchAllDocsQuery(),
Arrays.asList(
"2017-02-01T09:02:00.000Z",
"2017-02-01T09:35:00.000Z",
"2017-02-01T10:15:00.000Z",
"2017-02-01T13:06:00.000Z",
"2017-02-01T14:04:00.000Z",
"2017-02-01T14:05:00.000Z",
"2017-02-01T15:59:00.000Z",
"2017-02-01T16:06:00.000Z",
"2017-02-01T16:48:00.000Z",
"2017-02-01T16:59:00.000Z"
),
aggregation -> aggregation.fixedInterval(new DateHistogramInterval("60m")).field(AGGREGABLE_DATE).minDocCount(1L),
histogram -> {
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
assertEquals(6, buckets.size());

Histogram.Bucket bucket = buckets.get(0);
assertEquals("2017-02-01T09:00:00.000Z", bucket.getKeyAsString());
assertEquals(2, bucket.getDocCount());

bucket = buckets.get(1);
assertEquals("2017-02-01T10:00:00.000Z", bucket.getKeyAsString());
assertEquals(1, bucket.getDocCount());

bucket = buckets.get(2);
assertEquals("2017-02-01T13:00:00.000Z", bucket.getKeyAsString());
assertEquals(1, bucket.getDocCount());

bucket = buckets.get(3);
assertEquals("2017-02-01T14:00:00.000Z", bucket.getKeyAsString());
assertEquals(2, bucket.getDocCount());

bucket = buckets.get(4);
assertEquals("2017-02-01T15:00:00.000Z", bucket.getKeyAsString());
assertEquals(1, bucket.getDocCount());

bucket = buckets.get(5);
assertEquals("2017-02-01T16:00:00.000Z", bucket.getKeyAsString());
assertEquals(3, bucket.getDocCount());
},
false
);
}

public void testDocCountField() throws IOException {
testSearchCase(
new MatchAllDocsQuery(),
Expand Down

0 comments on commit eaa10d3

Please sign in to comment.