From 7f6c7a78b360317e80af4a263f7ce69bee75688e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 17 Apr 2024 13:31:44 -0700 Subject: [PATCH] Add more IT for MV validation Signed-off-by: Chen Dai --- .../spark/mv/FlintSparkMaterializedView.scala | 16 ++++++-- ...FlintSparkMaterializedViewSqlITSuite.scala | 39 ++++++++++++++++++- 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 1ab8f77bf..e9c762ddd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -17,6 +17,8 @@ import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generat import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.function.TumbleFunction import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.{getFlintIndexName, MV_INDEX_TYPE} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, INCREMENTAL} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.FunctionIdentifier @@ -122,10 +124,9 @@ case class FlintSparkMaterializedView( func } - if (winFuncs.size != 1) { - throw new IllegalStateException( - "A windowing function is required for incremental refresh with aggregation") - } + require( + winFuncs.size == 1, + "A windowing function is required for incremental refresh with aggregation") // Assume first aggregate item must be time column val winFunc = winFuncs.head @@ -198,6 +199,13 @@ object FlintSparkMaterializedView { override protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = { super.validateIndex(index) + + // Pre-build to ensure windowing function and watermark valid + val refreshMode = FlintSparkIndexRefresh.create(index.name(), index).refreshMode + if (refreshMode == AUTO || refreshMode == INCREMENTAL) { + index.asInstanceOf[StreamingRefresh].buildStream(flint.spark) + } + index } override protected def buildIndex(): FlintSparkIndex = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index ec9691261..653c56efa 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -198,16 +198,18 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { test("should fail if materialized view query has syntax error") { the[ParseException] thrownBy { + // Wrong syntax due to incomplete WITH clause sql(s""" | CREATE MATERIALIZED VIEW $testMvName | AS - | SELECT time FROM $testTable WITH ( + | SELECT time FROM $testTable WITH | """.stripMargin) } } test("should fail if materialized view query has semantic error") { the[AnalysisException] thrownBy { + // Non-existent time1 column sql(s""" | CREATE MATERIALIZED VIEW $testMvName | AS @@ -216,6 +218,41 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("should fail if no windowing function for aggregated query") { + withTempDir { checkpointDir => + the[IllegalArgumentException] thrownBy { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS + | SELECT COUNT(*) FROM $testTable GROUP BY time + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' + | ) + | """.stripMargin) + } should have message + "requirement failed: A windowing function is required for incremental refresh with aggregation" + } + } + + test("should fail if no watermark delay for aggregated query") { + withTempDir { checkpointDir => + the[IllegalArgumentException] thrownBy { + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS + | $testQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + } should have message + "requirement failed: watermark delay is required for auto refresh and incremental refresh with aggregation" + } + } + test("issue 112, https://github.com/opensearch-project/opensearch-spark/issues/112") { val tableName = "spark_catalog.default.issue112" createTableIssue112(tableName)