Skip to content

Commit

Permalink
Join side aliases should be optional
Browse files Browse the repository at this point in the history
Signed-off-by: Lantao Jin <[email protected]>
  • Loading branch information
LantaoJin committed Nov 1, 2024
1 parent 950009b commit 94a546c
Show file tree
Hide file tree
Showing 13 changed files with 643 additions and 96 deletions.
6 changes: 5 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ source = table | where ispresent(a) |
- `source = table1 | left semi join left = l right = r on l.a = r.a table2`
- `source = table1 | left anti join left = l right = r on l.a = r.a table2`
- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]`

- `source = table1 | inner join on table1.a = table2.a table2 | fields table1.a, table2.a, table1.b, table1.c` (directly refer table name)
- `source = table1 | inner join on a = c table2 | fields a, b, c, d` (ignore side aliases as long as no ambiguous)
- `source = table1 as t1 | join left = l right = r on l.a = r.a table2 as t2 | fields l.a, r.a` (side alias overrides table alias)
- `source = table1 as t1 | join left = l right = r on l.a = r.a table2 as t2 | fields t1.a, t2.a` (error, side alias overrides table alias)
- `source = table1 | join left = l right = r on l.a = r.a [ source = table2 ] as s | fields l.a, s.a` (error, side alias overrides subquery alias)

#### **Lookup**
[See additional command details](ppl-lookup-command.md)
Expand Down
23 changes: 12 additions & 11 deletions docs/ppl-lang/ppl-join-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ WHERE t1.serviceName = `order`
SEARCH source=<left-table>
| <other piped command>
| [joinType] JOIN
leftAlias
rightAlias
[leftAlias]
[rightAlias]
[joinHints]
ON joinCriteria
<right-table>
Expand All @@ -79,12 +79,12 @@ SEARCH source=<left-table>

**leftAlias**
- Syntax: `left = <leftAlias>`
- Required
- Optional
- Description: The subquery alias to use with the left join side, to avoid ambiguous naming.

**rightAlias**
- Syntax: `right = <rightAlias>`
- Required
- Optional
- Description: The subquery alias to use with the right join side, to avoid ambiguous naming.

**joinHints**
Expand Down Expand Up @@ -138,11 +138,11 @@ Rewritten by PPL Join query:
```sql
SEARCH source=customer
| FIELDS c_custkey
| LEFT OUTER JOIN left = c, right = o
ON c.c_custkey = o.o_custkey AND o_comment NOT LIKE '%unusual%packages%'
| LEFT OUTER JOIN
ON c_custkey = o_custkey AND o_comment NOT LIKE '%unusual%packages%'
orders
| STATS count(o_orderkey) AS c_count BY c.c_custkey
| STATS count(1) AS custdist BY c_count
| STATS count(o_orderkey) AS c_count BY c_custkey
| STATS count() AS custdist BY c_count
| SORT - custdist, - c_count
```
_- **Limitation: sub-searches is unsupported in join right side**_
Expand All @@ -151,14 +151,15 @@ If sub-searches is supported, above ppl query could be rewritten as:
```sql
SEARCH source=customer
| FIELDS c_custkey
| LEFT OUTER JOIN left = c, right = o ON c.c_custkey = o.o_custkey
| LEFT OUTER JOIN
ON c_custkey = o_custkey
[
SEARCH source=orders
| WHERE o_comment NOT LIKE '%unusual%packages%'
| FIELDS o_orderkey, o_custkey
]
| STATS count(o_orderkey) AS c_count BY c.c_custkey
| STATS count(1) AS custdist BY c_count
| STATS count(o_orderkey) AS c_count BY c_custkey
| STATS count() AS custdist BY c_count
| SORT - custdist, - c_count
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, GreaterThan, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
Expand Down Expand Up @@ -924,4 +924,271 @@ class FlintSparkPPLJoinITSuite
s
}.size == 13)
}

test("test multiple joins without table aliases") {
val frame = sql(s"""
| source = $testTable1
| | JOIN ON $testTable1.name = $testTable2.name $testTable2
| | JOIN ON $testTable2.name = $testTable3.name $testTable3
| | fields $testTable1.name, $testTable2.name, $testTable3.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello"),
Row("John", "John", "John"),
Row("David", "David", "David"),
Row("David", "David", "David"),
Row("Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
table1,
table2,
Inner,
Some(
EqualTo(
UnresolvedAttribute(s"$testTable1.name"),
UnresolvedAttribute(s"$testTable2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
table3,
Inner,
Some(
EqualTo(
UnresolvedAttribute(s"$testTable2.name"),
UnresolvedAttribute(s"$testTable3.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute(s"$testTable1.name"),
UnresolvedAttribute(s"$testTable2.name"),
UnresolvedAttribute(s"$testTable3.name")),
joinPlan2)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with part subquery aliases") {
val frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | JOIN right = t3 ON t1.name = t3.name $testTable3
| | fields t1.name, t2.name, t3.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello"),
Row("John", "John", "John"),
Row("David", "David", "David"),
Row("David", "David", "David"),
Row("Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
SubqueryAlias("t1", table1),
SubqueryAlias("t2", table2),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
SubqueryAlias("t3", table3),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("t1.name"),
UnresolvedAttribute("t2.name"),
UnresolvedAttribute("t3.name")),
joinPlan2)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with self join 1") {
val frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | JOIN right = t3 ON t1.name = t3.name $testTable3
| | JOIN right = t4 ON t1.name = t4.name $testTable1
| | fields t1.name, t2.name, t3.name, t4.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello", "Hello"),
Row("John", "John", "John", "John"),
Row("David", "David", "David", "David"),
Row("David", "David", "David", "David"),
Row("Jane", "Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
SubqueryAlias("t1", table1),
SubqueryAlias("t2", table2),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
SubqueryAlias("t3", table3),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))),
JoinHint.NONE)
val joinPlan3 = Join(
joinPlan2,
SubqueryAlias("t4", table1),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("t1.name"),
UnresolvedAttribute("t2.name"),
UnresolvedAttribute("t3.name"),
UnresolvedAttribute("t4.name")),
joinPlan3)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with self join 2") {
val frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | JOIN right = t3 ON t1.name = t3.name $testTable3
| | JOIN ON t1.name = t4.name
| [
| source = $testTable1
| ] as t4
| | fields t1.name, t2.name, t3.name, t4.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello", "Hello"),
Row("John", "John", "John", "John"),
Row("David", "David", "David", "David"),
Row("David", "David", "David", "David"),
Row("Jane", "Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
SubqueryAlias("t1", table1),
SubqueryAlias("t2", table2),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
SubqueryAlias("t3", table3),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))),
JoinHint.NONE)
val joinPlan3 = Join(
joinPlan2,
SubqueryAlias("t4", table1),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("t1.name"),
UnresolvedAttribute("t2.name"),
UnresolvedAttribute("t3.name"),
UnresolvedAttribute("t4.name")),
joinPlan3)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("check access the reference by aliases") {
var frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name $testTable2 as t2
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)

frame = sql(s"""
| source = $testTable1 as t1
| | JOIN ON t1.name = t2.name $testTable2 as t2
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)

frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name [ source = $testTable2 ] as t2
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)

frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name [ source = $testTable2 as t2 ]
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)
}

test("access the reference by override aliases should throw exception") {
var ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2 as tt
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1 as tt
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = $testTable2 as tt ]
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name [ source = $testTable2 as tt ] as t2
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = $testTable2 ] as tt
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1 as tt
| | JOIN left = t1 ON t1.name = t2.name $testTable2 as t2
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ joinType
;

sideAlias
: LEFT EQUAL leftAlias = ident COMMA? RIGHT EQUAL rightAlias = ident
: (LEFT EQUAL leftAlias = ident)? COMMA? (RIGHT EQUAL rightAlias = ident)?
;

joinCriteria
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import lombok.ToString;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.Collections;

/**
* Extend Relation to describe the table itself
*/
@ToString
public class DescribeRelation extends Relation{
public DescribeRelation(UnresolvedExpression tableName) {
super(tableName);
super(Collections.singletonList(tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
public class Join extends UnresolvedPlan {
private UnresolvedPlan left;
private final UnresolvedPlan right;
private final String leftAlias;
private final String rightAlias;
private final Optional<String> leftAlias;
private final Optional<String> rightAlias;
private final JoinType joinType;
private final Optional<UnresolvedExpression> joinCondition;
private final JoinHint joinHint;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.left = new SubqueryAlias(leftAlias, child);
this.left = leftAlias.isEmpty() ? child : new SubqueryAlias(leftAlias.get(), child);
return this;
}

Expand Down
Loading

0 comments on commit 94a546c

Please sign in to comment.