From 976ff622bccbe9477063b7d02de9d3b8ea989c97 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 20 May 2024 16:11:25 -0700 Subject: [PATCH] Handle runtime and other exception in new withTx Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSpark.scala | 51 ++++++++++--------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 39a476491..696a66749 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -96,12 +96,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @param ignoreIfExists * Ignore existing index */ - def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = - withTransaction[Unit]( - index.name(), - s"Create Flint index $index with ignoreIfExists $ignoreIfExists", - forceInit = true) { tx => - val indexName = index.name() + def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { + val indexName = index.name() + val opName = s"Create Flint index $indexName with ignoreIfExists $ignoreIfExists" + withTransaction[Unit](indexName, opName, forceInit = true) { tx => if (flintClient.exists(indexName)) { if (!ignoreIfExists) { throw new IllegalStateException(s"Flint index $indexName already exists") @@ -121,6 +119,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { }) } } + } /** * Start refreshing index data according to the given mode. @@ -202,10 +201,9 @@ class FlintSpark(val spark: SparkSession) extends Logging { * @return * refreshing job ID (empty if no job) */ - def updateIndex(index: FlintSparkIndex): Option[String] = - withTransaction[Option[String]](index.name(), s"Update Flint index $index") { tx => - val indexName = index.name - + def updateIndex(index: FlintSparkIndex): Option[String] = { + val indexName = index.name() + withTransaction[Option[String]](indexName, s"Update Flint index $indexName") { tx => validateUpdateAllowed( describeIndex(indexName) .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) @@ -214,10 +212,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { // Relies on validation to forbid auto-to-auto and manual-to-manual updates index.options.autoRefresh() match { - case true => updateIndexManualToAuto(index) - case false => updateIndexAutoToManual(index) + case true => updateIndexManualToAuto(index, tx) + case false => updateIndexAutoToManual(index, tx) } } + } /** * Delete index and refreshing job associated. @@ -392,11 +391,12 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } - private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = { + private def updateIndexAutoToManual( + index: FlintSparkIndex, + tx: OptimisticTransaction[Option[String]]): Option[String] = { val indexName = index.name val indexLogEntry = index.latestLogEntry.get - flintClient - .startTransaction(indexName, dataSourceName) + tx .initialLog(latest => latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => latest.copy(state = UPDATING)) @@ -410,12 +410,13 @@ class FlintSpark(val spark: SparkSession) extends Logging { }) } - private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = { + private def updateIndexManualToAuto( + index: FlintSparkIndex, + tx: OptimisticTransaction[Option[String]]): Option[String] = { val indexName = index.name val indexLogEntry = index.latestLogEntry.get val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) - flintClient - .startTransaction(indexName, dataSourceName) + tx .initialLog(latest => latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm) .transientLog(latest => @@ -445,12 +446,16 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Index operation [$operationName] complete") result } catch { - case e: IllegalStateException => - logError(s"Failed to execute index operation [$operationName]", e) - throw e // TODO: throw inner illegal state exception directly? case e: Exception => - logError(s"Failed to execute index operation [$operationName]", e) - throw new IllegalStateException(e) + val detailedMessage = + s"Failed to execute index operation [$operationName] caused by ${e.getMessage}" + logError(detailedMessage, e) + + // Re-throw directly if runtime exception or wrap it + e match { + case re: RuntimeException => throw re + case _ => throw new IllegalStateException(detailedMessage, e) + } } } }