Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support on-demand incremental refresh #234

24 changes: 22 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,25 @@ Please see the following example in which Index Building Logic and Query Rewrite
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100 |
| BloomFilter | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;client_ip BLOOM_FILTER<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;BLOOM_FILTER_AGG(client_ip) AS client_ip,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE client_ip = '127.0.0.1'<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE BLOOM_FILTER_MIGHT_CONTAIN(client_ip, '127.0.0.1') = true<br>)<br>WHERE client_ip = '127.0.0.1' |

### Flint Index Refresh

- **Auto Refresh:**
- This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences.
- **Manual Refresh:**
- Users have the option to manually trigger a refresh for the Flint Index. This provides flexibility and control over when the refresh occurs.
- **Full Refresh:**
- Initiates a comprehensive update of the Flint Index, fetching all available data and ensuring the most up-to-date information is displayed.
- **Incremental Refresh:**
- Performs an incremental update by fetching only the new data since the last refresh. This is useful for optimizing the refresh process and reducing resource usage.

The refresh mode is influenced by the index options specified during index creation, particularly the `auto_refresh` and `incremental_refresh` options. These options collectively define the behavior of the refresh mode when creating an index as below. Find more details in [Create Index Options](#create-index-options).

| Refresh Mode | auto_refresh | incremental_refresh |
|---------------------|--------------|---------------------|
| Auto Refresh | true | |
| Full Refresh | false | false |
| Incremental Refresh | false | true |

### Flint Index Specification

#### Metadata
Expand Down Expand Up @@ -263,9 +282,10 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics

User can provide the following options in `WITH` clause of create statement:

+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually.
+ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{AUTO, MANUAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.AUTO
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}

/**
* Flint Spark integration API entrypoint.
Expand Down Expand Up @@ -130,16 +129,14 @@ class FlintSpark(val spark: SparkSession) extends Logging {
*
* @param indexName
* index name
* @param mode
* refresh mode
* @return
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] = {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val mode = if (index.options.autoRefresh()) AUTO else MANUAL
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)

try {
flintClient
Expand All @@ -149,17 +146,16 @@ class FlintSpark(val spark: SparkSession) extends Logging {
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
// Change state to active if full, otherwise update index state regularly
if (mode == MANUAL) {
logInfo("Updating index state to active")
latest.copy(state = ACTIVE)
} else {
// Schedule regular update and return log entry as refreshing state
if (indexRefresh.refreshMode == AUTO) {
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
latest
} else {
logInfo("Updating index state to active")
latest.copy(state = ACTIVE)
}
})
.commit(_ => doRefreshIndex(index, indexName, mode))
.commit(_ => indexRefresh.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
Expand Down Expand Up @@ -292,7 +288,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ => doRefreshIndex(index.get, indexName, AUTO))
.commit(_ =>
FlintSparkIndexRefresh
.create(indexName, index.get)
.start(spark, flintSparkConf))

logInfo("Recovery complete")
true
Expand Down Expand Up @@ -333,67 +332,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
spark.read.format(FLINT_DATASOURCE).load(indexName)
}

// TODO: move to separate class
private def doRefreshIndex(
index: FlintSparkIndex,
indexName: String,
mode: RefreshMode): Option[String] = {
logInfo(s"Refreshing index $indexName in $mode mode")
val options = index.options
val tableName = index.metadata().source

// Batch refresh Flint index from the given source data frame
def batchRefresh(df: Option[DataFrame] = None): Unit = {
index
.build(spark, df)
.write
.format(FLINT_DATASOURCE)
.options(flintSparkConf.properties)
.mode(Overwrite)
.save(indexName)
}

val jobId = mode match {
case MANUAL =>
logInfo("Start refreshing index in batch style")
batchRefresh()
None

// Flint index has specialized logic and capability for incremental refresh
case AUTO if index.isInstanceOf[StreamingRefresh] =>
logInfo("Start refreshing index in streaming style")
val job =
index
.asInstanceOf[StreamingRefresh]
.buildStream(spark)
.writeStream
.queryName(indexName)
.format(FLINT_DATASOURCE)
.options(flintSparkConf.properties)
.addSinkOptions(options)
.start(indexName)
Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case AUTO =>
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
.table(quotedTableName(tableName))
.writeStream
.queryName(indexName)
.addSinkOptions(options)
.foreachBatch { (batchDF: DataFrame, _: Long) =>
batchRefresh(Some(batchDF))
}
.start()
Some(job.id.toString)
}

logInfo("Refresh index complete")
jobId
}

private def stopRefreshingJob(indexName: String): Unit = {
logInfo(s"Terminating refreshing job $indexName")
val job = spark.streams.active.find(_.name == indexName)
Expand All @@ -403,48 +341,4 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logWarning("Refreshing job not found")
}
}

// Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API
private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) {

def addSinkOptions(options: FlintSparkIndexOptions): DataStreamWriter[Row] = {
dataStream
.addCheckpointLocation(options.checkpointLocation())
.addRefreshInterval(options.refreshInterval())
.addOutputMode(options.outputMode())
.options(options.extraSinkOptions())
}

def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = {
checkpointLocation match {
case Some(location) => dataStream.option("checkpointLocation", location)
case None if flintSparkConf.isCheckpointMandatory =>
throw new IllegalStateException(
s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled")
case _ => dataStream
}
}

def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = {
refreshInterval
.map(interval => dataStream.trigger(Trigger.ProcessingTime(interval)))
.getOrElse(dataStream)
}

def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = {
outputMode.map(dataStream.outputMode).getOrElse(dataStream)
}
}
}

object FlintSpark {

/**
* Index refresh mode: FULL: refresh on current source data in batch style at one shot
* INCREMENTAL: auto refresh on new data in continuous streaming style
*/
object RefreshMode extends Enumeration {
type RefreshMode = Value
val MANUAL, AUTO = Value
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ package org.opensearch.flint.spark
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames

/**
Expand Down Expand Up @@ -39,6 +39,15 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def refreshInterval(): Option[String] = getOptionValue(REFRESH_INTERVAL)

/**
* Is refresh incremental or full. This only applies to manual refresh.
*
* @return
* incremental option value
*/
def incrementalRefresh(): Boolean =
getOptionValue(INCREMENTAL_REFRESH).getOrElse("false").toBoolean

/**
* The checkpoint location which maybe required by Flint index's refresh.
*
Expand Down Expand Up @@ -103,6 +112,9 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
if (!options.contains(AUTO_REFRESH.toString)) {
map += (AUTO_REFRESH.toString -> autoRefresh().toString)
}
if (!options.contains(INCREMENTAL_REFRESH.toString)) {
map += (INCREMENTAL_REFRESH.toString -> incrementalRefresh().toString)
}
map.result()
}

Expand Down Expand Up @@ -131,6 +143,7 @@ object FlintSparkIndexOptions {
type OptionName = Value
val AUTO_REFRESH: OptionName.Value = Value("auto_refresh")
val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval")
val INCREMENTAL_REFRESH: OptionName.Value = Value("incremental_refresh")
val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location")
val WATERMARK_DELAY: OptionName.Value = Value("watermark_delay")
val OUTPUT_MODE: OptionName.Value = Value("output_mode")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.refresh

import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}

/**
* Index refresh that auto refreshes the index by index options provided.
*
* @param indexName
* Flint index name
* @param index
* Flint index
*/
class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresh {

override def refreshMode: RefreshMode = AUTO

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
val options = index.options
val tableName = index.metadata().source
index match {
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
logInfo("Start refreshing index in streaming style")
val job =
refresh
.buildStream(spark)
.writeStream
.queryName(indexName)
.format(FLINT_DATASOURCE)
.options(flintSparkConf.properties)
.addSinkOptions(options, flintSparkConf)
.start(indexName)
Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case _ =>
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
.table(quotedTableName(tableName))
.writeStream
.queryName(indexName)
.addSinkOptions(options, flintSparkConf)
.foreachBatch { (batchDF: DataFrame, _: Long) =>
new FullIndexRefresh(indexName, index, Some(batchDF))
.start(spark, flintSparkConf)
() // discard return value above and return unit to use right overridden method
}
.start()
Some(job.id.toString)
}
}

// Using Scala implicit class to avoid breaking method chaining of Spark data frame fluent API
private implicit class FlintDataStreamWriter(val dataStream: DataStreamWriter[Row]) {

def addSinkOptions(
options: FlintSparkIndexOptions,
flintSparkConf: FlintSparkConf): DataStreamWriter[Row] = {
dataStream
.addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory)
.addRefreshInterval(options.refreshInterval())
.addAvailableNowTrigger(options.incrementalRefresh())
.addOutputMode(options.outputMode())
.options(options.extraSinkOptions())
}

def addCheckpointLocation(
checkpointLocation: Option[String],
isCheckpointMandatory: Boolean): DataStreamWriter[Row] = {
checkpointLocation match {
case Some(location) => dataStream.option("checkpointLocation", location)
case None if isCheckpointMandatory =>
throw new IllegalStateException(
s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled")
case _ => dataStream
}
}

def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = {
refreshInterval
.map(interval => dataStream.trigger(Trigger.ProcessingTime(interval)))
.getOrElse(dataStream)
}

def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = {
if (incrementalRefresh) {
dataStream.trigger(Trigger.AvailableNow())
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
} else {
dataStream
}
}

def addOutputMode(outputMode: Option[String]): DataStreamWriter[Row] = {
outputMode.map(dataStream.outputMode).getOrElse(dataStream)
}
}
}
Loading
Loading