Skip to content

Commit

Permalink
Translate Eval Command (#499)
Browse files Browse the repository at this point in the history
* Support Eval Command

Signed-off-by: Lantao Jin <[email protected]>

* add more ITs and documentation

Signed-off-by: Lantao Jin <[email protected]>

---------

Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin authored Jul 31, 2024
1 parent 1f81ddf commit 24d3b81
Show file tree
Hide file tree
Showing 11 changed files with 803 additions and 17 deletions.

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,23 @@ The next samples of PPL queries are currently supported:
- `source = table | where c != 'test' OR a > 1 | fields a,b,c | head 1`
- `source = table | where c = 'test' NOT a > 1 | fields a,b,c`


**Eval**

Assumptions: `a`, `b`, `c` are existing fields in `table`
- `source = table | eval f = 1 | fields a,b,c,f`
- `source = table | eval f = 1` (output a,b,c,f fields)
- `source = table | eval n = now() | eval t = unix_timestamp(a) | fields n,t`
- `source = table | eval f = a | where f > 1 | sort f | fields a,b,c | head 5`
- `source = table | eval f = a * 2 | eval h = f * 2 | fields a,f,h`
- `source = table | eval f = a * 2, h = f * 2 | fields a,f,h` (Spark 3.4.0+ required)
- `source = table | eval f = a * 2, h = b | stats avg(f) by h`

Limitation: Overriding existing field is unsupported, following queries throw exceptions with "Reference 'a' is ambiguous"
- `source = table | eval a = 10 | fields a,b,c`
- `source = table | eval a = a * 2 | stats avg(a)`
- `source = table | eval a = abs(a) | where a > 0`

**Aggregations**
- `source = table | stats avg(a) `
- `source = table | where a < 50 | stats avg(c) `
Expand All @@ -261,6 +278,7 @@ The next samples of PPL queries are currently supported:
- `search` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/search.rst)
- `where` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/where.rst)
- `fields` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/fields.rst)
- `eval` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/eval.rst)
- `head` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/head.rst)
- `stats` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/stats.rst) (supports AVG, COUNT, DISTINCT_COUNT, MAX, MIN and SUM aggregation functions)
- `sort` - [See details](https://github.com/opensearch-project/sql/blob/main/docs/user/ppl/cmd/sort.rst)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ commands
| statsCommand
| sortCommand
| headCommand
| evalCommand
;

searchCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
/** Argument. */
public class Argument extends UnresolvedExpression {
private final String name;
private String argName;
private Literal value;
private final Literal value;

public Argument(String name, Literal value) {
this.name = name;
Expand All @@ -27,8 +26,8 @@ public List<UnresolvedExpression> getChild() {
return Arrays.asList(value);
}

public String getArgName() {
return argName;
public String getName() {
return name;
}

public Literal getValue() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,25 @@
import com.google.common.collect.ImmutableList;
import org.opensearch.sql.ast.AbstractNodeVisitor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class Field extends UnresolvedExpression {
private final UnresolvedExpression field;
private final QualifiedName field;
private final List<Argument> fieldArgs;

/** Constructor of Field. */
public Field(UnresolvedExpression field) {
public Field(QualifiedName field) {
this(field, Collections.emptyList());
}

/** Constructor of Field. */
public Field(UnresolvedExpression field, List<Argument> fieldArgs) {
public Field(QualifiedName field, List<Argument> fieldArgs) {
this.field = field;
this.fieldArgs = fieldArgs;
}

public UnresolvedExpression getField() {
public QualifiedName getField() {
return field;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.QualifiedName;
import org.opensearch.sql.ast.expression.Scope;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.List;

/** Logical plan node of correlation , the interface for building the searching sources. */

public class Correlation extends UnresolvedPlan {
private final CorrelationType correlationType;
private final List<UnresolvedExpression> fieldsList;
private final CorrelationType correlationType;
private final List<QualifiedName> fieldsList;
private final Scope scope;
private final FieldsMapping mappingListContext;
private UnresolvedPlan child ;
public Correlation(String correlationType, List<UnresolvedExpression> fieldsList, Scope scope, FieldsMapping mappingListContext) {
public Correlation(String correlationType, List<QualifiedName> fieldsList, Scope scope, FieldsMapping mappingListContext) {
this.correlationType = CorrelationType.valueOf(correlationType);
this.fieldsList = fieldsList;
this.scope = scope;
Expand All @@ -45,7 +44,7 @@ public CorrelationType getCorrelationType() {
return correlationType;
}

public List<UnresolvedExpression> getFieldsList() {
public List<QualifiedName> getFieldsList() {
return fieldsList;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.Let;
import org.opensearch.sql.ast.expression.Literal;
import org.opensearch.sql.ast.expression.Not;
import org.opensearch.sql.ast.expression.Or;
Expand Down Expand Up @@ -60,6 +61,7 @@
import scala.Option;
import scala.collection.Seq;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -224,7 +226,22 @@ private Expression visitExpression(UnresolvedExpression expression, CatalystPlan

@Override
public LogicalPlan visitEval(Eval node, CatalystPlanContext context) {
throw new IllegalStateException("Not Supported operation : Eval");
LogicalPlan child = node.getChild().get(0).accept(this, context);
List<UnresolvedExpression> aliases = new ArrayList<>();
List<Let> letExpressions = node.getExpressionList();
for(Let let : letExpressions) {
Alias alias = new Alias(let.getVar().getField().toString(), let.getExpression());
aliases.add(alias);
}
if (context.getNamedParseExpressions().isEmpty()) {
// Create an UnresolvedStar for all-fields projection
context.getNamedParseExpressions().push(UnresolvedStar$.MODULE$.apply(Option.<Seq<String>>empty()));
}
List<Expression> expressionList = visitExpressionList(aliases, context);
Seq<NamedExpression> projectExpressions = context.retainAllNamedParseExpressions(p -> (NamedExpression) p);
// build the plan with the projection step
child = context.apply(p -> new org.apache.spark.sql.catalyst.plans.logical.Project(projectExpressions, p));
return child;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ public UnresolvedPlan visitWhereCommand(OpenSearchPPLParser.WhereCommandContext
public UnresolvedPlan visitCorrelateCommand(OpenSearchPPLParser.CorrelateCommandContext ctx) {
return new Correlation(ctx.correlationType().getText(),
ctx.fieldList().fieldExpression().stream()
.map(OpenSearchPPLParser.FieldExpressionContext::qualifiedName)
.map(this::internalVisitExpression)
.map(u -> (QualifiedName) u)
.collect(Collectors.toList()),
Objects.isNull(ctx.scopeClause()) ? null : new Scope(expressionBuilder.visit(ctx.scopeClause().fieldExpression()),
expressionBuilder.visit(ctx.scopeClause().value),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public UnresolvedExpression visitWcFieldExpression(OpenSearchPPLParser.WcFieldEx

@Override
public UnresolvedExpression visitSortField(OpenSearchPPLParser.SortFieldContext ctx) {
return new Field(
return new Field((QualifiedName)
visit(ctx.sortFieldExpression().fieldExpression().qualifiedName()),
ArgumentFactory.getArgumentList(ctx));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.common.utils;

import static org.junit.Assert.assertEquals;
Expand Down
Loading

0 comments on commit 24d3b81

Please sign in to comment.