Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Eventstats in PPL #800

Merged
merged 5 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,34 @@ source = table | where ispresent(a) |
- `source = table | stats avg(age) as avg_state_age by country, state | stats avg(avg_state_age) as avg_country_age by country`
- `source = table | stats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | stats avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | stats avg(avg_state_age) as avg_adult_country_age by country`

#### **Event Aggregations**
[See additional command details](ppl-eventstats-command.md)

- `source = table | eventstats avg(a) `
- `source = table | where a < 50 | eventstats avg(c) `
- `source = table | eventstats max(c) by b`
- `source = table | eventstats count(c) by b | head 5`
- `source = table | eventstats stddev_samp(c)`
- `source = table | eventstats stddev_pop(c)`
- `source = table | eventstats percentile(c, 90)`
- `source = table | eventstats percentile_approx(c, 99)`

**Limitation: distinct aggregation could not used in `eventstats`:**_
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add it cant be used in conjunction with stats - probably obvious but still need to be noted...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This limitation is only for the new command eventstats, not for stats. If we add a limitation note for conjunction with stats, similar, a limitation note for conjunction with every other commands would be considered. Would it be gilding the lily?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok make sense...

- `source = table | eventstats distinct_count(c)` (throw exception)

**Aggregations With Span**
- `source = table | eventstats count(a) by span(a, 10) as a_span`
- `source = table | eventstats sum(age) by span(age, 5) as age_span | head 2`
- `source = table | eventstats avg(age) by span(age, 20) as age_span, country | sort - age_span | head 2`

**Aggregations With TimeWindow Span (tumble windowing function)**
- `source = table | eventstats sum(productsAmount) by span(transactionDate, 1d) as age_date | sort age_date`
- `source = table | eventstats sum(productsAmount) by span(transactionDate, 1w) as age_date, productId`

**Aggregations Group by Multiple Times**
- `source = table | eventstats avg(age) as avg_state_age by country, state | eventstats avg(avg_state_age) as avg_country_age by country`
- `source = table | eventstats avg(age) as avg_city_age by country, state, city | eval new_avg_city_age = avg_city_age - 1 | eventstats avg(new_avg_city_age) as avg_state_age by country, state | where avg_state_age > 18 | eventstats avg(avg_state_age) as avg_adult_country_age by country`

#### **Dedup**

[See additional command details](ppl-dedup-command.md)
Expand Down
2 changes: 2 additions & 0 deletions docs/ppl-lang/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ For additional examples see the next [documentation](PPL-Example-Commands.md).

- [`stats command`](ppl-stats-command.md)

- [`eventstats command`](ppl-eventstats-command.md)

- [`where command`](ppl-where-command.md)

- [`head command`](ppl-head-command.md)
Expand Down
327 changes: 327 additions & 0 deletions docs/ppl-lang/ppl-eventstats-command.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an excellent document - can u plz create a similar doc for the stats command ?
in a new PR ...
thanks!!

Copy link
Member Author

@LantaoJin LantaoJin Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stats command already had one https://github.com/opensearch-project/opensearch-spark/blob/main/docs/ppl-lang/ppl-stats-command.md. And stats command is more straightforward, we even have a website doc https://opensearch.org/docs/latest/search-plugins/sql/ppl/functions/#stats to introduce what it is.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ WHERE: 'WHERE';
FIELDS: 'FIELDS';
RENAME: 'RENAME';
STATS: 'STATS';
EVENTSTATS: 'EVENTSTATS';
DEDUP: 'DEDUP';
SORT: 'SORT';
EVAL: 'EVAL';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ renameCommand
;

statsCommand
: STATS (PARTITIONS EQUAL partitions = integerLiteral)? (ALLNUM EQUAL allnum = booleanLiteral)? (DELIM EQUAL delim = stringLiteral)? statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit = booleanLiteral)?
: (STATS | EVENTSTATS) (PARTITIONS EQUAL partitions = integerLiteral)? (ALLNUM EQUAL allnum = booleanLiteral)? (DELIM EQUAL delim = stringLiteral)? statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit = booleanLiteral)?
;

dedupCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,8 @@ public T visitScalarSubquery(ScalarSubquery node, C context) {
public T visitExistsSubquery(ExistsSubquery node, C context) {
return visitChildren(node, context);
}

public T visitWindow(Window node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.List;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class Window extends UnresolvedPlan {
private final List<UnresolvedExpression> windowFunctionList;
private final List<UnresolvedExpression> partExprList;
private final List<UnresolvedExpression> sortExprList;
@Setter private UnresolvedExpression span;
private UnresolvedPlan child;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitWindow(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TopAggregation;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.ppl.utils.AggregatorTranslator;
import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator;
import org.opensearch.sql.ppl.utils.ComparatorTransformer;
import org.opensearch.sql.ppl.utils.ParseStrategy;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.opensearch.sql.ppl.utils.WindowSpecTransformer;
import scala.Option;
import scala.Tuple2;
import scala.collection.IterableLike;
Expand Down Expand Up @@ -115,6 +117,7 @@
import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier;
import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField;
import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window;
import static scala.collection.JavaConverters.seqAsJavaList;

/**
* Utility class to traverse PPL logical plan and translate it into catalyst logical plan
Expand Down Expand Up @@ -326,6 +329,30 @@ private static LogicalPlan extractedAggregation(CatalystPlanContext context) {
return context.apply(p -> new Aggregate(groupingExpression, aggregateExpressions, p));
}

@Override
public LogicalPlan visitWindow(Window node, CatalystPlanContext context) {
node.getChild().get(0).accept(this, context);
List<Expression> windowFunctionExpList = visitExpressionList(node.getWindowFunctionList(), context);
Seq<Expression> windowFunctionExpressions = context.retainAllNamedParseExpressions(p -> p);
List<Expression> partitionExpList = visitExpressionList(node.getPartExprList(), context);
UnresolvedExpression span = node.getSpan();
if (!Objects.isNull(span)) {
visitExpression(span, context);
}
Seq<Expression> partitionSpec = context.retainAllNamedParseExpressions(p -> p);
Seq<SortOrder> orderSpec = seq(new ArrayList<SortOrder>());
Seq<NamedExpression> aggregatorFunctions = seq(
seqAsJavaList(windowFunctionExpressions).stream()
.map(w -> WindowSpecTransformer.buildAggregateWindowFunction(w, partitionSpec, orderSpec))
.collect(Collectors.toList()));
return context.apply(p ->
new org.apache.spark.sql.catalyst.plans.logical.Window(
aggregatorFunctions,
partitionSpec,
orderSpec,
p));
}

@Override
public LogicalPlan visitAlias(Alias node, CatalystPlanContext context) {
expressionAnalyzer.visitAlias(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,24 @@ public UnresolvedPlan visitStatsCommand(OpenSearchPPLParser.StatsCommandContext
.map(this::internalVisitExpression)
.orElse(null);

Aggregation aggregation =
new Aggregation(
aggListBuilder.build(),
emptyList(),
groupList,
span,
ArgumentFactory.getArgumentList(ctx));
return aggregation;
if (ctx.STATS() != null) {
Aggregation aggregation =
new Aggregation(
aggListBuilder.build(),
emptyList(),
groupList,
span,
ArgumentFactory.getArgumentList(ctx));
return aggregation;
} else {
Window window =
new Window(
aggListBuilder.build(),
groupList,
emptyList());
window.setSpan(span);
return window;
}
}

/** Dedup command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl.utils;

import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.CurrentRow$;
import org.apache.spark.sql.catalyst.expressions.Divide;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand All @@ -16,6 +17,7 @@
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame;
import org.apache.spark.sql.catalyst.expressions.TimeWindow;
import org.apache.spark.sql.catalyst.expressions.UnboundedFollowing$;
import org.apache.spark.sql.catalyst.expressions.UnboundedPreceding$;
import org.apache.spark.sql.catalyst.expressions.WindowExpression;
import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition;
Expand Down Expand Up @@ -79,4 +81,21 @@ static NamedExpression buildRowNumber(Seq<Expression> partitionSpec, Seq<SortOrd
Option.empty(),
seq(new ArrayList<String>()));
}

static NamedExpression buildAggregateWindowFunction(Expression aggregator, Seq<Expression> partitionSpec, Seq<SortOrder> orderSpec) {
Alias aggregatorAlias = (Alias) aggregator;
WindowExpression aggWindowExpression = new WindowExpression(
aggregatorAlias.child(),
new WindowSpecDefinition(
partitionSpec,
orderSpec,
new SpecifiedWindowFrame(RowFrame$.MODULE$, UnboundedPreceding$.MODULE$, UnboundedFollowing$.MODULE$)));
return org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(
aggWindowExpression,
aggregatorAlias.name(),
NamedExpression.newExprId(),
seq(new ArrayList<String>()),
Option.empty(),
seq(new ArrayList<String>()));
}
}
Loading
Loading