From 694d603a64727d570ec34e28d2b2da6d564c27e4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 7 Feb 2024 11:23:25 -0800 Subject: [PATCH] Address PR comments Signed-off-by: Chen Dai --- docs/index.md | 12 ++++++++++-- .../spark/refresh/IncrementalIndexRefresh.scala | 5 +++++ .../spark/FlintSparkSkippingIndexITSuite.scala | 14 +++++++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/docs/index.md b/docs/index.md index 6388557dd..669fa25e0 100644 --- a/docs/index.md +++ b/docs/index.md @@ -36,6 +36,14 @@ Please see the following example in which Index Building Logic and Query Rewrite - **Incremental Refresh:** - Performs an incremental update by fetching only the new data since the last refresh. This is useful for optimizing the refresh process and reducing resource usage. +The refresh mode is influenced by the index options specified during index creation, particularly the `auto_refresh` and `incremental_refresh` options. These options collectively define the behavior of the refresh mode when creating an index as below. Find more details in [Create Index Options](#create-index-options). + +| Refresh Mode | auto_refresh | incremental_refresh | +|---------------------|--------------|---------------------| +| Auto Refresh | true | | +| Full Refresh | false | false | +| Incremental Refresh | false | true | + ### Flint Index Specification #### Metadata @@ -271,9 +279,9 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics User can provide the following options in `WITH` clause of create statement: -+ `auto_refresh`: automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually. ++ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually. + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. -+ `incremental_refresh`: incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled. ++ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled. + `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index 67d25a7c9..418ada902 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -27,6 +27,11 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { logInfo(s"Start refreshing index $indexName in incremental mode") + // TODO: move this to validation method together in future + if (index.options.checkpointLocation().isEmpty) { + throw new IllegalStateException("Checkpoint location is required by incremental refresh") + } + // Reuse auto refresh which uses AvailableNow trigger and will stop once complete val jobId = new AutoIndexRefresh(indexName, index) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index c0af6cb6f..7a6ba99e1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -179,7 +179,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } test("incremental refresh skipping index successfully") { - // Incremental refresh requires checkpoint withTempDir { checkpointDir => flint .skippingIndex() @@ -211,6 +210,19 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + test("should fail if incremental refresh without checkpoint location") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + .create() + + assertThrows[IllegalStateException] { + flint.refreshIndex(testIndex) + } + } + test("auto refresh skipping index successfully") { // Create Flint index and wait for complete flint