Skip to content

Commit

Permalink
Support InSubquery in PPL (#714)
Browse files Browse the repository at this point in the history
* Support InSubquery in PPL

Signed-off-by: Lantao Jin <[email protected]>

* Add more examples

Signed-off-by: Lantao Jin <[email protected]>

---------

Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin authored Oct 1, 2024
1 parent 7563edb commit 0aeed27
Show file tree
Hide file tree
Showing 10 changed files with 977 additions and 4 deletions.

Large diffs are not rendered by default.

91 changes: 91 additions & 0 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,97 @@ _- **Limitation: "REPLACE" or "APPEND" clause must contain "AS"**_

Details of Lookup command syntax, see [PPL-Lookup-Command](../docs/PPL-Lookup-command.md)

**InSubquery**
- `source = outer | where a in [ source = inner | fields b ]`
- `source = outer | where (a) in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) in [ source = inner | fields d,e,f ]`
- `source = outer | where a not in [ source = inner | fields b ]`
- `source = outer | where (a) not in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) not in [ source = inner | fields d,e,f ]`
- `source = outer | where a in [ source = inner1 | where b not in [ source = inner2 | fields c ] | fields b ]` (nested)
- `source = table1 | inner join left = l right = r on l.a = r.a AND r.a in [ source = inner | fields d ] | fields l.a, r.a, b, c` (as join filter)

SQL Migration examples with IN-Subquery PPL:
1. tpch q4 (in-subquery with aggregation)
```sql
select
o_orderpriority,
count(*) as order_count
from
orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and o_orderkey in (
select
l_orderkey
from
lineitem
where l_commitdate < l_receiptdate
)
group by
o_orderpriority
order by
o_orderpriority
```
Rewritten by PPL InSubquery query:
```sql
source = orders
| where o_orderdate >= "1993-07-01" and o_orderdate < "1993-10-01" and o_orderkey IN
[ source = lineitem
| where l_commitdate < l_receiptdate
| fields l_orderkey
]
| stats count(1) as order_count by o_orderpriority
| sort o_orderpriority
| fields o_orderpriority, order_count
```
2.tpch q20 (nested in-subquery)
```sql
select
s_name,
s_address
from
supplier,
nation
where
s_suppkey in (
select
ps_suppkey
from
partsupp
where
ps_partkey in (
select
p_partkey
from
part
where
p_name like 'forest%'
)
)
and s_nationkey = n_nationkey
and n_name = 'CANADA'
order by
s_name
```
Rewritten by PPL InSubquery query:
```sql
source = supplier
| where s_suppkey IN [
source = partsupp
| where ps_partkey IN [
source = part
| where like(p_name, "forest%")
| fields p_partkey
]
| fields ps_suppkey
]
| inner join left=l right=r on s_nationkey = n_nationkey and n_name = 'CANADA'
nation
| sort s_name
```

---
#### Experimental Commands:
- `correlation` - [See details](../docs/PPL-Correlation-command.md)
Expand Down
19 changes: 19 additions & 0 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ queryStatement
: pplCommands (PIPE commands)*
;

subSearch
: searchCommand (PIPE commands)*
;

// commands
pplCommands
: searchCommand
Expand Down Expand Up @@ -339,6 +343,12 @@ logicalExpression
comparisonExpression
: left = valueExpression comparisonOperator right = valueExpression # compareExpr
| valueExpression IN valueList # inExpr
| valueExpressionList NOT? IN LT_SQR_PRTHS subSearch RT_SQR_PRTHS # inSubqueryExpr
;

valueExpressionList
: valueExpression
| LT_PRTHS valueExpression (COMMA valueExpression)* RT_PRTHS
;

valueExpression
Expand Down Expand Up @@ -1004,4 +1014,13 @@ keywordsCanBeId
| SPARKLINE
| C
| DC
// JOIN TYPE
| OUTER
| INNER
| CROSS
| LEFT
| RIGHT
| FULL
| SEMI
| ANTI
;
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.InSubquery;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IsEmpty;
import org.opensearch.sql.ast.expression.Let;
Expand Down Expand Up @@ -289,4 +290,7 @@ public T visitExplain(Explain node, C context) {
return visitStatement(node, context);
}

public T visitInSubquery(InSubquery node, C context) {
return visitChildren(node, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.expression;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;

import java.util.Arrays;
import java.util.List;

@Getter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
public class InSubquery extends UnresolvedExpression {
private final List<UnresolvedExpression> value;
private final UnresolvedPlan query;

@Override
public List<UnresolvedExpression> getChild() {
return value;
}

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> nodeVisitor, C context) {
return nodeVisitor.visitInSubquery(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.spark.sql.catalyst.expressions.CaseWhen;
import org.apache.spark.sql.catalyst.expressions.Descending$;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.InSubquery$;
import org.apache.spark.sql.catalyst.expressions.ListQuery$;
import org.apache.spark.sql.catalyst.expressions.NamedExpression;
import org.apache.spark.sql.catalyst.expressions.Predicate;
import org.apache.spark.sql.catalyst.expressions.SortDirection;
Expand Down Expand Up @@ -41,6 +43,7 @@
import org.opensearch.sql.ast.expression.FieldsMapping;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.In;
import org.opensearch.sql.ast.expression.InSubquery;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IsEmpty;
import org.opensearch.sql.ast.expression.Let;
Expand Down Expand Up @@ -75,6 +78,7 @@
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.ast.tree.SubqueryAlias;
import org.opensearch.sql.ast.tree.TopAggregation;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.ppl.utils.AggregatorTranslator;
import org.opensearch.sql.ppl.utils.BuiltinFunctionTranslator;
Expand Down Expand Up @@ -124,6 +128,10 @@ public LogicalPlan visit(Statement plan, CatalystPlanContext context) {
return plan.accept(this, context);
}

public LogicalPlan visitSubSearch(UnresolvedPlan plan, CatalystPlanContext context) {
return plan.accept(this, context);
}

/**
* Handle Query Statement.
*/
Expand Down Expand Up @@ -487,7 +495,7 @@ public LogicalPlan visitDedupe(Dedupe node, CatalystPlanContext context) {
/**
* Expression Analyzer.
*/
public static class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, CatalystPlanContext> {
public class ExpressionAnalyzer extends AbstractNodeVisitor<Expression, CatalystPlanContext> {

public Expression analyze(UnresolvedExpression unresolved, CatalystPlanContext context) {
return unresolved.accept(this, context);
Expand Down Expand Up @@ -734,5 +742,24 @@ public Expression visitRareTopN(RareTopN node, CatalystPlanContext context) {
public Expression visitWindowFunction(WindowFunction node, CatalystPlanContext context) {
throw new IllegalStateException("Not Supported operation : WindowFunction");
}

@Override
public Expression visitInSubquery(InSubquery node, CatalystPlanContext outerContext) {
CatalystPlanContext innerContext = new CatalystPlanContext();
visitExpressionList(node.getChild(), innerContext);
Seq<Expression> values = innerContext.retainAllNamedParseExpressions(p -> p);
UnresolvedPlan outerPlan = node.getQuery();
LogicalPlan subSearch = CatalystQueryPlanVisitor.this.visitSubSearch(outerPlan, innerContext);
Expression inSubQuery = InSubquery$.MODULE$.apply(
values,
ListQuery$.MODULE$.apply(
subSearch,
seq(new java.util.ArrayList<Expression>()),
NamedExpression.newExprId(),
-1,
seq(new java.util.ArrayList<Expression>()),
Option.empty()));
return outerContext.getNamedParseExpressions().push(inSubQuery);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public UnresolvedPlan visitQueryStatement(OpenSearchPPLParser.QueryStatementCont
return ctx.commands().stream().map(this::visit).reduce(pplCommand, (r, e) -> e.attach(r));
}

@Override
public UnresolvedPlan visitSubSearch(OpenSearchPPLParser.SubSearchContext ctx) {
UnresolvedPlan searchCommand = visit(ctx.searchCommand());
return ctx.commands().stream().map(this::visit).reduce(searchCommand, (r, e) -> e.attach(r));
}

/** Search command. */
@Override
public UnresolvedPlan visitSearchFrom(OpenSearchPPLParser.SearchFromContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.sql.ast.expression.EqualTo;
import org.opensearch.sql.ast.expression.Field;
import org.opensearch.sql.ast.expression.Function;
import org.opensearch.sql.ast.expression.InSubquery;
import org.opensearch.sql.ast.expression.Interval;
import org.opensearch.sql.ast.expression.IntervalUnit;
import org.opensearch.sql.ast.expression.IsEmpty;
Expand Down Expand Up @@ -62,6 +63,13 @@ public class AstExpressionBuilder extends OpenSearchPPLParserBaseVisitor<Unresol

private static final int DEFAULT_TAKE_FUNCTION_SIZE_VALUE = 10;

private AstBuilder astBuilder;

/** Set AstBuilder back to AstExpressionBuilder for resolving the subquery plan in subquery expression */
public void setAstBuilder(AstBuilder astBuilder) {
this.astBuilder = astBuilder;
}

/**
* The function name mapping between fronted and core engine.
*/
Expand Down Expand Up @@ -370,6 +378,15 @@ public UnresolvedExpression visitRightHint(OpenSearchPPLParser.RightHintContext
return new EqualTo(new Literal(ctx.rightHintKey.getText(), DataType.STRING), visit(ctx.rightHintValue));
}

@Override
public UnresolvedExpression visitInSubqueryExpr(OpenSearchPPLParser.InSubqueryExprContext ctx) {
UnresolvedExpression expr = new InSubquery(
ctx.valueExpressionList().valueExpression().stream()
.map(this::visit).collect(Collectors.toList()),
astBuilder.visitSubSearch(ctx.subSearch()));
return ctx.NOT() != null ? new Not(expr) : expr;
}

private QualifiedName visitIdentifiers(List<? extends ParserRuleContext> ctx) {
return new QualifiedName(
ctx.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ class PPLSyntaxParser extends Parser {

object PlaneUtils {
def plan(parser: PPLSyntaxParser, query: String): Statement = {
val builder = new AstStatementBuilder(
new AstBuilder(new AstExpressionBuilder(), query),
AstStatementBuilder.StatementBuilderContext.builder())
val astExpressionBuilder = new AstExpressionBuilder()
val astBuilder = new AstBuilder(astExpressionBuilder, query)
astExpressionBuilder.setAstBuilder(astBuilder)
val builder =
new AstStatementBuilder(astBuilder, AstStatementBuilder.StatementBuilderContext.builder())
builder.visit(parser.parse(query))
}
}
Loading

0 comments on commit 0aeed27

Please sign in to comment.