Skip to content

Commit

Permalink
Reduce Session Document Retrieval in FintREPL to Enhance Latency Metrics
Browse files Browse the repository at this point in the history
This PR reduces the frequency of 'getSessionDoc' calls in two places of FintREPL, addressing the correlation between request count and query latency metrics.

1. **Heartbeat Update Optimization**:
   - Prior to updating the heartbeat, the sequence number and primary term are now obtained for effective concurrency control.
   - This PR removes the get session doc call and directly updates the last update time and state.

2. **Session Document Retrieval before Statement Processing**:
   - Previously, in scenarios where a query takes 10 minutes, the 'getSessionDoc' call is limited to once per 10 minutes. However, in idle states with no running queries, the call frequency is run every 100 milliseconds.
    -  This PR reduced the frequency of 'getSessionDoc' calls by ensuring we make the call at least 1 minute after the previous call.

**Testing**:
- Verified consistent 1-minute intervals for heartbeat updates.
- Confirmed the 'getSessionDoc' call executes every 1 minute prior to picking up the next statement.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Nov 30, 2023
1 parent a352f67 commit c080731
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ case class CommandState(
recordedVerificationResult: VerificationResult,
flintReader: FlintReader,
futureMappingCheck: Future[Either[String, Unit]],
executionContext: ExecutionContextExecutor)
executionContext: ExecutionContextExecutor,
recordedLastCanPickCheckTime: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
package org.apache.spark.sql

import java.net.ConnectException
import java.time.Instant
import java.util.Map
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture}

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, TimeoutException}
import scala.concurrent.duration.{Duration, MINUTES, _}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import org.json4s.native.Serialization
import org.opensearch.action.get.GetResponse
import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -47,6 +47,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
private val DEFAULT_QUERY_EXECUTION_TIMEOUT = Duration(30, MINUTES)
private val DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS = 10 * 60 * 1000
val INITIAL_DELAY_MILLIS = 3000L
val EARLY_TERMIANTION_CHECK_FREQUENCY = 60000L

