Skip to content

Commit

Permalink
Add scheduler_mode index option
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jul 10, 2024
1 parent b5715f6 commit 9f6a0de
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 17 deletions.
11 changes: 9 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ Please see the following example in which Index Building Logic and Query Rewrite
### Flint Index Refresh

- **Auto Refresh:**
- This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences.
- This feature allows the Flint Index to automatically refresh. Users can configure such as frequency of auto-refresh based on their preferences. There are two modes available for scheduling the auto-refresh:
- **Internal Scheduler:**
- Description: The data refresh is executed in micro-batch mode using the internal scheduler.
- Recommended Use-Case: This mode is ideal for low-latency use-cases where data needs to be refreshed frequently and quickly.
- **External Scheduler:**
- Description: The data refresh is executed using an external scheduler.
- Recommended Use-Case: This mode is suitable for scenarios where data responsiveness is less critical, helping to reduce the cost of maintaining a long-running Spark cluster.
- **Manual Refresh:**
- Users have the option to manually trigger a refresh for the Flint Index. This provides flexibility and control over when the refresh occurs.
- **Full Refresh:**
Expand Down Expand Up @@ -369,7 +375,8 @@ fetched rows / total rows = 5/5

User can provide the following options in `WITH` clause of create statement:

+ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually.
+ `auto_refresh`: default value is false. Automatically refresh the index if set to true. Otherwise, user has to trigger refresh by `REFRESH` statement manually.
+ `scheduler_mode`: A mode string (`internal` or `external`) that describes how `auto_refresh` is scheduled. `checkpoint_location` is required for the external scheduler.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `incremental_refresh`: default value is false. incrementally refresh the index if set to true. Otherwise, fully refresh the entire index. This only applicable when auto refresh disabled.
+ `checkpoint_location`: a string as the location path for refresh job checkpoint (auto or incremental). The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ package org.opensearch.flint.spark
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode

/**
* Flint Spark index configurable options.
Expand All @@ -22,6 +23,7 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
implicit val formats: Formats = Serialization.formats(NoTypeHints)

validateOptionNames(options)
validateOptionSchedulerModeValue()

/**
* Is Flint index auto refreshed or manual refreshed.
Expand All @@ -31,6 +33,19 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean

/**
* The scheduler mode for the Flint index refresh.
*
* @return
* scheduler mode option value
*/
def schedulerMode(): SchedulerMode.Value = {
// TODO: Change default value to external once the external scheduler is enabled
val defaultMode = "internal"
val modeStr = getOptionValue(SCHEDULER_MODE).getOrElse(defaultMode)
SchedulerMode.fromString(modeStr)
}

/**
* The refresh interval (only valid if auto refresh enabled).
*
Expand Down Expand Up @@ -112,6 +127,21 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
if (!options.contains(AUTO_REFRESH.toString)) {
map += (AUTO_REFRESH.toString -> autoRefresh().toString)
}

// Add default option only when auto refresh is TRUE
if (autoRefresh() == true) {
if (!options.contains(SCHEDULER_MODE.toString)) {
map += (SCHEDULER_MODE.toString -> schedulerMode().toString)
}

// The query will be executed in micro-batch mode using the internal scheduler
// The default interval for the external scheduler is 15 minutes.
if (SchedulerMode.EXTERNAL == schedulerMode() && !options.contains(
REFRESH_INTERVAL.toString)) {
map += (REFRESH_INTERVAL.toString -> "15 minutes")
}
}

if (!options.contains(INCREMENTAL_REFRESH.toString)) {
map += (INCREMENTAL_REFRESH.toString -> incrementalRefresh().toString)
}
Expand All @@ -127,6 +157,14 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
.map(opt => (parse(opt) \ key).extract[Map[String, String]])
.getOrElse(Map.empty)
}

private def validateOptionSchedulerModeValue(): Unit = {
getOptionValue(SCHEDULER_MODE) match {
case Some(modeStr) =>
SchedulerMode.fromString(modeStr) // Will throw an exception if the mode is invalid
case None => // no action needed if modeStr is empty
}
}
}

object FlintSparkIndexOptions {
Expand All @@ -142,6 +180,7 @@ object FlintSparkIndexOptions {
object OptionName extends Enumeration {
type OptionName = Value
val AUTO_REFRESH: OptionName.Value = Value("auto_refresh")
val SCHEDULER_MODE: OptionName.Value = Value("scheduler_mode")
val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval")
val INCREMENTAL_REFRESH: OptionName.Value = Value("incremental_refresh")
val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util.Collections
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
Expand Down Expand Up @@ -43,10 +44,11 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
!isTableProviderSupported(spark, index),
"Index auto refresh doesn't support Hive table")

// Checkpoint location is required if mandatory option set
// Checkpoint location is required if mandatory option set or external scheduler is used
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
val checkpointLocation = options.checkpointLocation()
if (flintSparkConf.isCheckpointMandatory) {
if (flintSparkConf.isCheckpointMandatory || SchedulerMode.EXTERNAL ==
options.schedulerMode()) {
require(
checkpointLocation.isDefined,
s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled")
Expand All @@ -63,6 +65,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
val options = index.options
val tableName = index.metadata().source
var jobId: Option[String] = None // Store the job ID here to use later

index match {
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
Expand All @@ -76,7 +80,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
.options(flintSparkConf.properties)
.addSinkOptions(options, flintSparkConf)
.start(indexName)
Some(job.id.toString)
jobId = Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case _ =>
Expand All @@ -90,10 +94,20 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
.foreachBatch { (batchDF: DataFrame, _: Long) =>
new FullIndexRefresh(indexName, index, Some(batchDF))
.start(spark, flintSparkConf)
() // discard return value above and return unit to use right overridden method
() // discard return value above and return unit to use the right overridden method
}
.start()
Some(job.id.toString)
jobId = Some(job.id.toString)
}

// If EXTERNAL scheduling is set, await termination and return None
if (SchedulerMode.EXTERNAL == options.schedulerMode() && jobId.isDefined) {
spark.streams
.get(jobId.get)
.awaitTermination()
None
} else {
jobId
}
}

Expand All @@ -103,10 +117,12 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
def addSinkOptions(
options: FlintSparkIndexOptions,
flintSparkConf: FlintSparkConf): DataStreamWriter[Row] = {
// For incremental refresh, the refresh_interval option is overridden by Trigger.AvailableNow().
dataStream
.addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory)
.addRefreshInterval(options.refreshInterval())
.addAvailableNowTrigger(options.incrementalRefresh())
.addAvailableNowTrigger(
SchedulerMode.EXTERNAL == options.schedulerMode() || options.incrementalRefresh())
.addOutputMode(options.outputMode())
.options(options.extraSinkOptions())
}
Expand All @@ -129,8 +145,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
.getOrElse(dataStream)
}

def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = {
if (incrementalRefresh) {
def addAvailableNowTrigger(setAvailableNow: Boolean): DataStreamWriter[Row] = {
if (setAvailableNow) {
dataStream.trigger(Trigger.AvailableNow())
} else {
dataStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ object FlintSparkIndexRefresh {
val AUTO, FULL, INCREMENTAL = Value
}

/** Index scheduler mode for auto refresh */
object SchedulerMode extends Enumeration {
type SchedulerMode = Value
val INTERNAL: SchedulerMode.Value = Value("internal")
val EXTERNAL: SchedulerMode.Value = Value("external")

def fromString(s: String): SchedulerMode.Value = {
require(
SchedulerMode.values.exists(_.toString.equalsIgnoreCase(s)),
s"Scheduler mode $s is invalid. Must be 'internal' or 'external'.")
SchedulerMode.values.find(_.toString.equalsIgnoreCase(s)).get
}
}

