Skip to content

Commit

Permalink
Add scheduler_mode index option
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jul 10, 2024
1 parent b5715f6 commit d74bcad
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy
Expand Down Expand Up @@ -140,7 +141,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
// Change state to active if full, otherwise update index state regularly
if (indexRefresh.refreshMode == AUTO) {
if (indexRefresh.refreshMode == AUTO && SchedulerMode.INTERNAL.equals(
index.options.schedulerMode().get)) {
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ package org.opensearch.flint.spark
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY}
import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode

/**
* Flint Spark index configurable options.
Expand All @@ -31,6 +32,17 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
*/
def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean

/**
* The scheduler mode for the Flint index refresh.
*
* @return
* scheduler mode option value
*/
def schedulerMode(): Option[String] = {
// TODO: Change default value to external once the external scheduler is enabled
getOptionValue(SCHEDULER_MODE).orElse(Some("internal"))
}

/**
* The refresh interval (only valid if auto refresh enabled).
*
Expand Down Expand Up @@ -112,6 +124,21 @@ case class FlintSparkIndexOptions(options: Map[String, String]) {
if (!options.contains(AUTO_REFRESH.toString)) {
map += (AUTO_REFRESH.toString -> autoRefresh().toString)
}

// Add default option only when auto refresh is TRUE
if (autoRefresh() == true) {
if (!options.contains(SCHEDULER_MODE.toString)) {
map += (SCHEDULER_MODE.toString -> schedulerMode().get)
}

// The query will be executed in micro-batch mode using the internal scheduler
// The default interval for the external scheduler is 15 minutes.
if (SchedulerMode.EXTERNAL.equals(schedulerMode().get) && !options.contains(
REFRESH_INTERVAL.toString)) {
map += (REFRESH_INTERVAL.toString -> "15 minutes")
}
}

if (!options.contains(INCREMENTAL_REFRESH.toString)) {
map += (INCREMENTAL_REFRESH.toString -> incrementalRefresh().toString)
}
Expand Down Expand Up @@ -142,6 +169,7 @@ object FlintSparkIndexOptions {
object OptionName extends Enumeration {
type OptionName = Value
val AUTO_REFRESH: OptionName.Value = Value("auto_refresh")
val SCHEDULER_MODE: OptionName.Value = Value("scheduler_mode")
val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval")
val INCREMENTAL_REFRESH: OptionName.Value = Value("incremental_refresh")
val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util.Collections
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
Expand Down Expand Up @@ -43,10 +44,11 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
!isTableProviderSupported(spark, index),
"Index auto refresh doesn't support Hive table")

// Checkpoint location is required if mandatory option set
// Checkpoint location is required if mandatory option set or external scheduler is used
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
val checkpointLocation = options.checkpointLocation()
if (flintSparkConf.isCheckpointMandatory) {
if (flintSparkConf.isCheckpointMandatory || SchedulerMode.EXTERNAL.equals(
options.schedulerMode().get)) {
require(
checkpointLocation.isDefined,
s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled")
Expand All @@ -63,6 +65,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
val options = index.options
val tableName = index.metadata().source
var jobId: Option[String] = None // Store the job ID here to use later

index match {
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
Expand All @@ -76,7 +80,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
.options(flintSparkConf.properties)
.addSinkOptions(options, flintSparkConf)
.start(indexName)
Some(job.id.toString)
jobId = Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case _ =>
Expand All @@ -90,10 +94,20 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
.foreachBatch { (batchDF: DataFrame, _: Long) =>
new FullIndexRefresh(indexName, index, Some(batchDF))
.start(spark, flintSparkConf)
() // discard return value above and return unit to use right overridden method
() // discard return value above and return unit to use the right overridden method
}
.start()
Some(job.id.toString)
jobId = Some(job.id.toString)
}

// If EXTERNAL scheduling is set, await termination and return None
if (SchedulerMode.EXTERNAL.equals(options.schedulerMode().get) && jobId.isDefined) {
spark.streams
.get(jobId.get)
.awaitTermination()
None
} else {
jobId
}
}

Expand All @@ -103,10 +117,12 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
def addSinkOptions(
options: FlintSparkIndexOptions,
flintSparkConf: FlintSparkConf): DataStreamWriter[Row] = {
// For incremental refresh, the refresh_interval option is overridden by Trigger.AvailableNow().
dataStream
.addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory)
.addRefreshInterval(options.refreshInterval())
.addAvailableNowTrigger(options.incrementalRefresh())
.addAvailableNowTrigger(SchedulerMode.EXTERNAL.equals(
options.schedulerMode().get) || options.incrementalRefresh())
.addOutputMode(options.outputMode())
.options(options.extraSinkOptions())
}
Expand All @@ -129,8 +145,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
.getOrElse(dataStream)
}

def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = {
if (incrementalRefresh) {
def addAvailableNowTrigger(setAvailableNow: Boolean): DataStreamWriter[Row] = {
if (setAvailableNow) {
dataStream.trigger(Trigger.AvailableNow())
} else {
dataStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark.refresh

import java.util.Locale

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.RefreshMode

Expand Down Expand Up @@ -59,6 +61,24 @@ object FlintSparkIndexRefresh {
val AUTO, FULL, INCREMENTAL = Value
}

object SchedulerMode extends Enumeration {
type SchedulerMode = Value
val INTERNAL, EXTERNAL = Value

implicit class SchedulerModeValue(value: SchedulerMode) {
override def toString: String = value.toString.toLowerCase(Locale.ROOT)

override def equals(obj: Any): Boolean = obj match {
case str: String =>
str.toLowerCase(Locale.ROOT) == value.toString.toLowerCase(Locale.ROOT)
case mode: SchedulerMode => mode.toString == value.toString
case _ => false
}

override def hashCode(): Int = value.toString.toLowerCase(Locale.ROOT).hashCode
}
}

/**
* Create concrete index refresh implementation for the given index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
new AutoIndexRefresh(indexName, index)
.start(spark, flintSparkConf)

// Blocks the calling thread until the streaming query finishes
spark.streams
.get(jobId.get)
.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql.covering
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
Expand Down Expand Up @@ -49,8 +50,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL.equals(indexOptions.schedulerMode().get)) {
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
flint.refreshIndex(flintIndexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable`
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._
Expand Down Expand Up @@ -42,8 +43,9 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL.equals(indexOptions.schedulerMode().get)) {
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter
import org.antlr.v4.runtime.tree.RuleNode
import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET}
Expand Down Expand Up @@ -75,8 +76,9 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
// Trigger auto refresh if enabled and not using external scheduler
if (indexOptions
.autoRefresh() && SchedulerMode.INTERNAL.equals(indexOptions.schedulerMode().get)) {
val indexName = getSkippingIndexName(flint, ctx.tableName)
flint.refreshIndex(indexName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {

test("should return lowercase name as option name") {
AUTO_REFRESH.toString shouldBe "auto_refresh"
SCHEDULER_MODE.toString shouldBe "scheduler_mode"
REFRESH_INTERVAL.toString shouldBe "refresh_interval"
INCREMENTAL_REFRESH.toString shouldBe "incremental_refresh"
CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location"
Expand All @@ -27,6 +28,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
val options = FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "external",
"refresh_interval" -> "1 Minute",
"incremental_refresh" -> "true",
"checkpoint_location" -> "s3://test/",
Expand All @@ -45,6 +47,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
| }""".stripMargin))

options.autoRefresh() shouldBe true
options.schedulerMode() shouldBe Some("external")
options.refreshInterval() shouldBe Some("1 Minute")
options.incrementalRefresh() shouldBe true
options.checkpointLocation() shouldBe Some("s3://test/")
Expand Down Expand Up @@ -73,6 +76,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
val options = FlintSparkIndexOptions(Map.empty)

options.autoRefresh() shouldBe false
options.schedulerMode() shouldBe Some("internal")
options.refreshInterval() shouldBe empty
options.checkpointLocation() shouldBe empty
options.watermarkDelay() shouldBe empty
Expand All @@ -92,6 +96,27 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers {
"refresh_interval" -> "1 Minute")
}

test("should return include default scheduler_mode option when auto refresh is set to true") {
val options = FlintSparkIndexOptions(Map("auto_refresh" -> "true"))

options.optionsWithDefault shouldBe Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "internal",
"incremental_refresh" -> "false")
}

test(
"should return include default refresh_interval option with auto_refresh=true and scheduler_mode=external") {
val options =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external"))

options.optionsWithDefault shouldBe Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "external",
"refresh_interval" -> "15 minutes",
"incremental_refresh" -> "false")
}

test("should report error if any unknown option name") {
the[IllegalArgumentException] thrownBy
FlintSparkIndexOptions(Map("autoRefresh" -> "true"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,23 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
}

test("auto refresh covering index successfully with external scheduler") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external")))
.create()

val jobId = flint.refreshIndex(testFlintIndex)
jobId shouldBe None

val indexData = flint.queryIndex(testFlintIndex)
checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
}

test("update covering index successfully") {
// Create full refresh Flint index
flint
Expand Down

0 comments on commit d74bcad

Please sign in to comment.