diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9cd5f60a7..39a476491 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.core.metadata.log.OptimisticTransaction import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._ @@ -95,18 +96,19 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @param ignoreIfExists * Ignore existing index */ - def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { - logInfo(s"Creating Flint index $index with ignoreIfExists $ignoreIfExists") - val indexName = index.name() - if (flintClient.exists(indexName)) { - if (!ignoreIfExists) { - throw new IllegalStateException(s"Flint index $indexName already exists") - } - } else { - val metadata = index.metadata() - try { - flintClient - .startTransaction(indexName, dataSourceName, true) + def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = + withTransaction[Unit]( + index.name(), + s"Create Flint index $index with ignoreIfExists $ignoreIfExists", + forceInit = true) { tx => + val indexName = index.name() + if (flintClient.exists(indexName)) { + if (!ignoreIfExists) { + throw new IllegalStateException(s"Flint index $indexName already exists") + } + } else { + val metadata = index.metadata() + tx .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) @@ -117,14 +119,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Creating index with metadata log entry ID ${latest.id}") flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) }) - logInfo("Create index complete") - } catch { - case e: Exception => - logError("Failed to create Flint index", e) - throw new IllegalStateException("Failed to create Flint index") } } - } /** * Start refreshing index data according to the given mode. @@ -134,15 +130,13 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * refreshing job ID (empty if batch job for now) */ - def refreshIndex(indexName: String): Option[String] = { - logInfo(s"Refreshing Flint index $indexName") - val index = describeIndex(indexName) - .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) + def refreshIndex(indexName: String): Option[String] = + withTransaction[Option[String]](indexName, s"Refresh Flint index $indexName") { tx => + val index = describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) - try { - flintClient - .startTransaction(indexName, dataSourceName) + tx .initialLog(latest => latest.state == ACTIVE) .transientLog(latest => latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) @@ -158,12 +152,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { } }) .commit(_ => indexRefresh.start(spark, flintSparkConf)) - } catch { - case e: Exception => - logError("Failed to refresh Flint index", e) - throw new IllegalStateException("Failed to refresh Flint index") } - } /** * Describe all Flint indexes whose name matches the given pattern. @@ -213,28 +202,22 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * refreshing job ID (empty if no job) */ - def updateIndex(index: FlintSparkIndex): Option[String] = { - logInfo(s"Updating Flint index $index") - val indexName = index.name + def updateIndex(index: FlintSparkIndex): Option[String] = + withTransaction[Option[String]](index.name(), s"Update Flint index $index") { tx => + val indexName = index.name - validateUpdateAllowed( - describeIndex(indexName) - .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - .options, - index.options) + validateUpdateAllowed( + describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + .options, + index.options) - try { // Relies on validation to forbid auto-to-auto and manual-to-manual updates index.options.autoRefresh() match { case true => updateIndexManualToAuto(index) case false => updateIndexAutoToManual(index) } - } catch { - case e: Exception => - logError("Failed to update Flint index", e) - throw new IllegalStateException("Failed to update Flint index") } - } /** * Delete index and refreshing job associated. @@ -244,12 +227,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * true if exist and deleted, otherwise false */ - def deleteIndex(indexName: String): Boolean = { - logInfo(s"Deleting Flint index $indexName") - if (flintClient.exists(indexName)) { - try { - flintClient - .startTransaction(indexName, dataSourceName) + def deleteIndex(indexName: String): Boolean = + withTransaction[Boolean](indexName, s"Delete Flint index $indexName") { tx => + if (flintClient.exists(indexName)) { + tx .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING) .transientLog(latest => latest.copy(state = DELETING)) .finalLog(latest => latest.copy(state = DELETED)) @@ -259,16 +240,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { stopRefreshingJob(indexName) true }) - } catch { - case e: Exception => - logError("Failed to delete Flint index", e) - throw new IllegalStateException("Failed to delete Flint index") + } else { + logInfo("Flint index to be deleted doesn't exist") + false } - } else { - logInfo("Flint index to be deleted doesn't exist") - false } - } /** * Delete a Flint index physically. @@ -278,12 +254,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * true if exist and deleted, otherwise false */ - def vacuumIndex(indexName: String): Boolean = { - logInfo(s"Vacuuming Flint index $indexName") - if (flintClient.exists(indexName)) { - try { - flintClient - .startTransaction(indexName, dataSourceName) + def vacuumIndex(indexName: String): Boolean = + withTransaction[Boolean](indexName, s"Vacuum Flint index $indexName") { tx => + if (flintClient.exists(indexName)) { + tx .initialLog(latest => latest.state == DELETED) .transientLog(latest => latest.copy(state = VACUUMING)) .finalLog(_ => NO_LOG_ENTRY) @@ -291,16 +265,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient.deleteIndex(indexName) true }) - } catch { - case e: Exception => - logError("Failed to vacuum Flint index", e) - throw new IllegalStateException("Failed to vacuum Flint index") + } else { + logInfo("Flint index to vacuum doesn't exist") + false } - } else { - logInfo("Flint index to vacuum doesn't exist") - false } - } /** * Recover index job. @@ -308,13 +277,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @param indexName * index name */ - def recoverIndex(indexName: String): Boolean = { - logInfo(s"Recovering Flint index $indexName") - val index = describeIndex(indexName) - if (index.exists(_.options.autoRefresh())) { - try { - flintClient - .startTransaction(indexName, dataSourceName) + def recoverIndex(indexName: String): Boolean = + withTransaction[Boolean](indexName, s"Recover Flint index $indexName") { tx => + val index = describeIndex(indexName) + if (index.exists(_.options.autoRefresh())) { + tx .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) .transientLog(latest => latest.copy(state = RECOVERING, createTime = System.currentTimeMillis())) @@ -322,37 +289,31 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintIndexMonitor.startMonitor(indexName) latest.copy(state = REFRESHING) }) - .commit(_ => + .commit(_ => { FlintSparkIndexRefresh .create(indexName, index.get) - .start(spark, flintSparkConf)) - - logInfo("Recovery complete") - true - } catch { - case e: Exception => - logError("Failed to recover Flint index", e) - throw new IllegalStateException("Failed to recover Flint index") - } - } else { - logInfo("Index to be recovered either doesn't exist or not auto refreshed") - if (index.isEmpty) { - /* - * If execution reaches this point, it indicates that the Flint index is corrupted. - * In such cases, clean up the metadata log, as the index data no longer exists. - * There is a very small possibility that users may recreate the index in the - * interim, but metadata log get deleted by this cleanup process. - */ - logWarning("Cleaning up metadata log as index data has been deleted") - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(_ => true) - .finalLog(_ => NO_LOG_ENTRY) - .commit(_ => {}) + .start(spark, flintSparkConf) + true + }) + } else { + logInfo("Index to be recovered either doesn't exist or not auto refreshed") + if (index.isEmpty) { + /* + * If execution reaches this point, it indicates that the Flint index is corrupted. + * In such cases, clean up the metadata log, as the index data no longer exists. + * There is a very small possibility that users may recreate the index in the + * interim, but metadata log get deleted by this cleanup process. + */ + logWarning("Cleaning up metadata log as index data has been deleted") + tx + .initialLog(_ => true) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => { false }) + } else { + false + } } - false } - } /** * Build data frame for querying the given index. This is mostly for unit test convenience. @@ -470,4 +431,26 @@ class FlintSpark(val spark: SparkSession) extends Logging { indexRefresh.start(spark, flintSparkConf) }) } + + private def withTransaction[T]( + indexName: String, + operationName: String, + forceInit: Boolean = false)(opBlock: OptimisticTransaction[T] => T): T = { + logInfo(s"Starting index operation [$operationName] with forceInit=$forceInit") + try { + val tx: OptimisticTransaction[T] = + flintClient.startTransaction(indexName, dataSourceName, forceInit) + + val result = opBlock(tx) + logInfo(s"Index operation [$operationName] complete") + result + } catch { + case e: IllegalStateException => + logError(s"Failed to execute index operation [$operationName]", e) + throw e // TODO: throw inner illegal state exception directly? + case e: Exception => + logError(s"Failed to execute index operation [$operationName]", e) + throw new IllegalStateException(e) + } + } }