From aea6dc9fa34fb69bc1af9e1bb541a4d8a9bcd107 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 9 Sep 2024 18:04:45 -0700 Subject: [PATCH] Resolve comments Signed-off-by: Louis Chu --- .../scala/org/apache/spark/sql/FlintJobITSuite.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/FlintJob.scala | 2 +- .../scala/org/apache/spark/sql/FlintJobExecutor.scala | 8 +++++++- .../src/main/scala/org/apache/spark/sql/FlintREPL.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/JobOperator.scala | 8 ++++---- .../spark/sql/SingleStatementExecutionManagerImpl.scala | 2 +- 6 files changed, 17 insertions(+), 11 deletions(-) diff --git a/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala index fe7eefb57..57277440e 100644 --- a/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -92,7 +92,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { * all Spark conf required by Flint code underlying manually. */ spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName) - spark.conf.set(JOB_TYPE.key, "streaming") + spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING) /** * FlintJob.main() is not called because we need to manually set these variables within a @@ -107,7 +107,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { queryId, dataSourceName, resultIndex, - "streaming", + FlintJobType.STREAMING, streamingRunningCount) job.terminateJVM = false job.start() diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 357f12ab9..04609cf3d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -30,7 +30,7 @@ object FlintJob extends Logging with FlintJobExecutor { val (queryOption, resultIndexOption) = parseArgs(args) val conf = createSparkConf() - val jobType = conf.get("spark.flint.job.type", "batch") + val jobType = conf.get("spark.flint.job.type", FlintJobType.BATCH) CustomLogging.logInfo(s"""Job type is: ${jobType}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 18c560809..24d68fd47 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -34,6 +34,12 @@ object SparkConfConstants { "org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions" } +object FlintJobType { + val INTERACTIVE = "interactive" + val BATCH = "batch" + val STREAMING = "streaming" +} + trait FlintJobExecutor { this: Logging => @@ -132,7 +138,7 @@ trait FlintJobExecutor { * https://github.com/opensearch-project/opensearch-spark/issues/324 */ def configDYNMaxExecutors(conf: SparkConf, jobType: String): Unit = { - if (jobType.equalsIgnoreCase("streaming")) { + if (jobType.equalsIgnoreCase(FlintJobType.STREAMING)) { conf.set( "spark.dynamicAllocation.maxExecutors", conf diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 9c533a19e..635a5226e 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -89,7 +89,7 @@ object FlintREPL extends Logging with FlintJobExecutor { val query = getQuery(queryOption, jobType, conf) val queryId = conf.get(FlintSparkConf.QUERY_ID.key, "") - if (jobType.equalsIgnoreCase("streaming")) { + if (jobType.equalsIgnoreCase(FlintJobType.STREAMING)) { if (resultIndexOption.isEmpty) { logAndThrow("resultIndex is not set") } @@ -223,7 +223,7 @@ object FlintREPL extends Logging with FlintJobExecutor { def getQuery(queryOption: Option[String], jobType: String, conf: SparkConf): String = { queryOption.getOrElse { - if (jobType.equalsIgnoreCase("streaming")) { + if (jobType.equalsIgnoreCase(FlintJobType.STREAMING)) { val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "") if (defaultQuery.isEmpty) { logAndThrow("Query undefined for the streaming job.") diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 7d6e4f4cb..cb4af86da 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -80,10 +80,10 @@ case class JobOperator( val futurePrepareQueryExecution = Future { statementExecutionManager.prepareStatementExecution() } - + val data = statementExecutionManager.executeStatement(statement) dataToWrite = Some( ThreadUtils.awaitResult(futurePrepareQueryExecution, Duration(1, MINUTES)) match { - case Right(_) => statementExecutionManager.executeStatement(statement) + case Right(_) => data case Left(err) => error = err constructErrorDF( @@ -101,7 +101,7 @@ case class JobOperator( exceptionThrown = false } catch { case e: TimeoutException => - error = s"Getting the mapping of index $resultIndex timed out" + error = s"Preparation for query execution timed out" logError(error, e) dataToWrite = Some( constructErrorDF( @@ -147,7 +147,7 @@ case class JobOperator( } def cleanUpResources(exceptionThrown: Boolean, threadPool: ThreadPoolExecutor): Unit = { - val isStreaming = jobType.equalsIgnoreCase("streaming") + val isStreaming = jobType.equalsIgnoreCase(FlintJobType.STREAMING) try { // Wait for streaming job complete if no error if (!exceptionThrown && isStreaming) { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/SingleStatementExecutionManagerImpl.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/SingleStatementExecutionManagerImpl.scala index abf3ab80e..52b8edd1d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/SingleStatementExecutionManagerImpl.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/SingleStatementExecutionManagerImpl.scala @@ -43,7 +43,7 @@ class SingleStatementExecutionManager( override def executeStatement(statement: FlintStatement): DataFrame = { import commandContext._ - val isStreaming = jobType.equalsIgnoreCase("streaming") + val isStreaming = jobType.equalsIgnoreCase(FlintJobType.STREAMING) executeQuery( applicationId, jobId,