Skip to content

Commit

Permalink
Add more examples
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin committed Sep 30, 2024
1 parent 91636fa commit 8797a82
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 4 deletions.
81 changes: 81 additions & 0 deletions ppl-spark-integration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,87 @@ Details of Lookup command syntax, see [PPL-Lookup-Command](../docs/PPL-Lookup-co
- `source = outer | where a in [ source = inner1 | where b not in [ source = inner2 | fields c ] | fields b ]` (nested)
- `source = table1 | inner join left = l right = r on l.a = r.a AND r.a in [ source = inner | fields d ] | fields l.a, r.a, b, c` (as join filter)

SQL Migration examples with IN-Subquery PPL:
1. tpch q4 (in-subquery with aggregation)
```sql
select
o_orderpriority,
count(*) as order_count
from
orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1993-07-01' + interval '3' month
and o_orderkey in (
select
l_orderkey
from
lineitem
where l_commitdate < l_receiptdate
)
group by
o_orderpriority
order by
o_orderpriority
```
Rewritten by PPL InSubquery query:
```sql
source = orders
| where o_orderdate >= "1993-07-01" and o_orderdate < "1993-10-01" and o_orderkey IN
[ source = lineitem
| where l_commitdate < l_receiptdate
| fields l_orderkey
]
| stats count(1) as order_count by o_orderpriority
| sort o_orderpriority
| fields o_orderpriority, order_count
```
2.tpch q20 (nested in-subquery)
```sql
select
s_name,
s_address
from
supplier,
nation
where
s_suppkey in (
select
ps_suppkey
from
partsupp
where
ps_partkey in (
select
p_partkey
from
part
where
p_name like 'forest%'
)
)
and s_nationkey = n_nationkey
and n_name = 'CANADA'
order by
s_name
```
Rewritten by PPL InSubquery query:
```sql
source = supplier
| where s_suppkey IN [
source = partsupp
| where ps_partkey IN [
source = part
| where like(p_name, "forest%")
| fields p_partkey
]
| fields ps_suppkey
]
| inner join left=l right=r on s_nationkey = n_nationkey and n_name = 'CANADA'
nation
| sort s_name
```

---
#### Experimental Commands:
- `correlation` - [See details](../docs/PPL-Correlation-command.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Descending, InSubquery, ListQuery, Not, SortOrder}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project, Sort}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Descending, EqualTo, GreaterThanOrEqual, InSubquery, LessThan, ListQuery, Literal, Not, SortOrder}
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, JoinHint, LogicalPlan, Project, Sort, SubqueryAlias}

class PPLLogicalPlanInSubqueryTranslatorTestSuite
extends SparkFunSuite
Expand Down Expand Up @@ -245,4 +245,121 @@ class PPLLogicalPlanInSubqueryTranslatorTestSuite
}
assert(ex.getMessage === "The number of columns not match output of subquery")
}

test("test tpch q4: in-subquery with aggregation") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
s"""
| source = orders
| | where o_orderdate >= "1993-07-01" AND o_orderdate < "1993-10-01" AND o_orderkey IN
| [ source = lineitem
| | where l_commitdate < l_receiptdate
| | fields l_orderkey
| ]
| | stats count(1) as order_count by o_orderpriority
| | sort o_orderpriority
| | fields o_orderpriority, order_count
| """.stripMargin),
context)

val outer = UnresolvedRelation(Seq("orders"))
val inner = UnresolvedRelation(Seq("lineitem"))
val inSubquery =
Filter(
And(
And(
GreaterThanOrEqual(UnresolvedAttribute("o_orderdate"), Literal("1993-07-01")),
LessThan(UnresolvedAttribute("o_orderdate"), Literal("1993-10-01"))),
InSubquery(
Seq(UnresolvedAttribute("o_orderkey")),
ListQuery(
Project(
Seq(UnresolvedAttribute("l_orderkey")),
Filter(
LessThan(
UnresolvedAttribute("l_commitdate"),
UnresolvedAttribute("l_receiptdate")),
inner))))),
outer)
val o_orderpriorityAlias = Alias(UnresolvedAttribute("o_orderpriority"), "o_orderpriority")()
val groupByAttributes = Seq(o_orderpriorityAlias)
val aggregateExpressions =
Alias(
UnresolvedFunction(Seq("COUNT"), Seq(Literal(1)), isDistinct = false),
"order_count")()
val aggregatePlan =
Aggregate(groupByAttributes, Seq(aggregateExpressions, o_orderpriorityAlias), inSubquery)
val sortedPlan: LogicalPlan =
Sort(
Seq(SortOrder(UnresolvedAttribute("o_orderpriority"), Ascending)),
global = true,
aggregatePlan)
val expectedPlan = Project(
Seq(UnresolvedAttribute("o_orderpriority"), UnresolvedAttribute("order_count")),
sortedPlan)
comparePlans(expectedPlan, logPlan, false)
}

test("test tpch q20 (partial): nested in-subquery") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(
plan(
pplParser,
s"""
| source = supplier
| | where s_suppkey IN [
| source = partsupp
| | where ps_partkey IN [
| source = part
| | where like(p_name, "forest%")
| | fields p_partkey
| ]
| | fields ps_suppkey
| ]
| | inner join left=l right=r on s_nationkey = n_nationkey and n_name = 'CANADA'
| nation
| | sort s_name
| """.stripMargin),
context)

val outer = UnresolvedRelation(Seq("supplier"))
val inner = UnresolvedRelation(Seq("partsupp"))
val nestedInner = UnresolvedRelation(Seq("part"))
val right = UnresolvedRelation(Seq("nation"))
val inSubquery =
Filter(
InSubquery(
Seq(UnresolvedAttribute("s_suppkey")),
ListQuery(
Project(
Seq(UnresolvedAttribute("ps_suppkey")),
Filter(
InSubquery(
Seq(UnresolvedAttribute("ps_partkey")),
ListQuery(Project(
Seq(UnresolvedAttribute("p_partkey")),
Filter(
UnresolvedFunction(
"like",
Seq(UnresolvedAttribute("p_name"), Literal("forest%")),
isDistinct = false),
nestedInner)))),
inner)))),
outer)
val leftPlan = SubqueryAlias("l", inSubquery)
val rightPlan = SubqueryAlias("r", right)
val joinCondition =
And(
EqualTo(UnresolvedAttribute("s_nationkey"), UnresolvedAttribute("n_nationkey")),
EqualTo(UnresolvedAttribute("n_name"), Literal("CANADA")))
val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE)
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("s_name"), Ascending)), global = true, joinPlan)
val expectedPlan = Project(Seq(UnresolvedStar(None)), sortedPlan)
comparePlans(expectedPlan, logPlan, false)
}
}

0 comments on commit 8797a82

Please sign in to comment.