From 5c190b3891852013bf983c923af18537c4e302ca Mon Sep 17 00:00:00 2001
From: Louis Chu <clingzhi@amazon.com>
Date: Tue, 20 Aug 2024 20:08:01 -0700
Subject: [PATCH] Add conf for specifying flint checkpoint location (#577)

* Add conf for specifying flint checkpoint location

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Add IT and public doc

Signed-off-by: Louis Chu <clingzhi@amazon.com>

---------

Signed-off-by: Louis Chu <clingzhi@amazon.com>
---
 docs/index.md                                 |  3 +-
 .../sql/flint/config/FlintSparkConf.scala     |  6 ++
 .../flint/spark/FlintSparkIndexBuilder.scala  | 42 +++++++++++++-
 .../FlintSparkCoveringIndexAstBuilder.scala   |  2 +-
 ...FlintSparkMaterializedViewAstBuilder.scala |  5 +-
 .../FlintSparkSkippingIndexAstBuilder.scala   |  5 +-
 .../spark/FlintSparkIndexBuilderSuite.scala   | 56 +++++++++++++++++++
 .../FlintSparkCoveringIndexITSuite.scala      | 47 +++++++++++++++-
 .../spark/FlintSparkIndexJobITSuite.scala     |  6 +-
 .../spark/FlintSparkIndexMonitorITSuite.scala |  2 +-
 .../spark/FlintSparkIndexSqlITSuite.scala     | 12 +++-
 .../FlintSparkMaterializedViewITSuite.scala   | 34 ++++++++++-
 .../FlintSparkSkippingIndexITSuite.scala      | 54 +++++++++++++++---
 .../spark/FlintSparkTransactionITSuite.scala  |  9 +--
 .../spark/FlintSparkUpdateIndexITSuite.scala  | 33 +++++++----
 15 files changed, 272 insertions(+), 44 deletions(-)

diff --git a/docs/index.md b/docs/index.md
index 3bfa0b468..e0f8dd406 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -538,7 +538,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
 - `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. 
 - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance.
 - `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance.
-- `spark.flint.index.hybridscan.enabled`: default is false.
+- `spark.flint.index.hybridscan.enabled`: default is false. 
+- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '<rootDir>/<indexName>/<UUID>' to isolate checkpoint data.
 - `spark.flint.index.checkpoint.mandatory`: default is true.
 - `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
 - `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
index 1d12d004e..0bfaf38e6 100644
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
@@ -167,6 +167,10 @@ object FlintSparkConf {
     .doc("Enable hybrid scan to include latest source data not refreshed to index yet")
     .createWithDefault("false")
 
+  val CHECKPOINT_LOCATION_ROOT_DIR = FlintConfig("spark.flint.index.checkpointLocation.rootDir")
+    .doc("Root directory of a user specified checkpoint location for index refresh")
+    .createOptional()
+
   val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory")
     .doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
     .createWithDefault("true")
@@ -261,6 +265,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
 
   def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean
 
+  def checkpointLocationRootDir: Option[String] = CHECKPOINT_LOCATION_ROOT_DIR.readFrom(reader)
+
   def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean
 
   def monitorInitialDelaySeconds(): Int = MONITOR_INITIAL_DELAY_SECONDS.readFrom(reader).toInt
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala
index 106df276d..c03dece89 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala
@@ -5,14 +5,18 @@
 
 package org.opensearch.flint.spark
 
+import java.util.{Collections, UUID}
+
 import scala.collection.JavaConverters.mapAsJavaMapConverter
 
+import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION
 import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
 import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
 
 import org.apache.spark.sql.catalog.Column
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.flint.{findField, loadTable, parseTableName, qualifyTableName}
+import org.apache.spark.sql.flint.config.FlintSparkConf
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
@@ -48,8 +52,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
    * @return
    *   builder
    */
-  def options(options: FlintSparkIndexOptions): this.type = {
-    this.indexOptions = options
+  def options(options: FlintSparkIndexOptions, indexName: String): this.type = {
+    val updatedOptions = updateOptionWithDefaultCheckpointLocation(indexName, options)
+    this.indexOptions = updatedOptions
     this
   }
 
@@ -139,4 +144,37 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
       isPartition = false, // useless for now so just set to false
       isBucket = false)
   }
+
+  /**
+   * Updates the options with a default checkpoint location if not already set.
+   *
+   * @param indexName
+   *   The index name string
+   * @param options
+   *   The original FlintSparkIndexOptions
+   * @return
+   *   Updated FlintSparkIndexOptions
+   */
+  private def updateOptionWithDefaultCheckpointLocation(
+      indexName: String,
+      options: FlintSparkIndexOptions): FlintSparkIndexOptions = {
+
+    val checkpointLocationRootDirOption = new FlintSparkConf(
+      Collections.emptyMap[String, String]).checkpointLocationRootDir
+
+    if (options.checkpointLocation().isEmpty) {
+      checkpointLocationRootDirOption match {
+        case Some(checkpointLocationRootDir) =>
+          // Currently, deleting and recreating the flint index will enter same checkpoint dir.
+          // Use a UUID to isolate checkpoint data.
+          val checkpointLocation =
+            s"${checkpointLocationRootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}"
+          FlintSparkIndexOptions(
+            options.options + (CHECKPOINT_LOCATION.toString -> checkpointLocation))
+        case None => options
+      }
+    } else {
+      options
+    }
+  }
 }
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala
index daac87395..46974a105 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala
@@ -47,7 +47,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
       val ignoreIfExists = ctx.EXISTS() != null
       val indexOptions = visitPropertyList(ctx.propertyList())
       indexBuilder
