From f2fff0a6711be4134ac8c202568ef36ad4f19635 Mon Sep 17 00:00:00 2001 From: YANGDB Date: Tue, 13 Aug 2024 13:00:51 -0700 Subject: [PATCH] fix scalafmtAll formatting Signed-off-by: YANGDB Signed-off-by: YANGDB --- .../flint/spark/sql/FlintSparkSqlParser.scala | 7 +- .../flint/spark/FlintSparkSuite.scala | 2 +- .../FlintSparkPPLNestedFieldsITSuite.scala | 150 ++++++++++-------- ...lPlanBasicQueriesTranslatorTestSuite.scala | 12 +- 4 files changed, 96 insertions(+), 75 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala index 0c871813f..a854d1d72 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlParser.scala @@ -31,7 +31,9 @@ import org.antlr.v4.runtime._ import org.antlr.v4.runtime.atn.PredictionMode import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException} import org.antlr.v4.runtime.tree.TerminalNodeImpl +import org.opensearch.flint.core.logging.CustomLogging.{logError, logInfo} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression @@ -40,7 +42,6 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.types.{DataType, StructType} -import org.opensearch.flint.core.logging.CustomLogging.{logError, logInfo} /** * Flint SQL parser that extends Spark SQL parser with Flint SQL statements. @@ -63,7 +64,9 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface // Fall back to Spark parse plan logic if flint cannot parse case e: ParseException => // Log the issue - logInfo(s"Failed to parse PPL with PPL parser. Falling back to Spark parser. PPL: $sqlText", e) + logInfo( + s"Failed to parse PPL with PPL parser. Falling back to Spark parser. PPL: $sqlText", + e) // Fall back to Spark parse plan logic sparkParser.parsePlan(sqlText) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 77f8fedcc..e93eee790 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -390,7 +390,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit | VALUES ( 50, STRUCT(STRUCT("value3"),789) ) |""".stripMargin) } - + protected def createStructNestedTable(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLNestedFieldsITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLNestedFieldsITSuite.scala index 514312e7e..084d889b7 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLNestedFieldsITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/ppl/FlintSparkPPLNestedFieldsITSuite.scala @@ -5,13 +5,13 @@ package org.opensearch.flint.spark.ppl +import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Descending, EqualTo, GreaterThan, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.DescribeTableCommand import org.apache.spark.sql.streaming.StreamTest -import org.apache.spark.sql.{QueryTest, Row} class FlintSparkPPLNestedFieldsITSuite extends QueryTest @@ -81,11 +81,11 @@ class FlintSparkPPLNestedFieldsITSuite val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = Array( - Row(30, Row(Row("value1"), 123), Row(Row("valueA"), 23)), - Row(40, Row(Row("value5"), 123), Row(Row("valueB"), 33)), - Row(30, Row(Row("value4"), 823), Row(Row("valueC"), 83)), - Row(40, Row(Row("value2"), 456), Row(Row("valueD"), 46)), - Row(50, Row(Row("value3"), 789), Row(Row("valueE"), 89))) + Row(30, Row(Row("value1"), 123), Row(Row("valueA"), 23)), + Row(40, Row(Row("value5"), 123), Row(Row("valueB"), 33)), + Row(30, Row(Row("value4"), 823), Row(Row("valueC"), 83)), + Row(40, Row(Row("value2"), 456), Row(Row("valueD"), 46)), + Row(50, Row(Row("value3"), 789), Row(Row("valueE"), 89))) // Compare the results implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) assert(results.sorted.sameElements(expectedResults.sorted)) @@ -183,11 +183,12 @@ class FlintSparkPPLNestedFieldsITSuite val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = - Array(Row( 30, 123, 23), - Row( 30, 823, 83), - Row( 40, 123, 33), - Row( 40, 456, 46), - Row( 50, 789, 89)) + Array( + Row(30, 123, 23), + Row(30, 823, 83), + Row(40, 123, 33), + Row(40, 456, 46), + Row(50, 789, 89)) // Compare the results implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0)) assert(results.sorted.sameElements(expectedResults.sorted)) @@ -196,7 +197,10 @@ class FlintSparkPPLNestedFieldsITSuite val logicalPlan: LogicalPlan = frame.queryExecution.logical // Define the expected logical plan val expectedPlan: LogicalPlan = Project( - Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2"), UnresolvedAttribute("struct_col2.field2")), + Seq( + UnresolvedAttribute("int_col"), + UnresolvedAttribute("struct_col.field2"), + UnresolvedAttribute("struct_col2.field2")), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Compare the two plans assert(expectedPlan === logicalPlan) @@ -211,11 +215,7 @@ class FlintSparkPPLNestedFieldsITSuite val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = - Array(Row(50, 89), - Row(30, 83), - Row(40, 46), - Row(40, 33), - Row(30, 23)) + Array(Row(50, 89), Row(30, 83), Row(40, 46), Row(40, 33), Row(30, 23)) assert(results === expectedResults) // Retrieve the logical plan @@ -229,7 +229,9 @@ class FlintSparkPPLNestedFieldsITSuite // Define the expected logical plan val expectedPlan: LogicalPlan = - Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")), sortedPlan) + Project( + Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")), + sortedPlan) // Compare the two plans assert(compareByString(expectedPlan) === compareByString(logicalPlan)) @@ -244,11 +246,7 @@ class FlintSparkPPLNestedFieldsITSuite val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = - Array(Row(30, 823), - Row(50, 789), - Row(40, 456), - Row(40, 123), - Row(30, 123)) + Array(Row(30, 823), Row(50, 789), Row(40, 456), Row(40, 123), Row(30, 123)) assert(results === expectedResults) // Retrieve the logical plan @@ -256,20 +254,23 @@ class FlintSparkPPLNestedFieldsITSuite val sortedPlan: LogicalPlan = Sort( - Seq(SortOrder(UnresolvedAttribute("struct_col.field2"), Descending), - SortOrder(UnresolvedAttribute("int_col"), Descending)), + Seq( + SortOrder(UnresolvedAttribute("struct_col.field2"), Descending), + SortOrder(UnresolvedAttribute("int_col"), Descending)), global = true, UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))) // Define the expected logical plan val expectedPlan: LogicalPlan = - Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")), sortedPlan) + Project( + Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")), + sortedPlan) // Compare the two plans assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } -test("create ppl simple query with nested field 1 range filter test") { + test("create ppl simple query with nested field 1 range filter test") { val frame = sql(s""" | source = $testTable| where struct_col.field2 > 200 | sort - struct_col.field2 | fields int_col, struct_col.field2 | """.stripMargin) @@ -278,10 +279,8 @@ test("create ppl simple query with nested field 1 range filter test") { val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = - Array(Row(30, 823), - Row(50, 789), - Row(40, 456)) - assert(results === expectedResults) + Array(Row(30, 823), Row(50, 789), Row(40, 456)) + assert(results === expectedResults) // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical @@ -289,19 +288,23 @@ test("create ppl simple query with nested field 1 range filter test") { // Define the expected logical plan val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) // Define the expected logical plan components - val filterPlan = Filter( - GreaterThan(UnresolvedAttribute("struct_col.field2"), Literal(200)), - table) + val filterPlan = + Filter(GreaterThan(UnresolvedAttribute("struct_col.field2"), Literal(200)), table) val sortedPlan: LogicalPlan = - Sort(Seq(SortOrder(UnresolvedAttribute("struct_col.field2"), Descending)), global = true, filterPlan) + Sort( + Seq(SortOrder(UnresolvedAttribute("struct_col.field2"), Descending)), + global = true, + filterPlan) val expectedPlan = - Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")), sortedPlan) - + Project( + Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")), + sortedPlan) + // Compare the two plans assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - -test("create ppl simple query with nested field 2 range filter test") { + + test("create ppl simple query with nested field 2 range filter test") { val frame = sql(s""" | source = $testTable| where struct_col2.field2 > 50 | sort - struct_col2.field2 | fields int_col, struct_col2.field2 | """.stripMargin) @@ -310,27 +313,30 @@ test("create ppl simple query with nested field 2 range filter test") { val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = - Array(Row(50, 89), - Row(30, 83)) - assert(results === expectedResults) + Array(Row(50, 89), Row(30, 83)) + assert(results === expectedResults) // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - // Define the expected logical plan - val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) - // Define the expected logical plan components - val filterPlan = Filter( - GreaterThan(UnresolvedAttribute("struct_col2.field2"), Literal(50)), - table) - val sortedPlan: LogicalPlan = - Sort(Seq(SortOrder(UnresolvedAttribute("struct_col2.field2"), Descending)), global = true, filterPlan) - val expectedPlan = - Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")), sortedPlan) - - // Compare the two plans - assert(compareByString(expectedPlan) === compareByString(logicalPlan)) -} + // Define the expected logical plan + val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) + // Define the expected logical plan components + val filterPlan = + Filter(GreaterThan(UnresolvedAttribute("struct_col2.field2"), Literal(50)), table) + val sortedPlan: LogicalPlan = + Sort( + Seq(SortOrder(UnresolvedAttribute("struct_col2.field2"), Descending)), + global = true, + filterPlan) + val expectedPlan = + Project( + Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")), + sortedPlan) + + // Compare the two plans + assert(compareByString(expectedPlan) === compareByString(logicalPlan)) + } test("create ppl simple query with nested field string match test") { val frame = sql(s""" @@ -347,22 +353,25 @@ test("create ppl simple query with nested field 2 range filter test") { // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - // Define the expected logical plan val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) // Define the expected logical plan components - val filterPlan = Filter( - EqualTo(UnresolvedAttribute("struct_col.field1.subfield"), Literal("value1")), - table) + val filterPlan = + Filter(EqualTo(UnresolvedAttribute("struct_col.field1.subfield"), Literal("value1")), table) val sortedPlan: LogicalPlan = Sort(Seq(SortOrder(UnresolvedAttribute("int_col"), Ascending)), global = true, filterPlan) val expectedPlan = - Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field1.subfield"), UnresolvedAttribute("struct_col2.field1.subfield")), sortedPlan) + Project( + Seq( + UnresolvedAttribute("int_col"), + UnresolvedAttribute("struct_col.field1.subfield"), + UnresolvedAttribute("struct_col2.field1.subfield")), + sortedPlan) // Compare the two plans assert(compareByString(expectedPlan) === compareByString(logicalPlan)) } - + test("create ppl simple query with nested field string filter test") { val frame = sql(s""" | source = $testTable| where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield @@ -372,16 +381,16 @@ test("create ppl simple query with nested field 2 range filter test") { val results: Array[Row] = frame.collect() // Define the expected results val expectedResults: Array[Row] = - Array(Row(30, "value4", "valueC"), - Row(40, "value5", "valueB"), - Row(40, "value2", "valueD"), - Row(50, "value3", "valueE")) + Array( + Row(30, "value4", "valueC"), + Row(40, "value5", "valueB"), + Row(40, "value2", "valueD"), + Row(50, "value3", "valueE")) assert(results === expectedResults) // Retrieve the logical plan val logicalPlan: LogicalPlan = frame.queryExecution.logical - // Define the expected logical plan val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")) // Define the expected logical plan components @@ -391,7 +400,12 @@ test("create ppl simple query with nested field 2 range filter test") { val sortedPlan: LogicalPlan = Sort(Seq(SortOrder(UnresolvedAttribute("int_col"), Ascending)), global = true, filterPlan) val expectedPlan = - Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field1.subfield"), UnresolvedAttribute("struct_col2.field1.subfield")), sortedPlan) + Project( + Seq( + UnresolvedAttribute("int_col"), + UnresolvedAttribute("struct_col.field1.subfield"), + UnresolvedAttribute("struct_col2.field1.subfield")), + sortedPlan) // Compare the two plans assert(compareByString(expectedPlan) === compareByString(logicalPlan)) diff --git a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala index d641ad696..6d34e3db9 100644 --- a/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala +++ b/ppl-spark-integration/src/test/scala/org/opensearch/flint/spark/ppl/PPLLogicalPlanBasicQueriesTranslatorTestSuite.scala @@ -109,7 +109,9 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite test("test simple search with schema.table and one nested field projected") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit(plan(pplParser, "source=schema.table | fields A.nested", false), context) + planTransformer.visit( + plan(pplParser, "source=schema.table | fields A.nested", false), + context) val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A.nested")) val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table"))) @@ -150,11 +152,13 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite comparePlans(expectedPlan, logPlan, false) } - - test("test simple search with one table with two fields projected sorted by one nested field") { + + test("test simple search with one table with two fields projected sorted by one nested field") { val context = new CatalystPlanContext val logPlan = - planTransformer.visit(plan(pplParser, "source=t | sort A.nested | fields A.nested, B", false), context) + planTransformer.visit( + plan(pplParser, "source=t | sort A.nested | fields A.nested, B", false), + context) val table = UnresolvedRelation(Seq("t")) val projectList = Seq(UnresolvedAttribute("A.nested"), UnresolvedAttribute("B"))