Skip to content

Commit

Permalink
cleaning up
Browse files Browse the repository at this point in the history
Signed-off-by: bowenlan-amzn <[email protected]>
  • Loading branch information
bowenlan-amzn committed Dec 7, 2023
1 parent d8f337b commit e7d8af9
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ private static Weight[] createFilterForAggregations(
final long interval = intervalOpt.getAsLong();
// afterKey is the last bucket key in previous response, while the bucket key
// is the start of the bucket values, so add the interval
if (afterKey != 0) {
if (afterKey != -1) {
low = afterKey + interval;
}
// Calculate the number of buckets using range and interval
Expand Down Expand Up @@ -248,7 +248,7 @@ public static class ValueSourceContext {
* @param missing whether missing value/bucket is set
* @param hasScript whether script is used
* @param fieldType null if the field doesn't exist
* @param afterKey for composite aggregation, the key of the last bucket in the previous response
* @param afterKey used to paginate for composite aggregation, pass in -1 if not used
*/
public ValueSourceContext(boolean missing, boolean hasScript, MappedFieldType fieldType, long afterKey) {
this.missing = missing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ final class CompositeAggregator extends BucketsAggregator {
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;

// Try fast filter optimization
// Try fast filter optimization when the only source is date histogram
if (sourceConfigs.length == 1 && sourceConfigs[0].valuesSource() instanceof RoundingValuesSource) {
RoundingValuesSource dateHistogramSource = (RoundingValuesSource) sourceConfigs[0].valuesSource();
bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), CardinalityUpperBound.ONE);
Expand Down Expand Up @@ -258,7 +258,7 @@ public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws I
);
}

// Fast filters optimization
// Build results from fast filters optimization
if (bucketOrds != null) {
Map<CompositeKey, InternalComposite.InternalBucket> bucketMap = new HashMap<>();
for (InternalComposite.InternalBucket internalBucket : buckets) {
Expand Down Expand Up @@ -386,9 +386,7 @@ private Sort buildIndexSortPrefix(LeafReaderContext context) throws IOException
}
break;
}

sortFields.add(indexSortField);

if (sourceConfig.valuesSource() instanceof RoundingValuesSource) {
// the rounding "squashes" many values together, that breaks the ordering of sub-values,
// so we ignore the subsequent sources even if they match the index sort.
Expand Down Expand Up @@ -516,15 +514,13 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t
.build();
Weight weight = context.searcher().createWeight(context.searcher().rewrite(newQuery), ScoreMode.COMPLETE_NO_SCORES, 1f);
Scorer scorer = weight.scorer(ctx);

if (scorer != null) {
DocIdSetIterator docIt = scorer.iterator();
final LeafBucketCollector inner = queue.getLeafCollector(
ctx,
getFirstPassCollector(docIdSetBuilder, indexSortPrefix.getSort().length)
);
inner.setScorer(scorer);

final Bits liveDocs = ctx.reader().getLiveDocs();
while (docIt.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
if (liveDocs == null || liveDocs.get(docIt.docID())) {
Expand All @@ -548,7 +544,6 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
Sort indexSortPrefix = buildIndexSortPrefix(ctx);
int sortPrefixLen = computeSortPrefixLen(indexSortPrefix);

// are there index sort enabled? sortPrefixLen
SortedDocsProducer sortedDocsProducer = sortPrefixLen == 0
? sources[0].createSortedDocsProducerOrNull(ctx.reader(), context.query())
: null;
Expand Down Expand Up @@ -602,6 +597,8 @@ public void collect(int doc, long bucket) throws IOException {
try {
long docCount = docCountProvider.getDocCount(doc);
if (queue.addIfCompetitive(indexSortPrefix, docCount)) {
// one doc may contain multiple values, we iterate over and collect one by one
// so the same doc can appear multiple times here
if (builder != null && lastDoc != doc) {
builder.add(doc);
lastDoc = doc;
Expand All @@ -626,18 +623,14 @@ private void runDeferredCollections() throws IOException {
Query query = context.query();
weight = context.searcher().createWeight(context.searcher().rewrite(query), ScoreMode.COMPLETE, 1f);
}

deferredCollectors.preCollection();

for (Entry entry : entries) {
DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator();
if (docIdSetIterator == null) {
continue;
}

final LeafBucketCollector subCollector = deferredCollectors.getLeafCollector(entry.context);
final LeafBucketCollector collector = queue.getLeafCollector(entry.context, getSecondPassCollector(subCollector));

DocIdSetIterator scorerIt = null;
if (needsScores) {
Scorer scorer = weight.scorer(entry.context);
Expand All @@ -646,7 +639,6 @@ private void runDeferredCollections() throws IOException {
subCollector.setScorer(scorer);
}
}

int docID;
while ((docID = docIdSetIterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
if (needsScores) {
Expand All @@ -658,7 +650,6 @@ private void runDeferredCollections() throws IOException {
collector.collect(docID);
}
}

deferredCollectors.postCollection();
}

Expand All @@ -670,7 +661,7 @@ private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollec
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0;
Integer slot = queue.compareCurrent();
Integer slot = queue.getCurrentSlot();
if (slot != null) {
// The candidate key is a top bucket.
// We can defer the collection of this document/bucket to the sub collector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@

/**
* A specialized {@link PriorityQueue} implementation for composite buckets.
* Can think of this as a max heap that holds the top small buckets slots in order.
* Each slot holds the values of the composite bucket key it represents.
*
* @opensearch.internal
*/
Expand Down Expand Up @@ -77,7 +79,7 @@ public int hashCode() {

private final BigArrays bigArrays;
private final int maxSize;
private final Map<Slot, Integer> map;
private final Map<Slot, Integer> map; // to quickly find the slot for a value
private final SingleDimensionValuesSource<?>[] arrays;

private LongArray docCounts;
Expand Down Expand Up @@ -119,10 +121,10 @@ boolean isFull() {
}

/**
* Compares the current candidate with the values in the queue and returns
* Try to get the slot of the current/candidate values in the queue and returns
* the slot if the candidate is already in the queue or null if the candidate is not present.
*/
Integer compareCurrent() {
Integer getCurrentSlot() {
return map.get(new Slot(CANDIDATE_SLOT));
}

Expand Down Expand Up @@ -165,13 +167,11 @@ int compare(int slot1, int slot2) {
assert slot2 != CANDIDATE_SLOT;
for (int i = 0; i < arrays.length; i++) {
final int cmp;

if (slot1 == CANDIDATE_SLOT) {
cmp = arrays[i].compareCurrent(slot2);
} else {
cmp = arrays[i].compare(slot1, slot2);
}

if (cmp != 0) {
return cmp > 0 ? i + 1 : -(i + 1);
}
Expand Down Expand Up @@ -255,13 +255,11 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue, LeafReader
while (last > 0) {
collector = arrays[last--].getLeafCollector(context, collector);
}

if (forceLeadSourceValue != null) {
collector = arrays[last].getLeafCollector(forceLeadSourceValue, context, collector);
} else {
collector = arrays[last].getLeafCollector(context, collector);
}

return collector;
}

Expand All @@ -283,9 +281,9 @@ boolean addIfCompetitive(long inc) {
*
* @throws CollectionTerminatedException if the current collection can be terminated early due to index sorting.
*/
boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading indexSortSourcePrefix can only be -1
boolean addIfCompetitive(int indexSortSourcePrefix, long inc) {
// checks if the candidate key is competitive
Integer curSlot = compareCurrent();
Integer curSlot = getCurrentSlot();
if (curSlot != null) {
// this key is already in the top N, skip it
docCounts.increment(curSlot, inc);
Expand All @@ -296,25 +294,23 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading
int cmp = compareCurrentWithAfter();
if (cmp <= 0) {
if (indexSortSourcePrefix < 0 && cmp == indexSortSourcePrefix) {
// the leading index sort is in the reverse order of the leading source
// the leading index sort is and the leading source order are both reversed,
// so we can early terminate when we reach a document that is smaller
// than the after key (collected on a previous page).
throw new CollectionTerminatedException();
}
// key was collected on a previous page, skip it (>= afterKey).
// the key was collected on a previous page, skip it.
return false;
}
}

if (size() >= maxSize) { // TODO reading when queue is full, can check competitiveness
// the tree map is full, check if the candidate key should be kept // TODO reading queue contain topN largest composite
// key/bucket/slot
// the heap is full, check if the candidate key larger than max heap top
if (size() >= maxSize) {
int cmp = compare(CANDIDATE_SLOT, top());
if (cmp > 0) { // TODO reading current large than queue
if (cmp <= indexSortSourcePrefix) { // TODO reading the way of comparing current and queue uses sorted fields
// index sort guarantees that there is no key greater or equal than the
// current one in the subsequent documents so we can early terminate. // TODO reading how to get the topN smallest items
// using heap?
if (cmp > 0) {
if (cmp <= indexSortSourcePrefix) {
// index sort guarantees the following documents will have a key larger than the current candidate,
// so we can early terminate.
throw new CollectionTerminatedException();
}
// the candidate key is not competitive, skip it.
Expand All @@ -330,9 +326,9 @@ boolean addIfCompetitive(int indexSortSourcePrefix, long inc) { // TODO reading
// and we recycle the deleted slot
newSlot = slot;
} else {
newSlot = size(); // TODO reading seems we don't care the number of slot here?
newSlot = size();
}
// move the candidate key to its new slot
// move the candidate key to its new slot by copy its values to the new slot
copyCurrent(newSlot, inc);
map.put(new Slot(newSlot), newSlot);
add(newSlot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue, LeafReade
}
lowerBucket = (Long) lowerValue;
}

long upperBucket = Long.MAX_VALUE;
Comparable upperValue = queue.getUpperValueLeadSource();
if (upperValue != null) {
Expand Down Expand Up @@ -148,7 +147,8 @@ public void visit(int docID, byte[] packedValue) throws IOException {
}

long bucket = bucketFunction.applyAsLong(packedValue);
if (first == false && bucket != lastBucket) { // process previous bucket when new bucket appears
// process previous bucket when new bucket appears
if (first == false && bucket != lastBucket) {
final DocIdSet docIdSet = bucketDocsBuilder.build();
if (processBucket(queue, context, docIdSet.iterator(), lastBucket, builder) &&
// lower bucket is inclusive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ protected boolean processBucket(
) throws IOException {
final int[] topCompositeCollected = new int[1];
final boolean[] hasCollected = new boolean[1];

final DocCountProvider docCountProvider = new DocCountProvider();
docCountProvider.setLeafReaderContext(context);

final LeafBucketCollector queueCollector = new LeafBucketCollector() {
int lastDoc = -1;

Expand All @@ -94,7 +92,7 @@ public void collect(int doc, long bucket) throws IOException {
long docCount = docCountProvider.getDocCount(doc);
if (queue.addIfCompetitive(docCount)) {
topCompositeCollected[0]++;
if (adder != null && doc != lastDoc) { // TODO reading why doc can be == lastDoc?
if (adder != null && doc != lastDoc) {
if (remainingBits == 0) {
// the cost approximation was lower than the real size, we need to grow the adder
// by some numbers (128) to ensure that we can add the extra documents
Expand All @@ -108,15 +106,13 @@ public void collect(int doc, long bucket) throws IOException {
}
}
};

final Bits liveDocs = context.reader().getLiveDocs();
final LeafBucketCollector collector = queue.getLeafCollector(leadSourceBucket, context, queueCollector);
while (iterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
if (liveDocs == null || liveDocs.get(iterator.docID())) {
collector.collect(iterator.docID());
}
}

if (queue.isFull() && hasCollected[0] && topCompositeCollected[0] == 0) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private AutoDateHistogramAggregator(
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null,
valuesSourceConfig.fieldType(),
0
-1
);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
valuesSourceConfig.missing() != null,
valuesSourceConfig.script() != null,
valuesSourceConfig.fieldType(),
0
-1
);
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2241,31 +2241,29 @@ private <T extends Comparable<T>, V extends Comparable<T>> void testRandomTerms(
Function<Object, V> transformKey
) throws IOException {
int numTerms = randomIntBetween(10, 500);
List<T> terms = new ArrayList<>();
List<T> terms = new ArrayList<>(); // possible values for the terms
for (int i = 0; i < numTerms; i++) {
terms.add(randomSupplier.get());
}
int numDocs = randomIntBetween(100, 200);
List<Map<String, List<Object>>> dataset = new ArrayList<>();

Set<T> valuesSet = new HashSet<>();
Map<Comparable<?>, AtomicLong> expectedDocCounts = new HashMap<>();
Set<T> valuesSet = new HashSet<>(); // how many different values
Map<Comparable<?>, AtomicLong> expectedDocCounts = new HashMap<>(); // how many docs for each value
for (int i = 0; i < numDocs; i++) {
int numValues = randomIntBetween(1, 5);
Set<Object> values = new HashSet<>();
for (int j = 0; j < numValues; j++) {
int rand = randomIntBetween(0, terms.size() - 1);
if (values.add(terms.get(rand))) {
if (values.add(terms.get(rand))) { // values are unique for one doc
AtomicLong count = expectedDocCounts.computeIfAbsent(terms.get(rand), (k) -> new AtomicLong(0));
count.incrementAndGet();
valuesSet.add(terms.get(rand));
}
}
dataset.add(Collections.singletonMap(field, new ArrayList<>(values)));
}
List<T> expected = new ArrayList<>(valuesSet);
List<T> expected = new ArrayList<>(valuesSet); // how many buckets expected
Collections.sort(expected);

List<Comparable<T>> seen = new ArrayList<>();
AtomicBoolean finish = new AtomicBoolean(false);
int size = randomIntBetween(1, expected.size());
Expand Down

0 comments on commit e7d8af9

Please sign in to comment.