Skip to content

Commit

Permalink
Add UT for build stream function
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 16, 2023
1 parent 2b34cdd commit 7e81d73
Showing 1 changed file with 86 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.spark.FlintSuite
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.dsl.expressions.{DslAttr, DslExpression}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, EventTimeWatermark}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.dsl.expressions.{count, DslAttr, DslExpression, StringToAttributeConversionHelper}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, EventTimeWatermark, Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -95,26 +95,88 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val actualPlan = mv.buildStream(spark).queryExecution.logical

val timeColumn = UnresolvedAttribute("time")
val expectPlan =
Aggregate(
Seq(
UnresolvedFunction(
"TUMBLE",
Seq(timeColumn, Literal("1 Minute")),
isDistinct = false)),
Seq(
UnresolvedAttribute("window.start") as "startTime",
UnresolvedFunction("COUNT", Seq(Literal(1)), isDistinct = false) as "count"),
EventTimeWatermark(
timeColumn,
IntervalUtils.stringToInterval(UTF8String.fromString("0 Minute")),
UnresolvedRelation(
TableIdentifier(testTable),
CaseInsensitiveStringMap.empty(),
isStreaming = true)))

actualPlan.sameSemantics(expectPlan)
assert(
actualPlan.sameSemantics(Aggregate(
Seq($"TUMBLE".function($"time", Literal("1 Minute"))),
Seq($"window.start" as "startTime", count(Literal(1)) as "count"),
watermark($"time", "0 Minute", streamingRelation(testTable)))))
}
}

test("build stream with filtering query") {
val testTable = "mv_build_test"
withTable(testTable) {
sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV")

val testQuery =
s"""
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| WHERE age > 30
| GROUP BY TUMBLE(time, '1 Minute')
|""".stripMargin

val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val actualPlan = mv.buildStream(spark).queryExecution.logical

assert(
actualPlan.sameSemantics(Aggregate(
Seq($"TUMBLE".function($"time", Literal("1 Minute"))),
Seq($"window.start" as "startTime", count(Literal(1)) as "count"),
watermark(
$"time",
"0 Minute",
Filter($"age" > Literal(30), streamingRelation(testTable))))))
}
}

test("build stream should fail if there is aggregation without windowing function") {
val testTable = "mv_build_test"
withTable(testTable) {
sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV")

val mv = FlintSparkMaterializedView(
testMvName,
s"SELECT name, COUNT(*) AS count FROM $testTable GROUP BY name",
Map.empty)

the[IllegalStateException] thrownBy
mv.buildStream(spark)
}
}

// TODO: should we add this restriction?
ignore("build stream should fail if there is no aggregation") {
val testTable = "mv_build_test"
withTable(testTable) {
sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV")

val mv = FlintSparkMaterializedView(
testMvName,
s"SELECT COUNT(*) AS count FROM $testTable",
Map.empty)

the[IllegalStateException] thrownBy
mv.buildStream(spark)
}
}

private def streamingRelation(tableName: String): UnresolvedRelation = {
UnresolvedRelation(
TableIdentifier(tableName),
CaseInsensitiveStringMap.empty(),
isStreaming = true)
}

private def watermark(
colName: Attribute,
interval: String,
child: LogicalPlan): EventTimeWatermark = {
EventTimeWatermark(
colName,
IntervalUtils.stringToInterval(UTF8String.fromString(interval)),
child)
}
}

0 comments on commit 7e81d73

Please sign in to comment.