def update(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = {
updater.update(flintCommand.statementId, FlintCommand.serialize(flintCommand))
Expand Down Expand Up @@ -292,10 +293,11 @@ object FlintREPL extends Logging with FlintJobExecutor {
var lastActivityTime = currentTimeProvider.currentEpochMillis()
var verificationResult: VerificationResult = NotVerified
var canPickUpNextStatement = true
var lastCanPickCheckTime = 0L
while (currentTimeProvider
.currentEpochMillis() - lastActivityTime <= commandContext.inactivityLimitMillis && canPickUpNextStatement) {
logInfo(
s"""read from ${commandContext.sessionIndex}, sessionId: $commandContext.sessionId""")
s"""read from ${commandContext.sessionIndex}, sessionId: ${commandContext.sessionId}""")
val flintReader: FlintReader =
createQueryReader(
commandContext.osClient,
Expand All @@ -309,18 +311,21 @@ object FlintREPL extends Logging with FlintJobExecutor {
verificationResult,
flintReader,
futureMappingCheck,
executionContext)
val result: (Long, VerificationResult, Boolean) =
executionContext,
lastCanPickCheckTime)
val result: (Long, VerificationResult, Boolean, Long) =
processCommands(commandContext, commandState)

val (
updatedLastActivityTime,
updatedVerificationResult,
updatedCanPickUpNextStatement) = result
updatedCanPickUpNextStatement,
updatedLastCanPickCheckTime) = result

lastActivityTime = updatedLastActivityTime
verificationResult = updatedVerificationResult
canPickUpNextStatement = updatedCanPickUpNextStatement
lastCanPickCheckTime = updatedLastCanPickCheckTime
} finally {
flintReader.close()
}
Expand Down Expand Up @@ -481,18 +486,27 @@ object FlintREPL extends Logging with FlintJobExecutor {

private def processCommands(
context: CommandContext,
state: CommandState): (Long, VerificationResult, Boolean) = {
state: CommandState): (Long, VerificationResult, Boolean, Long) = {
import context._
import state._

var lastActivityTime = recordedLastActivityTime
var verificationResult = recordedVerificationResult
var canProceed = true
var canPickNextStatementResult = true // Add this line to keep track of canPickNextStatement
var lastCanPickCheckTime = recordedLastCanPickCheckTime

while (canProceed) {
if (!canPickNextStatement(sessionId, jobId, osClient, sessionIndex)) {
canPickNextStatementResult = false
val currentTime = currentTimeProvider.currentEpochMillis()

// Only call canPickNextStatement if EARLY_TERMIANTION_CHECK_FREQUENCY milliseconds have passed
if (currentTime - lastCanPickCheckTime > EARLY_TERMIANTION_CHECK_FREQUENCY) {
canPickNextStatementResult =
canPickNextStatement(sessionId, jobId, osClient, sessionIndex)
lastCanPickCheckTime = currentTime
}

if (!canPickNextStatementResult) {
canProceed = false
} else if (!flintReader.hasNext) {
canProceed = false
Expand Down Expand Up @@ -524,7 +538,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
}

// return tuple indicating if still active and mapping verification result
(lastActivityTime, verificationResult, canPickNextStatementResult)
(lastActivityTime, verificationResult, canPickNextStatementResult, lastCanPickCheckTime)
}

/**
Expand Down Expand Up @@ -888,20 +902,12 @@ object FlintREPL extends Logging with FlintJobExecutor {
return // Exit the run method if the thread is interrupted
}

val getResponse = osClient.getDoc(sessionIndex, sessionId)
if (getResponse.isExists()) {
val source = getResponse.getSourceAsMap
val flintInstance = FlintInstance.deserializeFromMap(source)
flintInstance.state = "running"
flintSessionUpdater.updateIf(
sessionId,
FlintInstance.serializeWithoutJobId(
flintInstance,
currentTimeProvider.currentEpochMillis()),
getResponse.getSeqNo,
getResponse.getPrimaryTerm)
}
// do nothing if the session doc does not exist
flintSessionUpdater.upsert(
sessionId,
Serialization.write(
Map(
"lastUpdateTime" -> currentTimeProvider.currentEpochMillis(),
"state" -> "running")))
} catch {
case ie: InterruptedException =>
// Preserve the interrupt status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,6 @@ class FlintREPLTest
val getResponse = mock[GetResponse]
val scheduledFutureRaw = mock[ScheduledFuture[_]]

// Mock behaviors
when(osClient.getDoc(*, *)).thenReturn(getResponse)
when(getResponse.isExists()).thenReturn(true)
when(getResponse.getSourceAsMap).thenReturn(
Map[String, Object](
"applicationId" -> "app1",
"jobId" -> "job1",
"sessionId" -> "session1",
"lastUpdateTime" -> java.lang.Long.valueOf(12345L),
"error" -> "someError",
"state" -> "running",
"jobStartTime" -> java.lang.Long.valueOf(0L)).asJava)
when(getResponse.getSeqNo).thenReturn(0L)
when(getResponse.getPrimaryTerm).thenReturn(0L)
// when scheduled task is scheduled, execute the runnable immediately only once and become no-op afterwards.
when(
threadPool.scheduleAtFixedRate(
Expand All @@ -85,8 +71,7 @@ class FlintREPLTest
0)

// Verifications
verify(osClient, atLeastOnce()).getDoc("sessionIndex", "session1")
verify(flintSessionUpdater, atLeastOnce()).updateIf(eqTo("session1"), *, eqTo(0L), eqTo(0L))
verify(flintSessionUpdater, atLeastOnce()).upsert(eqTo("session1"), *)
}

test("createShutdownHook add shutdown hook and update FlintInstance if conditions are met") {
Expand Down

0 comments on commit c080731

Please sign in to comment.