Skip to content

Commit

Permalink
update tests with projected partitioning verification of correctness
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Nov 27, 2024
1 parent 726ae24 commit 10eb8a1
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFu
import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Descending, Divide, EqualTo, Floor, GreaterThan, IsNotNull, Literal, Multiply, Not, Or, SortOrder}
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, NamedReference, Transform}
import org.apache.spark.sql.execution.ExplainMode
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExplainCommand}
import org.apache.spark.sql.streaming.StreamTest
Expand Down Expand Up @@ -150,7 +150,22 @@ class FlintSparkPPLProjectStatementITSuite
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
comparePlans(logicalPlan, expectedPlan, checkAnalysis = false)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.isEmpty,
"Partitioning does not contain ay FieldReferences")
}

test("project using csv partition by age") {
Expand Down Expand Up @@ -197,7 +212,29 @@ class FlintSparkPPLProjectStatementITSuite
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logicalPlan) == expectedPlan.toString)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference => fieldRef.fieldNames().contains("age")
case _ => false
}
case _ => false
} && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1,
"Partitioning does not contain a FieldReferences: 'age'")
}

test("project using csv partition by state and country") {
Expand Down Expand Up @@ -284,7 +321,30 @@ class FlintSparkPPLProjectStatementITSuite
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logicalPlan) == expectedPlan.toString)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference =>
fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country")
case _ => false
}
case _ => false
} && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2,
"Partitioning does not contain a FieldReferences: 'name'")
}

test("project using parquet partition by state & country") {
Expand Down Expand Up @@ -371,7 +431,30 @@ class FlintSparkPPLProjectStatementITSuite
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logicalPlan) == expectedPlan.toString)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference =>
fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country")
case _ => false
}
case _ => false
} && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2,
"Partitioning does not contain a FieldReferences: 'name'")
}

test("project using parquet with options & partition by state & country") {
Expand Down Expand Up @@ -462,7 +545,30 @@ class FlintSparkPPLProjectStatementITSuite
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logicalPlan) == expectedPlan.toString)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference =>
fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country")
case _ => false
}
case _ => false
} && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2,
"Partitioning does not contain a FieldReferences: 'name'")
}

test("project using parquet with options & location with partition by state & country") {
Expand Down Expand Up @@ -553,8 +659,30 @@ class FlintSparkPPLProjectStatementITSuite
Map.empty,
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logicalPlan) == expectedPlan.toString)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logicalPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference =>
fieldRef.fieldNames().contains("state") || fieldRef.fieldNames().contains("country")
case _ => false
}
case _ => false
} && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2,
"Partitioning does not contain a FieldReferences: 'name'")
}

test("test inner join with relation subquery") {
Expand Down Expand Up @@ -656,6 +784,15 @@ class FlintSparkPPLProjectStatementITSuite
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference => fieldRef.fieldNames().contains("age_span")
case _ => false
}
case _ => false
} && logicalPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1,
"Partitioning does not contain a FieldReferences: 'name'")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import org.scalatest.matchers.should.Matchers

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedIdentifier, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.expressions.{And, Ascending, Descending, EqualTo, GreaterThan, LessThan, Literal, NamedExpression, Not, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{And, Ascending, AttributeReference, Descending, EqualTo, GreaterThan, LessThan, Literal, NamedExpression, Not, SortOrder}
import org.apache.spark.sql.catalyst.expressions.aggregate.AnyValue
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -63,7 +63,22 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
ignoreIfExists = false,
isAnalyzed = false)
// Compare the two plans
comparePlans(logPlan, expectedPlan, checkAnalysis = false)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].partitioning.isEmpty,
"Partitioning does not contain ay FieldReferences")
}

test("test project a simple search with only one table using csv and partitioned field ") {
Expand Down Expand Up @@ -97,7 +112,29 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
ignoreIfExists = true,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logPlan) == expectedPlan.toString)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference => fieldRef.fieldNames().contains("age")
case _ => false
}
case _ => false
} && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1,
"Partitioning does not contain a FieldReferences: 'name'")
}

test(
Expand Down Expand Up @@ -132,7 +169,30 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
ignoreIfExists = true,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logPlan) == expectedPlan.toString)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference =>
fieldRef.fieldNames().contains("country") || fieldRef.fieldNames().contains("age")
case _ => false
}
case _ => false
} && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2,
"Partitioning does not contain a FieldReferences: 'name'")
}

test(
Expand Down Expand Up @@ -171,7 +231,30 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
ignoreIfExists = true,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logPlan) == expectedPlan.toString)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference =>
fieldRef.fieldNames().contains("country") || fieldRef.fieldNames().contains("age")
case _ => false
}
case _ => false
} && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2,
"Partitioning does not contain a FieldReferences: 'name'")
}

test(
Expand Down Expand Up @@ -212,7 +295,30 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
ignoreIfExists = true,
isAnalyzed = false)
// Compare the two plans
assert(compareByString(logPlan) == expectedPlan.toString)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].query,
expectedPlan.asInstanceOf[CreateTableAsSelect].query,
checkAnalysis = false)
comparePlans(
logPlan.asInstanceOf[CreateTableAsSelect].name,
expectedPlan.asInstanceOf[CreateTableAsSelect].name,
checkAnalysis = false)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].tableSpec.toString == expectedPlan
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference =>
fieldRef.fieldNames().contains("country") || fieldRef.fieldNames().contains("age")
case _ => false
}
case _ => false
} && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 2,
"Partitioning does not contain a FieldReferences: 'name'")
}

test(
Expand Down Expand Up @@ -275,5 +381,15 @@ class PPLLogicalPlanProjectQueriesTranslatorTestSuite
.asInstanceOf[CreateTableAsSelect]
.tableSpec
.toString)
assert(
logPlan.asInstanceOf[CreateTableAsSelect].partitioning.exists {
case transform: Transform =>
transform.arguments().exists {
case fieldRef: NamedReference => fieldRef.fieldNames().contains("name")
case _ => false
}
case _ => false
} && logPlan.asInstanceOf[CreateTableAsSelect].partitioning.length == 1,
"Partitioning does not contain a FieldReferences: 'name'")
}
}

0 comments on commit 10eb8a1

Please sign in to comment.