-        .options(indexOptions)
+        .options(indexOptions, indexName)
         .create(ignoreIfExists)
 
       // Trigger auto refresh if enabled and not using external scheduler
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala
index 4ef0b003e..9d567c86a 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala
@@ -39,14 +39,15 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
 
       val ignoreIfExists = ctx.EXISTS() != null
       val indexOptions = visitPropertyList(ctx.propertyList())
+      val flintIndexName = getFlintIndexName(flint, ctx.mvName)
+
       mvBuilder
-        .options(indexOptions)
+        .options(indexOptions, flintIndexName)
         .create(ignoreIfExists)
 
       // Trigger auto refresh if enabled and not using external scheduler
       if (indexOptions
           .autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) {
-        val flintIndexName = getFlintIndexName(flint, ctx.mvName)
         flint.refreshIndex(flintIndexName)
       }
       Seq.empty
diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala
index 5c4613504..cb1489a27 100644
--- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala
+++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala
@@ -72,14 +72,15 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
 
       val ignoreIfExists = ctx.EXISTS() != null
       val indexOptions = visitPropertyList(ctx.propertyList())
+      val indexName = getSkippingIndexName(flint, ctx.tableName)
+
       indexBuilder
-        .options(indexOptions)
+        .options(indexOptions, indexName)
         .create(ignoreIfExists)
 
       // Trigger auto refresh if enabled and not using external scheduler
       if (indexOptions
           .autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) {
-        val indexName = getSkippingIndexName(flint, ctx.tableName)
         flint.refreshIndex(indexName)
       }
       Seq.empty
diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala
index a4ca4430a..21469c87b 100644
--- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala
+++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala
@@ -5,12 +5,17 @@
 
 package org.opensearch.flint.spark
 
+import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION
 import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
 
 import org.apache.spark.FlintSuite
