Skip to content

Commit

Permalink
Enhance Flint Spark API error reporting with centralized handler (#348)
Browse files Browse the repository at this point in the history
* Extract transaction template

Signed-off-by: Chen Dai <[email protected]>

* Handle runtime and other exception in new withTx

Signed-off-by: Chen Dai <[email protected]>

* Move to new tx support trait

Signed-off-by: Chen Dai <[email protected]>

* Add UT

Signed-off-by: Chen Dai <[email protected]>

* Fix broken IT

Signed-off-by: Chen Dai <[email protected]>

* Add IT for index state error

Signed-off-by: Chen Dai <[email protected]>

* Refactor index monitor transaction

Signed-off-by: Chen Dai <[email protected]>

* Fix broken IT

Signed-off-by: Chen Dai <[email protected]>

* Revert unused changes

Signed-off-by: Chen Dai <[email protected]>

* Fix broken IT

Signed-off-by: Chen Dai <[email protected]>

* Add index name to logging

Signed-off-by: Chen Dai <[email protected]>

* Remove root cause extraction

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Jun 27, 2024
1 parent 2d46cff commit 0c1ec6b
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
String.format("Index state [%s] doesn't satisfy precondition", latest.state()));
}

// Append optional transient log
Expand Down Expand Up @@ -126,7 +126,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
} catch (Exception ex) {
LOG.log(WARNING, "Failed to rollback transient log", ex);
}
throw new IllegalStateException("Failed to commit transaction operation");
throw new IllegalStateException("Failed to commit transaction operation", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.common.metadata.log.OptimisticTransaction
import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintMetadata
Expand All @@ -34,7 +35,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGN
/**
* Flint Spark integration API entrypoint.
*/
class FlintSpark(val spark: SparkSession) extends Logging {
class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport with Logging {

/** Flint spark configuration */
private val flintSparkConf: FlintSparkConf =
Expand All @@ -46,7 +47,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())

private val flintMetadataLogService: FlintMetadataLogService = {
override protected val flintMetadataLogService: FlintMetadataLogService = {
FlintMetadataLogServiceBuilder.build(
flintSparkConf.flintOptions(),
spark.sparkContext.getConf)
Expand Down Expand Up @@ -97,18 +98,16 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
logInfo(s"Creating Flint index $index with ignoreIfExists $ignoreIfExists")
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
try {
flintMetadataLogService
.startTransaction(indexName, true)
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit =
withTransaction[Unit](index.name(), "Create Flint index", forceInit = true) { tx =>
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
tx
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -119,14 +118,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
logInfo("Create index complete")
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
throw new IllegalStateException("Failed to create Flint index")
}
}
}

/**
* Start refreshing index data according to the given mode.
Expand All @@ -136,15 +129,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] = {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)

try {
flintMetadataLogService
.startTransaction(indexName)
def refreshIndex(indexName: String): Option[String] =
withTransaction[Option[String]](indexName, "Refresh Flint index") { tx =>
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
tx
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest =>
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
Expand All @@ -160,12 +150,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
throw new IllegalStateException("Failed to refresh Flint index")
}
}

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -221,25 +206,19 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name

val indexName = index.name()
validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)

try {
withTransaction[Option[String]](indexName, "Update Flint index") { tx =>
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
case true => updateIndexManualToAuto(index, tx)
case false => updateIndexAutoToManual(index, tx)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
}

Expand All @@ -251,12 +230,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def deleteIndex(indexName: String): Boolean = {
logInfo(s"Deleting Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintMetadataLogService
.startTransaction(indexName)
def deleteIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Delete Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest =>
latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED)
.transientLog(latest => latest.copy(state = DELETING))
Expand All @@ -267,16 +244,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
stopRefreshingJob(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to delete Flint index", e)
throw new IllegalStateException("Failed to delete Flint index")
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
}

/**
* Delete a Flint index physically.
Expand All @@ -286,81 +258,66 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintMetadataLogService
.startTransaction(indexName)
def vacuumIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Vacuum Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
}

/**
* Recover index job.
*
* @param indexName
* index name
*/
def recoverIndex(indexName: String): Boolean = {
logInfo(s"Recovering Flint index $indexName")
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
try {
flintMetadataLogService
.startTransaction(indexName)
def recoverIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, "Recover Flint index") { tx =>
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
tx
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ =>
.commit(_ => {
FlintSparkIndexRefresh
.create(indexName, index.get)
.start(spark, flintSparkConf))

logInfo("Recovery complete")
true
} catch {
case e: Exception =>
logError("Failed to recover Flint index", e)
throw new IllegalStateException("Failed to recover Flint index")
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
flintMetadataLogService
.startTransaction(indexName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
.start(spark, flintSparkConf)
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
}
false
}
}

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down Expand Up @@ -460,11 +417,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
private def updateIndexAutoToManual(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
flintMetadataLogService
.startTransaction(indexName)
tx
.initialLog(latest =>
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
Expand All @@ -478,12 +436,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
})
}

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
private def updateIndexManualToAuto(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintMetadataLogService
.startTransaction(indexName)
tx
.initialLog(latest =>
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
Expand Down
Loading

0 comments on commit 0c1ec6b

Please sign in to comment.