Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] [Star Tree] [Search] Resolving Date histogram with metric aggregation using star-tree #16674

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

sandeshkr419
Copy link
Contributor

@sandeshkr419 sandeshkr419 commented Nov 18, 2024

Description

This is the draft set of changes which I am utilizing to discuss solution approach here.

The primary challenge to resolve a date histogram with metric aggregation was to figure out how sub-aggregators will get resolve. When resolving a query by star-tree, we lose lthe need of ucene documents and don't utilize collect() calls which are used internally to delegate the collection of sub-aggregations to sub-collectors.

To mitigate this challenge, I have introduced a wrapper class - StarTreeBucketCollector to basically introduce a collectStarEntry(int starTreeEntry, long bucket) method. This method is then overridden in metric aggregator methods and invoked from parent aggregators (here DateHistogramAggregator).

The benefit of this strategy is that this is easily extensible by other bucket aggregators where metric aggregations will be nested. Also, other bucket related utilities are re-useable as it is, it saves the effort of having a separate set of utilities for star tree buckets as the old buckets are utilized here.

Want to take early feedback on this approach.

Note: Things are hard-coded for one example query shape right now

{
    "size": 0,
    "aggs": {
        "by_hour": {
            "date_histogram": {
                "field": "@timestamp",
                "calendar_interval": "month"
            }, "aggs": {
                "sum_status": {
                    "sum": {
                        "field": "status"
                    }
                }
            }
        }
    }
}

{
    "took": 15785,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1000,
            "relation": "eq"
        },
        "max_score": null,
        "hits": []
    },
    "aggregations": {
        "by_hour": {
            "buckets": [
                {
                    "key_as_string": "2024-01-01T00:00:00.000Z",
                    "key": 1704067200000,
                    "doc_count": 208,
                    "sum_status": {
                        "value": 43527.0
                    }
                },
                {
                    "key_as_string": "2024-02-01T00:00:00.000Z",
                    "key": 1706745600000,
                    "doc_count": 783,
                    "sum_status": {
                        "value": 164741.0
                    }
                },
                {
                    "key_as_string": "2024-03-01T00:00:00.000Z",
                    "key": 1709251200000,
                    "doc_count": 9,
                    "sum_status": {
                        "value": 1894.0
                    }
                }
            ]
        }
    }
}

Related Issues

