Skip to content

Commit

Permalink
Refactor index monitor transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 3, 2024
1 parent 5b482a7 commit e578afb
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
val opName = s"Create Flint index $indexName with ignoreIfExists $ignoreIfExists"
val opName = s"Create Flint index with ignoreIfExists $ignoreIfExists"
withTransaction[Unit](indexName, opName, forceInit = true) { tx =>
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
Expand Down Expand Up @@ -131,7 +131,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
* refreshing job ID (empty if batch job for now)
*/
def refreshIndex(indexName: String): Option[String] =
withTransaction[Option[String]](indexName, s"Refresh Flint index $indexName") { tx =>
withTransaction[Option[String]](indexName, "Refresh Flint index") { tx =>
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val indexRefresh = FlintSparkIndexRefresh.create(indexName, index)
Expand Down Expand Up @@ -210,7 +210,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.options,
index.options)

withTransaction[Option[String]](indexName, s"Update Flint index $indexName") { tx =>
withTransaction[Option[String]](indexName, "Update Flint index") { tx =>
// Relies on validation to forbid auto-to-auto and manual-to-manual updates
index.options.autoRefresh() match {
case true => updateIndexManualToAuto(index, tx)
Expand All @@ -228,7 +228,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
* true if exist and deleted, otherwise false
*/
def deleteIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Delete Flint index $indexName") { tx =>
withTransaction[Boolean](indexName, "Delete Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest =>
Expand Down Expand Up @@ -256,7 +256,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
* true if exist and deleted, otherwise false
*/
def vacuumIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Vacuum Flint index $indexName") { tx =>
withTransaction[Boolean](indexName, "Vacuum Flint index") { tx =>
if (flintClient.exists(indexName)) {
tx
.initialLog(latest => latest.state == DELETED)
Expand All @@ -279,7 +279,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
* index name
*/
def recoverIndex(indexName: String): Boolean =
withTransaction[Boolean](indexName, s"Recover Flint index $indexName") { tx =>
withTransaction[Boolean](indexName, "Recover Flint index") { tx =>
val index = describeIndex(indexName)
if (index.exists(_.options.autoRefresh())) {
tx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor
*/
class FlintSparkIndexMonitor(
spark: SparkSession,
flintClient: FlintClient,
dataSourceName: String)
extends Logging {
override val flintClient: FlintClient,
override val dataSourceName: String)
extends FlintSparkTransactionSupport
with Logging {

/** Task execution initial delay in seconds */
private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds()
Expand Down Expand Up @@ -130,11 +131,13 @@ class FlintSparkIndexMonitor(
*/
logError(s"Streaming job $name terminated with exception", e)
retry {
flintClient
.startTransaction(name, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
withTransaction[Unit](name, "Monitor index job") { tx =>
flintClient
.startTransaction(name, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
}
}
}
} else {
Expand All @@ -155,15 +158,15 @@ class FlintSparkIndexMonitor(
private var errorCnt = 0

override def run(): Unit = {
logInfo(s"Scheduler trigger index monitor task for $indexName")
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
withTransaction[Unit](indexName, "Monitor index job") { tx =>
tx
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
}
} else {
logError("Streaming job is not active. Cancelling monitor task")
stopMonitor(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ trait FlintSparkTransactionSupport { self: Logging =>
*/
def withTransaction[T](indexName: String, opName: String, forceInit: Boolean = false)(
opBlock: OptimisticTransaction[T] => T): T = {
logInfo(s"Starting index operation [$opName] with forceInit=$forceInit")
logInfo(s"Starting index operation [$opName for $indexName] with forceInit=$forceInit")
try {
// Create transaction (only have side effect if forceInit is true)
val tx: OptimisticTransaction[T] =
Expand Down

0 comments on commit e578afb

Please sign in to comment.