diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 9cd5f60a7..6c2c6f087 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -104,7 +104,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { } } else { val metadata = index.metadata() - try { + handleExceptions("Failed to create Flint index") { flintClient .startTransaction(indexName, dataSourceName, true) .initialLog(latest => latest.state == EMPTY || latest.state == DELETED) @@ -118,10 +118,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) }) logInfo("Create index complete") - } catch { - case e: Exception => - logError("Failed to create Flint index", e) - throw new IllegalStateException("Failed to create Flint index") } } } @@ -140,7 +136,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { .getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist")) val indexRefresh = FlintSparkIndexRefresh.create(indexName, index) - try { + handleExceptions("Failed to refresh Flint index") { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == ACTIVE) @@ -158,10 +154,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { } }) .commit(_ => indexRefresh.start(spark, flintSparkConf)) - } catch { - case e: Exception => - logError("Failed to refresh Flint index", e) - throw new IllegalStateException("Failed to refresh Flint index") } } @@ -223,16 +215,12 @@ class FlintSpark(val spark: SparkSession) extends Logging { .options, index.options) - try { + handleExceptions("Failed to update Flint index") { // Relies on validation to forbid auto-to-auto and manual-to-manual updates index.options.autoRefresh() match { case true => updateIndexManualToAuto(index) case false => updateIndexAutoToManual(index) } - } catch { - case e: Exception => - logError("Failed to update Flint index", e) - throw new IllegalStateException("Failed to update Flint index") } } @@ -247,7 +235,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { def deleteIndex(indexName: String): Boolean = { logInfo(s"Deleting Flint index $indexName") if (flintClient.exists(indexName)) { - try { + handleExceptions("Failed to delete Flint index") { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING) @@ -259,10 +247,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { stopRefreshingJob(indexName) true }) - } catch { - case e: Exception => - logError("Failed to delete Flint index", e) - throw new IllegalStateException("Failed to delete Flint index") } } else { logInfo("Flint index to be deleted doesn't exist") @@ -281,7 +265,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { def vacuumIndex(indexName: String): Boolean = { logInfo(s"Vacuuming Flint index $indexName") if (flintClient.exists(indexName)) { - try { + handleExceptions("Failed to vacuum Flint index") { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => latest.state == DELETED) @@ -291,10 +275,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { flintClient.deleteIndex(indexName) true }) - } catch { - case e: Exception => - logError("Failed to vacuum Flint index", e) - throw new IllegalStateException("Failed to vacuum Flint index") } } else { logInfo("Flint index to vacuum doesn't exist") @@ -312,7 +292,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo(s"Recovering Flint index $indexName") val index = describeIndex(indexName) if (index.exists(_.options.autoRefresh())) { - try { + handleExceptions("Failed to vacuum Flint index") { flintClient .startTransaction(indexName, dataSourceName) .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) @@ -329,10 +309,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { logInfo("Recovery complete") true - } catch { - case e: Exception => - logError("Failed to recover Flint index", e) - throw new IllegalStateException("Failed to recover Flint index") } } else { logInfo("Index to be recovered either doesn't exist or not auto refreshed") @@ -470,4 +446,14 @@ class FlintSpark(val spark: SparkSession) extends Logging { indexRefresh.start(spark, flintSparkConf) }) } + + private def handleExceptions[T](message: String)(operation: => T): T = { + try { + operation + } catch { + case e: Exception => + logError(message, e) + throw new IllegalStateException(message, e) + } + } }