Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join side aliases should be optional #862

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,11 @@ source = table | where ispresent(a) |
- `source = table1 | left semi join left = l right = r on l.a = r.a table2`
- `source = table1 | left anti join left = l right = r on l.a = r.a table2`
- `source = table1 | join left = l right = r [ source = table2 | where d > 10 | head 5 ]`

- `source = table1 | inner join on table1.a = table2.a table2 | fields table1.a, table2.a, table1.b, table1.c` (directly refer table name)
- `source = table1 | inner join on a = c table2 | fields a, b, c, d` (ignore side aliases as long as no ambiguous)
- `source = table1 as t1 | join left = l right = r on l.a = r.a table2 as t2 | fields l.a, r.a` (side alias overrides table alias)
- `source = table1 as t1 | join left = l right = r on l.a = r.a table2 as t2 | fields t1.a, t2.a` (error, side alias overrides table alias)
- `source = table1 | join left = l right = r on l.a = r.a [ source = table2 ] as s | fields l.a, s.a` (error, side alias overrides subquery alias)

#### **Lookup**
[See additional command details](ppl-lookup-command.md)
Expand Down
23 changes: 12 additions & 11 deletions docs/ppl-lang/ppl-join-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ WHERE t1.serviceName = `order`
SEARCH source=<left-table>
| <other piped command>
| [joinType] JOIN
leftAlias
rightAlias
[leftAlias]
[rightAlias]
[joinHints]
ON joinCriteria
<right-table>
Expand All @@ -79,12 +79,12 @@ SEARCH source=<left-table>

**leftAlias**
- Syntax: `left = <leftAlias>`
- Required
- Optional
- Description: The subquery alias to use with the left join side, to avoid ambiguous naming.

**rightAlias**
- Syntax: `right = <rightAlias>`
- Required
- Optional
- Description: The subquery alias to use with the right join side, to avoid ambiguous naming.

