Skip to content

Commit

Permalink
update tests with projected join query
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Nov 26, 2024
1 parent 36038aa commit 9a84325
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait LogicalPlanTestUtils {
}
agg.copy(groupingExpressions = newGrouping, aggregateExpressions = newAggregations)

// Normalize CreateTableAsSelect by ignoring partitioning and other specific fields
// Normalize CreateTableAsSelect by ignoring partitioning
case ctas: CreateTableAsSelect =>
ctas.copy(
partitioning = Seq.empty, // Ignore partitioning by setting it to an empty sequence
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import org.opensearch.sql.ppl.{CatalystPlanContext, CatalystQueryPlanVisitor}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedIdentifier, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Descending, EqualTo, GreaterThan, Literal, NamedExpression, Not, SortOrder}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Descending, EqualTo, GreaterThan, LessThan, Literal, NamedExpression, Not, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate.AnyValue
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, IdentityTransform, NamedReference, Transform}

Expand All @@ -26,6 +26,9 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
with LogicalPlanTestUtils
with Matchers {

private val testTable1 = "spark_catalog.default.flint_ppl_test1"
private val testTable2 = "spark_catalog.default.flint_ppl_test2"

private val planTransformer = new CatalystQueryPlanVisitor()
private val pplParser = new PPLSyntaxParser()
private val viewFolderLocation = Paths.get(".", "spark-warehouse", "student_partition_bucket")
Expand Down Expand Up @@ -173,14 +176,13 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite

test(
"test project a simple search with only one table using parquet with location and Options with multiple partitioned fields ") {
// if successful build ppl logical plan and translate to catalyst logical plan
val viewLocation = viewFolderLocation.toAbsolutePath.toString
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(
plan(
pplParser,
s"""
| project if not exists simpleView using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false')
| project if not exists simpleView using parquet OPTIONS('parquet.bloom.filter.enabled'='true', 'parquet.bloom.filter.enabled#age'='false')
| partitioned by (age, country) location '$viewLocation' | source = table | where state != 'California'
""".stripMargin),
context)
Expand Down Expand Up @@ -212,4 +214,66 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
// Compare the two plans
assert(compareByString(logPlan) == expectedPlan.toString)
}

test(
"test project with inner join: join condition with table names and predicates using parquet with location and Options with single partitioned fields") {
val viewLocation = viewFolderLocation.toAbsolutePath.toString
val context = new CatalystPlanContext
val logPlan = planTransformer.visit(
plan(
pplParser,
s"""
| project if not exists simpleView using parquet OPTIONS('parquet.bloom.filter.enabled'='true')
| partitioned by (name) location '$viewLocation' |
| source = $testTable1| INNER JOIN left = l right = r ON $testTable1.id = $testTable2.id AND $testTable1.count > 10 AND lower($testTable2.name) = 'hello' $testTable2
| """.stripMargin),
context)
val table1 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test1"))
val table2 = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test2"))
val leftPlan = SubqueryAlias("l", table1)
val rightPlan = SubqueryAlias("r", table2)
val joinCondition = And(
And(
EqualTo(UnresolvedAttribute(s"$testTable1.id"), UnresolvedAttribute(s"$testTable2.id")),
EqualTo(
Literal("hello"),
UnresolvedFunction.apply(
"lower",
Seq(UnresolvedAttribute(s"$testTable2.name")),
isDistinct = false))),
LessThan(Literal(10), UnresolvedAttribute(s"$testTable1.count")))
val joinPlan = Join(leftPlan, rightPlan, Inner, Some(joinCondition), JoinHint.NONE)
val expectedPlan: LogicalPlan =
CreateTableAsSelect(
UnresolvedIdentifier(Seq("simpleView")),
Seq(),
// Seq(IdentityTransform.apply(FieldReference.apply("age")), IdentityTransform.apply(FieldReference.apply("country"))),
Project(Seq(UnresolvedStar(None)), joinPlan),
UnresolvedTableSpec(
Map.empty,
Option("PARQUET"),
OptionList(Seq(("parquet.bloom.filter.enabled", Literal("true")))),
Option(viewLocation),
Option.empty,
Option.empty,
external = false),
Map.empty,
ignoreIfExists = true,
isAnalyzed = false)

// Compare the two plans
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
}
}

0 comments on commit 9a84325

Please sign in to comment.