Skip to content

Commit

Permalink
Extract transaction template
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 20, 2024
1 parent 9de4f28 commit efe3046
Showing 1 changed file with 90 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints}
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
Expand Down Expand Up @@ -95,18 +96,19 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
logInfo(s"Creating Flint index $index with ignoreIfExists $ignoreIfExists")
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
try {
flintClient
.startTransaction(indexName, dataSourceName, true)
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit =
withTransaction[Unit](
index.name(),
s"Create Flint index $index with ignoreIfExists $ignoreIfExists",
forceInit = true) { tx =>
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
tx
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
Expand All @@ -117,14 +119,8 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
logInfo("Create index complete")
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
throw new IllegalStateException("Failed to create Flint index")
}
}
}

/**
* Start refreshing index data according to the given mode.
Expand All @@ -134,15 +130,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] = {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
def refreshIndex(indexName: String): Option[String] =
withTransaction[Option[String]](indexName, s"Refresh Flint index $indexName") { tx =>
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)

try {
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest =>
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
Expand All @@ -158,12 +152,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
throw new IllegalStateException("Failed to refresh Flint index")
}
}

/**
* Describe all Flint indexes whose name matches the given pattern.
Expand Down Expand Up @@ -213,28 +202,22 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] = {
logInfo(s"Updating Flint index $index")
val indexName = index.name
def updateIndex(index: FlintSparkIndex): Option[String] =
withTransaction[Option[String]](index.name(), s"Update Flint index $index") { tx =>
val indexName = index.name

validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)
validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
.options,
index.options)

try {
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
}

/**
* Delete index and refreshing job associated.
Expand All @@ -244,12 +227,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def deleteIndex(indexName: String): Boolean = {
logInfo(s"Deleting Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def deleteIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Delete Flint index $indexName") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING)
.transientLog(latest => latest.copy(state = DELETING))
.finalLog(latest => latest.copy(state = DELETED))
Expand All @@ -259,16 +240,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {
stopRefreshingJob(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to delete Flint index", e)
throw new IllegalStateException("Failed to delete Flint index")
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
} else {
logInfo("Flint index to be deleted doesn't exist")
false
}
}

/**
* Delete a Flint index physically.
Expand All @@ -278,81 +254,66 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def vacuumIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Vacuum Flint index $indexName") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == DELETED)
.transientLog(latest => latest.copy(state = VACUUMING))
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
} else {
logInfo("Flint index to vacuum doesn't exist")
false
}
}

/**
* Recover index job.
*
* @param indexName
* index name
*/
def recoverIndex(indexName: String): Boolean = {
logInfo(s"Recovering Flint index $indexName")
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
try {
flintClient
.startTransaction(indexName, dataSourceName)
def recoverIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Recover Flint index $indexName") { tx =>
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
tx
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ =>
.commit(_ => {
FlintSparkIndexRefresh
.create(indexName, index.get)
.start(spark, flintSparkConf))

logInfo("Recovery complete")
true
} catch {
case e: Exception =>
logError("Failed to recover Flint index", e)
throw new IllegalStateException("Failed to recover Flint index")
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => {})
.start(spark, flintSparkConf)
true
})
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
if (index.isEmpty) {
/*
* If execution reaches this point, it indicates that the Flint index is corrupted.
* In such cases, clean up the metadata log, as the index data no longer exists.
* There is a very small possibility that users may recreate the index in the
* interim, but metadata log get deleted by this cleanup process.
*/
logWarning("Cleaning up metadata log as index data has been deleted")
tx
.initialLog(_ => true)
.finalLog(_ => NO_LOG_ENTRY)
.commit(_ => { false })
} else {
false
}
}
false
}
}

/**
* Build data frame for querying the given index. This is mostly for unit test convenience.
Expand Down Expand Up @@ -470,4 +431,26 @@ class FlintSpark(val spark: SparkSession) extends Logging {
indexRefresh.start(spark, flintSparkConf)
})
}

private def withTransaction[T](
indexName: String,
operationName: String,
forceInit: Boolean = false)(opBlock: OptimisticTransaction[T] => T): T = {
logInfo(s"Starting index operation [$operationName] with forceInit=$forceInit")
try {
val tx: OptimisticTransaction[T] =
flintClient.startTransaction(indexName, dataSourceName, forceInit)

val result = opBlock(tx)
logInfo(s"Index operation [$operationName] complete")
result
} catch {
case e: IllegalStateException =>
logError(s"Failed to execute index operation [$operationName]", e)
throw e // TODO: throw inner illegal state exception directly?
case e: Exception =>
logError(s"Failed to execute index operation [$operationName]", e)
throw new IllegalStateException(e)
}
}
}

0 comments on commit efe3046

Please sign in to comment.