diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index b2e310cc7..9ea44b1ce 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -22,6 +22,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy @@ -140,7 +141,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) .finalLog(latest => { // Change state to active if full, otherwise update index state regularly - if (indexRefresh.refreshMode == AUTO) { + if (indexRefresh.refreshMode == AUTO && SchedulerMode.INTERNAL == index.options + .schedulerMode()) { logInfo("Scheduling index state monitor") flintIndexMonitor.startMonitor(indexName) latest diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index f74582d80..5d556a111 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -47,8 +47,12 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) // Checkpoint location is required if mandatory option set or external scheduler is used val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) val checkpointLocation = options.checkpointLocation() - if (flintSparkConf.isCheckpointMandatory || SchedulerMode.EXTERNAL == - options.schedulerMode()) { + if (SchedulerMode.EXTERNAL == options.schedulerMode()) { + require( + checkpointLocation.isDefined, + "Checkpoint location is required for external scheduler") + } + if (flintSparkConf.isCheckpointMandatory) { require( checkpointLocation.isDefined, s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/worksheet.sc b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/worksheet.sc new file mode 100644 index 000000000..7fe34cc86 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/worksheet.sc @@ -0,0 +1 @@ +refresh \ No newline at end of file diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala index cc0573d6c..2190bd417 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexValidationITSuite.scala @@ -83,6 +83,27 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup } } + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) + .foreach { statement => + test( + s"should fail to create auto refresh Flint index if scheduler_mode is external and no checkpoint location: $statement") { + withTable(testTable) { + sql(s"CREATE TABLE $testTable (name STRING) USING JSON") + + the[IllegalArgumentException] thrownBy { + sql(s""" + | $statement + | WITH ( + | auto_refresh = true, + | scheduler_mode = 'external' + | ) + |""".stripMargin) + } should have message + "requirement failed: Checkpoint location is required for external scheduler" + } + } + } + Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement) .foreach { statement => test( diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 83fe1546c..f824aab73 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -78,7 +78,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | "auto_refresh": "true", | "incremental_refresh": "false", | "checkpoint_location": "${checkpointDir.getAbsolutePath}", - | "watermark_delay": "30 Seconds" + | "watermark_delay": "30 Seconds", + | "scheduler_mode":"internal" | }, | "latestId": "$testLatestId", | "properties": {} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 3a17cb8b1..fc4cdbeac 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -82,6 +82,36 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { } } + test("create materialized view with auto refresh and external scheduler") { + withTempDir { checkpointDir => + sql(s""" + | CREATE MATERIALIZED VIEW $testMvName + | AS $testQuery + | WITH ( + | auto_refresh = true, + | scheduler_mode = 'external', + | checkpoint_location = '${checkpointDir.getAbsolutePath}', + | watermark_delay = '1 Second' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + flint.queryIndex(testFlintIndex).count() shouldBe 3 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver') + | """.stripMargin) + flint.queryIndex(testFlintIndex).count() shouldBe 3 + + // New data is refreshed incrementally + sql(s"REFRESH MATERIALIZED VIEW $testMvName") + flint.queryIndex(testFlintIndex).count() shouldBe 4 + } + } + test("create materialized view with streaming job options") { withTempDir { checkpointDir => sql(s""" diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index b2185a5a9..5e62c4e02 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -266,6 +266,39 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { indexData should have size 2 } + test("auto refresh skipping index successfully with external scheduler") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "external", + "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .create() + + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 2 + + // Delete all index data intentionally and generate a new source file + openSearchClient.deleteByQuery( + new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()), + RequestOptions.DEFAULT) + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=4) + | VALUES ('Hello', 35, 'Vancouver') + | """.stripMargin) + + // Expect to only refresh the new file + flint.refreshIndex(testIndex) shouldBe empty + flint.queryIndex(testIndex).collect().toSet should have size 1 + } + } + test("update skipping index successfully") { // Create full refresh Flint index flint diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index e10e6a29b..af497eb2b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -62,6 +62,35 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit indexData.count() shouldBe 2 } + test("create skipping index with auto refresh and external scheduler") { + withTempDir { checkpointDir => + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH ( + | auto_refresh = true, + | scheduler_mode = 'external', + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + | """.stripMargin) + + // Refresh all present source data as of now + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 2 + + // New data won't be refreshed until refresh statement triggered + sql(s""" + | INSERT INTO $testTable + | PARTITION (year=2023, month=5) + | VALUES ('Hello', 50, 'Vancouver') + |""".stripMargin) + flint.queryIndex(testIndex).count() shouldBe 2 + + sql(s"REFRESH SKIPPING INDEX ON $testTable") + flint.queryIndex(testIndex).count() shouldBe 3 + } + } + test("create skipping index with max size value set") { sql(s""" | CREATE SKIPPING INDEX ON $testTable