diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index a20bd137f..29ab433c6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -14,7 +14,7 @@ import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexNam import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { @@ -87,6 +87,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { |""".stripMargin) } + // TODO: fix this windowing function unable to be used in GROUP BY ignore("full refresh materialized view") { flint .materializedView() @@ -107,13 +108,94 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } test("incremental refresh materialized view") { + withIncrementalMaterializedView(testQuery) { indexData => + checkAnswer( + indexData.select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 00:00:00"), 1), + Row(timestamp("2023-10-01 00:10:00"), 2), + Row(timestamp("2023-10-01 01:00:00"), 1) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 02:00:00"), 1) + */ + )) + } + } + + test("incremental refresh materialized view with larger window") { + val largeWindowQuery = + s""" + | SELECT + | window.start AS startTime, + | COUNT(*) AS count + | FROM $testTable + | GROUP BY TUMBLE(time, '1 Hour') + |""".stripMargin + + withIncrementalMaterializedView(largeWindowQuery) { indexData => + checkAnswer( + indexData.select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 00:00:00"), 3), + Row(timestamp("2023-10-01 01:00:00"), 1) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 02:00:00"), 1) + */ + )) + } + } + + test("incremental refresh materialized view with filtering query") { + val filterQuery = + s""" + | SELECT + | window.start AS startTime, + | COUNT(*) AS count + | FROM $testTable + | WHERE address = 'Seattle' + | GROUP BY TUMBLE(time, '10 Minutes') + |""".stripMargin + + withIncrementalMaterializedView(filterQuery) { indexData => + checkAnswer( + indexData.select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 00:00:00"), 1) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 00:10:00"), 1) + */ + )) + } + } + + test("incremental refresh materialized view with non-aggregate query") { + val nonAggQuery = + s""" + | SELECT name, age + | FROM $testTable + | WHERE age <= 30 + |""".stripMargin + + withIncrementalMaterializedView(nonAggQuery) { indexData => + checkAnswer(indexData.select("name", "age"), Seq(Row("A", 30), Row("B", 20), Row("E", 15))) + } + } + + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) + + private def withIncrementalMaterializedView(query: String)( + codeBlock: DataFrame => Unit): Unit = { withTempDir { checkpointDir => val indexOptions = FlintSparkIndexOptions( Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)) + flint .materializedView() .name(testMvName) - .query(testQuery) + .query(query) .options(indexOptions) .create() @@ -122,20 +204,10 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { .map(awaitStreamingComplete) .orElse(throw new RuntimeException) - val indexData = flint.queryIndex(testFlintIndex).select("startTime", "count") - checkAnswer( - indexData, - Seq( - Row(timestamp("2023-10-01 00:00:00"), 1), - Row(timestamp("2023-10-01 00:10:00"), 2), - Row(timestamp("2023-10-01 01:00:00"), 1) - /* - * The last row is pending to fire upon watermark - * Row(timestamp("2023-10-01 02:00:00"), 1) - */ - )) + val indexData = flint.queryIndex(testFlintIndex) + + // Execute the code block + codeBlock(indexData) } } - - private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) }