From 10eb8a1116b2d354a5e087223bb6d4b3986bf579 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 26 Nov 2024 17:31:39 -0800 Subject: [PATCH] update tests with projected partitioning verification of correctness Signed-off-by: YANGDB --- ...FlintSparkPPLProjectStatementITSuite.scala | 155 +++++++++++++++++- ...lanProjectQueriesTranslatorTestSuite.scala | 128 ++++++++++++++- 2 files changed, 268 insertions(+), 15 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala index 54d4aff1b..b639faf7a 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala @@ -13,7 +13,7 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFu import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, IsNotNull, Literal, Multiply, Not, Or, SortOrder} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} +import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, NamedReference, Transform} import org.apache.spark.sql.execution.ExplainMode import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand} import org.apache.spark.sql.streaming.StreamTest @@ -150,7 +150,22 @@ class FlintSparkPPLProjectStatementITSuite ignoreIfExists = false, isAnalyzed = false) // Compare the two plans - comparePlans(logicalPlan, expectedPlan, checkAnalysis = false) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.isEmpty, + "Partitioning does not contain ay FieldReferences") } test("project using csv partition by age") { @@ -197,7 +212,29 @@ class FlintSparkPPLProjectStatementITSuite ignoreIfExists = false, isAnalyzed = false) // Compare the two plans - assert(compareByString(logicalPlan) == expectedPlan.toString) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => fieldRef.fieldNames().contains("age") + case _ => false + } + case _ => false + } && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1, + "Partitioning does not contain a FieldReferences: 'age'") } test("project using csv partition by state and country") { @@ -284,7 +321,30 @@ class FlintSparkPPLProjectStatementITSuite ignoreIfExists = false, isAnalyzed = false) // Compare the two plans - assert(compareByString(logicalPlan) == expectedPlan.toString) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => + fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country") + case _ => false + } + case _ => false + } && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2, + "Partitioning does not contain a FieldReferences: 'name'") } test("project using parquet partition by state & country") { @@ -371,7 +431,30 @@ class FlintSparkPPLProjectStatementITSuite ignoreIfExists = false, isAnalyzed = false) // Compare the two plans - assert(compareByString(logicalPlan) == expectedPlan.toString) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => + fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country") + case _ => false + } + case _ => false + } && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2, + "Partitioning does not contain a FieldReferences: 'name'") } test("project using parquet with options & partition by state & country") { @@ -462,7 +545,30 @@ class FlintSparkPPLProjectStatementITSuite ignoreIfExists = false, isAnalyzed = false) // Compare the two plans - assert(compareByString(logicalPlan) == expectedPlan.toString) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => + fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country") + case _ => false + } + case _ => false + } && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2, + "Partitioning does not contain a FieldReferences: 'name'") } test("project using parquet with options & location with partition by state & country") { @@ -553,8 +659,30 @@ class FlintSparkPPLProjectStatementITSuite Map.empty, ignoreIfExists = false, isAnalyzed = false) - // Compare the two plans - assert(compareByString(logicalPlan) == expectedPlan.toString) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logicalPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => + fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country") + case _ => false + } + case _ => false + } && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2, + "Partitioning does not contain a FieldReferences: 'name'") } test("test inner join with relation subquery") { @@ -656,6 +784,15 @@ class FlintSparkPPLProjectStatementITSuite .asInstanceOf[CreateTableAsSelect] .tableSpec .toString) + assert( + logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => fieldRef.fieldNames().contains("age_span") + case _ => false + } + case _ => false + } && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1, + "Partitioning does not contain a FieldReferences: 'name'") } - } diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala index ad27083cd..c3fb99a8a 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanProjectQueriesTranslatorTestSuite.scala @@ -14,7 +14,7 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation, UnresolvedStar} -import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Descending, EqualTo, GreaterThan, LessThan, Literal, NamedExpression, Not, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{And, Ascending, AttributeReference, Descending, EqualTo, GreaterThan, LessThan, Literal, NamedExpression, Not, SortOrder} import org.apache.spark.sql.catalyst.expressions.aggregate.AnyValue import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical._ @@ -63,7 +63,22 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite ignoreIfExists = false, isAnalyzed = false) // Compare the two plans - comparePlans(logPlan, expectedPlan, checkAnalysis = false) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].partitioning.isEmpty, + "Partitioning does not contain ay FieldReferences") } test("test project a simple search with only one table using csv and partitioned field ") { @@ -97,7 +112,29 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite ignoreIfExists = true, isAnalyzed = false) // Compare the two plans - assert(compareByString(logPlan) == expectedPlan.toString) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => fieldRef.fieldNames().contains("age") + case _ => false + } + case _ => false + } && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1, + "Partitioning does not contain a FieldReferences: 'name'") } test( @@ -132,7 +169,30 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite ignoreIfExists = true, isAnalyzed = false) // Compare the two plans - assert(compareByString(logPlan) == expectedPlan.toString) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => + fieldRef.fieldNames().contains("country") || fieldRef.fieldNames().contains("age") + case _ => false + } + case _ => false + } && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2, + "Partitioning does not contain a FieldReferences: 'name'") } test( @@ -171,7 +231,30 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite ignoreIfExists = true, isAnalyzed = false) // Compare the two plans - assert(compareByString(logPlan) == expectedPlan.toString) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => + fieldRef.fieldNames().contains("country") || fieldRef.fieldNames().contains("age") + case _ => false + } + case _ => false + } && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2, + "Partitioning does not contain a FieldReferences: 'name'") } test( @@ -212,7 +295,30 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite ignoreIfExists = true, isAnalyzed = false) // Compare the two plans - assert(compareByString(logPlan) == expectedPlan.toString) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].query, + expectedPlan.asInstanceOf[CreateTableAsSelect].query, + checkAnalysis = false) + comparePlans( + logPlan.asInstanceOf[CreateTableAsSelect].name, + expectedPlan.asInstanceOf[CreateTableAsSelect].name, + checkAnalysis = false) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan + .asInstanceOf[CreateTableAsSelect] + .tableSpec + .toString) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => + fieldRef.fieldNames().contains("country") || fieldRef.fieldNames().contains("age") + case _ => false + } + case _ => false + } && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2, + "Partitioning does not contain a FieldReferences: 'name'") } test( @@ -275,5 +381,15 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite .asInstanceOf[CreateTableAsSelect] .tableSpec .toString) + assert( + logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists { + case transform: Transform => + transform.arguments().exists { + case fieldRef: NamedReference => fieldRef.fieldNames().contains("name") + case _ => false + } + case _ => false + } && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1, + "Partitioning does not contain a FieldReferences: 'name'") } }