Skip to content

Commit

Permalink
Handle runtime and other exception in new withTx
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 20, 2024
1 parent efe3046 commit 976ff62
Showing 1 changed file with 28 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit =
withTransaction[Unit](
index.name(),
s"Create Flint index $index with ignoreIfExists $ignoreIfExists",
forceInit = true) { tx =>
val indexName = index.name()
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
val opName = s"Create Flint index $indexName with ignoreIfExists $ignoreIfExists"
withTransaction[Unit](indexName, opName, forceInit = true) { tx =>
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
Expand All @@ -121,6 +119,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
})
}
}
}

/**
* Start refreshing index data according to the given mode.
Expand Down Expand Up @@ -202,10 +201,9 @@ class FlintSpark(val spark: SparkSession) extends Logging {
* @return
* refreshing job ID (empty if no job)
*/
def updateIndex(index: FlintSparkIndex): Option[String] =
withTransaction[Option[String]](index.name(), s"Update Flint index $index") { tx =>
val indexName = index.name

def updateIndex(index: FlintSparkIndex): Option[String] = {
val indexName = index.name()
withTransaction[Option[String]](indexName, s"Update Flint index $indexName") { tx =>
validateUpdateAllowed(
describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
Expand All @@ -214,10 +212,11 @@ class FlintSpark(val spark: SparkSession) extends Logging {

// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index)
case false => updateIndexAutoToManual(index)
case true => updateIndexManualToAuto(index, tx)
case false => updateIndexAutoToManual(index, tx)
}
}
}

/**
* Delete index and refreshing job associated.
Expand Down Expand Up @@ -392,11 +391,12 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}
}

private def updateIndexAutoToManual(index: FlintSparkIndex): Option[String] = {
private def updateIndexAutoToManual(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest =>
latest.state == REFRESHING && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest => latest.copy(state = UPDATING))
Expand All @@ -410,12 +410,13 @@ class FlintSpark(val spark: SparkSession) extends Logging {
})
}

private def updateIndexManualToAuto(index: FlintSparkIndex): Option[String] = {
private def updateIndexManualToAuto(
index: FlintSparkIndex,
tx: OptimisticTransaction[Option[String]]): Option[String] = {
val indexName = index.name
val indexLogEntry = index.latestLogEntry.get
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
flintClient
.startTransaction(indexName, dataSourceName)
tx
.initialLog(latest =>
latest.state == ACTIVE && latest.seqNo == indexLogEntry.seqNo && latest.primaryTerm == indexLogEntry.primaryTerm)
.transientLog(latest =>
Expand Down Expand Up @@ -445,12 +446,16 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Index operation [$operationName] complete")
result
} catch {
case e: IllegalStateException =>
logError(s"Failed to execute index operation [$operationName]", e)
throw e // TODO: throw inner illegal state exception directly?
case e: Exception =>
logError(s"Failed to execute index operation [$operationName]", e)
throw new IllegalStateException(e)
val detailedMessage =
s"Failed to execute index operation [$operationName] caused by ${e.getMessage}"
logError(detailedMessage, e)

// Re-throw directly if runtime exception or wrap it
e match {
case re: RuntimeException => throw re
case _ => throw new IllegalStateException(detailedMessage, e)
}
}
}
}

0 comments on commit 976ff62

Please sign in to comment.