From dd9965cb81fac5e8158ecf9b1950487f4fa1c037 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 23 Jan 2024 16:41:01 -0800 Subject: [PATCH] Update user manual and javadoc Signed-off-by: Chen Dai --- docs/index.md | 16 ++++++++-- .../opensearch/flint/spark/FlintSpark.scala | 32 ++++--------------- .../spark/refresh/AutoIndexRefresher.scala | 10 ++++-- .../refresh/FlintSparkIndexRefresher.scala | 26 +++++++++++++++ .../spark/refresh/FullIndexRefresher.scala | 5 ++- .../refresh/IncrementalIndexRefresher.scala | 5 ++- .../FlintSparkCoveringIndexAstBuilder.scala | 1 - ...FlintSparkMaterializedViewAstBuilder.scala | 1 - .../FlintSparkSkippingIndexAstBuilder.scala | 1 - 9 files changed, 62 insertions(+), 35 deletions(-) diff --git a/docs/index.md b/docs/index.md index 5f9d594de..0038a9b50 100644 --- a/docs/index.md +++ b/docs/index.md @@ -25,6 +25,17 @@ Please see the following example in which Index Building Logic and Query Rewrite | ValueSet | CREATE SKIPPING INDEX
ON alb_logs
(
  elb_status_code VALUE_SET
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  COLLECT_SET(elb_status_code) AS elb_status_code,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE elb_status_code = 404
=>
SELECT *
FROM alb_logs (input_files =
  SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE ARRAY_CONTAINS(elb_status_code, 404)
)
WHERE elb_status_code = 404 | | MinMax | CREATE SKIPPING INDEX
ON alb_logs
(
  request_processing_time MIN_MAX
) | INSERT INTO flint_alb_logs_skipping_index
SELECT
  MIN(request_processing_time) AS request_processing_time_min,
  MAX(request_processing_time) AS request_processing_time_max,
  input_file_name() AS file_path
FROM alb_logs
GROUP BY
  input_file_name() | SELECT *
FROM alb_logs
WHERE request_processing_time = 100
=>
SELECT *
FROM alb_logs (input_files =
SELECT file_path
  FROM flint_alb_logs_skipping_index
  WHERE request_processing_time_min <= 100
    AND 100 <= request_processing_time_max
)
WHERE request_processing_time = 100 +### Flint Index Refresh + +- **Auto Refresh:** + - This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences. +- **Manual Refresh:** + - Users have the option to manually trigger a refresh for the Flint Index. This provides flexibility and control over when the refresh occurs. + - **Full Refresh:** + - Initiates a comprehensive update of the Flint Index, fetching all available data and ensuring the most up-to-date information is displayed. + - **Incremental Refresh:** + - Performs an incremental update by fetching only the new data since the last refresh. This is useful for optimizing the refresh process and reducing resource usage. + ### Flint Index Specification #### Metadata @@ -260,9 +271,10 @@ VACUUM MATERIALIZED VIEW alb_logs_metrics User can provide the following options in `WITH` clause of create statement: -+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually. ++ `auto_refresh`: automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually. + `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing. -+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. ++ `incremental_refresh`: incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled. ++ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart. + `watermark_delay`: a string as time expression for how late data can come and still be processed, e.g. 1 minute, 10 seconds. This is required by incremental refresh on materialized view if it has aggregation in the query. + `output_mode`: a mode string that describes how data will be written to streaming sink. If unspecified, default append mode will be applied. + `index_settings`: a JSON string as index settings for OpenSearch index that will be created. Please follow the format in OpenSearch documentation. If unspecified, default OpenSearch index settings will be applied. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index df269cf78..629b5104c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -12,11 +12,11 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY -import org.opensearch.flint.spark.FlintSpark.RefreshMode.{AUTO, MANUAL} import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.AUTO import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer @@ -129,8 +129,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { * * @param indexName * index name - * @param mode - * refresh mode * @return * refreshing job ID (empty if batch job for now) */ @@ -138,7 +136,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Refreshing Flint index $indexName") val index = describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val mode = if (index.options.autoRefresh()) AUTO else MANUAL + val refresher = FlintSparkIndexRefresher.create(indexName, index) try { flintClient @@ -148,20 +146,16 @@ class FlintSpark(val spark: SparkSession) extends Logging { latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) .finalLog(latest => { // Change state to active if full, otherwise update index state regularly - if (mode == MANUAL) { - logInfo("Updating index state to active") - latest.copy(state = ACTIVE) - } else { - // Schedule regular update and return log entry as refreshing state + if (refresher.refreshMode == AUTO) { logInfo("Scheduling index state monitor") flintIndexMonitor.startMonitor(indexName) latest + } else { + logInfo("Updating index state to active") + latest.copy(state = ACTIVE) } }) - .commit(_ => - FlintSparkIndexRefresher - .create(indexName, index) - .start(spark, flintSparkConf)) + .commit(_ => refresher.start(spark, flintSparkConf)) } catch { case e: Exception => logError("Failed to refresh Flint index", e) @@ -334,15 +328,3 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } } - -object FlintSpark { - - /** - * Index refresh mode: FULL: refresh on current source data in batch style at one shot - * INCREMENTAL: auto refresh on new data in continuous streaming style - */ - object RefreshMode extends Enumeration { - type RefreshMode = Value - val MANUAL, AUTO = Value - } -} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresher.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresher.scala index 76560ea71..c4240ab59 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresher.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresher.scala @@ -7,6 +7,7 @@ package org.opensearch.flint.spark.refresh import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.{AUTO, RefreshMode} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE @@ -25,6 +26,8 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} class AutoIndexRefresher(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresher { + override def refreshMode: RefreshMode = AUTO + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { val options = index.options val tableName = index.metadata().source @@ -42,8 +45,9 @@ class AutoIndexRefresher(indexName: String, index: FlintSparkIndex) .addSinkOptions(options, flintSparkConf) .start(indexName) Some(job.id.toString) + + // Otherwise, fall back to foreachBatch + batch refresh case _ => - // Otherwise, fall back to foreachBatch + batch refresh logInfo("Start refreshing index in foreach streaming style") val job = spark.readStream .options(options.extraSourceOptions(tableName)) @@ -93,8 +97,8 @@ class AutoIndexRefresher(indexName: String, index: FlintSparkIndex) .getOrElse(dataStream) } - def addAvailableNowTrigger(incremental: Boolean): DataStreamWriter[Row] = { - if (incremental) { + def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = { + if (incrementalRefresh) { dataStream.trigger(Trigger.AvailableNow()) } else { dataStream diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresher.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresher.scala index 2457a35b8..e7e95d2c5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresher.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresher.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark.refresh import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.RefreshMode import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -16,6 +17,12 @@ import org.apache.spark.sql.flint.config.FlintSparkConf */ trait FlintSparkIndexRefresher extends Logging { + /** + * @return + * refresh mode + */ + def refreshMode: RefreshMode + /** * Start refreshing the index. * @@ -31,6 +38,25 @@ trait FlintSparkIndexRefresher extends Logging { object FlintSparkIndexRefresher { + /** + * Index refresh mode: FULL: refresh on current source data in batch style at one shot + * INCREMENTAL: auto refresh on new data in continuous streaming style + */ + object RefreshMode extends Enumeration { + type RefreshMode = Value + val AUTO, FULL, INCREMENTAL = Value + } + + /** + * Create concrete index refresher for the given index. + * + * @param indexName + * Flint index name + * @param index + * Flint index + * @return + * index refresher + */ def create(indexName: String, index: FlintSparkIndex): FlintSparkIndexRefresher = { val options = index.options if (options.autoRefresh()) { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresher.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresher.scala index a9c912d59..6230705f8 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresher.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FullIndexRefresher.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark.refresh import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.{FULL, RefreshMode} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.SaveMode.Overwrite @@ -28,8 +29,10 @@ class FullIndexRefresher( source: Option[DataFrame] = None) extends FlintSparkIndexRefresher { + override def refreshMode: RefreshMode = FULL + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { - logInfo(s"Start refreshing index $indexName in full-manual mode") + logInfo(s"Start refreshing index $indexName in full mode") index .build(spark, source) .write diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresher.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresher.scala index e01b7583d..11d507c06 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresher.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresher.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark.refresh import org.opensearch.flint.spark.FlintSparkIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresher.RefreshMode.{INCREMENTAL, RefreshMode} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.flint.config.FlintSparkConf @@ -21,8 +22,10 @@ import org.apache.spark.sql.flint.config.FlintSparkConf class IncrementalIndexRefresher(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresher { + override def refreshMode: RefreshMode = INCREMENTAL + override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { - logInfo(s"Start refreshing index $indexName in incremental-manual mode") + logInfo(s"Start refreshing index $indexName in incremental mode") val jobId = new AutoIndexRefresher(indexName, index) .start(spark, flintSparkConf) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index eae401a69..14fa21240 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -7,7 +7,6 @@ package org.opensearch.flint.spark.sql.covering import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index a67803a18..5b31890bb 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -9,7 +9,6 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 73bff5cba..9b638f36f 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -9,7 +9,6 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark -import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}