Skip to content

Commit

Permalink
Rename vars
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Feb 14, 2024
1 parent 4a29076 commit a30da68
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ object FlintREPL extends Logging with FlintJobExecutor {
conf.getLong("spark.flint.job.queryWaitTimeoutMillis", DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS)

val flintSessionIndexUpdater = osClient.createUpdater(sessionIndex.get)
val flintSessionContext = getTimerContext(MetricConstants.REPL_PROCESSING_TIME_METRIC)
val sessionTimerContext = getTimerContext(MetricConstants.REPL_PROCESSING_TIME_METRIC)

addShutdownHook(
flintSessionIndexUpdater,
osClient,
sessionIndex.get,
sessionId.get,
flintSessionContext)
sessionTimerContext)

// 1 thread for updating heart beat
val threadPool =
Expand Down Expand Up @@ -175,7 +175,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
exponentialBackoffRetry(maxRetries = 5, initialDelay = 2.seconds) {
queryLoop(commandContext)
}
recordSessionSuccess(flintSessionContext)
recordSessionSuccess(sessionTimerContext)
} catch {
case e: Exception =>
handleSessionError(
Expand All @@ -187,13 +187,13 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintSessionIndexUpdater,
osClient,
sessionIndex.get,
flintSessionContext)
sessionTimerContext)
} finally {
if (threadPool != null) {
heartBeatFuture.cancel(true) // Pass `true` to interrupt if running
threadPoolFactory.shutdownThreadPool(threadPool)
}
stopTimer(flintSessionContext)
stopTimer(sessionTimerContext)
spark.stop()

// Check for non-daemon threads that may prevent the driver from shutting down.
Expand Down Expand Up @@ -537,7 +537,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
} else if (!flintReader.hasNext) {
canProceed = false
} else {
val statementContext = getTimerContext(MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)
val statementTimerContext = getTimerContext(
MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)
val flintCommand = processCommandInitiation(flintReader, flintSessionIndexUpdater)

val (dataToWrite, returnedVerificationResult) = processStatementOnVerification(
Expand All @@ -559,7 +560,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
resultIndex,
flintSessionIndexUpdater,
osClient,
statementContext)
statementTimerContext)
// last query finish time is last activity time
lastActivityTime = currentTimeProvider.currentEpochMillis()
}
Expand Down

0 comments on commit a30da68

Please sign in to comment.