+import org.apache.spark.sql.flint.config.FlintSparkConf
 
 class FlintSparkIndexBuilderSuite extends FlintSuite {
 
+  val indexName: String = "test_index"
+  val testCheckpointLocation = "/test/checkpoints/"
+
   override def beforeAll(): Unit = {
     super.beforeAll()
 
@@ -31,6 +36,56 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
     super.afterAll()
   }
 
+  test("indexOptions should not have checkpoint location when no conf") {
+    assert(!conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))
+
+    val options = FlintSparkIndexOptions(Map.empty)
+    val builder = new FakeFlintSparkIndexBuilder
+
+    val updatedOptions = builder.options(options, indexName).testOptions
+    updatedOptions.checkpointLocation() shouldBe None
+  }
+
+  test("indexOptions should not override existing checkpoint location when no conf") {
+    assert(!conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))
+
+    val options =
+      FlintSparkIndexOptions(Map(CHECKPOINT_LOCATION.toString -> testCheckpointLocation))
+    val builder = new FakeFlintSparkIndexBuilder
+
+    val updatedOptions = builder.options(options, indexName).testOptions
+    updatedOptions.checkpointLocation() shouldBe Some(testCheckpointLocation)
+  }
+
+  test("indexOptions should not override existing checkpoint location with conf") {
+    conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
+    assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))
+
+    val options =
+      FlintSparkIndexOptions(Map(CHECKPOINT_LOCATION.toString -> testCheckpointLocation))
+    val builder = new FakeFlintSparkIndexBuilder
+
+    val updatedOptions = builder.options(options, indexName).testOptions
+    updatedOptions.checkpointLocation() shouldBe Some(testCheckpointLocation)
+  }
+
+  test("indexOptions should have default checkpoint location with conf") {
+    conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation)
+    assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key))
+
+    val options = FlintSparkIndexOptions(Map.empty)
+    val builder = new FakeFlintSparkIndexBuilder
+
+    val updatedOptions = builder.options(options, indexName).testOptions
+    assert(updatedOptions.checkpointLocation().isDefined, "Checkpoint location should be defined")
+    assert(
+      updatedOptions
+        .checkpointLocation()
+        .get
+        .startsWith(s"${testCheckpointLocation}${indexName}"),
+      s"Checkpoint location should start with ${testCheckpointLocation}${indexName}")
+  }
+
   test("find column type") {
     builder()
       .onTable("test")
@@ -94,6 +149,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite {
    * Fake builder that have access to internal method for assertion
    */
   class FakeFlintSparkIndexBuilder extends FlintSparkIndexBuilder(new FlintSpark(spark)) {
+    def testOptions: FlintSparkIndexOptions = this.indexOptions
 
     def onTable(tableName: String): FakeFlintSparkIndexBuilder = {
       this.tableName = tableName
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala
index 9c91a129e..648c21419 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala
@@ -15,6 +15,7 @@ import org.scalatest.matchers.must.Matchers.defined
 import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.flint.config.FlintSparkConf
 
 class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
 
@@ -104,7 +105,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
       .name(testIndex)
       .onTable(testTable)
       .addIndexColumns("name", "age")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
       .create()
 
     val jobId = flint.refreshIndex(testFlintIndex)
@@ -117,6 +118,47 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
 
     val indexData = flint.queryIndex(testFlintIndex)
     checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
+
+    val indexOptions = flint.describeIndex(testFlintIndex)
+    indexOptions shouldBe defined
+    indexOptions.get.options.checkpointLocation() shouldBe None
+  }
+
+  test("create covering index with default checkpoint location successfully") {
+    withTempDir { checkpointDir =>
+      conf.setConfString(
+        FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
+        checkpointDir.getAbsolutePath)
+      flint
+        .coveringIndex()
+        .name(testIndex)
+        .onTable(testTable)
+        .addIndexColumns("name", "age")
+        .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex)
+        .create()
+
+      val jobId = flint.refreshIndex(testFlintIndex)
+      jobId shouldBe defined
+
+      val job = spark.streams.get(jobId.get)
+      failAfter(streamingTimeout) {
+        job.processAllAvailable()
+      }
+
+      val indexData = flint.queryIndex(testFlintIndex)
+      checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25)))
+
+      val index = flint.describeIndex(testFlintIndex)
+      index shouldBe defined
+
+      val checkpointLocation = index.get.options.checkpointLocation()
+      assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
+      assert(
+        checkpointLocation.get.contains(testFlintIndex),
+        s"Checkpoint location dir should contain ${testFlintIndex}")
+
+      conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
+    }
   }
 
   test("auto refresh covering index successfully with external scheduler") {
@@ -131,7 +173,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
             Map(
               "auto_refresh" -> "true",
               "scheduler_mode" -> "external",
-              "checkpoint_location" -> checkpointDir.getAbsolutePath)))
+              "checkpoint_location" -> checkpointDir.getAbsolutePath)),
+          testIndex)
         .create()
 
       val jobId = flint.refreshIndex(testFlintIndex)
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala
index f676d9ad5..a7e508596 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala
@@ -51,7 +51,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
       .skippingIndex()
       .onTable(testTable)
       .addPartitions("year")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
       .create()
 
     flint.recoverIndex(testIndex) shouldBe true
@@ -65,7 +65,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
         .skippingIndex()
         .onTable(testTable)
         .addPartitions("year")
-        .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+        .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
         .create()
 
       updateLatestLogEntry(
@@ -88,7 +88,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers
         .skippingIndex()
         .onTable(testTable)
         .addPartitions("year")
-        .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+        .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
         .create()
 
       updateLatestLogEntry(
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala
index 8d8311b11..88c24dc95 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala
@@ -53,7 +53,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
       .skippingIndex()
       .onTable(testTable)
       .addValueSet("name")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex)
       .create()
     flint.refreshIndex(testFlintIndex)
 
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala
index ce1cbb2ea..2e84118ec 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala
@@ -109,7 +109,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
       .skippingIndex()
       .onTable(testTableQualifiedName)
       .addValueSet("name")
-      .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
+      .options(
+        FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")),
+        testSkippingFlintIndex)
       .create()
     flint.refreshIndex(testSkippingFlintIndex)
     val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex)
