From b8e02fc18d029bc1b18414aec70b2cb5a0ad6e31 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Mon, 25 Nov 2024 16:38:42 -0800 Subject: [PATCH] update tests & command spec changes Signed-off-by: YANGDB --- ...FlintSparkPPLProjectStatementITSuite.scala | 103 ++++++++++++------ 1 file changed, 70 insertions(+), 33 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala index 36929430d..3ab4228be 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLProjectStatementITSuite.scala @@ -48,11 +48,11 @@ class FlintSparkPPLProjectStatementITSuite } } - test("project sql test using csv") { + ignore("project sql test using csv") { val frame = sql(s""" | CREATE TABLE student_partition_bucket - | USING csv - | PARTITIONED BY (age) + | USING parquet + | PARTITIONED BY (age, country) | AS SELECT * FROM $testTable; | """.stripMargin) @@ -166,7 +166,7 @@ class FlintSparkPPLProjectStatementITSuite ) } - test("project using csv partition by age and state") { + test("project using csv partition by state and country") { val frame = sql(s""" |project simpleView using csv partitioned by ('state', 'country') | source = $testTable | dedup name | fields name, state, country | """.stripMargin) @@ -244,45 +244,82 @@ class FlintSparkPPLProjectStatementITSuite ) } - test("project using parquet partition by state & location") { + test("project using parquet partition by state & country") { val frame = sql(s""" - | source = $testTable| head 2 - | """.stripMargin) - - // Retrieve the results - val results: Array[Row] = frame.collect() - assert(results.length == 2) + |project simpleView using parquet partitioned by ('state', 'country') | source = $testTable | dedup name | fields name, state, country + | """.stripMargin) - // Retrieve the logical plan - val logicalPlan: LogicalPlan = frame.queryExecution.logical - // Define the expected logical plan - val limitPlan: LogicalPlan = - Limit(Literal(2), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) - val expectedPlan = Project(Seq(UnresolvedStar(None)), limitPlan) + frame.collect() + // verify new view was created correctly + val results = sql(s""" + | source = simpleView + | """.stripMargin).collect() - // Compare the two plans - assert(compareByString(expectedPlan) === compareByString(logicalPlan)) - } + // Define the expected results + val expectedResults: Array[Row] = Array(Row("Jane", "Quebec", "Canada"), Row("John", "Ontario", "Canada"), Row("Jake", "California", "USA"), Row("Hello", "New York", "USA")) + // Convert actual results to a Set for quick lookup + val resultsSet: Set[Row] = results.toSet + // Check that each expected row is present in the actual results + expectedResults.foreach { expectedRow => + assert(resultsSet.contains(expectedRow), s"Expected row $expectedRow not found in results") + } - test("create ppl simple query two with fields and head (limit) test") { - val frame = sql(s""" - | source = $testTable| fields name, age | head 1 - | """.stripMargin) + // verify new view was created correctly + val describe = sql(s""" + | describe simpleView + | """.stripMargin).collect() - // Retrieve the results - val results: Array[Row] = frame.collect() - assert(results.length == 1) + // Define the expected results + val expectedDescribeResults: Array[Row] = Array( + Row("Database", "default"), + Row("Partition Provider", "Catalog"), + Row("Type", "MANAGED"), + Row("country", "string", "null"), + Row("Catalog", "spark_catalog"), + Row("state", "string", "null"), + Row("# Partition Information", ""), + Row("Created By", "Spark 3.5.1"), + Row("Provider", "PARQUET"), + Row("# Detailed Table Information", ""), + Row("Table", "simpleview"), + Row("Last Access", "UNKNOWN"), + Row("# col_name", "data_type", "comment"), + Row("name", "string", "null")) + // Convert actual results to a Set for quick lookup + val describeResults: Set[Row] = describe.toSet + // Check that each expected row is present in the actual results + expectedDescribeResults.foreach { expectedRow => + assert(expectedDescribeResults.contains(expectedRow), s"Expected row $expectedRow not found in results") + } // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - val project = Project( - Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")), - UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan - val limitPlan: LogicalPlan = Limit(Literal(1), project) - val expectedPlan: LogicalPlan = Project(Seq(UnresolvedStar(None)), limitPlan) + val relation = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + val nameAttribute = UnresolvedAttribute("name") + val dedup = + Deduplicate(Seq(nameAttribute), Filter(IsNotNull(nameAttribute), relation)) + val expectedPlan: LogicalPlan = + CreateTableAsSelect( + UnresolvedIdentifier(Seq("simpleView")), + // Seq(IdentityTransform.apply(FieldReference.apply("age")), IdentityTransform.apply(FieldReference.apply("state")), + Seq(), + Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("state"), UnresolvedAttribute("country")), dedup), + UnresolvedTableSpec( + Map.empty, + Option("PARQUET"), + OptionList(Seq()), + Option.empty, + Option.empty, + Option.empty, + external = false), + Map.empty, + ignoreIfExists = false, + isAnalyzed = false) // Compare the two plans - assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + assert( + compareByString(logicalPlan) == expectedPlan.toString + ) } }