Skip to content

Commit

Permalink
Fix logical plan assert
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 18, 2023
1 parent cd928e1 commit dae275b
Showing 1 changed file with 32 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.expressions.{count, intToLiteral, stringToLiteral, DslAttr, DslExpression, StringToAttributeConversionHelper}
import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
Expand All @@ -33,7 +32,7 @@ import org.apache.spark.unsafe.types.UTF8String
class FlintSparkMaterializedViewSuite extends FlintSuite {

/** Test table, MV name and query */
val testTable = "mv_build_test"
val testTable = "spark_catalog.default.mv_build_test"
val testMvName = "spark_catalog.default.mv"
val testQuery = "SELECT 1"

Expand Down Expand Up @@ -100,13 +99,15 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val options = Map("watermark_delay" -> "30 Seconds")

withAggregateMaterializedView(testQuery, options) { actualPlan =>
assert(
actualPlan.sameSemantics(
streamingRelation(testTable)
.watermark($"time", "30 Seconds")
.groupBy($"TUMBLE".function($"time", "1 Minute"))(
$"window.start" as "startTime",
count(1) as "count")))
comparePlans(
actualPlan,
streamingRelation(testTable)
.watermark($"time", "30 Seconds")
.groupBy($"TUMBLE".function($"time", "1 Minute"))(
$"window.start" as "startTime",
$"COUNT".function(1) as "count"),
checkAnalysis = false
) // don't analyze due to full test table name
}
}

Expand All @@ -123,26 +124,28 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val options = Map("watermark_delay" -> "30 Seconds")

withAggregateMaterializedView(testQuery, options) { actualPlan =>
assert(
actualPlan.sameSemantics(
streamingRelation(testTable)
.where($"age" > 30)
.watermark($"time", "30 Seconds")
.groupBy($"TUMBLE".function($"time", "1 Minute"))(
$"window.start" as "startTime",
count(1) as "count")))
comparePlans(
actualPlan,
streamingRelation(testTable)
.where($"age" > 30)
.watermark($"time", "30 Seconds")
.groupBy($"TUMBLE".function($"time", "1 Minute"))(
$"window.start" as "startTime",
$"COUNT".function(1) as "count"),
checkAnalysis = false)
}
}

test("build stream with non-aggregate query") {
val testQuery = s"SELECT name, age FROM $testTable WHERE age > 30"

withAggregateMaterializedView(testQuery, Map.empty) { actualPlan =>
assert(
actualPlan.sameSemantics(
streamingRelation(testTable)
.where($"age" > 30)
.select($"name", $"age")))
comparePlans(
actualPlan,
streamingRelation(testTable)
.where($"age" > 30)
.select($"name", $"age"),
checkAnalysis = false)
}
}

Expand All @@ -151,9 +154,11 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val options = Map("extra_options" -> s"""{"$testTable": {"maxFilesPerTrigger": "1"}}""")

withAggregateMaterializedView(testQuery, options) { actualPlan =>
val expectPlan =
streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1")).select("name", "age")
assert(actualPlan.sameSemantics(expectPlan))
comparePlans(
actualPlan,
streamingRelation(testTable, Map("maxFilesPerTrigger" -> "1"))
.select($"name", $"age"),
checkAnalysis = false)
}
}

Expand Down Expand Up @@ -195,8 +200,8 @@ object FlintSparkMaterializedViewSuite {
def streamingRelation(
tableName: String,
extraOptions: Map[String, String] = Map.empty): UnresolvedRelation = {
UnresolvedRelation(
TableIdentifier(tableName),
new UnresolvedRelation(
tableName.split('.'),
new CaseInsensitiveStringMap(extraOptions.asJava),
isStreaming = true)
}
Expand Down

0 comments on commit dae275b

Please sign in to comment.