Skip to content

Commit

Permalink
Extract error handler
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 20, 2024
1 parent 9de4f28 commit 0f0720d
Showing 1 changed file with 16 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
} else {
val metadata = index.metadata()
try {
handleExceptions("Failed to create Flint index") {
flintClient
.startTransaction(indexName, dataSourceName, true)
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
Expand All @@ -118,10 +118,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
})
logInfo("Create index complete")
} catch {
case e: Exception =>
logError("Failed to create Flint index", e)
throw new IllegalStateException("Failed to create Flint index")
}
}
}
Expand All @@ -140,7 +136,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)

try {
handleExceptions("Failed to refresh Flint index") {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE)
Expand All @@ -158,10 +154,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
})
.commit(_ => indexRefresh.start(spark, flintSparkConf))
} catch {
case e: Exception =>
logError("Failed to refresh Flint index", e)
throw new IllegalStateException("Failed to refresh Flint index")
}
}

Expand Down Expand Up @@ -223,16 +215,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
.options,
index.options)

try {
handleExceptions("Failed to update Flint index") {
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
}
} catch {
case e: Exception =>
logError("Failed to update Flint index", e)
throw new IllegalStateException("Failed to update Flint index")
}
}

Expand All @@ -247,7 +235,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
def deleteIndex(indexName: String): Boolean = {
logInfo(s"Deleting Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
handleExceptions("Failed to delete Flint index") {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE || latest.state == REFRESHING)
Expand All @@ -259,10 +247,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
stopRefreshingJob(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to delete Flint index", e)
throw new IllegalStateException("Failed to delete Flint index")
}
} else {
logInfo("Flint index to be deleted doesn't exist")
Expand All @@ -281,7 +265,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
def vacuumIndex(indexName: String): Boolean = {
logInfo(s"Vacuuming Flint index $indexName")
if (flintClient.exists(indexName)) {
try {
handleExceptions("Failed to vacuum Flint index") {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == DELETED)
Expand All @@ -291,10 +275,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient.deleteIndex(indexName)
true
})
} catch {
case e: Exception =>
logError("Failed to vacuum Flint index", e)
throw new IllegalStateException("Failed to vacuum Flint index")
}
} else {
logInfo("Flint index to vacuum doesn't exist")
Expand All @@ -312,7 +292,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Recovering Flint index $indexName")
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
try {
handleExceptions("Failed to vacuum Flint index") {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
Expand All @@ -329,10 +309,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {

logInfo("Recovery complete")
true
} catch {
case e: Exception =>
logError("Failed to recover Flint index", e)
throw new IllegalStateException("Failed to recover Flint index")
}
} else {
logInfo("Index to be recovered either doesn't exist or not auto refreshed")
Expand Down Expand Up @@ -470,4 +446,14 @@ class FlintSpark(val spark: SparkSession) extends Logging {
indexRefresh.start(spark, flintSparkConf)
})
}

private def handleExceptions[T](message: String)(operation: => T): T = {
try {
operation
} catch {
case e: Exception =>
logError(message, e)
throw new IllegalStateException(message, e)
}
}
}

0 comments on commit 0f0720d

Please sign in to comment.