Skip to content

Commit

Permalink
Change UT and IT with watermark delay option
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 18, 2023
1 parent a63e913 commit a9b894e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ case class FlintSparkMaterializedView(
extends FlintSparkIndex
with StreamingRefresh {

/** TODO: add it to index option */
private val watermarkDelay = "0 Minute"

override val kind: String = MV_INDEX_TYPE

override def name(): String = getFlintIndexName(mvName)
Expand Down Expand Up @@ -81,16 +78,21 @@ case class FlintSparkMaterializedView(
* 2.Set isStreaming flag to true in Relation operator
*/
val streamingPlan = batchPlan transform {
case WindowingAggregate(agg, timeCol) =>
agg.copy(child = watermark(timeCol, watermarkDelay, agg.child))
case WindowingAggregate(aggregate, timeCol) =>
aggregate.copy(child = watermark(timeCol, aggregate.child))

case relation: UnresolvedRelation if !relation.isStreaming =>
relation.copy(isStreaming = true)
}
logicalPlanToDataFrame(spark, streamingPlan)
}

private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = {
private def watermark(timeCol: Attribute, child: LogicalPlan) = {
require(
options.watermarkDelay().isDefined,
"watermark delay is required for incremental refresh with aggregation")

val delay = options.watermarkDelay().get
EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child)
}

Expand All @@ -107,7 +109,7 @@ case class FlintSparkMaterializedView(

if (winFuncs.size != 1) {
throw new IllegalStateException(
"A windowing function is required for streaming aggregation")
"A windowing function is required for incremental refresh with aggregation")
}

// Assume first aggregate item must be time column
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,24 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
| GROUP BY TUMBLE(time, '1 Minute')
|""".stripMargin

val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val mv = FlintSparkMaterializedView(
testMvName,
testQuery,
Map.empty,
FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds")))

val actualPlan = mv.buildStream(spark).queryExecution.logical
assert(
actualPlan.sameSemantics(
streamingRelation(testTable)
.watermark($"time", "0 Minute")
.watermark($"time", "30 Seconds")
.groupBy($"TUMBLE".function($"time", "1 Minute"))(
$"window.start" as "startTime",
count(1) as "count")))
}
}

test("build stream with filtering query") {
test("build stream with filtering aggregate query") {
val testTable = "mv_build_test"
withTable(testTable) {
sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV")
Expand All @@ -127,13 +132,18 @@ class FlintSparkMaterializedViewSuite extends FlintSuite {
| GROUP BY TUMBLE(time, '1 Minute')
|""".stripMargin

val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty)
val mv = FlintSparkMaterializedView(
testMvName,
testQuery,
Map.empty,
FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds")))

val actualPlan = mv.buildStream(spark).queryExecution.logical
assert(
actualPlan.sameSemantics(
streamingRelation(testTable)
.where($"age" > 30)
.watermark($"time", "0 Minute")
.watermark($"time", "30 Seconds")
.groupBy($"TUMBLE".function($"time", "1 Minute"))(
$"window.start" as "startTime",
count(1) as "count")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

test("create materialized view with metadata successfully") {
val indexOptions =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/"))
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> "s3://test/",
"watermark_delay" -> "30 Seconds"))
flint
.materializedView()
.name(testMvName)
Expand All @@ -70,7 +74,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| }],
| "options": {
| "auto_refresh": "true",
| "checkpoint_location": "s3://test/"
| "checkpoint_location": "s3://test/",
| "watermark_delay": "30 Seconds"
| },
| "properties": {}
| },
Expand Down Expand Up @@ -147,15 +152,15 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
}
}

test("incremental refresh materialized view with filtering query") {
test("incremental refresh materialized view with filtering aggregate query") {
val filterQuery =
s"""
| SELECT
| window.start AS startTime,
| COUNT(*) AS count
| FROM $testTable
| WHERE address = 'Seattle'
| GROUP BY TUMBLE(time, '10 Minutes')
| GROUP BY TUMBLE(time, '5 Minutes')
|""".stripMargin

withIncrementalMaterializedView(filterQuery) { indexData =>
Expand Down Expand Up @@ -190,7 +195,10 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
codeBlock: DataFrame => Unit): Unit = {
withTempDir { checkpointDir =>
val indexOptions = FlintSparkIndexOptions(
Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath,
"watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon

flint
.materializedView()
Expand Down

0 comments on commit a9b894e

Please sign in to comment.