Skip to content

Commit

Permalink
Add maxExecutors configuration for streaming queries (#326)
Browse files Browse the repository at this point in the history
* Add maxExecutors configuration for streaming queries

Signed-off-by: Peng Huo <[email protected]>

* scala fmt

Signed-off-by: Peng Huo <[email protected]>

* update IT

Signed-off-by: Peng Huo <[email protected]>

---------

Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored May 1, 2024
1 parent a8a376f commit 20b761c
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ object FlintJob extends Logging with FlintJobExecutor {
* Without this setup, Spark would not recognize names in the format `my_glue1.default`.
*/
conf.set("spark.sql.defaultCatalog", dataSource)
configDYNMaxExecutors(conf, jobType)

val streamingRunningCount = new AtomicInteger(0)
val jobOperator =
JobOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ trait FlintJobExecutor {
"org.opensearch.flint.spark.FlintPPLSparkExtensions,org.opensearch.flint.spark.FlintSparkExtensions")
}

/*
* Override dynamicAllocation.maxExecutors with streaming maxExecutors. more detail at
* https://github.com/opensearch-project/opensearch-spark/issues/324
*/
def configDYNMaxExecutors(conf: SparkConf, jobType: String): Unit = {
if (jobType.equalsIgnoreCase("streaming")) {
conf.set(
"spark.dynamicAllocation.maxExecutors",
conf
.get("spark.flint.streaming.dynamicAllocation.maxExecutors", "10"))
}
}

def createSparkSession(conf: SparkConf): SparkSession = {
val builder = SparkSession.builder().config(conf)
if (enableHiveSupport) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ object FlintREPL extends Logging with FlintJobExecutor {

if (jobType.equalsIgnoreCase("streaming")) {
logInfo(s"""streaming query ${query}""")
configDYNMaxExecutors(conf, jobType)
val streamingRunningCount = new AtomicInteger(0)
val jobOperator =
JobOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,17 @@ class FlintJobTest extends SparkFunSuite with JobMatchers {
|""".stripMargin
assert(FlintJob.isSuperset(input, mapping))
}

test("default streaming query maxExecutors is 10") {
val conf = spark.sparkContext.conf
FlintJob.configDYNMaxExecutors(conf, "streaming")
conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "10"
}

test("override streaming query maxExecutors") {
spark.sparkContext.conf.set("spark.flint.streaming.dynamicAllocation.maxExecutors", "30")
FlintJob.configDYNMaxExecutors(spark.sparkContext.conf, "streaming")
spark.sparkContext.conf.get("spark.dynamicAllocation.maxExecutors") shouldBe "30"
}

}

0 comments on commit 20b761c

Please sign in to comment.