diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java index 1992e173d..bfe99c0cd 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/DefaultOptimisticTransaction.java @@ -85,7 +85,7 @@ public T commit(Function operation) { if (!initialCondition.test(latest)) { LOG.warning("Initial log entry doesn't satisfy precondition " + latest); throw new IllegalStateException( - "Transaction failed due to initial log precondition not satisfied"); + String.format("Index state [%s] doesn't satisfy precondition", latest.state())); } // Append optional transient log @@ -126,7 +126,7 @@ public T commit(Function operation) { } catch (Exception ex) { LOG.log(WARNING, "Failed to rollback transient log", ex); } - throw new IllegalStateException("Failed to commit transaction operation"); + throw new IllegalStateException("Failed to commit transaction operation", e); } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 0ab24032d..b2e310cc7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -11,6 +11,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.opensearch.flint.common.metadata.log.OptimisticTransaction import org.opensearch.flint.common.metadata.log.OptimisticTransaction.NO_LOG_ENTRY import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} import org.opensearch.flint.core.metadata.FlintMetadata @@ -34,7 +35,7 @@ import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGN /** * Flint Spark integration API entrypoint. */ -class FlintSpark(val spark: SparkSession) extends Logging { +class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport with Logging { /** Flint spark configuration */ private val flintSparkConf: FlintSparkConf = @@ -46,7 +47,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { /** Flint client for low-level index operation */ private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) - private val flintMetadataLogService: FlintMetadataLogService = { + override protected val flintMetadataLogService: FlintMetadataLogService = { FlintMetadataLogServiceBuilder.build( flintSparkConf.flintOptions(), spark.sparkContext.getConf) @@ -97,18 +98,16 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @param ignoreIfExists * Ignore existing index */ - def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { - logInfo(s"Creating Flint index $index with ignoreIfExists $ignoreIfExists") - val indexName = index.name() - if (flintClient.exists(indexName)) { - if (!ignoreIfExists) { - throw new IllegalStateException(s"Flint index $indexName already exists") - } - } else { - val metadata = index.metadata() - try { - flintMetadataLogService - .startTransaction(indexName, true) + def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = + withTransaction[Unit](index.name(), "Create Flint index", forceInit = true) { tx => + val indexName = index.name() + if (flintClient.exists(indexName)) { + if (!ignoreIfExists) { + throw new IllegalStateException(s"Flint index $indexName already exists") + } + } else { + val metadata = index.metadata() + tx .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) .transientLog(latest => latest.copy(state = CREATING)) .finalLog(latest => latest.copy(state = ACTIVE)) @@ -119,14 +118,8 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Creating index with metadata log entry ID ${latest.id}") flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) }) - logInfo("Create index complete") - } catch { - case e: Exception => - logError("Failed to create Flint index", e) - throw new IllegalStateException("Failed to create Flint index") } } - } /** * Start refreshing index data according to the given mode. @@ -136,15 +129,12 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * refreshing job ID (empty if batch job for now) */ - def refreshIndex(indexName: String): Option[String] = { - logInfo(s"Refreshing Flint index $indexName") - val index = describeIndex(indexName) - .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) - val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) - - try { - flintMetadataLogService - .startTransaction(indexName) + def refreshIndex(indexName: String): Option[String] = + withTransaction[Option[String]](indexName, "Refresh Flint index") { tx => + val index = describeIndex(indexName) + .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) + val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) + tx .initialLog(latest => latest.state == ACTIVE) .transientLog(latest => latest.copy(state = REFRESHING, createTime = System.currentTimeMillis())) @@ -160,12 +150,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { } }) .commit(_ => indexRefresh.start(spark, flintSparkConf)) - } catch { - case e: Exception => - logError("Failed to refresh Flint index", e) - throw new IllegalStateException("Failed to refresh Flint index") } - } /** * Describe all Flint indexes whose name matches the given pattern. @@ -221,25 +206,19 @@ class FlintSpark(val spark: SparkSession) extends Logging { * refreshing job ID (empty if no job) */ def updateIndex(index: FlintSparkIndex): Option[String] = { - logInfo(s"Updating Flint index $index") - val indexName = index.name - + val indexName = index.name() validateUpdateAllowed( describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) .options, index.options) - try { + withTransaction[Option[String]](indexName, "Update Flint index") { tx => // Relies on validation to forbid auto-to-auto and manual-to-manual updates index.options.autoRefresh() match { - case true => updateIndexManualToAuto(index) - case false => updateIndexAutoToManual(index) + case true => updateIndexManualToAuto(index, tx) + case false => updateIndexAutoToManual(index, tx) } - } catch { - case e: Exception => - logError("Failed to update Flint index", e) - throw new IllegalStateException("Failed to update Flint index") } } @@ -251,12 +230,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * true if exist and deleted, otherwise false */ - def deleteIndex(indexName: String): Boolean = { - logInfo(s"Deleting Flint index $indexName") - if (flintClient.exists(indexName)) { - try { - flintMetadataLogService - .startTransaction(indexName) + def deleteIndex(indexName: String): Boolean = + withTransaction[Boolean](indexName, "Delete Flint index") { tx => + if (flintClient.exists(indexName)) { + tx .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING || latest.state == FAILED) .transientLog(latest => latest.copy(state = DELETING)) @@ -267,16 +244,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { stopRefreshingJob(indexName) true }) - } catch { - case e: Exception => - logError("Failed to delete Flint index", e) - throw new IllegalStateException("Failed to delete Flint index") + } else { + logInfo("Flint index to be deleted doesn't exist") + false } - } else { - logInfo("Flint index to be deleted doesn't exist") - false } - } /** * Delete a Flint index physically. @@ -286,12 +258,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * true if exist and deleted, otherwise false */ - def vacuumIndex(indexName: String): Boolean = { - logInfo(s"Vacuuming Flint index $indexName") - if (flintClient.exists(indexName)) { - try { - flintMetadataLogService - .startTransaction(indexName) + def vacuumIndex(indexName: String): Boolean = + withTransaction[Boolean](indexName, "Vacuum Flint index") { tx => + if (flintClient.exists(indexName)) { + tx .initialLog(latest => latest.state == DELETED) .transientLog(latest => latest.copy(state = VACUUMING)) .finalLog(_ => NO_LOG_ENTRY) @@ -299,16 +269,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient.deleteIndex(indexName) true }) - } catch { - case e: Exception => - logError("Failed to vacuum Flint index", e) - throw new IllegalStateException("Failed to vacuum Flint index") + } else { + logInfo("Flint index to vacuum doesn't exist") + false } - } else { - logInfo("Flint index to vacuum doesn't exist") - false } - } /** * Recover index job. @@ -316,13 +281,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @param indexName * index name */ - def recoverIndex(indexName: String): Boolean = { - logInfo(s"Recovering Flint index $indexName") - val index = describeIndex(indexName) - if (index.exists(_.options.autoRefresh())) { - try { - flintMetadataLogService - .startTransaction(indexName) + def recoverIndex(indexName: String): Boolean = + withTransaction[Boolean](indexName, "Recover Flint index") { tx => + val index = describeIndex(indexName) + if (index.exists(_.options.autoRefresh())) { + tx .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) .transientLog(latest => latest.copy(state = RECOVERING, createTime = System.currentTimeMillis())) @@ -330,37 +293,31 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintIndexMonitor.startMonitor(indexName) latest.copy(state = REFRESHING) }) - .commit(_ => + .commit(_ => { FlintSparkIndexRefresh .create(indexName, index.get) - .start(spark, flintSparkConf)) - - logInfo("Recovery complete") - true - } catch { - case e: Exception => - logError("Failed to recover Flint index", e) - throw new IllegalStateException("Failed to recover Flint index") - } - } else { - logInfo("Index to be recovered either doesn't exist or not auto refreshed") - if (index.isEmpty) { - /* - * If execution reaches this point, it indicates that the Flint index is corrupted. - * In such cases, clean up the metadata log, as the index data no longer exists. - * There is a very small possibility that users may recreate the index in the - * interim, but metadata log get deleted by this cleanup process. - */ - logWarning("Cleaning up metadata log as index data has been deleted") - flintMetadataLogService - .startTransaction(indexName) - .initialLog(_ => true) - .finalLog(_ => NO_LOG_ENTRY) - .commit(_ => {}) + .start(spark, flintSparkConf) + true + }) + } else { + logInfo("Index to be recovered either doesn't exist or not auto refreshed") + if (index.isEmpty) { + /* + * If execution reaches this point, it indicates that the Flint index is corrupted. + * In such cases, clean up the metadata log, as the index data no longer exists. + * There is a very small possibility that users may recreate the index in the + * interim, but metadata log get deleted by this cleanup process. + */ + logWarning("Cleaning up metadata log as index data has been deleted") + tx + .initialLog(_ => true) + .finalLog(_ => NO_LOG_ENTRY) + .commit(_ => { false }) + } else { + false + } } - false } - } /** * Build data frame for querying the given index. This is mostly for unit test convenience. @@ -460,11 +417,12 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } - private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = { + private def updateIndexAutoToManual( + index: FlintSparkIndex, + tx: OptimisticTransaction[Option[String]]): Option[String] = { val indexName = index.name val indexLogEntry = index.latestLogEntry.get - flintMetadataLogService - .startTransaction(indexName) + tx .initialLog(latest => latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => latest.copy(state = UPDATING)) @@ -478,12 +436,13 @@ class FlintSpark(val spark: SparkSession) extends Logging { }) } - private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = { + private def updateIndexManualToAuto( + index: FlintSparkIndex, + tx: OptimisticTransaction[Option[String]]): Option[String] = { val indexName = index.name val indexLogEntry = index.latestLogEntry.get val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) - flintMetadataLogService - .startTransaction(indexName) + tx .initialLog(latest => latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala new file mode 100644 index 000000000..045c527c5 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkTransactionSupport.scala @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.opensearch.flint.common.metadata.log.{FlintMetadataLogService, OptimisticTransaction} + +import org.apache.spark.internal.Logging + +/** + * Provides transaction support with proper error handling and logging capabilities. + * + * @note + * This trait requires the mixing class to extend Spark's `Logging` to utilize its logging + * functionalities. Meanwhile it needs to provide `FlintClient` and data source name so this + * trait can help create transaction context. + */ +trait FlintSparkTransactionSupport { self: Logging => + + /** Flint metadata log service defined in the mixing class */ + protected def flintMetadataLogService: FlintMetadataLogService + + /** + * Executes a block of code within a transaction context, handling and logging errors + * appropriately. This method logs the start and completion of the transaction and captures any + * exceptions that occur, enriching them with detailed error messages before re-throwing. + * + * @param indexName + * the name of the index on which the operation is performed + * @param opName + * the name of the operation, used for logging + * @param forceInit + * a boolean flag indicating whether to force the initialization of the metadata log + * @param opBlock + * the operation block to execute within the transaction context, which takes an + * `OptimisticTransaction` and returns a value of type `T` + * @tparam T + * the type of the result produced by the operation block + * @return + * the result of the operation block + */ + def withTransaction[T](indexName: String, opName: String, forceInit: Boolean = false)( + opBlock: OptimisticTransaction[T] => T): T = { + logInfo(s"Starting index operation [$opName $indexName] with forceInit=$forceInit") + try { + // Create transaction (only have side effect if forceInit is true) + val tx: OptimisticTransaction[T] = + flintMetadataLogService.startTransaction(indexName, forceInit) + + val result = opBlock(tx) + logInfo(s"Index operation [$opName $indexName] complete") + result + } catch { + case e: Exception => + logError(s"Failed to execute index operation [$opName $indexName]", e) + + // Rethrowing the original exception for high level logic to handle + throw e + } + } +} diff --git a/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala new file mode 100644 index 000000000..7646dde60 --- /dev/null +++ b/flint-spark-integration/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionSupportSuite.scala @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.mockito.Mockito.{times, verify} +import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.FlintSuite +import org.apache.spark.internal.Logging + +class FlintSparkTransactionSupportSuite extends FlintSuite with Matchers { + + private val mockFlintMetadataLogService: FlintMetadataLogService = mock[FlintMetadataLogService] + private val testIndex = "test_index" + private val testOpName = "test operation" + + /** Creating a fake FlintSparkTransactionSupport subclass for test */ + private val transactionSupport = new FlintSparkTransactionSupport with Logging { + override protected def flintMetadataLogService: FlintMetadataLogService = + mockFlintMetadataLogService + } + + test("with transaction without force initialization") { + transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ => } + + verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, false) + } + + test("with transaction with force initialization") { + transactionSupport.withTransaction[Unit](testIndex, testOpName, forceInit = true) { _ => } + + verify(mockFlintMetadataLogService, times(1)).startTransaction(testIndex, true) + } + + test("should throw original exception") { + the[RuntimeException] thrownBy { + transactionSupport.withTransaction[Unit](testIndex, testOpName) { _ => + val rootCause = new IllegalArgumentException("Fake root cause") + val cause = new RuntimeException("Fake cause", rootCause) + throw cause + } + } should have message "Fake cause" + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index b2d489c81..741db4bd1 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -177,6 +177,19 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match RequestOptions.DEFAULT) shouldBe false } + test("should fail to vacuum index if index is not logically deleted") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year", "month") + .create() + + the[IllegalStateException] thrownBy { + flint.vacuumIndex(testFlintIndex) + } should have message + s"Index state [active] doesn't satisfy precondition" + } + test("should not recreate index if index data still exists") { flint .skippingIndex() @@ -195,7 +208,8 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .onTable(testTable) .addValueSet("name") .create() - } should have message s"Flint index $testFlintIndex already exists" + } should have message + s"Flint index $testFlintIndex already exists" } test("should clean up metadata log entry if index data has been deleted") {