@@ -153,7 +155,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
       .name(testCoveringIndex)
       .onTable(testTableQualifiedName)
       .addIndexColumns("name", "age")
-      .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
+      .options(
+        FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")),
+        testCoveringFlintIndex)
       .create()
     flint.refreshIndex(testCoveringFlintIndex)
 
@@ -228,7 +232,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {
       .name(testCoveringIndexSpecial)
       .onTable(testTableQualifiedName)
       .addIndexColumns("name", "age")
-      .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
+      .options(
+        FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")),
+        testCoveringFlintIndexSpecial)
       .create()
     flint.refreshIndex(testCoveringFlintIndexSpecial)
 
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala
index 605975af6..fd06d8eeb 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala
@@ -11,11 +11,13 @@ import java.util.Base64
 import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
 import org.opensearch.flint.common.FlintVersion.current
 import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService
+import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION
 import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName
 import org.scalatest.matchers.must.Matchers.defined
 import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
 
 import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.flint.config.FlintSparkConf
 
 class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
 
@@ -55,7 +57,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
         .materializedView()
         .name(testMvName)
         .query(testQuery)
-        .options(indexOptions)
+        .options(indexOptions, testFlintIndex)
         .create()
 
       val index = flint.describeIndex(testFlintIndex)
@@ -99,6 +101,34 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
     }
   }
 
+  test("create materialized view with default checkpoint location successfully") {
+    withTempDir { checkpointDir =>
+      conf.setConfString(
+        FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
+        checkpointDir.getAbsolutePath)
+
+      val indexOptions =
+        FlintSparkIndexOptions(Map("auto_refresh" -> "true", "watermark_delay" -> "30 Seconds"))
+      flint
+        .materializedView()
+        .name(testMvName)
+        .query(testQuery)
+        .options(indexOptions, testFlintIndex)
+        .create()
+
+      val index = flint.describeIndex(testFlintIndex)
+      index shouldBe defined
+
+      val checkpointLocation = index.get.options.checkpointLocation()
+      assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
+      assert(
+        checkpointLocation.get.contains(testFlintIndex),
+        s"Checkpoint location dir should contain ${testFlintIndex}")
+
+      conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
+    }
+  }
+
   test("full refresh materialized view") {
     flint
       .materializedView()
@@ -255,7 +285,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
         .materializedView()
         .name(testMvName)
         .query(query)
-        .options(indexOptions)
+        .options(indexOptions, testFlintIndex)
         .create()
 
       flint
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala
index 968f09345..833a85a84 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.flint.config.FlintSparkConf
 import org.apache.spark.sql.flint.config.FlintSparkConf._
 import org.apache.spark.sql.functions.{col, isnull, lit, xxhash64}
 import org.apache.spark.sql.internal.SQLConf
@@ -146,11 +147,13 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
         .skippingIndex()
         .onTable(testTable)
         .addValueSet("address")
-        .options(FlintSparkIndexOptions(Map(
-          "auto_refresh" -> "true",
-          "refresh_interval" -> "1 Minute",
-          "checkpoint_location" -> checkpointDir.getAbsolutePath,
-          "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")))
+        .options(
+          FlintSparkIndexOptions(Map(
+            "auto_refresh" -> "true",
+            "refresh_interval" -> "1 Minute",
+            "checkpoint_location" -> checkpointDir.getAbsolutePath,
+            "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")),
+          testIndex)
         .create()
 
       val index = flint.describeIndex(testIndex)
@@ -181,6 +184,37 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
     }
   }
 
+  test("create skipping index with default checkpoint location successfully") {
+    withTempDir { checkpointDir =>
+      conf.setConfString(
+        FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key,
+        checkpointDir.getAbsolutePath)
+      flint
+        .skippingIndex()
+        .onTable(testTable)
+        .addValueSet("address")
+        .options(
+          FlintSparkIndexOptions(
+            Map(
+              "auto_refresh" -> "true",
+              "refresh_interval" -> "1 Minute",
+              "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")),
+          testIndex)
+        .create()
+
+      val index = flint.describeIndex(testIndex)
+      index shouldBe defined
+
+      val checkpointLocation = index.get.options.checkpointLocation()
+      assert(checkpointLocation.isDefined, "Checkpoint location should be defined")
+      assert(
+        checkpointLocation.get.contains(testIndex),
+        s"Checkpoint location dir should contain ${testIndex}")
+
+      conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)
+    }
+  }
+
   test("should not have ID column in index data") {
     flint
       .skippingIndex()
@@ -218,7 +252,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
           FlintSparkIndexOptions(
             Map(
               "incremental_refresh" -> "true",
-              "checkpoint_location" -> checkpointDir.getAbsolutePath)))
+              "checkpoint_location" -> checkpointDir.getAbsolutePath)),
+          testIndex)
         .create()
 
       flint.refreshIndex(testIndex) shouldBe empty
