From 9c03d7d0b6217fed29481dba6e8fd445dc183cb7 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 3 Sep 2024 16:37:50 -0700 Subject: [PATCH 1/3] Add checkpoint abstraction and IT Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 10 ++- .../flint/spark/FlintSparkCheckpoint.scala | 72 +++++++++++++++++++ .../spark/FlintSparkValidationHelper.scala | 26 +------ .../FlintSparkSkippingIndexITSuite.scala | 26 +++++++ 4 files changed, 109 insertions(+), 25 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 3eb36010e..aee816c83 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -10,9 +10,8 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata} +import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction} import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ -import org.opensearch.flint.common.metadata.log.FlintMetadataLogService -import org.opensearch.flint.common.metadata.log.OptimisticTransaction import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder @@ -272,8 +271,15 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = VACUUMING)) .finalLog(_ => NO_LOG_ENTRY) .commit(_ => { + val options = flintIndexMetadataService.getIndexMetadata(indexName).options flintClient.deleteIndex(indexName) flintIndexMetadataService.deleteIndexMetadata(indexName) + + Option(options.get(CHECKPOINT_LOCATION.toString)) + .foreach { checkpointDir => + new FlintSparkCheckpoint(spark, checkpointDir.asInstanceOf[String]) + .delete() + } true }) } else { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala new file mode 100644 index 000000000..bb5d160de --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala @@ -0,0 +1,72 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.UUID + +import org.apache.hadoop.fs.{FSDataOutputStream, Path} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods + +/** + * Manages the checkpoint directory for Flint indexes. + * + * @param spark + * The SparkSession used for Hadoop configuration. + * @param checkpointLocation + * The path to the checkpoint directory. + */ +class FlintSparkCheckpoint(spark: SparkSession, checkpointLocation: String) extends Logging { + private val checkpointDir = new Path(checkpointLocation) + private val checkpointManager = + CheckpointFileManager.create(checkpointDir, spark.sessionState.newHadoopConf()) + + /** + * Checks if the checkpoint directory exists. + * + * @return + * true if the checkpoint directory exists, false otherwise. + */ + def exists(): Boolean = checkpointManager.exists(checkpointDir) + + /** + * Creates a temporary file in the checkpoint directory. + * + * @return + * An optional FSDataOutputStream for the created temporary file, or None if creation fails. + */ + def createTempFile(): Option[FSDataOutputStream] = { + checkpointManager match { + case manager: RenameHelperMethods => + val tempFilePath = + new Path( + checkpointManager.createCheckpointDirectory(), // create all parent folders if needed + s"${UUID.randomUUID().toString}.tmp") + Some(manager.createTempFile(tempFilePath)) + case _ => + logInfo(s"Cannot create temp file at checkpoint location: ${checkpointManager.getClass}") + None + } + } + + /** + * Deletes the checkpoint directory. This method attempts to delete the checkpoint directory and + * captures any exceptions that occur. Exceptions are logged but ignored so as not to disrupt + * the caller's workflow. + */ + def delete(): Unit = { + try { + checkpointManager.delete(checkpointDir) + logInfo(s"Checkpoint directory $checkpointDir deleted.") + } catch { + case e: Exception => + logError(s"Error deleting checkpoint directory $checkpointDir", e) + } + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala index f615d9398..1aaa85075 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkValidationHelper.scala @@ -5,9 +5,6 @@ package org.opensearch.flint.spark -import java.util.UUID - -import org.apache.hadoop.fs.Path import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex import org.opensearch.flint.spark.mv.FlintSparkMaterializedView import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex @@ -16,8 +13,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.streaming.CheckpointFileManager -import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName} /** @@ -72,35 +67,20 @@ trait FlintSparkValidationHelper extends Logging { */ def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = { try { - val checkpointManager = - CheckpointFileManager.create( - new Path(checkpointLocation), - spark.sessionState.newHadoopConf()) + val checkpoint = new FlintSparkCheckpoint(spark, checkpointLocation) /* * Read permission check: The primary intent here is to catch any exceptions * during the accessibility check. The actual result is ignored, as the write * permission check below will create any necessary sub-folders. */ - checkpointManager.exists(new Path(checkpointLocation)) + checkpoint.exists() /* * Write permission check: Attempt to create a temporary file to verify write access. * The temporary file is left in place in case additional delete permissions required. */ - checkpointManager match { - case manager: RenameHelperMethods => - val tempFilePath = - new Path( - checkpointManager - .createCheckpointDirectory(), // create all parent folders if needed - s"${UUID.randomUUID().toString}.tmp") - - manager.createTempFile(tempFilePath).close() - case _ => - logInfo( - s"Bypass checkpoint location write permission check: ${checkpointManager.getClass}") - } + checkpoint.createTempFile().foreach(_.close()) true } catch { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 833a85a84..ed3a27846 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -917,6 +917,32 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } + test("should remove checkpoint folder when vacuum") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addValueSet("address") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + flint.refreshIndex(testIndex) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + + flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) + + flint.describeIndex(testIndex) shouldBe None + checkpointDir.exists() shouldBe false + } + } + // Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex def useFlintSparkSkippingFileIndex( subMatcher: Matcher[FlintSparkSkippingFileIndex]): Matcher[SparkPlan] = { From 20a6935293b9775ed2ecb5f2cbbcab001c5da311 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 4 Sep 2024 10:06:13 -0700 Subject: [PATCH 2/3] Add UT and more doc Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkCheckpoint.scala | 33 ++++++--- .../spark/FlintSparkCheckpointSuite.scala | 70 +++++++++++++++++++ 2 files changed, 93 insertions(+), 10 deletions(-) create mode 100644 flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala index bb5d160de..4c18fea77 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkCheckpoint.scala @@ -22,10 +22,14 @@ import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelp * @param checkpointLocation * The path to the checkpoint directory. */ -class FlintSparkCheckpoint(spark: SparkSession, checkpointLocation: String) extends Logging { - private val checkpointDir = new Path(checkpointLocation) +class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String) extends Logging { + + /** Checkpoint root directory path */ + private val checkpointRootDir = new Path(checkpointLocation) + + /** Spark checkpoint manager */ private val checkpointManager = - CheckpointFileManager.create(checkpointDir, spark.sessionState.newHadoopConf()) + CheckpointFileManager.create(checkpointRootDir, spark.sessionState.newHadoopConf()) /** * Checks if the checkpoint directory exists. @@ -33,7 +37,18 @@ class FlintSparkCheckpoint(spark: SparkSession, checkpointLocation: String) exte * @return * true if the checkpoint directory exists, false otherwise. */ - def exists(): Boolean = checkpointManager.exists(checkpointDir) + def exists(): Boolean = checkpointManager.exists(checkpointRootDir) + + /** + * Creates the checkpoint directory and all necessary parent directories if they do not already + * exist. + * + * @return + * The path to the created checkpoint directory. + */ + def createDirectory(): Path = { + checkpointManager.createCheckpointDirectory + } /** * Creates a temporary file in the checkpoint directory. @@ -45,9 +60,7 @@ class FlintSparkCheckpoint(spark: SparkSession, checkpointLocation: String) exte checkpointManager match { case manager: RenameHelperMethods => val tempFilePath = - new Path( - checkpointManager.createCheckpointDirectory(), // create all parent folders if needed - s"${UUID.randomUUID().toString}.tmp") + new Path(createDirectory(), s"${UUID.randomUUID().toString}.tmp") Some(manager.createTempFile(tempFilePath)) case _ => logInfo(s"Cannot create temp file at checkpoint location: ${checkpointManager.getClass}") @@ -62,11 +75,11 @@ class FlintSparkCheckpoint(spark: SparkSession, checkpointLocation: String) exte */ def delete(): Unit = { try { - checkpointManager.delete(checkpointDir) - logInfo(s"Checkpoint directory $checkpointDir deleted.") + checkpointManager.delete(checkpointRootDir) + logInfo(s"Checkpoint directory $checkpointRootDir deleted") } catch { case e: Exception => - logError(s"Error deleting checkpoint directory $checkpointDir", e) + logError(s"Error deleting checkpoint directory $checkpointRootDir", e) } } } diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala new file mode 100644 index 000000000..85f5ba1ec --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkCheckpointSuite.scala @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.FlintSuite + +class FlintSparkCheckpointSuite extends FlintSuite with Matchers { + + test("exists") { + withCheckpoint { checkpoint => + checkpoint.exists() shouldBe false + checkpoint.createDirectory() + checkpoint.exists() shouldBe true + } + } + + test("create directory") { + withTempPath { tempDir => + val checkpointDir = new Path(tempDir.getAbsolutePath, "sub/subsub") + val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.toString) + checkpoint.createDirectory() + + tempDir.exists() shouldBe true + } + } + + test("create temp file") { + withCheckpoint { checkpoint => + val tempFile = checkpoint.createTempFile() + tempFile shouldBe defined + + // Close the stream to ensure the file is flushed + tempFile.get.close() + + // Assert that there is a .tmp file + listFiles(checkpoint.checkpointLocation) + .exists(isTempFile) shouldBe true + } + } + + test("delete") { + withCheckpoint { checkpoint => + checkpoint.createDirectory() + checkpoint.delete() + checkpoint.exists() shouldBe false + } + } + + private def withCheckpoint(block: FlintSparkCheckpoint => Unit): Unit = { + withTempPath { checkpointDir => + val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.getAbsolutePath) + block(checkpoint) + } + } + + private def listFiles(dir: String): Array[FileStatus] = { + val fs = FileSystem.get(spark.sessionState.newHadoopConf()) + fs.listStatus(new Path(dir)) + } + + private def isTempFile(file: FileStatus): Boolean = { + file.isFile && file.getPath.getName.endsWith(".tmp") + } +} From d5acdc8b9ef47036e2bd5f33488b744cceda9552 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Wed, 4 Sep 2024 13:24:45 -0700 Subject: [PATCH 3/3] Add more IT Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 14 ++++++---- .../FlintSparkCoveringIndexSqlITSuite.scala | 28 +++++++++++++++++++ ...FlintSparkMaterializedViewSqlITSuite.scala | 28 +++++++++++++++++++ .../FlintSparkSkippingIndexITSuite.scala | 7 +++-- .../FlintSparkSkippingIndexSqlITSuite.scala | 27 ++++++++++++++++++ 5 files changed, 95 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index aee816c83..88b8c38cc 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -271,15 +271,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w .transientLog(latest => latest.copy(state = VACUUMING)) .finalLog(_ => NO_LOG_ENTRY) .commit(_ => { - val options = flintIndexMetadataService.getIndexMetadata(indexName).options + val options = flintIndexMetadataService.getIndexMetadata(indexName).options.asScala flintClient.deleteIndex(indexName) flintIndexMetadataService.deleteIndexMetadata(indexName) - Option(options.get(CHECKPOINT_LOCATION.toString)) - .foreach { checkpointDir => - new FlintSparkCheckpoint(spark, checkpointDir.asInstanceOf[String]) - .delete() - } + // Remove checkpoint folder if defined + val checkpoint = options + .get(CHECKPOINT_LOCATION.toString) + .map(path => new FlintSparkCheckpoint(spark, path.asInstanceOf[String])) + if (checkpoint.isDefined) { + checkpoint.get.delete() + } true }) } else { diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 235cab4d2..9abebbd65 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -468,6 +468,34 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe empty } + test("vacuum covering index with checkpoint") { + withTempDir { checkpointDir => + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testFlintIndex) + .create() + flint.refreshIndex(testFlintIndex) + + val job = spark.streams.active.find(_.name == testFlintIndex) + awaitStreamingComplete(job.get.id.toString) + flint.deleteIndex(testFlintIndex) + + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + sql(s"VACUUM INDEX $testIndex ON $testTable") + flint.describeIndex(testFlintIndex) shouldBe empty + checkpointDir.exists() shouldBe false + } + } + private def awaitRefreshComplete(query: String): Unit = { sql(query) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index 66d6e0779..ba073bd23 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -408,5 +408,33 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { flint.describeIndex(testFlintIndex) shouldBe empty } + test("vacuum materialized view with checkpoint") { + withTempDir { checkpointDir => + flint + .materializedView() + .name(testMvName) + .query(testQuery) + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath, + "watermark_delay" -> "1 Second")), + testFlintIndex) + .create() + flint.refreshIndex(testFlintIndex) + + val job = spark.streams.active.find(_.name == testFlintIndex) + awaitStreamingComplete(job.get.id.toString) + flint.deleteIndex(testFlintIndex) + + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + sql(s"VACUUM MATERIALIZED VIEW $testMvName") + flint.describeIndex(testFlintIndex) shouldBe empty + checkpointDir.exists() shouldBe false + } + } + private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts) } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index ed3a27846..e6de21210 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -917,7 +917,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { } } - test("should remove checkpoint folder when vacuum") { + test("vacuum skipping index with checkpoint") { withTempDir { checkpointDir => flint .skippingIndex() @@ -934,10 +934,11 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { val job = spark.streams.active.find(_.name == testIndex) awaitStreamingComplete(job.get.id.toString) - flint.deleteIndex(testIndex) - flint.vacuumIndex(testIndex) + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + flint.vacuumIndex(testIndex) flint.describeIndex(testIndex) shouldBe None checkpointDir.exists() shouldBe false } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index ff114b8e2..4dfe1a5d5 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -580,6 +580,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit flint.describeIndex(testIndex) shouldBe empty } + test("vacuum skipping index with checkpoint") { + withTempDir { checkpointDir => + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options( + FlintSparkIndexOptions( + Map( + "auto_refresh" -> "true", + "checkpoint_location" -> checkpointDir.getAbsolutePath)), + testIndex) + .create() + flint.refreshIndex(testIndex) + + val job = spark.streams.active.find(_.name == testIndex) + awaitStreamingComplete(job.get.id.toString) + flint.deleteIndex(testIndex) + + // Checkpoint folder should be removed after vacuum + checkpointDir.exists() shouldBe true + sql(s"VACUUM SKIPPING INDEX ON $testTable") + flint.describeIndex(testIndex) shouldBe empty + checkpointDir.exists() shouldBe false + } + } + test("analyze skipping index with for supported data types") { val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable")