From d74bcad1ce2c01a70eecf0c3f0a05a7d69df8438 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 9 Jul 2024 17:35:17 -0700 Subject: [PATCH] Add scheduler_mode index option Signed-off-by: Louis Chu --- .../opensearch/flint/spark/FlintSpark.scala | 4 ++- .../flint/spark/FlintSparkIndexOptions.scala | 30 ++++++++++++++++- .../spark/refresh/AutoIndexRefresh.scala | 32 ++++++++++++++----- .../refresh/FlintSparkIndexRefresh.scala | 20 ++++++++++++ .../refresh/IncrementalIndexRefresh.scala | 1 + .../FlintSparkCoveringIndexAstBuilder.scala | 6 ++-- ...FlintSparkMaterializedViewAstBuilder.scala | 6 ++-- .../FlintSparkSkippingIndexAstBuilder.scala | 6 ++-- .../spark/FlintSparkIndexOptionsSuite.scala | 25 +++++++++++++++ .../FlintSparkCoveringIndexITSuite.scala | 17 ++++++++++ 10 files changed, 131 insertions(+), 16 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index b2e310cc7..0fab94c49 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -22,6 +22,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._ +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy @@ -140,7 +141,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) .finalLog(latest => { // Change state to active if full, otherwise update index state regularly - if (indexRefresh.refreshMode == AUTO) { + if (indexRefresh.refreshMode == AUTO && SchedulerMode.INTERNAL.equals( + index.options.schedulerMode().get)) { logInfo("Scheduling index state monitor") flintIndexMonitor.startMonitor(indexName) latest diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 9107a8a66..3f3f2dcdc 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -8,8 +8,9 @@ package org.opensearch.flint.spark import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization -import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, WATERMARK_DELAY} +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRESH, CHECKPOINT_LOCATION, EXTRA_OPTIONS, INCREMENTAL_REFRESH, INDEX_SETTINGS, OptionName, OUTPUT_MODE, REFRESH_INTERVAL, SCHEDULER_MODE, WATERMARK_DELAY} import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode /** * Flint Spark index configurable options. @@ -31,6 +32,17 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { */ def autoRefresh(): Boolean = getOptionValue(AUTO_REFRESH).getOrElse("false").toBoolean + /** + * The scheduler mode for the Flint index refresh. + * + * @return + * scheduler mode option value + */ + def schedulerMode(): Option[String] = { + // TODO: Change default value to external once the external scheduler is enabled + getOptionValue(SCHEDULER_MODE).orElse(Some("internal")) + } + /** * The refresh interval (only valid if auto refresh enabled). * @@ -112,6 +124,21 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { if (!options.contains(AUTO_REFRESH.toString)) { map += (AUTO_REFRESH.toString -> autoRefresh().toString) } + + // Add default option only when auto refresh is TRUE + if (autoRefresh() == true) { + if (!options.contains(SCHEDULER_MODE.toString)) { + map += (SCHEDULER_MODE.toString -> schedulerMode().get) + } + + // The query will be executed in micro-batch mode using the internal scheduler + // The default interval for the external scheduler is 15 minutes. + if (SchedulerMode.EXTERNAL.equals(schedulerMode().get) && !options.contains( + REFRESH_INTERVAL.toString)) { + map += (REFRESH_INTERVAL.toString -> "15 minutes") + } + } + if (!options.contains(INCREMENTAL_REFRESH.toString)) { map += (INCREMENTAL_REFRESH.toString -> incrementalRefresh().toString) } @@ -142,6 +169,7 @@ object FlintSparkIndexOptions { object OptionName extends Enumeration { type OptionName = Value val AUTO_REFRESH: OptionName.Value = Value("auto_refresh") + val SCHEDULER_MODE: OptionName.Value = Value("scheduler_mode") val REFRESH_INTERVAL: OptionName.Value = Value("refresh_interval") val INCREMENTAL_REFRESH: OptionName.Value = Value("incremental_refresh") val CHECKPOINT_LOCATION: OptionName.Value = Value("checkpoint_location") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala index dd565fd18..b86b4c1e9 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/AutoIndexRefresh.scala @@ -10,6 +10,7 @@ import java.util.Collections import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper} import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh} import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode} +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE @@ -43,10 +44,11 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) !isTableProviderSupported(spark, index), "Index auto refresh doesn't support Hive table") - // Checkpoint location is required if mandatory option set + // Checkpoint location is required if mandatory option set or external scheduler is used val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) val checkpointLocation = options.checkpointLocation() - if (flintSparkConf.isCheckpointMandatory) { + if (flintSparkConf.isCheckpointMandatory || SchedulerMode.EXTERNAL.equals( + options.schedulerMode().get)) { require( checkpointLocation.isDefined, s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled") @@ -63,6 +65,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = { val options = index.options val tableName = index.metadata().source + var jobId: Option[String] = None // Store the job ID here to use later + index match { // Flint index has specialized logic and capability for incremental refresh case refresh: StreamingRefresh => @@ -76,7 +80,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) .options(flintSparkConf.properties) .addSinkOptions(options, flintSparkConf) .start(indexName) - Some(job.id.toString) + jobId = Some(job.id.toString) // Otherwise, fall back to foreachBatch + batch refresh case _ => @@ -90,10 +94,20 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) .foreachBatch { (batchDF: DataFrame, _: Long) => new FullIndexRefresh(indexName, index, Some(batchDF)) .start(spark, flintSparkConf) - () // discard return value above and return unit to use right overridden method + () // discard return value above and return unit to use the right overridden method } .start() - Some(job.id.toString) + jobId = Some(job.id.toString) + } + + // If EXTERNAL scheduling is set, await termination and return None + if (SchedulerMode.EXTERNAL.equals(options.schedulerMode().get) && jobId.isDefined) { + spark.streams + .get(jobId.get) + .awaitTermination() + None + } else { + jobId } } @@ -103,10 +117,12 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) def addSinkOptions( options: FlintSparkIndexOptions, flintSparkConf: FlintSparkConf): DataStreamWriter[Row] = { + // For incremental refresh, the refresh_interval option is overridden by Trigger.AvailableNow(). dataStream .addCheckpointLocation(options.checkpointLocation(), flintSparkConf.isCheckpointMandatory) .addRefreshInterval(options.refreshInterval()) - .addAvailableNowTrigger(options.incrementalRefresh()) + .addAvailableNowTrigger(SchedulerMode.EXTERNAL.equals( + options.schedulerMode().get) || options.incrementalRefresh()) .addOutputMode(options.outputMode()) .options(options.extraSinkOptions()) } @@ -129,8 +145,8 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) .getOrElse(dataStream) } - def addAvailableNowTrigger(incrementalRefresh: Boolean): DataStreamWriter[Row] = { - if (incrementalRefresh) { + def addAvailableNowTrigger(setAvailableNow: Boolean): DataStreamWriter[Row] = { + if (setAvailableNow) { dataStream.trigger(Trigger.AvailableNow()) } else { dataStream diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala index 0c6adb0bd..570a942d4 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/FlintSparkIndexRefresh.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark.refresh +import java.util.Locale + import org.opensearch.flint.spark.FlintSparkIndex import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.RefreshMode @@ -59,6 +61,24 @@ object FlintSparkIndexRefresh { val AUTO, FULL, INCREMENTAL = Value } + object SchedulerMode extends Enumeration { + type SchedulerMode = Value + val INTERNAL, EXTERNAL = Value + + implicit class SchedulerModeValue(value: SchedulerMode) { + override def toString: String = value.toString.toLowerCase(Locale.ROOT) + + override def equals(obj: Any): Boolean = obj match { + case str: String => + str.toLowerCase(Locale.ROOT) == value.toString.toLowerCase(Locale.ROOT) + case mode: SchedulerMode => mode.toString == value.toString + case _ => false + } + + override def hashCode(): Int = value.toString.toLowerCase(Locale.ROOT).hashCode + } + } + /** * Create concrete index refresh implementation for the given index. * diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala index b52f1ae79..d9b7aea47 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/refresh/IncrementalIndexRefresh.scala @@ -50,6 +50,7 @@ class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex) new AutoIndexRefresh(indexName, index) .start(spark, flintSparkConf) + // Blocks the calling thread until the streaming query finishes spark.streams .get(jobId.get) .awaitTermination() diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index fc200aebf..79444d658 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.sql.covering import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -49,8 +50,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A .options(indexOptions) .create(ignoreIfExists) - // Trigger auto refresh if enabled - if (indexOptions.autoRefresh()) { + // Trigger auto refresh if enabled and not using external scheduler + if (indexOptions + .autoRefresh() && SchedulerMode.INTERNAL.equals(indexOptions.schedulerMode().get)) { val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName) flint.refreshIndex(flintIndexName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index cd4f84028..91df2128d 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -10,6 +10,7 @@ import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, getSqlText, IndexBelongsTo} import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._ @@ -42,8 +43,9 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito .options(indexOptions) .create(ignoreIfExists) - // Trigger auto refresh if enabled - if (indexOptions.autoRefresh()) { + // Trigger auto refresh if enabled and not using external scheduler + if (indexOptions + .autoRefresh() && SchedulerMode.INTERNAL.equals(indexOptions.schedulerMode().get)) { val flintIndexName = getFlintIndexName(flint, ctx.mvName) flint.refreshIndex(flintIndexName) } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 521898aa2..faf377440 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -10,6 +10,7 @@ import scala.collection.JavaConverters.collectionAsScalaIterableConverter import org.antlr.v4.runtime.tree.RuleNode import org.opensearch.flint.core.field.bloomfilter.BloomFilterFactory._ import org.opensearch.flint.spark.FlintSpark +import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{BLOOM_FILTER, MIN_MAX, PARTITION, VALUE_SET} @@ -75,8 +76,9 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A .options(indexOptions) .create(ignoreIfExists) - // Trigger auto refresh if enabled - if (indexOptions.autoRefresh()) { + // Trigger auto refresh if enabled and not using external scheduler + if (indexOptions + .autoRefresh() && SchedulerMode.INTERNAL.equals(indexOptions.schedulerMode().get)) { val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName) } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala index 212d91e13..cd8d11517 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexOptionsSuite.scala @@ -14,6 +14,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { test("should return lowercase name as option name") { AUTO_REFRESH.toString shouldBe "auto_refresh" + SCHEDULER_MODE.toString shouldBe "scheduler_mode" REFRESH_INTERVAL.toString shouldBe "refresh_interval" INCREMENTAL_REFRESH.toString shouldBe "incremental_refresh" CHECKPOINT_LOCATION.toString shouldBe "checkpoint_location" @@ -27,6 +28,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { val options = FlintSparkIndexOptions( Map( "auto_refresh" -> "true", + "scheduler_mode" -> "external", "refresh_interval" -> "1 Minute", "incremental_refresh" -> "true", "checkpoint_location" -> "s3://test/", @@ -45,6 +47,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { | }""".stripMargin)) options.autoRefresh() shouldBe true + options.schedulerMode() shouldBe Some("external") options.refreshInterval() shouldBe Some("1 Minute") options.incrementalRefresh() shouldBe true options.checkpointLocation() shouldBe Some("s3://test/") @@ -73,6 +76,7 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { val options = FlintSparkIndexOptions(Map.empty) options.autoRefresh() shouldBe false + options.schedulerMode() shouldBe Some("internal") options.refreshInterval() shouldBe empty options.checkpointLocation() shouldBe empty options.watermarkDelay() shouldBe empty @@ -92,6 +96,27 @@ class FlintSparkIndexOptionsSuite extends FlintSuite with Matchers { "refresh_interval" -> "1 Minute") } + test("should return include default scheduler_mode option when auto refresh is set to true") { + val options = FlintSparkIndexOptions(Map("auto_refresh" -> "true")) + + options.optionsWithDefault shouldBe Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "internal", + "incremental_refresh" -> "false") + } + + test( + "should return include default refresh_interval option with auto_refresh=true and scheduler_mode=external") { + val options = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external")) + + options.optionsWithDefault shouldBe Map( + "auto_refresh" -> "true", + "scheduler_mode" -> "external", + "refresh_interval" -> "15 minutes", + "incremental_refresh" -> "false") + } + test("should report error if any unknown option name") { the[IllegalArgumentException] thrownBy FlintSparkIndexOptions(Map("autoRefresh" -> "true")) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index e5aa7b4d1..73ab393eb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -118,6 +118,23 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) } + test("auto refresh covering index successfully with external scheduler") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .options( + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "scheduler_mode" -> "external"))) + .create() + + val jobId = flint.refreshIndex(testFlintIndex) + jobId shouldBe None + + val indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + } + test("update covering index successfully") { // Create full refresh Flint index flint