diff --git a/docs/index.md b/docs/index.md index 2c93418fa..3924fd932 100644 --- a/docs/index.md +++ b/docs/index.md @@ -538,7 +538,8 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.read.support_shard`: default is true. set to false if index does not support shard (AWS OpenSearch Serverless collection). Do not use in production, this setting will be removed in later version. - `spark.flint.optimizer.enabled`: default is true. enable the Flint optimizer for improving query performance. - `spark.flint.optimizer.covering.enabled`: default is true. enable the Flint covering index optimizer for improving query performance. -- `spark.flint.index.hybridscan.enabled`: default is false. +- `spark.flint.index.hybridscan.enabled`: default is false. +- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '//' to isolate checkpoint data. - `spark.flint.index.checkpoint.mandatory`: default is true. - `spark.datasource.flint.socket_timeout_millis`: default value is 60000. - `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15. diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 1d12d004e..0bfaf38e6 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -167,6 +167,10 @@ object FlintSparkConf { .doc("Enable hybrid scan to include latest source data not refreshed to index yet") .createWithDefault("false") + val CHECKPOINT_LOCATION_ROOT_DIR = FlintConfig("spark.flint.index.checkpointLocation.rootDir") + .doc("Root directory of a user specified checkpoint location for index refresh") + .createOptional() + val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory") .doc("Checkpoint location for incremental refresh index will be mandatory if enabled") .createWithDefault("true") @@ -261,6 +265,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean + def checkpointLocationRootDir: Option[String] = CHECKPOINT_LOCATION_ROOT_DIR.readFrom(reader) + def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean def monitorInitialDelaySeconds(): Int = MONITOR_INITIAL_DELAY_SECONDS.readFrom(reader).toInt diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 106df276d..c03dece89 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -5,14 +5,18 @@ package org.opensearch.flint.spark +import java.util.{Collections, UUID} + import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh import org.apache.spark.sql.catalog.Column import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.flint.{findField, loadTable, parseTableName, qualifyTableName} +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.types.{StructField, StructType} /** @@ -48,8 +52,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { * @return * builder */ - def options(options: FlintSparkIndexOptions): this.type = { - this.indexOptions = options + def options(options: FlintSparkIndexOptions, indexName: String): this.type = { + val updatedOptions = updateOptionWithDefaultCheckpointLocation(indexName, options) + this.indexOptions = updatedOptions this } @@ -139,4 +144,37 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { isPartition = false, // useless for now so just set to false isBucket = false) } + + /** + * Updates the options with a default checkpoint location if not already set. + * + * @param indexName + * The index name string + * @param options + * The original FlintSparkIndexOptions + * @return + * Updated FlintSparkIndexOptions + */ + private def updateOptionWithDefaultCheckpointLocation( + indexName: String, + options: FlintSparkIndexOptions): FlintSparkIndexOptions = { + + val checkpointLocationRootDirOption = new FlintSparkConf( + Collections.emptyMap[String, String]).checkpointLocationRootDir + + if (options.checkpointLocation().isEmpty) { + checkpointLocationRootDirOption match { + case Some(checkpointLocationRootDir) => + // Currently, deleting and recreating the flint index will enter same checkpoint dir. + // Use a UUID to isolate checkpoint data. + val checkpointLocation = + s"${checkpointLocationRootDir.stripSuffix("/")}/$indexName/${UUID.randomUUID().toString}" + FlintSparkIndexOptions( + options.options + (CHECKPOINT_LOCATION.toString -> checkpointLocation)) + case None => options + } + } else { + options + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index daac87395..46974a105 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -47,7 +47,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A val ignoreIfExists = ctx.EXISTS() != null val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder - .options(indexOptions) + .options(indexOptions, indexName) .create(ignoreIfExists) // Trigger auto refresh if enabled and not using external scheduler diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala index 4ef0b003e..9d567c86a 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/mv/FlintSparkMaterializedViewAstBuilder.scala @@ -39,14 +39,15 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito val ignoreIfExists = ctx.EXISTS() != null val indexOptions = visitPropertyList(ctx.propertyList()) + val flintIndexName = getFlintIndexName(flint, ctx.mvName) + mvBuilder - .options(indexOptions) + .options(indexOptions, flintIndexName) .create(ignoreIfExists) // Trigger auto refresh if enabled and not using external scheduler if (indexOptions .autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) { - val flintIndexName = getFlintIndexName(flint, ctx.mvName) flint.refreshIndex(flintIndexName) } Seq.empty diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index 5c4613504..cb1489a27 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -72,14 +72,15 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A val ignoreIfExists = ctx.EXISTS() != null val indexOptions = visitPropertyList(ctx.propertyList()) + val indexName = getSkippingIndexName(flint, ctx.tableName) + indexBuilder - .options(indexOptions) + .options(indexOptions, indexName) .create(ignoreIfExists) // Trigger auto refresh if enabled and not using external scheduler if (indexOptions .autoRefresh() && SchedulerMode.INTERNAL == indexOptions.schedulerMode()) { - val indexName = getSkippingIndexName(flint, ctx.tableName) flint.refreshIndex(indexName) } Seq.empty diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala index a4ca4430a..21469c87b 100644 --- a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexBuilderSuite.scala @@ -5,12 +5,17 @@ package org.opensearch.flint.spark +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.FlintSuite +import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkIndexBuilderSuite extends FlintSuite { + val indexName: String = "test_index" + val testCheckpointLocation = "/test/checkpoints/" + override def beforeAll(): Unit = { super.beforeAll() @@ -31,6 +36,56 @@ class FlintSparkIndexBuilderSuite extends FlintSuite { super.afterAll() } + test("indexOptions should not have checkpoint location when no conf") { + assert(!conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)) + + val options = FlintSparkIndexOptions(Map.empty) + val builder = new FakeFlintSparkIndexBuilder + + val updatedOptions = builder.options(options, indexName).testOptions + updatedOptions.checkpointLocation() shouldBe None + } + + test("indexOptions should not override existing checkpoint location when no conf") { + assert(!conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)) + + val options = + FlintSparkIndexOptions(Map(CHECKPOINT_LOCATION.toString -> testCheckpointLocation)) + val builder = new FakeFlintSparkIndexBuilder + + val updatedOptions = builder.options(options, indexName).testOptions + updatedOptions.checkpointLocation() shouldBe Some(testCheckpointLocation) + } + + test("indexOptions should not override existing checkpoint location with conf") { + conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation) + assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)) + + val options = + FlintSparkIndexOptions(Map(CHECKPOINT_LOCATION.toString -> testCheckpointLocation)) + val builder = new FakeFlintSparkIndexBuilder + + val updatedOptions = builder.options(options, indexName).testOptions + updatedOptions.checkpointLocation() shouldBe Some(testCheckpointLocation) + } + + test("indexOptions should have default checkpoint location with conf") { + conf.setConfString(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, testCheckpointLocation) + assert(conf.contains(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key)) + + val options = FlintSparkIndexOptions(Map.empty) + val builder = new FakeFlintSparkIndexBuilder + + val updatedOptions = builder.options(options, indexName).testOptions + assert(updatedOptions.checkpointLocation().isDefined, "Checkpoint location should be defined") + assert( + updatedOptions + .checkpointLocation() + .get + .startsWith(s"${testCheckpointLocation}${indexName}"), + s"Checkpoint location should start with ${testCheckpointLocation}${indexName}") + } + test("find column type") { builder() .onTable("test") @@ -94,6 +149,7 @@ class FlintSparkIndexBuilderSuite extends FlintSuite { * Fake builder that have access to internal method for assertion */ class FakeFlintSparkIndexBuilder extends FlintSparkIndexBuilder(new FlintSpark(spark)) { + def testOptions: FlintSparkIndexOptions = this.indexOptions def onTable(tableName: String): FakeFlintSparkIndexBuilder = { this.tableName = tableName diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 9c91a129e..648c21419 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -15,6 +15,7 @@ import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.sql.Row +import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { @@ -104,7 +105,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .name(testIndex) .onTable(testTable) .addIndexColumns("name", "age") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex) .create() val jobId = flint.refreshIndex(testFlintIndex) @@ -117,6 +118,47 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { val indexData = flint.queryIndex(testFlintIndex) checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + + val indexOptions = flint.describeIndex(testFlintIndex) + indexOptions shouldBe defined + indexOptions.get.options.checkpointLocation() shouldBe None + } + + test("create covering index with default checkpoint location successfully") { + withTempDir { checkpointDir => + conf.setConfString( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, + checkpointDir.getAbsolutePath) + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) + .create() + + val jobId = flint.refreshIndex(testFlintIndex) + jobId shouldBe defined + + val job = spark.streams.get(jobId.get) + failAfter(streamingTimeout) { + job.processAllAvailable() + } + + val indexData = flint.queryIndex(testFlintIndex) + checkAnswer(indexData, Seq(Row("Hello", 30), Row("World", 25))) + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + + val checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isDefined, "Checkpoint location should be defined") + assert( + checkpointLocation.get.contains(testFlintIndex), + s"Checkpoint location dir should contain ${testFlintIndex}") + + conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key) + } } test("auto refresh covering index successfully with external scheduler") { @@ -131,7 +173,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "true", "scheduler_mode" -> "external", - "checkpoint_location" -> checkpointDir.getAbsolutePath))) + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) .create() val jobId = flint.refreshIndex(testFlintIndex) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index f676d9ad5..a7e508596 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -51,7 +51,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers .skippingIndex() .onTable(testTable) .addPartitions("year") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex) .create() flint.recoverIndex(testIndex) shouldBe true @@ -65,7 +65,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers .skippingIndex() .onTable(testTable) .addPartitions("year") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex) .create() updateLatestLogEntry( @@ -88,7 +88,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers .skippingIndex() .onTable(testTable) .addPartitions("year") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex) .create() updateLatestLogEntry( diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 8d8311b11..88c24dc95 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -53,7 +53,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc .skippingIndex() .onTable(testTable) .addValueSet("name") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) .create() flint.refreshIndex(testFlintIndex) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala index ce1cbb2ea..2e84118ec 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -109,7 +109,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { .skippingIndex() .onTable(testTableQualifiedName) .addValueSet("name") - .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true"))) + .options( + FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")), + testSkippingFlintIndex) .create() flint.refreshIndex(testSkippingFlintIndex) val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex) @@ -153,7 +155,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { .name(testCoveringIndex) .onTable(testTableQualifiedName) .addIndexColumns("name", "age") - .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true"))) + .options( + FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")), + testCoveringFlintIndex) .create() flint.refreshIndex(testCoveringFlintIndex) @@ -228,7 +232,9 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers { .name(testCoveringIndexSpecial) .onTable(testTableQualifiedName) .addIndexColumns("name", "age") - .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true"))) + .options( + FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")), + testCoveringFlintIndexSpecial) .create() flint.refreshIndex(testCoveringFlintIndexSpecial) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 605975af6..fd06d8eeb 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -11,11 +11,13 @@ import java.util.Base64 import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.common.FlintVersion.current import org.opensearch.flint.core.storage.FlintOpenSearchIndexMetadataService +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.CHECKPOINT_LOCATION import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.flint.config.FlintSparkConf class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { @@ -55,7 +57,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { .materializedView() .name(testMvName) .query(testQuery) - .options(indexOptions) + .options(indexOptions, testFlintIndex) .create() val index = flint.describeIndex(testFlintIndex) @@ -99,6 +101,34 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { } } + test("create materialized view with default checkpoint location successfully") { + withTempDir { checkpointDir => + conf.setConfString( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, + checkpointDir.getAbsolutePath) + + val indexOptions = + FlintSparkIndexOptions(Map("auto_refresh" -> "true", "watermark_delay" -> "30 Seconds")) + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .options(indexOptions, testFlintIndex) + .create() + + val index = flint.describeIndex(testFlintIndex) + index shouldBe defined + + val checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isDefined, "Checkpoint location should be defined") + assert( + checkpointLocation.get.contains(testFlintIndex), + s"Checkpoint location dir should contain ${testFlintIndex}") + + conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key) + } + } + test("full refresh materialized view") { flint .materializedView() @@ -255,7 +285,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { .materializedView() .name(testMvName) .query(query) - .options(indexOptions) + .options(indexOptions, testFlintIndex) .create() flint diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 968f09345..833a85a84 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf._ import org.apache.spark.sql.functions.{col, isnull, lit, xxhash64} import org.apache.spark.sql.internal.SQLConf @@ -146,11 +147,13 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addValueSet("address") - .options(FlintSparkIndexOptions(Map( - "auto_refresh" -> "true", - "refresh_interval" -> "1 Minute", - "checkpoint_location" -> checkpointDir.getAbsolutePath, - "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) + .options( + FlintSparkIndexOptions(Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")), + testIndex) .create() val index = flint.describeIndex(testIndex) @@ -181,6 +184,37 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + test("create skipping index with default checkpoint location successfully") { + withTempDir { checkpointDir => + conf.setConfString( + FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key, + checkpointDir.getAbsolutePath) + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")), + testIndex) + .create() + + val index = flint.describeIndex(testIndex) + index shouldBe defined + + val checkpointLocation = index.get.options.checkpointLocation() + assert(checkpointLocation.isDefined, "Checkpoint location should be defined") + assert( + checkpointLocation.get.contains(testIndex), + s"Checkpoint location dir should contain ${testIndex}") + + conf.unsetConf(FlintSparkConf.CHECKPOINT_LOCATION_ROOT_DIR.key) + } + } + test("should not have ID column in index data") { flint .skippingIndex() @@ -218,7 +252,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { FlintSparkIndexOptions( Map( "incremental_refresh" -> "true", - "checkpoint_location" -> checkpointDir.getAbsolutePath))) + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) .create() flint.refreshIndex(testIndex) shouldBe empty @@ -246,7 +281,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("incremental_refresh" -> "true")), testIndex) .create() } should have message "requirement failed: Checkpoint location is required" } @@ -257,7 +292,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex) .create() val jobId = flint.refreshIndex(testIndex) @@ -283,7 +318,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { Map( "auto_refresh" -> "true", "scheduler_mode" -> "external", - "checkpoint_location" -> checkpointDir.getAbsolutePath))) + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) .create() flint.refreshIndex(testIndex) shouldBe empty diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 741db4bd1..b543ba87c 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -87,7 +87,8 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match FlintSparkIndexOptions( Map( "incremental_refresh" -> "true", - "checkpoint_location" -> checkpointDir.getAbsolutePath))) + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testFlintIndex) .create() flint.refreshIndex(testFlintIndex) @@ -102,7 +103,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) .create() flint.refreshIndex(testFlintIndex) @@ -143,7 +144,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) .create() flint.refreshIndex(testFlintIndex) @@ -217,7 +218,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testFlintIndex) .create() flint.refreshIndex(testFlintIndex) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index f2ed92adc..7bbf24567 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -40,12 +40,14 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addValueSet("address") - .options(FlintSparkIndexOptions(Map( - "auto_refresh" -> "false", - "incremental_refresh" -> "true", - "refresh_interval" -> "1 Minute", - "checkpoint_location" -> checkpointDir.getAbsolutePath, - "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}"))) + .options( + FlintSparkIndexOptions(Map( + "auto_refresh" -> "false", + "incremental_refresh" -> "true", + "refresh_interval" -> "1 Minute", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "index_settings" -> "{\"number_of_shards\": 3,\"number_of_replicas\": 2}")), + testIndex) .create() val indexInitial = flint.describeIndex(testIndex).get @@ -148,7 +150,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .get("checkpoint_location") .map(_ => initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) - .getOrElse(initialOptionsMap))) + .getOrElse(initialOptionsMap)), + testIndex) .create() flint.refreshIndex(testIndex) @@ -280,7 +283,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .get("checkpoint_location") .map(_ => initialOptionsMap.updated("checkpoint_location", checkpointDir.getAbsolutePath)) - .getOrElse(initialOptionsMap))) + .getOrElse(initialOptionsMap)), + testIndex) .create() flint.refreshIndex(testIndex) @@ -397,7 +401,8 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { FlintSparkIndexOptions( Map( "incremental_refresh" -> "true", - "checkpoint_location" -> checkpointDir.getAbsolutePath))) + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) .create() flint.refreshIndex(testIndex) shouldBe empty @@ -440,7 +445,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true")), testIndex) .create() val jobId = flint.refreshIndex(testIndex) @@ -484,8 +489,12 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") - .options(FlintSparkIndexOptions( - Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))) + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) .create() val jobId = flint.refreshIndex(testIndex)