Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed Sep 10, 2024
1 parent 8cab459 commit e0de9a2
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, "streaming")
spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING)

/**
* FlintJob.main() is not called because we need to manually set these variables within a
Expand All @@ -107,7 +107,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
queryId,
dataSourceName,
resultIndex,
"streaming",
FlintJobType.STREAMING,
streamingRunningCount)
job.terminateJVM = false
job.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object FlintJob extends Logging with FlintJobExecutor {
val (queryOption, resultIndexOption) = parseArgs(args)

val conf = createSparkConf()
val jobType = conf.get("spark.flint.job.type", "batch")
val jobType = conf.get("spark.flint.job.type", FlintJobType.BATCH)
CustomLogging.logInfo(s"""Job type is: ${jobType}""")
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ object SparkConfConstants {
"org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions"
}

object FlintJobType {
val INTERACTIVE = "interactive"
val BATCH = "batch"
val STREAMING = "streaming"
}

trait FlintJobExecutor {
this: Logging =>

Expand Down Expand Up @@ -132,7 +138,7 @@ trait FlintJobExecutor {
* https://github.com/opensearch-project/opensearch-spark/issues/324
*/
def configDYNMaxExecutors(conf: SparkConf, jobType: String): Unit = {
if (jobType.equalsIgnoreCase("streaming")) {
if (jobType.equalsIgnoreCase(FlintJobType.STREAMING)) {
conf.set(
"spark.dynamicAllocation.maxExecutors",
conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
val query = getQuery(queryOption, jobType, conf)
val queryId = conf.get(FlintSparkConf.QUERY_ID.key, "")

if (jobType.equalsIgnoreCase("streaming")) {
if (jobType.equalsIgnoreCase(FlintJobType.STREAMING)) {
if (resultIndexOption.isEmpty) {
logAndThrow("resultIndex is not set")
}
Expand Down Expand Up @@ -223,7 +223,7 @@ object FlintREPL extends Logging with FlintJobExecutor {

def getQuery(queryOption: Option[String], jobType: String, conf: SparkConf): String = {
queryOption.getOrElse {
if (jobType.equalsIgnoreCase("streaming")) {
if (jobType.equalsIgnoreCase(FlintJobType.STREAMING)) {
val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "")
if (defaultQuery.isEmpty) {
logAndThrow("Query undefined for the streaming job.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ case class JobOperator(
val futurePrepareQueryExecution = Future {
statementExecutionManager.prepareStatementExecution()
}

val data = statementExecutionManager.executeStatement(statement)
dataToWrite = Some(
ThreadUtils.awaitResult(futurePrepareQueryExecution, Duration(1, MINUTES)) match {
case Right(_) => statementExecutionManager.executeStatement(statement)
case Right(_) => data
case Left(err) =>
error = err
constructErrorDF(
Expand All @@ -101,7 +101,7 @@ case class JobOperator(
exceptionThrown = false
} catch {
case e: TimeoutException =>
error = s"Getting the mapping of index $resultIndex timed out"
error = s"Preparation for query execution timed out"
logError(error, e)
dataToWrite = Some(
constructErrorDF(
Expand Down Expand Up @@ -147,7 +147,7 @@ case class JobOperator(
}

def cleanUpResources(exceptionThrown: Boolean, threadPool: ThreadPoolExecutor): Unit = {
val isStreaming = jobType.equalsIgnoreCase("streaming")
val isStreaming = jobType.equalsIgnoreCase(FlintJobType.STREAMING)
try {
// Wait for streaming job complete if no error
if (!exceptionThrown && isStreaming) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SingleStatementExecutionManager(

override def executeStatement(statement: FlintStatement): DataFrame = {
import commandContext._
val isStreaming = jobType.equalsIgnoreCase("streaming")
val isStreaming = jobType.equalsIgnoreCase(FlintJobType.STREAMING)
executeQuery(
applicationId,
jobId,
Expand Down

0 comments on commit e0de9a2

Please sign in to comment.