Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Session Document Retrieval in FintREPL to Enhance Latency Metrics #179

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ case class CommandState(
recordedVerificationResult: VerificationResult,
flintReader: FlintReader,
futureMappingCheck: Future[Either[String, Unit]],
executionContext: ExecutionContextExecutor)
executionContext: ExecutionContextExecutor,
recordedLastCanPickCheckTime: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
package org.apache.spark.sql

import java.net.ConnectException
import java.time.Instant
import java.util.Map
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture}

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future, TimeoutException}
import scala.concurrent.duration.{Duration, MINUTES, _}
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import org.json4s.native.Serialization
import org.opensearch.action.get.GetResponse
import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -47,6 +47,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
private val DEFAULT_QUERY_EXECUTION_TIMEOUT = Duration(30, MINUTES)
private val DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS = 10 * 60 * 1000
val INITIAL_DELAY_MILLIS = 3000L
val EARLY_TERMIANTION_CHECK_FREQUENCY = 60000L

def update(flintCommand: FlintCommand, updater: OpenSearchUpdater): Unit = {
updater.update(flintCommand.statementId, FlintCommand.serialize(flintCommand))
Expand Down Expand Up @@ -292,10 +293,11 @@ object FlintREPL extends Logging with FlintJobExecutor {
var lastActivityTime = currentTimeProvider.currentEpochMillis()
var verificationResult: VerificationResult = NotVerified
var canPickUpNextStatement = true
var lastCanPickCheckTime = 0L
while (currentTimeProvider
.currentEpochMillis() - lastActivityTime <= commandContext.inactivityLimitMillis && canPickUpNextStatement) {
logInfo(
s"""read from ${commandContext.sessionIndex}, sessionId: $commandContext.sessionId""")
s"""read from ${commandContext.sessionIndex}, sessionId: ${commandContext.sessionId}""")
val flintReader: FlintReader =
createQueryReader(
commandContext.osClient,
Expand All @@ -309,18 +311,21 @@ object FlintREPL extends Logging with FlintJobExecutor {
verificationResult,
flintReader,
futureMappingCheck,
executionContext)
val result: (Long, VerificationResult, Boolean) =
executionContext,
lastCanPickCheckTime)
val result: (Long, VerificationResult, Boolean, Long) =
processCommands(commandContext, commandState)

val (
updatedLastActivityTime,
updatedVerificationResult,
updatedCanPickUpNextStatement) = result
updatedCanPickUpNextStatement,
updatedLastCanPickCheckTime) = result

lastActivityTime = updatedLastActivityTime
verificationResult = updatedVerificationResult
canPickUpNextStatement = updatedCanPickUpNextStatement
lastCanPickCheckTime = updatedLastCanPickCheckTime
} finally {
flintReader.close()
}
Expand Down Expand Up @@ -481,18 +486,27 @@ object FlintREPL extends Logging with FlintJobExecutor {

private def processCommands(
context: CommandContext,
state: CommandState): (Long, VerificationResult, Boolean) = {
state: CommandState): (Long, VerificationResult, Boolean, Long) = {
import context._
import state._

var lastActivityTime = recordedLastActivityTime
var verificationResult = recordedVerificationResult
var canProceed = true
var canPickNextStatementResult = true // Add this line to keep track of canPickNextStatement
var lastCanPickCheckTime = recordedLastCanPickCheckTime

while (canProceed) {
if (!canPickNextStatement(sessionId, jobId, osClient, sessionIndex)) {
canPickNextStatementResult = false
val currentTime = currentTimeProvider.currentEpochMillis()

// Only call canPickNextStatement if EARLY_TERMIANTION_CHECK_FREQUENCY milliseconds have passed
if (currentTime - lastCanPickCheckTime > EARLY_TERMIANTION_CHECK_FREQUENCY) {
canPickNextStatementResult =
canPickNextStatement(sessionId, jobId, osClient, sessionIndex)
lastCanPickCheckTime = currentTime
}

if (!canPickNextStatementResult) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
canProceed = false
} else if (!flintReader.hasNext) {
canProceed = false
Expand Down Expand Up @@ -524,7 +538,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
}

// return tuple indicating if still active and mapping verification result
(lastActivityTime, verificationResult, canPickNextStatementResult)
(lastActivityTime, verificationResult, canPickNextStatementResult, lastCanPickCheckTime)
}

/**
Expand Down Expand Up @@ -888,20 +902,12 @@ object FlintREPL extends Logging with FlintJobExecutor {
return // Exit the run method if the thread is interrupted
}

val getResponse = osClient.getDoc(sessionIndex, sessionId)
if (getResponse.isExists()) {
val source = getResponse.getSourceAsMap
val flintInstance = FlintInstance.deserializeFromMap(source)
flintInstance.state = "running"
flintSessionUpdater.updateIf(
sessionId,
FlintInstance.serializeWithoutJobId(
flintInstance,
currentTimeProvider.currentEpochMillis()),
getResponse.getSeqNo,
getResponse.getPrimaryTerm)
}
// do nothing if the session doc does not exist
flintSessionUpdater.upsert(
sessionId,
Serialization.write(
Map(
"lastUpdateTime" -> currentTimeProvider.currentEpochMillis(),
"state" -> "running")))
} catch {
case ie: InterruptedException =>
// Preserve the interrupt status
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,6 @@ class FlintREPLTest
val getResponse = mock[GetResponse]
val scheduledFutureRaw = mock[ScheduledFuture[_]]

// Mock behaviors
when(osClient.getDoc(*, *)).thenReturn(getResponse)
when(getResponse.isExists()).thenReturn(true)
when(getResponse.getSourceAsMap).thenReturn(
Map[String, Object](
"applicationId" -> "app1",
"jobId" -> "job1",
"sessionId" -> "session1",
"lastUpdateTime" -> java.lang.Long.valueOf(12345L),
"error" -> "someError",
"state" -> "running",
"jobStartTime" -> java.lang.Long.valueOf(0L)).asJava)
when(getResponse.getSeqNo).thenReturn(0L)
when(getResponse.getPrimaryTerm).thenReturn(0L)
// when scheduled task is scheduled, execute the runnable immediately only once and become no-op afterwards.
when(
threadPool.scheduleAtFixedRate(
Expand All @@ -85,8 +71,7 @@ class FlintREPLTest
0)

// Verifications
verify(osClient, atLeastOnce()).getDoc("sessionIndex", "session1")
verify(flintSessionUpdater, atLeastOnce()).updateIf(eqTo("session1"), *, eqTo(0L), eqTo(0L))
verify(flintSessionUpdater, atLeastOnce()).upsert(eqTo("session1"), *)
}

test("createShutdownHook add shutdown hook and update FlintInstance if conditions are met") {
Expand Down
Loading