Resolves (#16552)

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request Search:Aggregations labels Nov 18, 2024
Copy link
Contributor

❌ Gradle check result for d7fbc39: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@@ -136,6 +140,35 @@ public void collect(int doc, long bucket) throws IOException {
public LeafBucketCollector getStarTreeCollector(LeafReaderContext ctx, LeafBucketCollector sub, CompositeIndexFieldInfo starTree)
throws IOException {
final CompensatedSum kahanSummation = new CompensatedSum(sums.get(0), 0);
if (parent != null && subAggregators.length == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why this can't be the default collector regardless of parent collectors ?

when there is only metric aggregation, there will be a single bucket otherwise multiple buckets.


@Override
public void collect(int doc, long bucket) throws IOException {
sub.collect(doc, bucket);
Copy link
Contributor

@bharath-techie bharath-techie Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw UnSupportedException here - star tree collectors can strictly enforce this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.


@Override
public void setScorer(Scorable s) throws IOException {
sub.setScorer(s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even here we should be able to throw UnSupportedException since there is no scorer for star tree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now I am not using StarTreeLeafBucketCollectorBase at all. I am thinking of getting rid of it if if does not serves much utility.
Earlier I thought I might need to delegate operations to original collector if required.

long bucketOrd = bucketOrds.add(0, dimensionValue);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectStarTreeBucket((StarTreeBucketCollector) sub, metricValue, bucketOrd, bit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use collectExistingStarBucket / collectStarBucket paradigm here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can refactor to it if required. Both the utilities differ by grow(bucketOrd + 1); which I had used manually for now during POC but will abstract it in a better way.

@@ -164,7 +172,8 @@ private static StarTreeResult traverseStarTree(StarTreeValues starTreeValues, Ma

String childDimension = dimensionNames.get(dimensionId + 1);
StarTreeNode starNode = null;
if (globalRemainingPredicateColumns == null || !globalRemainingPredicateColumns.contains(childDimension)) {
if (globalRemainingPredicateColumns == null
Copy link
Contributor

@bharath-techie bharath-techie Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if( (globalRemainingPredicateColumns == null
                || !globalRemainingPredicateColumns.contains(childDimension)) && !remainingGroupByColumns.contains(childDimension) )

Predicate conditions should be grouped together

CompositeDataCubeFieldType compositeIndexFieldInfo,
AggregatorFactory aggregatorFactory
) {
if (aggregatorFactory instanceof DateHistogramAggregatorFactory && aggregatorFactory.getSubFactories().getFactories().length == 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably have a recursive method which checks if parent aggregator is star tree supported and if children aggregators are supported and so on.

And we can make the supported aggregators configurable.


public abstract class StarTreeBucketCollector extends LeafBucketCollector {

public abstract void collectStarEntry(int starTreeEntry, long bucket) throws IOException;
Copy link
Contributor

@sarthakaggarwal97 sarthakaggarwal97 Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be good to have comment around what's a star tree entry, unless already specified somewhere

}

@Override
public void collectStarEntry(int starTreeEntry, long bucket) throws IOException {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean parent aggregator will always have the entire context to traverse over the star tree ?

What if there is a filter sub aggregation which needs to further reduce the number of applicable dimension values ?

Copy link
Collaborator

@msfroh msfroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I don't like the deepening association with the Collector interface, given that we're not collecting documents from a Lucene query.

Instead, could we try to work at the Aggregator level more?

I think the place where you should hook more of the star tree stuff in would be the getLeafCollector(LeafReaderContext) method in AggregatorBase:

public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx) throws IOException {

I think that's where you could (per-segment) make your choice about whether you are going to delegate to the aggregation logic to return a real LeafBucketCollector, or you're going to throw a CollectionTerminatedException. You can even pass the subAggregators to that logic so the parent has easy access to its children.

Essentially, in the star tree case, you should never need to return a collector. You can just read the values directly from the segment.

Comment on lines 133 to 139
public final void collectStarTreeBucket(StarTreeBucketCollector subCollector, long docCount, long bucketOrd, int entryBit)
throws IOException {
if (docCounts.increment(bucketOrd, docCount) == docCount) {
multiBucketConsumer.accept(0);
}
subCollector.collectStarEntry(entryBit, bucketOrd);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking through this and it mostly makes sense.

I guess my one objection is the use of the word "collect", since it's not really related to collectors (or rather, it doesn't need to be).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, see my comment above -- I think you could just intercept the whole thing earlier and not have to do a custom collector.

@bharath-techie
Copy link
Contributor

Essentially, in the star tree case, you should never need to return a collector. You can just read the values directly from the segment.

Hi @msfroh @sandeshkr419 ,
Do we need a different interface / method specific for star tree which will be returned by the aggregators in that case ?

Currently, lets say we have nested aggregation as follows :

{
     "date_histogram_aggs" : {
                "terms_aggs" : {
                          "sum_aggs"
                }
      }
}

Totally makes sense to throw CollectionTerminationException from date_histogram_aggs [ if we don't throw then I noticed that if size > 0, we get into multiCollector. collectDoc() and similar other issues where we expect a traditional doc collector ]

It works well if there are no nested collectors.

But am just wondering how to handle nested aggs , for the above example we need collectDoc equivalent in terms_aggs and sum_aggs for star tree entries.

Do we need to return a new interface/class specific for star tree in that case for aggregation ?

For Star tree aggregations :
REPLACE

    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException 

WITH 

    public StarTreeAggregator getStarAggregator(LeafReaderContext ctx, StarTreeAggregator sub) throws IOException 

// And all aggregators which supports star tree will have something like 

return StarTreeAggregator {
        @override
        void aggregateStarTreeEntry(int starTreeEntryId, int bucket) {
                .....actual logic.....         
        }
}


Because we can't throw CollectionTerminationException in subAggregators as we need the collection to happen in the aggregation hierarchy order.

And if we don't have new interface/class, then still subCollector is of type LeafBucketCollector where we had to implement some sort of collect specific to star-tree.

Please correct me if my understanding is wrong.

Sandesh Kumar and others added 2 commits December 3, 2024 13:52
Signed-off-by: Sandesh Kumar <[email protected]>
Signed-off-by: Sandesh Kumar <[email protected]>
@sandeshkr419
Copy link
Contributor Author

@msfroh @bharath-techie Refactored the changes to not extend LeafBucketCollectors. However, I have introduced a new interface StarTreeCollector to expose a preCompute() method.

Now, introducing this new method can compliment with Froh's idea on unifying pre-computations, I'll leave comments on that PR itself to make it more friendly while pre-computing sub-aggregations.

Copy link
Contributor

github-actions bot commented Dec 3, 2024

❌ Gradle check result for e1898f1: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@bharath-techie
Copy link
Contributor

@sandeshkr419
My only concern is that without the starTreeEntry, bucket pair , how nested aggs will get handled.

Can we please validate dateHisto -> NumericTerms -> Sum chain just to validate if the proposal holds good.


import java.io.IOException;

public interface StarTreeCollector {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call this StarTreeAggregator, since it's expected that Aggregator subclasses would implement this interface?

Comment on lines +332 to +336
// Resolve via star-tree
// if (queryStarTree(ctx, leafCollector)) {
// return;
// }

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, in practice, you won't need any more changes in ContextIndexSearcher in this PR, right?

multiBucketConsumer.accept(0);
}
// Only collect own bucket & not sub-aggregator buckets
// subCollector.collectStarEntry(entryBit, bucketOrd);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line won't exist, right?

Could we also do this by calling collectExistingBucket with LeafBucketCollector.NO_OP_COLLECTOR?

@sandeshkr419
Copy link
Contributor Author

@msfroh @bharath-techie

Comparing the 2 solutions proposed in 2 commits:

  1. (1st commit) Having an extension to LeafCollector (lets call it StarTreeCollector for now) with a side-kick method collectStarTreeEntry - so basically we filter out the star-tree entries in a bitset. and each aggregator initializes a StarTreeCollector and the collector/sub-collector just keeps iterating on dimensions/metrics to collect the buckets of that aggregator/sub-aggregator.
  2. (Pre-compute commit) Having a pre-compute method outside LeafCollector code which iterates over the dimension/metrics of one iterator and then moves to sub-aggregator. The challenge here is that the sub-aggregator does not knows which parent bucket the sub-bucket will have to reside in unless you iterate again with dimension values which are already iterated over in parent aggregator. For the cases like date-histogram with metric aggregator, it is not a problem - you collect doc-count for parent aggregator (date buckets), and then go to sum/metric aggregator, you need to iterate over date buckets again to know which buckets to update sum. The challenge comes when the nesting deepens.
    For example,
{
    "size": 0,
    "aggs": {
        "unique_terms": {
            "date_histogram": {
                "field": "@timestamp",
                "calendar_interval": "month"
            },
            "aggs": {
                "terms_field": {
                    "terms": {
                        "field": "status",
                        "size": 10
                    },
                    "aggs": {
                        "sum_s": {
                            "sum": {
                                "field": "status"
                            }
                        }
                    }
                }
            }
        }
    }
}

Response (redacted):

"aggregations": {
        "unique_terms": {
            "buckets": [
                {
                    "key_as_string": "2024-01-01T00:00:00.000Z",
                    "key": 1704067200000,
                    "doc_count": 208,
                    "terms_field": {
                        "doc_count_error_upper_bound": 0,
                        "sum_other_doc_count": 82,
                        "buckets": [
                            {
                                "key": 209,
                                "doc_count": 16,
                                "sum_s": {
                                    "value": 3344.0
                                }
                            },
                            {
                                "key": 202,
                                "doc_count": 14,
                                "sum_s": {
                                    "value": 2828.0
                                }
                            },
                            {
                                "key": 216,
                                "doc_count": 14,
                                "sum_s": {
                                    "value": 3024.0
                                }
                            }

Now to update the sum bucket, we'd have to iterate over date dimension & field dimension again along with sum metric. This complicates the extensibility part of solution 2 as each nesting increment would require us maintaining multiple star-tree entry iterators. However, the initial solution 1 helps avoid keeping track of multiple iterators with each nesting. The only catch is extending LeafCollector and calling it a StarTreePreComputeCollector or something similar.

Basically, a collector like interface which has hold of sub-collectors (and we recursively call up collectStarTreeEntry - which will be an analogous method to collect/collectBucjket what we have for Lucene documents) might simplify the solution what @bharath-techie also proposed earlier.

Let me know your thoughts on this.

@msfroh
Copy link
Collaborator

msfroh commented Dec 18, 2024

Now to update the sum bucket, we'd have to iterate over date dimension & field dimension again along with sum metric. This complicates the extensibility part of solution 2 as each nesting increment would require us maintaining multiple star-tree entry iterators. However, the initial solution 1 helps avoid keeping track of multiple iterators with each nesting. The only catch is extending LeafCollector and calling it a StarTreePreComputeCollector or something similar.

I don't understand what benefit you get from extending LeafCollector to do this. LeafCollector is a simple interface with (effectively) one method (collect(int)). What's stopping you from creating your own class that manages the star tree iterators?

@bharath-techie
Copy link
Contributor

bharath-techie commented Dec 19, 2024

Hi @sandeshkr419 ,
The separate interface which we have must have an implementation similar to LeafCollector.
We don't have to extend LeafCollector.

public interface StarTreeAggregator {
    // public void collectStarEntry(int starTreeEntryBit, long bucket) throws IOException;

    public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, LongKeyedBucketOrds bucketOrds) throws IOException;
}

to

public interface StarTreeAggregator {

    public void preCompute(LeafReaderContext ctx, CompositeIndexFieldInfo starTree, StarTreeAggregator sub) throws IOException;

// collect equivalent method
  public void aggregate(int doc, long owningBucketOrd) throws IOException;
}

Will this work ?

Basically the parent must check if all subAggregators are of type StarTreeAggregator and then if that's the case call aggregate method similar to the current leafCollectors.

So all aggregators will have preCompute and aggregate implementations. We will call preCompute for the first level of parent aggregators which calls aggregate on all the subAggregators.

[ We can have better interfaces as well but the idea is to enable this within our interfaces and make it work similar to existing leafCollector interface ]

Edit : [ 25/12 ]
After further discussing with Sandesh, I think the bucket aggregators can implement and return StarTreeAggregator similar to LeafBucketCollector. Instead of scorer calling 'collector.collect' - here we can probably check if a query shape can be solved by startree , then precompute method can probably call the aggregate for each starTreeEntry.

public class DateHistogramAggregator implements StarTreeAggregator
...
public StarTreeAggregator getStarTreeAggregator(LeafReaderContext ctx, StarTreeAggregator sub) {
return new StarTreeAggregator(LeafReaderContext ctx, StarTreeAggregator sub) {
@override
  public void aggregate(int starTreeEntry, long owningBucketOrd) throws IOException {
    ...
    sub.aggregate(starTreeEntry, owningBucketOrd);
   ... 
}
}

public interface StarTreeAggregator {
   void aggregate(int starTreeEntry, long owningBucketOrd);
}

BucketsAggregator {
    public StarTreeAggregator getStarTreeAggregator(LeafReaderContext ctx, StarTreeAggregator sub) {
       // default no impl
   }

   precompute() {
        if( can be solved in star tree ) {
            StarTreeAggregator st = getStarTreeAggregator(ctx, sub);
            for(int entry : startreeentries) {
                 st.aggregate(entry, 0);
           }
       }
   }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Search:Aggregations
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants