From e7ee556c45a1d25446467b36a1707c1efcc50f25 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Wed, 1 May 2024 18:31:20 +0000 Subject: [PATCH] Add maxExecutors configuration for streaming queries (#326) * Add maxExecutors configuration for streaming queries Signed-off-by: Peng Huo * scala fmt Signed-off-by: Peng Huo * update IT Signed-off-by: Peng Huo --------- Signed-off-by: Peng Huo (cherry picked from commit 20b761cde815e5a5f9261225a213bc80a7ef1563) Signed-off-by: github-actions[bot] --- .../main/scala/org/apache/spark/sql/FlintJob.scala | 2 ++ .../org/apache/spark/sql/FlintJobExecutor.scala | 13 +++++++++++++ .../main/scala/org/apache/spark/sql/FlintREPL.scala | 1 + .../scala/org/apache/spark/sql/FlintJobTest.scala | 13 +++++++++++++ 4 files changed, 29 insertions(+) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 6ee7cc68e..f582f9f45 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -60,6 +60,8 @@ object FlintJob extends Logging with FlintJobExecutor { * Without this setup, Spark would not recognize names in the format `my_glue1.default`. */ conf.set("spark.sql.defaultCatalog", dataSource) + configDYNMaxExecutors(conf, jobType) + val streamingRunningCount = new AtomicInteger(0) val jobOperator = JobOperator( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index c1d2bf79b..099a9dc69 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -85,6 +85,19 @@ trait FlintJobExecutor { "org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions") } + /* + * Override dynamicAllocation.maxExecutors with streaming maxExecutors. more detail at + * https://github.com/opensearch-project/opensearch-spark/issues/324 + */ + def configDYNMaxExecutors(conf: SparkConf, jobType: String): Unit = { + if (jobType.equalsIgnoreCase("streaming")) { + conf.set( + "spark.dynamicAllocation.maxExecutors", + conf + .get("spark.flint.streaming.dynamicAllocation.maxExecutors", "10")) + } + } + def createSparkSession(conf: SparkConf): SparkSession = { val builder = SparkSession.builder().config(conf) if (enableHiveSupport) { diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 1016adaba..b96163693 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -91,6 +91,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (jobType.equalsIgnoreCase("streaming")) { logInfo(s"""streaming query ${query}""") + configDYNMaxExecutors(conf, jobType) val streamingRunningCount = new AtomicInteger(0) val jobOperator = JobOperator( diff --git a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala index 352d140ce..aceb9468f 100644 --- a/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala +++ b/spark-sql-application/src/test/scala/org/apache/spark/sql/FlintJobTest.scala @@ -97,4 +97,17 @@ class FlintJobTest extends SparkFunSuite with JobMatchers { |""".stripMargin assert(FlintJob.isSuperset(input, mapping)) } + + test("default streaming query maxExecutors is 10") { + val conf = spark.sparkContext.conf + FlintJob.configDYNMaxExecutors(conf, "streaming") + conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "10" + } + + test("override streaming query maxExecutors") { + spark.sparkContext.conf.set("spark.flint.streaming.dynamicAllocation.maxExecutors", "30") + FlintJob.configDYNMaxExecutors(spark.sparkContext.conf, "streaming") + spark.sparkContext.conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "30" + } + }