Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy clean up dangling index metadata log entry #558

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava)

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())
override protected val flintClient: FlintClient =
FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintIndexMetadataService: FlintIndexMetadataService = {
FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions())
Expand Down Expand Up @@ -170,7 +171,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
}
}.flatten

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -242,7 +243,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
case (true, false) => updateIndexManualToAuto(index, tx)
case (false, false) => updateIndexAutoToManual(index, tx)
}
}
}.flatten
}

/**
Expand Down Expand Up @@ -276,7 +277,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
logInfo("Flint index to be deleted doesn't exist")
false
}
}
}.getOrElse(false)

/**
* Delete a Flint index physically.
Expand Down Expand Up @@ -319,7 +320,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
logInfo("Flint index to vacuum doesn't exist")
false
}
}
}.getOrElse(false)

/**
* Recover index job.
Expand Down Expand Up @@ -356,24 +357,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
logInfo("Index to be recovered is not auto refreshed")
false
}
}
}.getOrElse(false)

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,22 @@ class FlintSparkIndexMonitor(
override def run(): Unit = {
logInfo(s"Scheduler trigger index monitor task for $indexName")
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintMetadataLogService.recordHeartbeat(indexName)
val isJobActive = isStreamingJobActive(indexName)
val indexExists = flintClient.exists(indexName)

if (!flintClient.exists(indexName)) {
logWarning("Streaming job is active but data is deleted")
(isJobActive, indexExists) match {
case (true, true) =>
logInfo("Streaming job is active and index exists")
flintMetadataLogService.recordHeartbeat(indexName)

case (true, false) =>
logWarning("Streaming job is active but index is deleted")
stopStreamingJobAndMonitor(indexName)
}
} else {
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
logInfo("Index monitor task is cancelled")

case (false, _) =>
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
logInfo("Index monitor task is cancelled")
}
errorCnt = 0 // Reset counter if no error
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,33 @@
package org.opensearch.flint.spark

import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{CREATING, EMPTY, VACUUMING}
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.FlintClient

import org.apache.spark.internal.Logging

/**
* Provides transaction support with proper error handling and logging capabilities.
*
* @note
* This trait requires the mixing class to extend Spark's `Logging` to utilize its logging
* functionalities. Meanwhile it needs to provide `FlintClient` and data source name so this
* trait can help create transaction context.
* This trait requires the mixing class to provide both `FlintClient` and
* `FlintMetadataLogService` so this trait can help create transaction context.
*/
trait FlintSparkTransactionSupport { self: Logging =>
trait FlintSparkTransactionSupport extends Logging {

/** Flint client defined in the mixing class */
protected def flintClient: FlintClient

/** Flint metadata log service defined in the mixing class */
protected def flintMetadataLogService: FlintMetadataLogService

/**
* Executes a block of code within a transaction context, handling and logging errors
* appropriately. This method logs the start and completion of the transaction and captures any
* exceptions that occur, enriching them with detailed error messages before re-throwing.
* exceptions that occur, enriching them with detailed error messages before re-throwing. If the
* index data is missing (excluding index creation actions), the operation is bypassed, and any
* dangling metadata log entries are cleaned up.
*
* @param indexName
* the name of the index on which the operation is performed
Expand All @@ -39,19 +46,31 @@ trait FlintSparkTransactionSupport { self: Logging =>
* @tparam T
* the type of the result produced by the operation block
* @return
* the result of the operation block
* Some(result) of the operation block if the operation is executed, or None if the operation
* execution is bypassed due to index corrupted
*/
def withTransaction[T](indexName: String, opName: String, forceInit: Boolean = false)(
opBlock: OptimisticTransaction[T] => T): T = {
opBlock: OptimisticTransaction[T] => T): Option[T] = {
logInfo(s"Starting index operation [$opName $indexName] with forceInit=$forceInit")
try {
// Create transaction (only have side effect if forceInit is true)
val tx: OptimisticTransaction[T] =
flintMetadataLogService.startTransaction(indexName, forceInit)
val isCorrupted = isIndexCorrupted(indexName)
if (isCorrupted) {
cleanupCorruptedIndex(indexName)
}

// Execute the action if create index action (indicated by forceInit) or not corrupted
if (forceInit || !isCorrupted) {

val result = opBlock(tx)
logInfo(s"Index operation [$opName $indexName] complete")
result
// Create transaction (only have side effect if forceInit is true)
val tx: OptimisticTransaction[T] =
flintMetadataLogService.startTransaction(indexName, forceInit)
val result = opBlock(tx)
logInfo(s"Index operation [$opName $indexName] complete")
Some(result)
} else {
logWarning(s"Bypassing index operation [$opName $indexName]")
None
}
} catch {
case e: Exception =>
logError(s"Failed to execute index operation [$opName $indexName]", e)
Expand All @@ -60,4 +79,42 @@ trait FlintSparkTransactionSupport { self: Logging =>
throw e
}
}

/**
* Determines if the index is corrupted, meaning metadata log entry exists but the corresponding
* data index does not. For indexes creating or vacuuming, the check for a corrupted index is
* skipped to reduce the possibility of race condition. This is because the index may be in a
* transitional phase where the data index is temporarily missing before the process completes.
*/
private def isIndexCorrupted(indexName: String): Boolean = {
val logEntry =
flintMetadataLogService
.getIndexMetadataLog(indexName)
.flatMap(_.getLatest)
val logEntryExists = logEntry.isPresent
val dataIndexExists = flintClient.exists(indexName)
val isCreatingOrVacuuming =
logEntry
.filter(e => e.state == EMPTY || e.state == CREATING || e.state == VACUUMING)
noCharger marked this conversation as resolved.
Show resolved Hide resolved
.isPresent
val isCorrupted = logEntryExists && !dataIndexExists && !isCreatingOrVacuuming

if (isCorrupted) {
logWarning(s"""
| Cleaning up corrupted index:
| - logEntryExists [$logEntryExists]
| - dataIndexExists [$dataIndexExists]
| - isCreatingOrVacuuming [$isCreatingOrVacuuming]
|""".stripMargin)
}
isCorrupted
}

private def cleanupCorruptedIndex(indexName: String): Unit = {
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
}
}
Loading
Loading