diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index c03dece89..eeac3e2cd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import java.util.{Collections, UUID} +import java.util.Collections import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -81,8 +81,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { index: FlintSparkIndex, updateOptions: FlintSparkIndexOptions): FlintSparkIndex = { val originalOptions = index.options - val updatedOptions = - originalOptions.copy(options = originalOptions.options ++ updateOptions.options) + val updatedOptions = updateOptionWithDefaultCheckpointLocation( + index.name(), + originalOptions.copy(options = originalOptions.options ++ updateOptions.options)) val updatedMetadata = index .metadata() .copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava) @@ -159,22 +160,13 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { indexName: String, options: FlintSparkIndexOptions): FlintSparkIndexOptions = { - val checkpointLocationRootDirOption = new FlintSparkConf( - Collections.emptyMap[String, String]).checkpointLocationRootDir - - if (options.checkpointLocation().isEmpty) { - checkpointLocationRootDirOption match { - case Some(checkpointLocationRootDir) => - // Currently, deleting and recreating the flint index will enter same checkpoint dir. - // Use a UUID to isolate checkpoint data. - val checkpointLocation = - s"${checkpointLocationRootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}" - FlintSparkIndexOptions( - options.options + (CHECKPOINT_LOCATION.toString -> checkpointLocation)) - case None => options - } - } else { - options + val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String]) + val checkpointLocation = options.checkpointLocation(indexName, flintSparkConf) + + checkpointLocation match { + case Some(location) => + FlintSparkIndexOptions(options.options + (CHECKPOINT_LOCATION.toString -> location)) + case None => options } } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala index 8bf09caf9..a7f00da1e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexOptions.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import java.util.UUID + import org.json4s.{Formats, NoTypeHints} import org.json4s.native.JsonMethods._ import org.json4s.native.Serialization @@ -12,6 +14,8 @@ import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.{AUTO_REFRES import org.opensearch.flint.spark.FlintSparkIndexOptions.validateOptionNames import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode +import org.apache.spark.sql.flint.config.FlintSparkConf + /** * Flint Spark index configurable options. * @@ -165,6 +169,18 @@ case class FlintSparkIndexOptions(options: Map[String, String]) { case None => // no action needed if modeStr is empty } } + + def checkpointLocation(indexName: String, flintSparkConf: FlintSparkConf): Option[String] = { + options.get(CHECKPOINT_LOCATION.toString) match { + case Some(location) => Some(location) + case None => + // Currently, deleting and recreating the flint index will enter same checkpoint dir. + // Use a UUID to isolate checkpoint data. + flintSparkConf.checkpointLocationRootDir.map { rootDir => + s"${rootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}" + } + } + } } object FlintSparkIndexOptions { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala index 21469c87b..5287919ce 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala @@ -58,7 +58,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite { } test("indexOptions should not override existing checkpoint location with conf") { - conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation) + setFlintSparkConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, testCheckpointLocation) assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)) val options = @@ -70,7 +70,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite { } test("indexOptions should have default checkpoint location with conf") { - conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation) + setFlintSparkConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, testCheckpointLocation) assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)) val options = FlintSparkIndexOptions(Map.empty) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 648c21419..149686051 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -126,8 +126,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { test("create covering index with default checkpoint location successfully") { withTempDir { checkpointDir => - conf.setConfString( - FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, + setFlintSparkConf( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, checkpointDir.getAbsolutePath) flint .coveringIndex() @@ -213,6 +213,59 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) } + test("update covering index successfully with custom checkpoint location") { + withTempDir { checkpointDir => + // 1. Create an full refresh CV + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .options(FlintSparkIndexOptions.empty, testFlintIndex) + .create() + var indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq()) + + var index = flint.describeIndex(testFlintIndex) + var checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined") + + // 2. Update the spark conf with a custom checkpoint location + setFlintSparkConf( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, + checkpointDir.getAbsolutePath) + + index = flint.describeIndex(testFlintIndex) + checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined") + + // 3. Update index to auto refresh + val updatedIndex = flint + .coveringIndex() + .copyWithUpdate(index.get, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + + index = flint.describeIndex(testFlintIndex) + + checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isDefined, "Checkpoint location should be defined") + assert( + checkpointLocation.get.contains(testFlintIndex), + s"Checkpoint location dir should contain ${testFlintIndex}") + + conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key) + } + } + test("can have multiple covering indexes on a table") { flint .coveringIndex() diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index fd06d8eeb..ff55e3020 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -103,8 +103,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { test("create materialized view with default checkpoint location successfully") { withTempDir { checkpointDir => - conf.setConfString( - FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, + setFlintSparkConf( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, checkpointDir.getAbsolutePath) val indexOptions = @@ -268,6 +268,70 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } + test("update materialized view successfully with custom checkpoint location") { + withTempDir { checkpointDir => + // 1. Create full refresh MV + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .options(FlintSparkIndexOptions.empty, testFlintIndex) + .create() + var indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq()) + + var index = flint.describeIndex(testFlintIndex) + var checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined") + + // 2. Update the spark conf with a custom checkpoint location + setFlintSparkConf( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, + checkpointDir.getAbsolutePath) + + index = flint.describeIndex(testFlintIndex) + checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined") + + // 3. Update Flint index to auto refresh and wait for complete + val updatedIndex = flint + .materializedView() + .copyWithUpdate( + index.get, + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "watermark_delay" -> "1 Minute"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + indexData = flint.queryIndex(testFlintIndex) + checkAnswer( + indexData.select("startTime", "count"), + Seq( + Row(timestamp("2023-10-01 00:00:00"), 1), + Row(timestamp("2023-10-01 00:10:00"), 2), + Row(timestamp("2023-10-01 01:00:00"), 1) + /* + * The last row is pending to fire upon watermark + * Row(timestamp("2023-10-01 02:00:00"), 1) + */ + )) + + index = flint.describeIndex(testFlintIndex) + + checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isDefined, "Checkpoint location should be defined") + assert( + checkpointLocation.get.contains(testFlintIndex), + s"Checkpoint location dir should contain ${testFlintIndex}") + + conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key) + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) private def withIncrementalMaterializedView(query: String)( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e6de21210..84594fd24 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -186,8 +186,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { test("create skipping index with default checkpoint location successfully") { withTempDir { checkpointDir => - conf.setConfString( - FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, + setFlintSparkConf( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, checkpointDir.getAbsolutePath) flint .skippingIndex() @@ -369,6 +369,58 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { flint.queryIndex(testIndex).collect().toSet should have size 2 } + test("update skipping index successfully with custom checkpoint location") { + withTempDir { checkpointDir => + // 1. Create full refresh SI + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .options(FlintSparkIndexOptions.empty, testIndex) + .create() + + flint.queryIndex(testIndex).collect().toSet should have size 0 + + var index = flint.describeIndex(testIndex) + var checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined") + + // 2. Update the spark conf with a custom checkpoint location + setFlintSparkConf( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR, + checkpointDir.getAbsolutePath) + + index = flint.describeIndex(testIndex) + checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isEmpty, "Checkpoint location should not be defined") + + // 3. Update Flint index to auto refresh and wait for complete + val updatedIndex = + flint + .skippingIndex() + .copyWithUpdate(index.get, FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + val jobId = flint.updateIndex(updatedIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + flint.queryIndex(testIndex).collect().toSet should have size 2 + + index = flint.describeIndex(testIndex) + + checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isDefined, "Checkpoint location should be defined") + assert( + checkpointLocation.get.contains(testIndex), + s"Checkpoint location dir should contain ${testIndex}") + + conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key) + } + } + test("can have only 1 skipping index on a table") { flint .skippingIndex()