Skip to content

Commit

Permalink
update tests & command spec changes
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Nov 26, 2024
1 parent 95e9504 commit b8e02fc
Showing 1 changed file with 70 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ class FlintSparkPPLProjectStatementITSuite
}
}

test("project sql test using csv") {
ignore("project sql test using csv") {
val frame = sql(s"""
| CREATE TABLE student_partition_bucket
| USING csv
| PARTITIONED BY (age)
| USING parquet
| PARTITIONED BY (age, country)
| AS SELECT * FROM $testTable;
| """.stripMargin)

Expand Down Expand Up @@ -166,7 +166,7 @@ class FlintSparkPPLProjectStatementITSuite
)
}

test("project using csv partition by age and state") {
test("project using csv partition by state and country") {
val frame = sql(s"""
|project simpleView using csv partitioned by ('state', 'country') | source = $testTable | dedup name | fields name, state, country
| """.stripMargin)
Expand Down Expand Up @@ -244,45 +244,82 @@ class FlintSparkPPLProjectStatementITSuite
)
}

test("project using parquet partition by state & location") {
test("project using parquet partition by state & country") {
val frame = sql(s"""
| source = $testTable| head 2
| """.stripMargin)

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 2)
|project simpleView using parquet partitioned by ('state', 'country') | source = $testTable | dedup name | fields name, state, country
| """.stripMargin)

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
// Define the expected logical plan
val limitPlan: LogicalPlan =
Limit(Literal(2), UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
val expectedPlan = Project(Seq(UnresolvedStar(None)), limitPlan)
frame.collect()
// verify new view was created correctly
val results = sql(s"""
| source = simpleView
| """.stripMargin).collect()

// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
}
// Define the expected results
val expectedResults: Array[Row] = Array(Row("Jane", "Quebec", "Canada"), Row("John", "Ontario", "Canada"), Row("Jake", "California", "USA"), Row("Hello", "New York", "USA"))
// Convert actual results to a Set for quick lookup
val resultsSet: Set[Row] = results.toSet
// Check that each expected row is present in the actual results
expectedResults.foreach { expectedRow =>
assert(resultsSet.contains(expectedRow), s"Expected row $expectedRow not found in results")
}

test("create ppl simple query two with fields and head (limit) test") {
val frame = sql(s"""
| source = $testTable| fields name, age | head 1
| """.stripMargin)
// verify new view was created correctly
val describe = sql(s"""
| describe simpleView
| """.stripMargin).collect()

// Retrieve the results
val results: Array[Row] = frame.collect()
assert(results.length == 1)
// Define the expected results
val expectedDescribeResults: Array[Row] = Array(
Row("Database", "default"),
Row("Partition Provider", "Catalog"),
Row("Type", "MANAGED"),
Row("country", "string", "null"),
Row("Catalog", "spark_catalog"),
Row("state", "string", "null"),
Row("# Partition Information", ""),
Row("Created By", "Spark 3.5.1"),
Row("Provider", "PARQUET"),
Row("# Detailed Table Information", ""),
Row("Table", "simpleview"),
Row("Last Access", "UNKNOWN"),
Row("# col_name", "data_type", "comment"),
Row("name", "string", "null"))
// Convert actual results to a Set for quick lookup
val describeResults: Set[Row] = describe.toSet
// Check that each expected row is present in the actual results
expectedDescribeResults.foreach { expectedRow =>
assert(expectedDescribeResults.contains(expectedRow), s"Expected row $expectedRow not found in results")
}

// Retrieve the logical plan
val logicalPlan: LogicalPlan = frame.queryExecution.logical
val project = Project(
Seq(UnresolvedAttribute("name"), UnresolvedAttribute("age")),
UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test")))
// Define the expected logical plan
val limitPlan: LogicalPlan = Limit(Literal(1), project)
val expectedPlan: LogicalPlan = Project(Seq(UnresolvedStar(None)), limitPlan)
val relation = UnresolvedRelation(Seq("spark_catalog", "default", "flint_ppl_test"))
val nameAttribute = UnresolvedAttribute("name")
val dedup =
Deduplicate(Seq(nameAttribute), Filter(IsNotNull(nameAttribute), relation))
val expectedPlan: LogicalPlan =
CreateTableAsSelect(
UnresolvedIdentifier(Seq("simpleView")),
// Seq(IdentityTransform.apply(FieldReference.apply("age")), IdentityTransform.apply(FieldReference.apply("state")),
Seq(),
Project(Seq(UnresolvedAttribute("name"), UnresolvedAttribute("state"), UnresolvedAttribute("country")), dedup),
UnresolvedTableSpec(
Map.empty,
Option("PARQUET"),
OptionList(Seq()),
Option.empty,
Option.empty,
Option.empty,
external = false),
Map.empty,
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(expectedPlan) === compareByString(logicalPlan))
assert(
compareByString(logicalPlan) == expectedPlan.toString
)
}

}

0 comments on commit b8e02fc

Please sign in to comment.