Skip to content

Commit

Permalink
Support RelationSubquery PPL
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin committed Oct 12, 2024
1 parent d83f61d commit 64685fb
Show file tree
Hide file tree
Showing 7 changed files with 523 additions and 41 deletions.
12 changes: 10 additions & 2 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ source = table | where ispresent(a) |
- `source = table1 | cross join left = l right = r table2`
- `source = table1 | left semi join left = l right = r on l.a = r.a table2`
- `source = table1 | left anti join left = l right = r on l.a = r.a table2`

_- **Limitation: sub-searches is unsupported in join right side now**_
- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]`


#### **Lookup**
Expand Down Expand Up @@ -349,6 +348,15 @@ Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table in
- `source = outer | where a = [ source = inner | stats max(c) | sort c ] OR b = [ source = inner | where c = 1 | stats min(d) | sort d ]`
- `source = outer | where a = [ source = inner | where c = [ source = nested | stats max(e) by f | sort f ] | stats max(d) by c | sort c | head 1 ]`

#### **(Relation) Subquery**
[See additional command details](ppl-subquery-command.md)

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or From clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

_- **Limitation: another command usage of (relation) subquery is in `appendcols` commands which is unsupported**_

---
#### Experimental Commands:
Expand Down
63 changes: 57 additions & 6 deletions docs/ppl-lang/ppl-subquery-command.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## PPL SubQuery Commands:

**Syntax**
### Syntax
The subquery command should be implemented using a clean, logical syntax that integrates with existing PPL structure.

```sql
Expand All @@ -21,7 +21,7 @@ For additional info See [Issue](https://github.com/opensearch-project/opensearch

---

**InSubquery usage**
### InSubquery usage
- `source = outer | where a in [ source = inner | fields b ]`
- `source = outer | where (a) in [ source = inner | fields b ]`
- `source = outer | where (a,b,c) in [ source = inner | fields d,e,f ]`
Expand Down Expand Up @@ -111,8 +111,9 @@ source = supplier
nation
| sort s_name
```
---

**ExistsSubquery usage**
### ExistsSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table inner2

Expand Down Expand Up @@ -163,8 +164,9 @@ source = orders
| sort o_orderpriority
| fields o_orderpriority, order_count
```
---

**ScalarSubquery usage**
### ScalarSubquery usage

Assumptions: `a`, `b` are fields of table outer, `c`, `d` are fields of table inner, `e`, `f` are fields of table nested

Expand Down Expand Up @@ -240,10 +242,59 @@ source = spark_catalog.default.outer
source = spark_catalog.default.inner | where c = 1 | stats min(d) | sort d
]
```
---

### (Relation) Subquery
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expressions. But `RelationSubquery` is not a subquery expression, it is a subquery plan which is common used in Join or From clause.

- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]` (subquery in join right side)
- `source = [ source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ] | stats count(a) by b ] as outer | head 1`

**_SQL Migration examples with Exists-Subquery PPL:_**

tpch q13
```sql
select
c_count,
count(*) as custdist
from
(
select
c_custkey,
count(o_orderkey) as c_count
from
customer left outer join orders on
c_custkey = o_custkey
and o_comment not like '%special%requests%'
group by
c_custkey
) as c_orders
group by
c_count
order by
custdist desc,
c_count desc
```
Rewritten by PPL (Relation) Subquery:
```sql
SEARCH source = [
SEARCH source = customer
| LEFT OUTER JOIN left = c right = o ON c_custkey = o_custkey
[
SEARCH source = orders
| WHERE not like(o_comment, '%special%requests%')
]
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
] AS c_orders
| STATS COUNT(o_orderkey) AS c_count BY c_custkey
| STATS COUNT(1) AS custdist BY c_count
| SORT - custdist, - c_count
```
---

### **Additional Context**
### Additional Context

`InSubquery`, `ExistsSubquery` and `ScalarSubquery` are all subquery expression. The common usage of subquery expression is in `where` clause:
`InSubquery`, `ExistsSubquery` and `ScalarSubquery` as subquery expressions, their common usage is in `where` clause.

The `where` command syntax is:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, GreaterThan, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, JoinHint, LogicalPlan, Project, Sort, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, GlobalLimit, Join, JoinHint, LocalLimit, LogicalPlan, Project, Sort, SubqueryAlias}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkPPLJoinITSuite
Expand Down Expand Up @@ -738,4 +738,190 @@ class FlintSparkPPLJoinITSuite
case j @ Join(_, _, Inner, _, JoinHint.NONE) => j
}.size == 1)
}

test("test inner join with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'USA' OR country = 'England'
| | inner join left=a, right=b
| ON a.name = b.name
| [
| source = $testTable2
| | where salary > 0
| | fields name, country, salary
| | sort salary
| | head 3
| ]
| | stats avg(salary) by span(age, 10) as age_span, b.country
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row(70000.0, "USA", 30), Row(100000.0, "England", 70))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val filterExpr = Or(
EqualTo(UnresolvedAttribute("country"), Literal("USA")),
EqualTo(UnresolvedAttribute("country"), Literal("England")))
val plan1 = SubqueryAlias("a", Filter(filterExpr, table1))
val rightSubquery =
GlobalLimit(
Literal(3),
LocalLimit(
Literal(3),
Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Ascending)),
global = true,
Project(
Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("country"),
UnresolvedAttribute("salary")),
Filter(GreaterThan(UnresolvedAttribute("salary"), Literal(0)), table2)))))
val plan2 = SubqueryAlias("b", rightSubquery)

val joinCondition = EqualTo(UnresolvedAttribute("a.name"), UnresolvedAttribute("b.name"))
val joinPlan = Join(plan1, plan2, Inner, Some(joinCondition), JoinHint.NONE)

val salaryField = UnresolvedAttribute("salary")
val countryField = UnresolvedAttribute("b.country")
val countryAlias = Alias(countryField, "b.country")()
val star = Seq(UnresolvedStar(None))
val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(salaryField), isDistinct = false), "avg(salary)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val aggregatePlan =
Aggregate(Seq(countryAlias, span), Seq(aggregateExpressions, countryAlias, span), joinPlan)

val expectedPlan = Project(star, aggregatePlan)
val logicalPlan: LogicalPlan = frame.queryExecution.logical

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test left outer join with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'USA' OR country = 'England'
| | left join left=a, right=b
| ON a.name = b.name
| [
| source = $testTable2
| | where salary > 0
| | fields name, country, salary
| | sort salary
| | head 3
| ]
| | stats avg(salary) by span(age, 10) as age_span, b.country
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] =
Array(Row(70000.0, "USA", 30), Row(100000.0, "England", 70), Row(null, null, 40))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Double](_.getAs[Double](0))
assert(results.sorted.sameElements(expectedResults.sorted))

val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val filterExpr = Or(
EqualTo(UnresolvedAttribute("country"), Literal("USA")),
EqualTo(UnresolvedAttribute("country"), Literal("England")))
val plan1 = SubqueryAlias("a", Filter(filterExpr, table1))
val rightSubquery =
GlobalLimit(
Literal(3),
LocalLimit(
Literal(3),
Sort(
Seq(SortOrder(UnresolvedAttribute("salary"), Ascending)),
global = true,
Project(
Seq(
UnresolvedAttribute("name"),
UnresolvedAttribute("country"),
UnresolvedAttribute("salary")),
Filter(GreaterThan(UnresolvedAttribute("salary"), Literal(0)), table2)))))
val plan2 = SubqueryAlias("b", rightSubquery)

val joinCondition = EqualTo(UnresolvedAttribute("a.name"), UnresolvedAttribute("b.name"))
val joinPlan = Join(plan1, plan2, LeftOuter, Some(joinCondition), JoinHint.NONE)

val salaryField = UnresolvedAttribute("salary")
val countryField = UnresolvedAttribute("b.country")
val countryAlias = Alias(countryField, "b.country")()
val star = Seq(UnresolvedStar(None))
val aggregateExpressions =
Alias(UnresolvedFunction(Seq("AVG"), Seq(salaryField), isDistinct = false), "avg(salary)")()
val span = Alias(
Multiply(Floor(Divide(UnresolvedAttribute("age"), Literal(10))), Literal(10)),
"age_span")()
val aggregatePlan =
Aggregate(Seq(countryAlias, span), Seq(aggregateExpressions, countryAlias, span), joinPlan)

val expectedPlan = Project(star, aggregatePlan)
val logicalPlan: LogicalPlan = frame.queryExecution.logical

comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with relation subquery") {
val frame = sql(s"""
| source = $testTable1
| | where country = 'Canada' OR country = 'England'
| | inner join left=a, right=b
| ON a.name = b.name AND a.year = 2023 AND a.month = 4 AND b.year = 2023 AND b.month = 4
| [
| source = $testTable2
| ]
| | eval a_name = a.name
| | eval a_country = a.country
| | eval b_country = b.country
| | fields a_name, age, state, a_country, occupation, b_country, salary
| | left join left=a, right=b
| ON a.a_name = b.name
| [
| source = $testTable3
| ]
| | eval aa_country = a.a_country
| | eval ab_country = a.b_country
| | eval bb_country = b.country
| | fields a_name, age, state, aa_country, occupation, ab_country, salary, bb_country, hobby, language
| | cross join left=a, right=b
| [
| source = $testTable2
| ]
| | eval new_country = a.aa_country
| | eval new_salary = b.salary
| | stats avg(new_salary) as avg_salary by span(age, 5) as age_span, state
| | left semi join left=a, right=b
| ON a.state = b.state
| [
| source = $testTable1
| ]
| | eval new_avg_salary = floor(avg_salary)
| | fields state, age_span, new_avg_salary
| """.stripMargin)
val results: Array[Row] = frame.collect()
val expectedResults: Array[Row] = Array(Row("Quebec", 20, 83333), Row("Ontario", 25, 83333))

implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, String](_.getAs[String](0))
assert(results.sorted.sameElements(expectedResults.sorted))

assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Cross, None, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, LeftOuter, _, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.optimizedPlan.collect {
case j @ Join(_, _, Inner, _, JoinHint.NONE) => j
}.size == 1)
assert(frame.queryExecution.analyzed.collect { case s: SubqueryAlias =>
s
}.size == 13)
}
}
34 changes: 22 additions & 12 deletions ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ commands
;

searchCommand
: (SEARCH)? fromClause # searchFrom
| (SEARCH)? fromClause logicalExpression # searchFromFilter
| (SEARCH)? logicalExpression fromClause # searchFilterFrom
: (SEARCH | FROM)? fromClause # searchFrom
| (SEARCH | FROM)? fromClause logicalExpression # searchFromFilter
| (SEARCH | FROM)? logicalExpression fromClause # searchFilterFrom
;

describeCommand
Expand Down Expand Up @@ -247,17 +247,27 @@ mlArg

// clauses
fromClause
: SOURCE EQUAL tableSourceClause
| INDEX EQUAL tableSourceClause
: SOURCE EQUAL tableOrSubqueryClause
| INDEX EQUAL tableOrSubqueryClause
;

tableOrSubqueryClause
: LT_SQR_PRTHS subSearch RT_SQR_PRTHS (AS alias = qualifiedName)?
| tableSourceClause
;

// One tableSourceClause will generate one Relation node with/without one alias
// even if the relation contains more than one table sources.
// These table sources in one relation will be readed one by one in OpenSearch.
// But it may have different behaivours in different execution backends.
// For example, a Spark UnresovledRelation node only accepts one data source.
tableSourceClause
: tableSource (COMMA tableSource)*
: tableSource (COMMA tableSource)* (AS alias = qualifiedName)?
;

// join
joinCommand
: (joinType) JOIN sideAlias joinHintList? joinCriteria? right = tableSource
: (joinType) JOIN sideAlias joinHintList? joinCriteria? right = tableOrSubqueryClause
;

joinType
Expand All @@ -279,13 +289,13 @@ joinCriteria
;

joinHintList
: hintPair (COMMA? hintPair)*
;
: hintPair (COMMA? hintPair)*
;

hintPair
: leftHintKey = LEFT_HINT DOT ID EQUAL leftHintValue = ident #leftHint
| rightHintKey = RIGHT_HINT DOT ID EQUAL rightHintValue = ident #rightHint
;
: leftHintKey = LEFT_HINT DOT ID EQUAL leftHintValue = ident #leftHint
| rightHintKey = RIGHT_HINT DOT ID EQUAL rightHintValue = ident #rightHint
;

renameClasue
: orignalField = wcFieldExpression AS renamedField = wcFieldExpression
Expand Down
Loading

0 comments on commit 64685fb

Please sign in to comment.