@@ -246,7 +281,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
         .skippingIndex()
         .onTable(testTable)
         .addPartitions("year", "month")
-        .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true")))
+        .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true")), testIndex)
         .create()
     } should have message "requirement failed: Checkpoint location is required"
   }
@@ -257,7 +292,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
       .skippingIndex()
       .onTable(testTable)
       .addPartitions("year", "month")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
       .create()
 
     val jobId = flint.refreshIndex(testIndex)
@@ -283,7 +318,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
             Map(
               "auto_refresh" -> "true",
               "scheduler_mode" -> "external",
-              "checkpoint_location" -> checkpointDir.getAbsolutePath)))
+              "checkpoint_location" -> checkpointDir.getAbsolutePath)),
+          testIndex)
         .create()
 
       flint.refreshIndex(testIndex) shouldBe empty
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala
index 741db4bd1..b543ba87c 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala
@@ -87,7 +87,8 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
           FlintSparkIndexOptions(
             Map(
               "incremental_refresh" -> "true",
-              "checkpoint_location" -> checkpointDir.getAbsolutePath)))
+              "checkpoint_location" -> checkpointDir.getAbsolutePath)),
+          testFlintIndex)
         .create()
       flint.refreshIndex(testFlintIndex)
 
@@ -102,7 +103,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
       .skippingIndex()
       .onTable(testTable)
       .addPartitions("year", "month")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex)
       .create()
     flint.refreshIndex(testFlintIndex)
 
@@ -143,7 +144,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
       .skippingIndex()
       .onTable(testTable)
       .addPartitions("year", "month")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex)
       .create()
     flint.refreshIndex(testFlintIndex)
 
@@ -217,7 +218,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match
       .skippingIndex()
       .onTable(testTable)
       .addPartitions("year", "month")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex)
       .create()
     flint.refreshIndex(testFlintIndex)
 
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala
index f2ed92adc..7bbf24567 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala
@@ -40,12 +40,14 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
         .skippingIndex()
         .onTable(testTable)
         .addValueSet("address")
-        .options(FlintSparkIndexOptions(Map(
-          "auto_refresh" -> "false",
-          "incremental_refresh" -> "true",
-          "refresh_interval" -> "1 Minute",
-          "checkpoint_location" -> checkpointDir.getAbsolutePath,
-          "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")))
+        .options(
+          FlintSparkIndexOptions(Map(
+            "auto_refresh" -> "false",
+            "incremental_refresh" -> "true",
+            "refresh_interval" -> "1 Minute",
+            "checkpoint_location" -> checkpointDir.getAbsolutePath,
+            "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")),
+          testIndex)
         .create()
       val indexInitial = flint.describeIndex(testIndex).get
 
@@ -148,7 +150,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
                 .get("checkpoint_location")
                 .map(_ =>
                   initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath))
-                .getOrElse(initialOptionsMap)))
+                .getOrElse(initialOptionsMap)),
+              testIndex)
             .create()
           flint.refreshIndex(testIndex)
 
@@ -280,7 +283,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
                 .get("checkpoint_location")
                 .map(_ =>
                   initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath))
-                .getOrElse(initialOptionsMap)))
+                .getOrElse(initialOptionsMap)),
+              testIndex)
             .create()
           flint.refreshIndex(testIndex)
 
@@ -397,7 +401,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
           FlintSparkIndexOptions(
             Map(
               "incremental_refresh" -> "true",
-              "checkpoint_location" -> checkpointDir.getAbsolutePath)))
+              "checkpoint_location" -> checkpointDir.getAbsolutePath)),
+          testIndex)
         .create()
 
       flint.refreshIndex(testIndex) shouldBe empty
@@ -440,7 +445,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
       .skippingIndex()
       .onTable(testTable)
       .addPartitions("year", "month")
-      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")))
+      .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex)
       .create()
 
     val jobId = flint.refreshIndex(testIndex)
@@ -484,8 +489,12 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
         .skippingIndex()
         .onTable(testTable)
         .addPartitions("year", "month")
-        .options(FlintSparkIndexOptions(
-          Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath)))
+        .options(
+          FlintSparkIndexOptions(
+            Map(
+              "auto_refresh" -> "true",
+              "checkpoint_location" -> checkpointDir.getAbsolutePath)),
+          testIndex)
         .create()
 
       val jobId = flint.refreshIndex(testIndex)