diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 3eb36010e..88b8c38cc 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -10,9 +10,8 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.common.metadata.log.FlintMetadataLogService -import org.opensearch.flint.common.metadata.log.OptimisticTransaction import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder @@ -272,8 +271,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = VACUUMING)) .finalLog(_ => NO_LOG_ENTRY) .commit(_ => { + val options = flintIndexMetadataService.getIndexMetadata(indexName).options.asScala flintClient.deleteIndex(indexName) flintIndexMetadataService.deleteIndexMetadata(indexName) + + // Remove checkpoint folder if defined + val checkpoint = options + .get(CHECKPOINT_LOCATION.toString) + .map(path => new FlintSparkCheckpoint(spark, path.asInstanceOf[String])) + if (checkpoint.isDefined) { + checkpoint.get.delete() + } true }) } else { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala new file mode 100644 index 000000000..4c18fea77 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala @@ -0,0 +1,85 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.UUID + +import org.apache.hadoop.fs.{FSDataOutputStream, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods + +/** + * Manages the checkpoint directory for Flint indexes. + * + * @param spark + * The SparkSession used for Hadoop configuration. + * @param checkpointLocation + * The path to the checkpoint directory. + */ +class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String) extends Logging { + + /** Checkpoint root directory path */ + private val checkpointRootDir = new Path(checkpointLocation) + + /** Spark checkpoint manager */ + private val checkpointManager = + CheckpointFileManager.create(checkpointRootDir, spark.sessionState.newHadoopConf()) + + /** + * Checks if the checkpoint directory exists. + * + * @return + * true if the checkpoint directory exists, false otherwise. + */ + def exists(): Boolean = checkpointManager.exists(checkpointRootDir) + + /** + * Creates the checkpoint directory and all necessary parent directories if they do not already + * exist. + * + * @return + * The path to the created checkpoint directory. + */ + def createDirectory(): Path = { + checkpointManager.createCheckpointDirectory + } + + /** + * Creates a temporary file in the checkpoint directory. + * + * @return + * An optional FSDataOutputStream for the created temporary file, or None if creation fails. + */ + def createTempFile(): Option[FSDataOutputStream] = { + checkpointManager match { + case manager: RenameHelperMethods => + val tempFilePath = + new Path(createDirectory(), s"${UUID.randomUUID().toString}.tmp") + Some(manager.createTempFile(tempFilePath)) + case _ => + logInfo(s"Cannot create temp file at checkpoint location: ${checkpointManager.getClass}") + None + } + } + + /** + * Deletes the checkpoint directory. This method attempts to delete the checkpoint directory and + * captures any exceptions that occur. Exceptions are logged but ignored so as not to disrupt + * the caller's workflow. + */ + def delete(): Unit = { + try { + checkpointManager.delete(checkpointRootDir) + logInfo(s"Checkpoint directory $checkpointRootDir deleted") + } catch { + case e: Exception => + logError(s"Error deleting checkpoint directory $checkpointRootDir", e) + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index f615d9398..1aaa85075 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -5,9 +5,6 @@ package org.opensearch.flint.spark -import java.util.UUID - -import org.apache.hadoop.fs.Path import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex @@ -16,8 +13,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.streaming.CheckpointFileManager -import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} /** @@ -72,35 +67,20 @@ trait FlintSparkValidationHelper extends Logging { */ def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { try { - val checkpointManager = - CheckpointFileManager.create( - new Path(checkpointLocation), - spark.sessionState.newHadoopConf()) + val checkpoint = new FlintSparkCheckpoint(spark, checkpointLocation) /* * Read permission check: The primary intent here is to catch any exceptions * during the accessibility check. The actual result is ignored, as the write * permission check below will create any necessary sub-folders. */ - checkpointManager.exists(new Path(checkpointLocation)) + checkpoint.exists() /* * Write permission check: Attempt to create a temporary file to verify write access. * The temporary file is left in place in case additional delete permissions required. */ - checkpointManager match { - case manager: RenameHelperMethods => - val tempFilePath = - new Path( - checkpointManager - .createCheckpointDirectory(), // create all parent folders if needed - s"${UUID.randomUUID().toString}.tmp") - - manager.createTempFile(tempFilePath).close() - case _ => - logInfo( - s"Bypass checkpoint location write permission check: ${checkpointManager.getClass}") - } + checkpoint.createTempFile().foreach(_.close()) true } catch { diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala new file mode 100644 index 000000000..85f5ba1ec --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.FlintSuite + +class FlintSparkCheckpointSuite extends FlintSuite with Matchers { + + test("exists") { + withCheckpoint { checkpoint => + checkpoint.exists() shouldBe false + checkpoint.createDirectory() + checkpoint.exists() shouldBe true + } + } + + test("create directory") { + withTempPath { tempDir => + val checkpointDir = new Path(tempDir.getAbsolutePath, "sub/subsub") + val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.toString) + checkpoint.createDirectory() + + tempDir.exists() shouldBe true + } + } + + test("create temp file") { + withCheckpoint { checkpoint => + val tempFile = checkpoint.createTempFile() + tempFile shouldBe defined + + // Close the stream to ensure the file is flushed + tempFile.get.close() + + // Assert that there is a .tmp file + listFiles(checkpoint.checkpointLocation) + .exists(isTempFile) shouldBe true + } + } + + test("delete") { + withCheckpoint { checkpoint => + checkpoint.createDirectory() + checkpoint.delete() + checkpoint.exists() shouldBe false + } + } + + private def withCheckpoint(block: FlintSparkCheckpoint => Unit): Unit = { + withTempPath { checkpointDir => + val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.getAbsolutePath) + block(checkpoint) + } + } + + private def listFiles(dir: String): Array[FileStatus] = { + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + fs.listStatus(new Path(dir)) + } + + private def isTempFile(file: FileStatus): Boolean = { + file.isFile && file.getPath.getName.endsWith(".tmp") + } +} diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 235cab4d2..9abebbd65 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -468,6 +468,34 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe empty } + test("vacuum covering index with checkpoint") { + withTempDir { checkpointDir => + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testFlintIndex) + .create() + flint.refreshIndex(testFlintIndex) + + val job = spark.streams.active.find(_.name == testFlintIndex) + awaitStreamingComplete(job.get.id.toString) + flint.deleteIndex(testFlintIndex) + + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + sql(s"VACUUM INDEX $testIndex ON $testTable") + flint.describeIndex(testFlintIndex) shouldBe empty + checkpointDir.exists() shouldBe false + } + } + private def awaitRefreshComplete(query: String): Unit = { sql(query) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 11581e731..c3cbbcc7e 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -404,5 +404,33 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe empty } + test("vacuum materialized view with checkpoint") { + withTempDir { checkpointDir => + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "1 Second")), + testFlintIndex) + .create() + flint.refreshIndex(testFlintIndex) + + val job = spark.streams.active.find(_.name == testFlintIndex) + awaitStreamingComplete(job.get.id.toString) + flint.deleteIndex(testFlintIndex) + + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + sql(s"VACUUM MATERIALIZED VIEW $testMvName") + flint.describeIndex(testFlintIndex) shouldBe empty + checkpointDir.exists() shouldBe false + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 833a85a84..e6de21210 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -917,6 +917,33 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + test("vacuum skipping index with checkpoint") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + flint.refreshIndex(testIndex) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + flint.deleteIndex(testIndex) + + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + flint.vacuumIndex(testIndex) + flint.describeIndex(testIndex) shouldBe None + checkpointDir.exists() shouldBe false + } + } + // Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex def useFlintSparkSkippingFileIndex( subMatcher: Matcher[FlintSparkSkippingFileIndex]): Matcher[SparkPlan] = { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index ebefa4e89..9ddde32bd 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -569,6 +569,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit flint.describeIndex(testIndex) shouldBe empty } + test("vacuum skipping index with checkpoint") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + flint.refreshIndex(testIndex) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + flint.deleteIndex(testIndex) + + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + sql(s"VACUUM SKIPPING INDEX ON $testTable") + flint.describeIndex(testIndex) shouldBe empty + checkpointDir.exists() shouldBe false + } + } + test("analyze skipping index with for supported data types") { val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable")