Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance Flint Spark API error reporting with centralized handler #348

Merged
merged 16 commits into from
Jun 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
String.format("Index state [%s] doesn't satisfy precondition", latest.state()));
}

// Append optional transient log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGN
/**
* Flint Spark integration API entrypoint.
*/
class FlintSpark(val spark: SparkSession) extends Logging {
class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport with Logging {

/** Flint spark configuration */
private val flintSparkConf: FlintSparkConf =
Expand All @@ -41,7 +42,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava)

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())
override protected val flintClient: FlintClient =
FlintClientBuilder.build(flintSparkConf.flintOptions())

/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer
Expand All @@ -50,7 +52,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* Data source name. Assign empty string in case of backward compatibility. TODO: remove this in
* future
*/
private val dataSourceName: String =
override protected val dataSourceName: String =
spark.conf.getOption("spark.flint.datasource.name").getOrElse("")

/** Flint Spark index monitor */
Expand Down Expand Up @@ -96,17 +98,16 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
logInfo(s"Creating Flint index $index with ignoreIfExists $ignoreIfExists")
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
try {
flintClient
.startTransaction(indexName, dataSourceName, true)
val opName = s"Create Flint index $indexName with ignoreIfExists $ignoreIfExists"
withTransaction[Unit](indexName, opName, forceInit = true) { tx =>
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
tx
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -117,11 +118,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
logInfo("Create index complete")
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
throw new IllegalStateException("Failed to create Flint index")
}
}
}
Expand All @@ -134,15 +130,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] = {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
def refreshIndex(indexName: String): Option[String] =
withTransaction[Option[String]](indexName, s"Refresh Flint index $indexName") { tx =>
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)

try {
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest =>
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
Expand All @@ -158,12 +152,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
throw new IllegalStateException("Failed to refresh Flint index")
}
}

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -214,25 +203,19 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name

val indexName = index.name()
validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)

try {
withTransaction[Option[String]](indexName, s"Update Flint index $indexName") { tx =>
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
case true => updateIndexManualToAuto(index, tx)
case false => updateIndexAutoToManual(index, tx)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
}

Expand All @@ -244,12 +227,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def deleteIndex(indexName: String): Boolean = {
logInfo(s"Deleting Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def deleteIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Delete Flint index $indexName") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING)
.transientLog(latest => latest.copy(state = DELETING))
.finalLog(latest => latest.copy(state = DELETED))
Expand All @@ -259,16 +240,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
stopRefreshingJob(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to delete Flint index", e)
throw new IllegalStateException("Failed to delete Flint index")
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
}

/**
* Delete a Flint index physically.
Expand All @@ -278,81 +254,66 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def vacuumIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Vacuum Flint index $indexName") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
}

/**
* Recover index job.
*
* @param indexName
* index name
*/
def recoverIndex(indexName: String): Boolean = {
logInfo(s"Recovering Flint index $indexName")
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def recoverIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Recover Flint index $indexName") { tx =>
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
tx
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ =>
.commit(_ => {
FlintSparkIndexRefresh
.create(indexName, index.get)
.start(spark, flintSparkConf))

logInfo("Recovery complete")
true
} catch {
case e: Exception =>
logError("Failed to recover Flint index", e)
throw new IllegalStateException("Failed to recover Flint index")
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
.start(spark, flintSparkConf)
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
}
false
}
}

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down Expand Up @@ -431,11 +392,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
private def updateIndexAutoToManual(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest =>
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
Expand All @@ -449,12 +411,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
})
}

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
private def updateIndexManualToAuto(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest =>
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
Expand Down
Loading
Loading