Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Feb 7, 2024
1 parent 4848476 commit 694d603
Showing 3 changed files with 28 additions and 3 deletions.
12 changes: 10 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -36,6 +36,14 @@ Please see the following example in which Index Building Logic and Query Rewrite
- **Incremental Refresh:**
- Performs an incremental update by fetching only the new data since the last refresh. This is useful for optimizing the refresh process and reducing resource usage.

The refresh mode is influenced by the index options specified during index creation, particularly the `auto_refresh` and `incremental_refresh` options. These options collectively define the behavior of the refresh mode when creating an index as below. Find more details in [Create Index Options](#create-index-options).

| Refresh Mode | auto_refresh | incremental_refresh |
|---------------------|--------------|---------------------|
| Auto Refresh | true | |
| Full Refresh | false | false |
| Incremental Refresh | false | true |

### Flint Index Specification

#### Metadata
@@ -271,9 +279,9 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics

User can provide the following options in `WITH` clause of create statement:

+ `auto_refresh`: automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually.
+ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `incremental_refresh`: incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
Original file line number Diff line number Diff line change
@@ -27,6 +27,11 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
logInfo(s"Start refreshing index $indexName in incremental mode")

// TODO: move this to validation method together in future
if (index.options.checkpointLocation().isEmpty) {
throw new IllegalStateException("Checkpoint location is required by incremental refresh")
}

// Reuse auto refresh which uses AvailableNow trigger and will stop once complete
val jobId =
new AutoIndexRefresh(indexName, index)
Original file line number Diff line number Diff line change
@@ -179,7 +179,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}

test("incremental refresh skipping index successfully") {
// Incremental refresh requires checkpoint
withTempDir { checkpointDir =>
flint
.skippingIndex()
@@ -211,6 +210,19 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("should fail if incremental refresh without checkpoint location") {
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true")))
.create()

assertThrows[IllegalStateException] {
flint.refreshIndex(testIndex)
}
}

test("auto refresh skipping index successfully") {
// Create Flint index and wait for complete
flint

0 comments on commit 694d603

Please sign in to comment.