/**
* Create concrete index refresh implementation for the given index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
new AutoIndexRefresh(indexName, index)
.start(spark, flintSparkConf)

// Blocks the calling thread until the streaming query finishes
spark.streams
.get(jobId.get)
.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql.covering
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
Expand Down Expand Up @@ -49,8 +50,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) {
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
Expand Down Expand Up @@ -42,8 +43,9 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) {
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}
Expand Down Expand Up @@ -75,8 +76,9 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) {
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite
Expand All @@ -14,6 +15,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {

test("should return lowercase name as option name") {
AUTO_REFRESH.toString shouldBe "auto_refresh"
SCHEDULER_MODE.toString shouldBe "scheduler_mode"
REFRESH_INTERVAL.toString shouldBe "refresh_interval"
INCREMENTAL_REFRESH.toString shouldBe "incremental_refresh"
CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location"
Expand All @@ -27,6 +29,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
val options = FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "external",
"refresh_interval" -> "1 Minute",
"incremental_refresh" -> "true",
"checkpoint_location" -> "s3://test/",
Expand All @@ -45,6 +48,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
| }""".stripMargin))

options.autoRefresh() shouldBe true
options.schedulerMode() shouldBe SchedulerMode.EXTERNAL
options.refreshInterval() shouldBe Some("1 Minute")
options.incrementalRefresh() shouldBe true
options.checkpointLocation() shouldBe Some("s3://test/")
Expand Down Expand Up @@ -73,6 +77,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
val options = FlintSparkIndexOptions(Map.empty)

options.autoRefresh() shouldBe false
options.schedulerMode() shouldBe SchedulerMode.INTERNAL
options.refreshInterval() shouldBe empty
options.checkpointLocation() shouldBe empty
options.watermarkDelay() shouldBe empty
Expand All @@ -92,6 +97,27 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
"refresh_interval" -> "1 Minute")
}

test("should return include default scheduler_mode option when auto refresh is set to true") {
val options = FlintSparkIndexOptions(Map("auto_refresh" -> "true"))

options.optionsWithDefault shouldBe Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "internal",
"incremental_refresh" -> "false")
}

test(
"should return include default refresh_interval option with auto_refresh=true and scheduler_mode=external") {
val options =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external"))

options.optionsWithDefault shouldBe Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "external",
"refresh_interval" -> "15 minutes",
"incremental_refresh" -> "false")
}

test("should report error if any unknown option name") {
the[IllegalArgumentException] thrownBy
FlintSparkIndexOptions(Map("autoRefresh" -> "true"))
Expand All @@ -102,5 +128,9 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
the[IllegalArgumentException] thrownBy {
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "indexSetting" -> "test"))
} should have message "requirement failed: option name indexSetting is invalid"

the[IllegalArgumentException] thrownBy {
FlintSparkIndexOptions(Map("scheduler_mode" -> "invalid_mode"))
} should have message "requirement failed: Scheduler mode invalid_mode is invalid. Must be 'internal' or 'external'."
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,29 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
}

test("auto refresh covering index successfully with external scheduler") {
withTempDir { checkpointDir =>
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "external",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))
.create()

val jobId = flint.refreshIndex(testFlintIndex)
jobId shouldBe None

val indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
}
}

test("update covering index successfully") {
// Create full refresh Flint index
flint
Expand Down
Loading

0 comments on commit 9f6a0de

Please sign in to comment.