Skip to content

Commit

Permalink
Update user manual and javadoc
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jan 24, 2024
1 parent 1835132 commit dd9965c
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 35 deletions.
16 changes: 14 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ Please see the following example in which Index Building Logic and Query Rewrite
| ValueSet | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;elb_status_code VALUE_SET<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;COLLECT_SET(elb_status_code) AS elb_status_code,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE elb_status_code = 404<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br>&nbsp;&nbsp;SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE ARRAY_CONTAINS(elb_status_code, 404)<br>)<br>WHERE elb_status_code = 404 |
| MinMax | CREATE SKIPPING INDEX<br>ON alb_logs<br> (<br>&nbsp;&nbsp;request_processing_time MIN_MAX<br>) | INSERT INTO flint_alb_logs_skipping_index<br>SELECT<br>&nbsp;&nbsp;MIN(request_processing_time) AS request_processing_time_min,<br>&nbsp;&nbsp;MAX(request_processing_time) AS request_processing_time_max,<br>&nbsp;&nbsp;input_file_name() AS file_path<br>FROM alb_logs<br>GROUP BY<br>&nbsp;&nbsp;input_file_name() | SELECT *<br>FROM alb_logs<br>WHERE request_processing_time = 100<br>=><br>SELECT *<br>FROM alb_logs (input_files = <br> SELECT file_path<br>&nbsp;&nbsp;FROM flint_alb_logs_skipping_index<br>&nbsp;&nbsp;WHERE request_processing_time_min <= 100<br>&nbsp;&nbsp;&nbsp;&nbsp;AND 100 <= request_processing_time_max<br>)<br>WHERE request_processing_time = 100

### Flint Index Refresh

- **Auto Refresh:**
- This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences.
- **Manual Refresh:**
- Users have the option to manually trigger a refresh for the Flint Index. This provides flexibility and control over when the refresh occurs.
- **Full Refresh:**
- Initiates a comprehensive update of the Flint Index, fetching all available data and ensuring the most up-to-date information is displayed.
- **Incremental Refresh:**
- Performs an incremental update by fetching only the new data since the last refresh. This is useful for optimizing the refresh process and reducing resource usage.

### Flint Index Specification

#### Metadata
Expand Down Expand Up @@ -260,9 +271,10 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics

User can provide the following options in `WITH` clause of create statement:

+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually.
+ `auto_refresh`: automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `incremental_refresh`: incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
+ `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query.
+ `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied.
+ `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{AUTO, MANUAL}
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.AUTO
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer

Expand Down Expand Up @@ -129,16 +129,14 @@ class FlintSpark(val spark: SparkSession) extends Logging {
*
* @param indexName
* index name
* @param mode
* refresh mode
* @return
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] = {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val mode = if (index.options.autoRefresh()) AUTO else MANUAL
val refresher = FlintSparkIndexRefresher.create(indexName, index)

try {
flintClient
Expand All @@ -148,20 +146,16 @@ class FlintSpark(val spark: SparkSession) extends Logging {
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
// Change state to active if full, otherwise update index state regularly
if (mode == MANUAL) {
logInfo("Updating index state to active")
latest.copy(state = ACTIVE)
} else {
// Schedule regular update and return log entry as refreshing state
if (refresher.refreshMode == AUTO) {
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
latest
} else {
logInfo("Updating index state to active")
latest.copy(state = ACTIVE)
}
})
.commit(_ =>
FlintSparkIndexRefresher
.create(indexName, index)
.start(spark, flintSparkConf))
.commit(_ => refresher.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
Expand Down Expand Up @@ -334,15 +328,3 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}
}

object FlintSpark {

/**
* Index refresh mode: FULL: refresh on current source data in batch style at one shot
* INCREMENTAL: auto refresh on new data in continuous streaming style
*/
object RefreshMode extends Enumeration {
type RefreshMode = Value
val MANUAL, AUTO = Value
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.flint.spark.refresh

import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.{AUTO, RefreshMode}

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
Expand All @@ -25,6 +26,8 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
class AutoIndexRefresher(indexName: String, index: FlintSparkIndex)
extends FlintSparkIndexRefresher {

override def refreshMode: RefreshMode = AUTO

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
val options = index.options
val tableName = index.metadata().source
Expand All @@ -42,8 +45,9 @@ class AutoIndexRefresher(indexName: String, index: FlintSparkIndex)
.addSinkOptions(options, flintSparkConf)
.start(indexName)
Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case _ =>
// Otherwise, fall back to foreachBatch + batch refresh
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
Expand Down Expand Up @@ -93,8 +97,8 @@ class AutoIndexRefresher(indexName: String, index: FlintSparkIndex)
.getOrElse(dataStream)
}

def addAvailableNowTrigger(incremental: Boolean): DataStreamWriter[Row] = {
if (incremental) {
def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = {
if (incrementalRefresh) {
dataStream.trigger(Trigger.AvailableNow())
} else {
dataStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark.refresh

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.RefreshMode

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -16,6 +17,12 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
*/
trait FlintSparkIndexRefresher extends Logging {

/**
* @return
* refresh mode
*/
def refreshMode: RefreshMode

/**
* Start refreshing the index.
*
Expand All @@ -31,6 +38,25 @@ trait FlintSparkIndexRefresher extends Logging {

object FlintSparkIndexRefresher {

/**
* Index refresh mode: FULL: refresh on current source data in batch style at one shot
* INCREMENTAL: auto refresh on new data in continuous streaming style
*/
object RefreshMode extends Enumeration {
type RefreshMode = Value
val AUTO, FULL, INCREMENTAL = Value
}

/**
* Create concrete index refresher for the given index.
*
* @param indexName
* Flint index name
* @param index
* Flint index
* @return
* index refresher
*/
def create(indexName: String, index: FlintSparkIndex): FlintSparkIndexRefresher = {
val options = index.options
if (options.autoRefresh()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark.refresh

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.{FULL, RefreshMode}

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SaveMode.Overwrite
Expand All @@ -28,8 +29,10 @@ class FullIndexRefresher(
source: Option[DataFrame] = None)
extends FlintSparkIndexRefresher {

override def refreshMode: RefreshMode = FULL

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
logInfo(s"Start refreshing index $indexName in full-manual mode")
logInfo(s"Start refreshing index $indexName in full mode")
index
.build(spark, source)
.write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark.refresh

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.{INCREMENTAL, RefreshMode}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.flint.config.FlintSparkConf
Expand All @@ -21,8 +22,10 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
class IncrementalIndexRefresher(indexName: String, index: FlintSparkIndex)
extends FlintSparkIndexRefresher {

override def refreshMode: RefreshMode = INCREMENTAL

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
logInfo(s"Start refreshing index $indexName in incremental-manual mode")
logInfo(s"Start refreshing index $indexName in incremental mode")
val jobId =
new AutoIndexRefresher(indexName, index)
.start(spark, flintSparkConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package org.opensearch.flint.spark.sql.covering

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
Expand Down

0 comments on commit dd9965c

Please sign in to comment.