Skip to content

Commit

Permalink
Support Eventstats in PPL
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin committed Oct 22, 2024
1 parent 8ab81ae commit 74512f1
Show file tree
Hide file tree
Showing 9 changed files with 750 additions and 9 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ WHERE: 'WHERE';
FIELDS: 'FIELDS';
RENAME: 'RENAME';
STATS: 'STATS';
EVENTSTATS: 'EVENTSTATS';
DEDUP: 'DEDUP';
SORT: 'SORT';
EVAL: 'EVAL';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ renameCommand
;

statsCommand
: STATS (PARTITIONS EQUAL partitions = integerLiteral)? (ALLNUM EQUAL allnum = booleanLiteral)? (DELIM EQUAL delim = stringLiteral)? statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit = booleanLiteral)?
: (STATS | EVENTSTATS) (PARTITIONS EQUAL partitions = integerLiteral)? (ALLNUM EQUAL allnum = booleanLiteral)? (DELIM EQUAL delim = stringLiteral)? statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (DEDUP_SPLITVALUES EQUAL dedupsplit = booleanLiteral)?
;

dedupCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,8 @@ public T visitScalarSubquery(ScalarSubquery node, C context) {
public T visitExistsSubquery(ExistsSubquery node, C context) {
return visitChildren(node, context);
}

public T visitWindow(Window node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.List;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class Window extends UnresolvedPlan {
private final List<UnresolvedExpression> windowFunctionList;
private final List<UnresolvedExpression> partExprList;
private final List<UnresolvedExpression> sortExprList;
@Setter private UnresolvedExpression span;
private UnresolvedPlan child;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitWindow(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TopAggregation;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.ppl.utils.AggregatorTranslator;
import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator;
import org.opensearch.sql.ppl.utils.ComparatorTransformer;
import org.opensearch.sql.ppl.utils.ParseStrategy;
import org.opensearch.sql.ppl.utils.SortUtils;
import org.opensearch.sql.ppl.utils.WindowSpecTransformer;
import scala.Option;
import scala.Tuple2;
import scala.collection.IterableLike;
Expand Down Expand Up @@ -115,6 +117,7 @@
import static org.opensearch.sql.ppl.utils.RelationUtils.getTableIdentifier;
import static org.opensearch.sql.ppl.utils.RelationUtils.resolveField;
import static org.opensearch.sql.ppl.utils.WindowSpecTransformer.window;
import static scala.collection.JavaConverters.seqAsJavaList;

/**
* Utility class to traverse PPL logical plan and translate it into catalyst logical plan
Expand Down Expand Up @@ -326,6 +329,30 @@ private static LogicalPlan extractedAggregation(CatalystPlanContext context) {
return context.apply(p -> new Aggregate(groupingExpression, aggregateExpressions, p));
}

@Override
public LogicalPlan visitWindow(Window node, CatalystPlanContext context) {
node.getChild().get(0).accept(this, context);
List<Expression> windowFunctionExpList = visitExpressionList(node.getWindowFunctionList(), context);
Seq<Expression> windowFunctionExpressions = context.retainAllNamedParseExpressions(p -> p);
List<Expression> partitionExpList = visitExpressionList(node.getPartExprList(), context);
UnresolvedExpression span = node.getSpan();
if (!Objects.isNull(span)) {
visitExpression(span, context);
}
Seq<Expression> partitionSpec = context.retainAllNamedParseExpressions(p -> p);
Seq<SortOrder> orderSpec = seq(new ArrayList<SortOrder>());
Seq<NamedExpression> aggregatorFunctions = seq(
seqAsJavaList(windowFunctionExpressions).stream()
.map(w -> WindowSpecTransformer.buildAggregateWindowFunction(w, partitionSpec, orderSpec))
.collect(Collectors.toList()));
return context.apply(p ->
new org.apache.spark.sql.catalyst.plans.logical.Window(
aggregatorFunctions,
partitionSpec,
orderSpec,
p));
}

@Override
public LogicalPlan visitAlias(Alias node, CatalystPlanContext context) {
expressionAnalyzer.visitAlias(node, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,24 @@ public UnresolvedPlan visitStatsCommand(OpenSearchPPLParser.StatsCommandContext
.map(this::internalVisitExpression)
.orElse(null);

Aggregation aggregation =
new Aggregation(
aggListBuilder.build(),
emptyList(),
groupList,
span,
ArgumentFactory.getArgumentList(ctx));
return aggregation;
if (ctx.STATS() != null) {
Aggregation aggregation =
new Aggregation(
aggListBuilder.build(),
emptyList(),
groupList,
span,
ArgumentFactory.getArgumentList(ctx));
return aggregation;
} else {
Window window =
new Window(
aggListBuilder.build(),
groupList,
emptyList());
window.setSpan(span);
return window;
}
}

/** Dedup command. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.ppl.utils;

import org.apache.spark.sql.catalyst.expressions.Alias;
import org.apache.spark.sql.catalyst.expressions.CurrentRow$;
import org.apache.spark.sql.catalyst.expressions.Divide;
import org.apache.spark.sql.catalyst.expressions.Expression;
Expand All @@ -16,6 +17,7 @@
import org.apache.spark.sql.catalyst.expressions.SortOrder;
import org.apache.spark.sql.catalyst.expressions.SpecifiedWindowFrame;
import org.apache.spark.sql.catalyst.expressions.TimeWindow;
import org.apache.spark.sql.catalyst.expressions.UnboundedFollowing$;
import org.apache.spark.sql.catalyst.expressions.UnboundedPreceding$;
import org.apache.spark.sql.catalyst.expressions.WindowExpression;
import org.apache.spark.sql.catalyst.expressions.WindowSpecDefinition;
Expand Down Expand Up @@ -79,4 +81,21 @@ static NamedExpression buildRowNumber(Seq<Expression> partitionSpec, Seq<SortOrd
Option.empty(),
seq(new ArrayList<String>()));
}

static NamedExpression buildAggregateWindowFunction(Expression aggregator, Seq<Expression> partitionSpec, Seq<SortOrder> orderSpec) {
Alias aggregatorAlias = (Alias) aggregator;
WindowExpression aggWindowExpression = new WindowExpression(
aggregatorAlias.child(),
new WindowSpecDefinition(
partitionSpec,
orderSpec,
new SpecifiedWindowFrame(RowFrame$.MODULE$, UnboundedPreceding$.MODULE$, UnboundedFollowing$.MODULE$)));
return org.apache.spark.sql.catalyst.expressions.Alias$.MODULE$.apply(
aggWindowExpression,
aggregatorAlias.name(),
NamedExpression.newExprId(),
seq(new ArrayList<String>()),
Option.empty(),
seq(new ArrayList<String>()));
}
}
Loading

0 comments on commit 74512f1

Please sign in to comment.