diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index ee58ec7f5..112de680f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -44,9 +44,6 @@ case class FlintSparkMaterializedView( extends FlintSparkIndex with StreamingRefresh { - /** TODO: add it to index option */ - private val watermarkDelay = "0 Minute" - override val kind: String = MV_INDEX_TYPE override def name(): String = getFlintIndexName(mvName) @@ -81,8 +78,8 @@ case class FlintSparkMaterializedView( * 2.Set isStreaming flag to true in Relation operator */ val streamingPlan = batchPlan transform { - case WindowingAggregate(agg, timeCol) => - agg.copy(child = watermark(timeCol, watermarkDelay, agg.child)) + case WindowingAggregate(aggregate, timeCol) => + aggregate.copy(child = watermark(timeCol, aggregate.child)) case relation: UnresolvedRelation if !relation.isStreaming => relation.copy(isStreaming = true) @@ -90,7 +87,12 @@ case class FlintSparkMaterializedView( logicalPlanToDataFrame(spark, streamingPlan) } - private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = { + private def watermark(timeCol: Attribute, child: LogicalPlan) = { + require( + options.watermarkDelay().isDefined, + "watermark delay is required for incremental refresh with aggregation") + + val delay = options.watermarkDelay().get EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } @@ -107,7 +109,7 @@ case class FlintSparkMaterializedView( if (winFuncs.size != 1) { throw new IllegalStateException( - "A windowing function is required for streaming aggregation") + "A windowing function is required for incremental refresh with aggregation") } // Assume first aggregate item must be time column diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala index c28495c69..1f9b52963 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedViewSuite.scala @@ -100,19 +100,24 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView( + testMvName, + testQuery, + Map.empty, + FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) + val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( streamingRelation(testTable) - .watermark($"time", "0 Minute") + .watermark($"time", "30 Seconds") .groupBy($"TUMBLE".function($"time", "1 Minute"))( $"window.start" as "startTime", count(1) as "count"))) } } - test("build stream with filtering query") { + test("build stream with filtering aggregate query") { val testTable = "mv_build_test" withTable(testTable) { sql(s"CREATE TABLE $testTable (time TIMESTAMP, name STRING, age INT) USING CSV") @@ -127,13 +132,18 @@ class FlintSparkMaterializedViewSuite extends FlintSuite { | GROUP BY TUMBLE(time, '1 Minute') |""".stripMargin - val mv = FlintSparkMaterializedView(testMvName, testQuery, Map.empty) + val mv = FlintSparkMaterializedView( + testMvName, + testQuery, + Map.empty, + FlintSparkIndexOptions(Map("watermark_delay" -> "30 Seconds"))) + val actualPlan = mv.buildStream(spark).queryExecution.logical assert( actualPlan.sameSemantics( streamingRelation(testTable) .where($"age" > 30) - .watermark($"time", "0 Minute") + .watermark($"time", "30 Seconds") .groupBy($"TUMBLE".function($"time", "1 Minute"))( $"window.start" as "startTime", count(1) as "count"))) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 29ab433c6..29ce4e248 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -43,7 +43,11 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { test("create materialized view with metadata successfully") { val indexOptions = - FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/")) + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> "s3://test/", + "watermark_delay" -> "30 Seconds")) flint .materializedView() .name(testMvName) @@ -70,7 +74,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | }], | "options": { | "auto_refresh": "true", - | "checkpoint_location": "s3://test/" + | "checkpoint_location": "s3://test/", + | "watermark_delay": "30 Seconds" | }, | "properties": {} | }, @@ -147,7 +152,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } - test("incremental refresh materialized view with filtering query") { + test("incremental refresh materialized view with filtering aggregate query") { val filterQuery = s""" | SELECT @@ -155,7 +160,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | COUNT(*) AS count | FROM $testTable | WHERE address = 'Seattle' - | GROUP BY TUMBLE(time, '10 Minutes') + | GROUP BY TUMBLE(time, '5 Minutes') |""".stripMargin withIncrementalMaterializedView(filterQuery) { indexData => @@ -190,7 +195,10 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { codeBlock: DataFrame => Unit): Unit = { withTempDir { checkpointDir => val indexOptions = FlintSparkIndexOptions( - Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)) + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "1 Minute")) // This must be small to ensure window closed soon flint .materializedView()