Skip to content

Commit

Permalink
Remove checkpoint folder when vacuuming index (#621)
Browse files Browse the repository at this point in the history
* Add checkpoint abstraction and IT

Signed-off-by: Chen Dai <[email protected]>

* Add UT and more doc

Signed-off-by: Chen Dai <[email protected]>

* Add more IT

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
(cherry picked from commit 88ad15f)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Sep 8, 2024
1 parent 2f23396 commit 7e37545
Show file tree
Hide file tree
Showing 8 changed files with 278 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import scala.collection.JavaConverters._
import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.{FlintIndexMetadataService, FlintMetadata}
import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
Expand Down Expand Up @@ -272,8 +271,17 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
val options = flintIndexMetadataService.getIndexMetadata(indexName).options.asScala
flintClient.deleteIndex(indexName)
flintIndexMetadataService.deleteIndexMetadata(indexName)

// Remove checkpoint folder if defined
val checkpoint = options
.get(CHECKPOINT_LOCATION.toString)
.map(path => new FlintSparkCheckpoint(spark, path.asInstanceOf[String]))
if (checkpoint.isDefined) {
checkpoint.get.delete()
}
true
})
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.util.UUID

import org.apache.hadoop.fs.{FSDataOutputStream, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods

/**
* Manages the checkpoint directory for Flint indexes.
*
* @param spark
* The SparkSession used for Hadoop configuration.
* @param checkpointLocation
* The path to the checkpoint directory.
*/
class FlintSparkCheckpoint(spark: SparkSession, val checkpointLocation: String) extends Logging {

/** Checkpoint root directory path */
private val checkpointRootDir = new Path(checkpointLocation)

/** Spark checkpoint manager */
private val checkpointManager =
CheckpointFileManager.create(checkpointRootDir, spark.sessionState.newHadoopConf())

/**
* Checks if the checkpoint directory exists.
*
* @return
* true if the checkpoint directory exists, false otherwise.
*/
def exists(): Boolean = checkpointManager.exists(checkpointRootDir)

/**
* Creates the checkpoint directory and all necessary parent directories if they do not already
* exist.
*
* @return
* The path to the created checkpoint directory.
*/
def createDirectory(): Path = {
checkpointManager.createCheckpointDirectory
}

/**
* Creates a temporary file in the checkpoint directory.
*
* @return
* An optional FSDataOutputStream for the created temporary file, or None if creation fails.
*/
def createTempFile(): Option[FSDataOutputStream] = {
checkpointManager match {
case manager: RenameHelperMethods =>
val tempFilePath =
new Path(createDirectory(), s"${UUID.randomUUID().toString}.tmp")
Some(manager.createTempFile(tempFilePath))
case _ =>
logInfo(s"Cannot create temp file at checkpoint location: ${checkpointManager.getClass}")
None
}
}

/**
* Deletes the checkpoint directory. This method attempts to delete the checkpoint directory and
* captures any exceptions that occur. Exceptions are logged but ignored so as not to disrupt
* the caller's workflow.
*/
def delete(): Unit = {
try {
checkpointManager.delete(checkpointRootDir)
logInfo(s"Checkpoint directory $checkpointRootDir deleted")
} catch {
case e: Exception =>
logError(s"Error deleting checkpoint directory $checkpointRootDir", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package org.opensearch.flint.spark

import java.util.UUID

import org.apache.hadoop.fs.Path
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand All @@ -16,8 +13,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.execution.streaming.CheckpointFileManager.RenameHelperMethods
import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}

/**
Expand Down Expand Up @@ -72,35 +67,20 @@ trait FlintSparkValidationHelper extends Logging {
*/
def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = {
try {
val checkpointManager =
CheckpointFileManager.create(
new Path(checkpointLocation),
spark.sessionState.newHadoopConf())
val checkpoint = new FlintSparkCheckpoint(spark, checkpointLocation)

/*
* Read permission check: The primary intent here is to catch any exceptions
* during the accessibility check. The actual result is ignored, as the write
* permission check below will create any necessary sub-folders.
*/
checkpointManager.exists(new Path(checkpointLocation))
checkpoint.exists()

/*
* Write permission check: Attempt to create a temporary file to verify write access.
* The temporary file is left in place in case additional delete permissions required.
*/
checkpointManager match {
case manager: RenameHelperMethods =>
val tempFilePath =
new Path(
checkpointManager
.createCheckpointDirectory(), // create all parent folders if needed
s"${UUID.randomUUID().toString}.tmp")

manager.createTempFile(tempFilePath).close()
case _ =>
logInfo(
s"Bypass checkpoint location write permission check: ${checkpointManager.getClass}")
}
checkpoint.createTempFile().foreach(_.close())

true
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.scalatest.matchers.should.Matchers

import org.apache.spark.FlintSuite

class FlintSparkCheckpointSuite extends FlintSuite with Matchers {

test("exists") {
withCheckpoint { checkpoint =>
checkpoint.exists() shouldBe false
checkpoint.createDirectory()
checkpoint.exists() shouldBe true
}
}

test("create directory") {
withTempPath { tempDir =>
val checkpointDir = new Path(tempDir.getAbsolutePath, "sub/subsub")
val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.toString)
checkpoint.createDirectory()

tempDir.exists() shouldBe true
}
}

test("create temp file") {
withCheckpoint { checkpoint =>
val tempFile = checkpoint.createTempFile()
tempFile shouldBe defined

// Close the stream to ensure the file is flushed
tempFile.get.close()

// Assert that there is a .tmp file
listFiles(checkpoint.checkpointLocation)
.exists(isTempFile) shouldBe true
}
}

test("delete") {
withCheckpoint { checkpoint =>
checkpoint.createDirectory()
checkpoint.delete()
checkpoint.exists() shouldBe false
}
}

private def withCheckpoint(block: FlintSparkCheckpoint => Unit): Unit = {
withTempPath { checkpointDir =>
val checkpoint = new FlintSparkCheckpoint(spark, checkpointDir.getAbsolutePath)
block(checkpoint)
}
}

private def listFiles(dir: String): Array[FileStatus] = {
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
fs.listStatus(new Path(dir))
}

private def isTempFile(file: FileStatus): Boolean = {
file.isFile && file.getPath.getName.endsWith(".tmp")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,34 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe empty
}

test("vacuum covering index with checkpoint") {
withTempDir { checkpointDir =>
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testFlintIndex)
.create()
flint.refreshIndex(testFlintIndex)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testFlintIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM INDEX $testIndex ON $testTable")
flint.describeIndex(testFlintIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

private def awaitRefreshComplete(query: String): Unit = {
sql(query)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,5 +408,33 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
flint.describeIndex(testFlintIndex) shouldBe empty
}

test("vacuum materialized view with checkpoint") {
withTempDir { checkpointDir =>
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath,
"watermark_delay" -> "1 Second")),
testFlintIndex)
.create()
flint.refreshIndex(testFlintIndex)

val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testFlintIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM MATERIALIZED VIEW $testMvName")
flint.describeIndex(testFlintIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

private def timestamp(ts: String): Timestamp = Timestamp.valueOf(ts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,33 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
}
}

test("vacuum skipping index with checkpoint") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addValueSet("address")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
flint.vacuumIndex(testIndex)
flint.describeIndex(testIndex) shouldBe None
checkpointDir.exists() shouldBe false
}
}

// Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex
def useFlintSparkSkippingFileIndex(
subMatcher: Matcher[FlintSparkSkippingFileIndex]): Matcher[SparkPlan] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,33 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
flint.describeIndex(testIndex) shouldBe empty
}

test("vacuum skipping index with checkpoint") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()
flint.refreshIndex(testIndex)

val job = spark.streams.active.find(_.name == testIndex)
awaitStreamingComplete(job.get.id.toString)
flint.deleteIndex(testIndex)

// Checkpoint folder should be removed after vacuum
checkpointDir.exists() shouldBe true
sql(s"VACUUM SKIPPING INDEX ON $testTable")
flint.describeIndex(testIndex) shouldBe empty
checkpointDir.exists() shouldBe false
}
}

test("analyze skipping index with for supported data types") {
val result = sql(s"ANALYZE SKIPPING INDEX ON $testTable")

Expand Down

0 comments on commit 7e37545

Please sign in to comment.