**joinHints**
Expand Down Expand Up @@ -138,11 +138,11 @@ Rewritten by PPL Join query:
```sql
SEARCH source=customer
| FIELDS c_custkey
| LEFT OUTER JOIN left = c, right = o
ON c.c_custkey = o.o_custkey AND o_comment NOT LIKE '%unusual%packages%'
| LEFT OUTER JOIN
ON c_custkey = o_custkey AND o_comment NOT LIKE '%unusual%packages%'
orders
| STATS count(o_orderkey) AS c_count BY c.c_custkey
| STATS count(1) AS custdist BY c_count
| STATS count(o_orderkey) AS c_count BY c_custkey
| STATS count() AS custdist BY c_count
| SORT - custdist, - c_count
```
_- **Limitation: sub-searches is unsupported in join right side**_
Expand All @@ -151,14 +151,15 @@ If sub-searches is supported, above ppl query could be rewritten as:
```sql
SEARCH source=customer
| FIELDS c_custkey
| LEFT OUTER JOIN left = c, right = o ON c.c_custkey = o.o_custkey
| LEFT OUTER JOIN
ON c_custkey = o_custkey
[
SEARCH source=orders
| WHERE o_comment NOT LIKE '%unusual%packages%'
| FIELDS o_orderkey, o_custkey
]
| STATS count(o_orderkey) AS c_count BY c.c_custkey
| STATS count(1) AS custdist BY c_count
| STATS count(o_orderkey) AS c_count BY c_custkey
| STATS count() AS custdist BY c_count
| SORT - custdist, - c_count
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Ascending, Divide, EqualTo, Floor, GreaterThan, LessThan, Literal, Multiply, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
Expand Down Expand Up @@ -924,4 +924,271 @@ class FlintSparkPPLJoinITSuite
s
}.size == 13)
}

test("test multiple joins without table aliases") {
val frame = sql(s"""
| source = $testTable1
| | JOIN ON $testTable1.name = $testTable2.name $testTable2
| | JOIN ON $testTable2.name = $testTable3.name $testTable3
| | fields $testTable1.name, $testTable2.name, $testTable3.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello"),
Row("John", "John", "John"),
Row("David", "David", "David"),
Row("David", "David", "David"),
Row("Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
table1,
table2,
Inner,
Some(
EqualTo(
UnresolvedAttribute(s"$testTable1.name"),
UnresolvedAttribute(s"$testTable2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
table3,
Inner,
Some(
EqualTo(
UnresolvedAttribute(s"$testTable2.name"),
UnresolvedAttribute(s"$testTable3.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute(s"$testTable1.name"),
UnresolvedAttribute(s"$testTable2.name"),
UnresolvedAttribute(s"$testTable3.name")),
joinPlan2)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with part subquery aliases") {
val frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | JOIN right = t3 ON t1.name = t3.name $testTable3
| | fields t1.name, t2.name, t3.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello"),
Row("John", "John", "John"),
Row("David", "David", "David"),
Row("David", "David", "David"),
Row("Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
SubqueryAlias("t1", table1),
SubqueryAlias("t2", table2),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
SubqueryAlias("t3", table3),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("t1.name"),
UnresolvedAttribute("t2.name"),
UnresolvedAttribute("t3.name")),
joinPlan2)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with self join 1") {
val frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | JOIN right = t3 ON t1.name = t3.name $testTable3
| | JOIN right = t4 ON t1.name = t4.name $testTable1
| | fields t1.name, t2.name, t3.name, t4.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello", "Hello"),
Row("John", "John", "John", "John"),
Row("David", "David", "David", "David"),
Row("David", "David", "David", "David"),
Row("Jane", "Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
SubqueryAlias("t1", table1),
SubqueryAlias("t2", table2),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
SubqueryAlias("t3", table3),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))),
JoinHint.NONE)
val joinPlan3 = Join(
joinPlan2,
SubqueryAlias("t4", table1),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("t1.name"),
UnresolvedAttribute("t2.name"),
UnresolvedAttribute("t3.name"),
UnresolvedAttribute("t4.name")),
joinPlan3)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("test multiple joins with self join 2") {
val frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | JOIN right = t3 ON t1.name = t3.name $testTable3
| | JOIN ON t1.name = t4.name
| [
| source = $testTable1
| ] as t4
| | fields t1.name, t2.name, t3.name, t4.name
| """.stripMargin)
assertSameRows(
Array(
Row("Jake", "Jake", "Jake", "Jake"),
Row("Hello", "Hello", "Hello", "Hello"),
Row("John", "John", "John", "John"),
Row("David", "David", "David", "David"),
Row("David", "David", "David", "David"),
Row("Jane", "Jane", "Jane", "Jane")),
frame)

val logicalPlan = frame.queryExecution.logical
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val table3 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test3"))
val joinPlan1 = Join(
SubqueryAlias("t1", table1),
SubqueryAlias("t2", table2),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t2.name"))),
JoinHint.NONE)
val joinPlan2 = Join(
joinPlan1,
SubqueryAlias("t3", table3),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t3.name"))),
JoinHint.NONE)
val joinPlan3 = Join(
joinPlan2,
SubqueryAlias("t4", table1),
Inner,
Some(EqualTo(UnresolvedAttribute("t1.name"), UnresolvedAttribute("t4.name"))),
JoinHint.NONE)
val expectedPlan = Project(
Seq(
UnresolvedAttribute("t1.name"),
UnresolvedAttribute("t2.name"),
UnresolvedAttribute("t3.name"),
UnresolvedAttribute("t4.name")),
joinPlan3)
comparePlans(expectedPlan, logicalPlan, checkAnalysis = false)
}

test("check access the reference by aliases") {
var frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name $testTable2 as t2
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)

frame = sql(s"""
| source = $testTable1 as t1
| | JOIN ON t1.name = t2.name $testTable2 as t2
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)

frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name [ source = $testTable2 ] as t2
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)

frame = sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name [ source = $testTable2 as t2 ]
| | fields t1.name, t2.name
| """.stripMargin)
assert(frame.collect().length > 0)
}

test("access the reference by override aliases should throw exception") {
var ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2 as tt
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1 as tt
| | JOIN left = t1 right = t2 ON t1.name = t2.name $testTable2
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = $testTable2 as tt ]
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 ON t1.name = t2.name [ source = $testTable2 as tt ] as t2
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1
| | JOIN left = t1 right = t2 ON t1.name = t2.name [ source = $testTable2 ] as tt
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))

ex = intercept[AnalysisException](sql(s"""
| source = $testTable1 as tt
| | JOIN left = t1 ON t1.name = t2.name $testTable2 as t2
| | fields tt.name
| """.stripMargin))
assert(ex.getMessage.contains("`tt`.`name` cannot be resolved"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ joinType
;

sideAlias
: LEFT EQUAL leftAlias = ident COMMA? RIGHT EQUAL rightAlias = ident
: (LEFT EQUAL leftAlias = ident)? COMMA? (RIGHT EQUAL rightAlias = ident)?
;

joinCriteria
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import lombok.ToString;
import org.opensearch.sql.ast.expression.UnresolvedExpression;

import java.util.Collections;

/**
* Extend Relation to describe the table itself
*/
@ToString
public class DescribeRelation extends Relation{
public DescribeRelation(UnresolvedExpression tableName) {
super(tableName);
super(Collections.singletonList(tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
public class Join extends UnresolvedPlan {
private UnresolvedPlan left;
private final UnresolvedPlan right;
private final String leftAlias;
private final String rightAlias;
private final Optional<String> leftAlias;
private final Optional<String> rightAlias;
private final JoinType joinType;
private final Optional<UnresolvedExpression> joinCondition;
private final JoinHint joinHint;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.left = new SubqueryAlias(leftAlias, child);
this.left = leftAlias.isEmpty() ? child : new SubqueryAlias(leftAlias.get(), child);
return this;
}

Expand Down
Loading
Loading