Skip to content

Commit

Permalink
fix scalafmtAll formatting
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Aug 13, 2024
1 parent 1e6dcd6 commit f2fff0a
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import org.opensearch.flint.core.logging.CustomLogging.{logError, logInfo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
Expand All @@ -40,7 +42,6 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.withOrigin
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.types.{DataType, StructType}
import org.opensearch.flint.core.logging.CustomLogging.{logError, logInfo}

/**
* Flint SQL parser that extends Spark SQL parser with Flint SQL statements.
Expand All @@ -63,7 +64,9 @@ class FlintSparkSqlParser(sparkParser: ParserInterface) extends ParserInterface
// Fall back to Spark parse plan logic if flint cannot parse
case e: ParseException =>
// Log the issue
logInfo(s"Failed to parse PPL with PPL parser. Falling back to Spark parser. PPL: $sqlText", e)
logInfo(
s"Failed to parse PPL with PPL parser. Falling back to Spark parser. PPL: $sqlText",
e)
// Fall back to Spark parse plan logic
sparkParser.parsePlan(sqlText)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
| VALUES ( 50, STRUCT(STRUCT("value3"),789) )
|""".stripMargin)
}

protected def createStructNestedTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

package org.opensearch.flint.spark.ppl

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Descending, EqualTo, GreaterThan, Literal, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.DescribeTableCommand
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.{QueryTest, Row}

class FlintSparkPPLNestedFieldsITSuite
extends QueryTest
Expand Down Expand Up @@ -81,11 +81,11 @@ class FlintSparkPPLNestedFieldsITSuite
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] = Array(
Row(30, Row(Row("value1"), 123), Row(Row("valueA"), 23)),
Row(40, Row(Row("value5"), 123), Row(Row("valueB"), 33)),
Row(30, Row(Row("value4"), 823), Row(Row("valueC"), 83)),
Row(40, Row(Row("value2"), 456), Row(Row("valueD"), 46)),
Row(50, Row(Row("value3"), 789), Row(Row("valueE"), 89)))
Row(30, Row(Row("value1"), 123), Row(Row("valueA"), 23)),
Row(40, Row(Row("value5"), 123), Row(Row("valueB"), 33)),
Row(30, Row(Row("value4"), 823), Row(Row("valueC"), 83)),
Row(40, Row(Row("value2"), 456), Row(Row("valueD"), 46)),
Row(50, Row(Row("value3"), 789), Row(Row("valueE"), 89)))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
Expand Down Expand Up @@ -183,11 +183,12 @@ class FlintSparkPPLNestedFieldsITSuite
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row( 30, 123, 23),
Row( 30, 823, 83),
Row( 40, 123, 33),
Row( 40, 456, 46),
Row( 50, 789, 89))
Array(
Row(30, 123, 23),
Row(30, 823, 83),
Row(40, 123, 33),
Row(40, 456, 46),
Row(50, 789, 89))
// Compare the results
implicit val rowOrdering: Ordering[Row] = Ordering.by[Row, Int](_.getAs[Int](0))
assert(results.sorted.sameElements(expectedResults.sorted))
Expand All @@ -196,7 +197,10 @@ class FlintSparkPPLNestedFieldsITSuite
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val expectedPlan: LogicalPlan = Project(
Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2"), UnresolvedAttribute("struct_col2.field2")),
Seq(
UnresolvedAttribute("int_col"),
UnresolvedAttribute("struct_col.field2"),
UnresolvedAttribute("struct_col2.field2")),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
// Compare the two plans
assert(expectedPlan === logicalPlan)
Expand All @@ -211,11 +215,7 @@ class FlintSparkPPLNestedFieldsITSuite
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row(50, 89),
Row(30, 83),
Row(40, 46),
Row(40, 33),
Row(30, 23))
Array(Row(50, 89), Row(30, 83), Row(40, 46), Row(40, 33), Row(30, 23))
assert(results === expectedResults)

// Retrieve the logical plan
Expand All @@ -229,7 +229,9 @@ class FlintSparkPPLNestedFieldsITSuite

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")), sortedPlan)
Project(
Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")),
sortedPlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
Expand All @@ -244,32 +246,31 @@ class FlintSparkPPLNestedFieldsITSuite
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row(30, 823),
Row(50, 789),
Row(40, 456),
Row(40, 123),
Row(30, 123))
Array(Row(30, 823), Row(50, 789), Row(40, 456), Row(40, 123), Row(30, 123))
assert(results === expectedResults)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical

val sortedPlan: LogicalPlan =
Sort(
Seq(SortOrder(UnresolvedAttribute("struct_col.field2"), Descending),
SortOrder(UnresolvedAttribute("int_col"), Descending)),
Seq(
SortOrder(UnresolvedAttribute("struct_col.field2"), Descending),
SortOrder(UnresolvedAttribute("int_col"), Descending)),
global = true,
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))

// Define the expected logical plan
val expectedPlan: LogicalPlan =
Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")), sortedPlan)
Project(
Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")),
sortedPlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("create ppl simple query with nested field 1 range filter test") {
test("create ppl simple query with nested field 1 range filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col.field2 > 200 | sort - struct_col.field2 | fields int_col, struct_col.field2
| """.stripMargin)
Expand All @@ -278,30 +279,32 @@ test("create ppl simple query with nested field 1 range filter test") {
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row(30, 823),
Row(50, 789),
Row(40, 456))
assert(results === expectedResults)
Array(Row(30, 823), Row(50, 789), Row(40, 456))
assert(results === expectedResults)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
// Define the expected logical plan components
val filterPlan = Filter(
GreaterThan(UnresolvedAttribute("struct_col.field2"), Literal(200)),
table)
val filterPlan =
Filter(GreaterThan(UnresolvedAttribute("struct_col.field2"), Literal(200)), table)
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("struct_col.field2"), Descending)), global = true, filterPlan)
Sort(
Seq(SortOrder(UnresolvedAttribute("struct_col.field2"), Descending)),
global = true,
filterPlan)
val expectedPlan =
Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")), sortedPlan)

Project(
Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field2")),
sortedPlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}
test("create ppl simple query with nested field 2 range filter test") {

test("create ppl simple query with nested field 2 range filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col2.field2 > 50 | sort - struct_col2.field2 | fields int_col, struct_col2.field2
| """.stripMargin)
Expand All @@ -310,27 +313,30 @@ test("create ppl simple query with nested field 2 range filter test") {
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row(50, 89),
Row(30, 83))
assert(results === expectedResults)
Array(Row(50, 89), Row(30, 83))
assert(results === expectedResults)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical

// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
// Define the expected logical plan components
val filterPlan = Filter(
GreaterThan(UnresolvedAttribute("struct_col2.field2"), Literal(50)),
table)
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("struct_col2.field2"), Descending)), global = true, filterPlan)
val expectedPlan =
Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")), sortedPlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}
// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
// Define the expected logical plan components
val filterPlan =
Filter(GreaterThan(UnresolvedAttribute("struct_col2.field2"), Literal(50)), table)
val sortedPlan: LogicalPlan =
Sort(
Seq(SortOrder(UnresolvedAttribute("struct_col2.field2"), Descending)),
global = true,
filterPlan)
val expectedPlan =
Project(
Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col2.field2")),
sortedPlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("create ppl simple query with nested field string match test") {
val frame = sql(s"""
Expand All @@ -347,22 +353,25 @@ test("create ppl simple query with nested field 2 range filter test") {
// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical


// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
// Define the expected logical plan components
val filterPlan = Filter(
EqualTo(UnresolvedAttribute("struct_col.field1.subfield"), Literal("value1")),
table)
val filterPlan =
Filter(EqualTo(UnresolvedAttribute("struct_col.field1.subfield"), Literal("value1")), table)
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("int_col"), Ascending)), global = true, filterPlan)
val expectedPlan =
Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field1.subfield"), UnresolvedAttribute("struct_col2.field1.subfield")), sortedPlan)
Project(
Seq(
UnresolvedAttribute("int_col"),
UnresolvedAttribute("struct_col.field1.subfield"),
UnresolvedAttribute("struct_col2.field1.subfield")),
sortedPlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}

test("create ppl simple query with nested field string filter test") {
val frame = sql(s"""
| source = $testTable| where struct_col2.field1.subfield > 'valueA' | sort int_col | fields int_col, struct_col.field1.subfield, struct_col2.field1.subfield
Expand All @@ -372,16 +381,16 @@ test("create ppl simple query with nested field 2 range filter test") {
val results: Array[Row] = frame.collect()
// Define the expected results
val expectedResults: Array[Row] =
Array(Row(30, "value4", "valueC"),
Row(40, "value5", "valueB"),
Row(40, "value2", "valueD"),
Row(50, "value3", "valueE"))
Array(
Row(30, "value4", "valueC"),
Row(40, "value5", "valueB"),
Row(40, "value2", "valueD"),
Row(50, "value3", "valueE"))
assert(results === expectedResults)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical


// Define the expected logical plan
val table = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
// Define the expected logical plan components
Expand All @@ -391,7 +400,12 @@ test("create ppl simple query with nested field 2 range filter test") {
val sortedPlan: LogicalPlan =
Sort(Seq(SortOrder(UnresolvedAttribute("int_col"), Ascending)), global = true, filterPlan)
val expectedPlan =
Project(Seq(UnresolvedAttribute("int_col"), UnresolvedAttribute("struct_col.field1.subfield"), UnresolvedAttribute("struct_col2.field1.subfield")), sortedPlan)
Project(
Seq(
UnresolvedAttribute("int_col"),
UnresolvedAttribute("struct_col.field1.subfield"),
UnresolvedAttribute("struct_col2.field1.subfield")),
sortedPlan)

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite
test("test simple search with schema.table and one nested field projected") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "source=schema.table | fields A.nested", false), context)
planTransformer.visit(
plan(pplParser, "source=schema.table | fields A.nested", false),
context)

val projectList: Seq[NamedExpression] = Seq(UnresolvedAttribute("A.nested"))
val expectedPlan = Project(projectList, UnresolvedRelation(Seq("schema", "table")))
Expand Down Expand Up @@ -150,11 +152,13 @@ class PPLLogicalPlanBasicQueriesTranslatorTestSuite

comparePlans(expectedPlan, logPlan, false)
}
test("test simple search with one table with two fields projected sorted by one nested field") {

test("test simple search with one table with two fields projected sorted by one nested field") {
val context = new CatalystPlanContext
val logPlan =
planTransformer.visit(plan(pplParser, "source=t | sort A.nested | fields A.nested, B", false), context)
planTransformer.visit(
plan(pplParser, "source=t | sort A.nested | fields A.nested, B", false),
context)

val table = UnresolvedRelation(Seq("t"))
val projectList = Seq(UnresolvedAttribute("A.nested"), UnresolvedAttribute("B"))
Expand Down

0 comments on commit f2fff0a

Please sign in to comment.