From 3ff2ef23739bc900be3ea4fdac5b9b2615f40265 Mon Sep 17 00:00:00 2001 From: qianheng Date: Fri, 22 Nov 2024 11:41:26 +0800 Subject: [PATCH] Enable parallelExecution for integration test suites (#934) * Split integration test to multiple groups and enable parallelExecution Signed-off-by: Heng Qian * Fix spark-warehouse conflict Signed-off-by: Heng Qian * Test with 3 groups Signed-off-by: Heng Qian * Random shuffle tests before splitting groups Signed-off-by: Heng Qian * reset group number to 4 Signed-off-by: Heng Qian * revert shuffle Signed-off-by: Heng Qian --------- Signed-off-by: Heng Qian --- build.sbt | 28 ++++++++++++++++--- .../scala/org/apache/spark/FlintSuite.scala | 2 ++ 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 724d348ae..365b88aa3 100644 --- a/build.sbt +++ b/build.sbt @@ -2,8 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ -import Dependencies._ -import sbtassembly.AssemblyPlugin.autoImport.ShadeRule +import Dependencies.* lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.5.1" @@ -38,6 +37,11 @@ ThisBuild / scalastyleConfig := baseDirectory.value / "scalastyle-config.xml" */ ThisBuild / Test / parallelExecution := false +/** + * Set the parallelism of forked tests to 4 to accelerate integration test + */ +concurrentRestrictions in Global := Seq(Tags.limit(Tags.ForkedTestGroup, 4)) + // Run as part of compile task. lazy val compileScalastyle = taskKey[Unit]("compileScalastyle") @@ -274,13 +278,29 @@ lazy val integtest = (project in file("integ-test")) IntegrationTest / javaSource := baseDirectory.value / "src/integration/java", IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", IntegrationTest / resourceDirectory := baseDirectory.value / "src/integration/resources", - IntegrationTest / parallelExecution := false, + IntegrationTest / parallelExecution := true, // enable parallel execution + IntegrationTest / testForkedParallel := false, // disable forked parallel execution to avoid duplicate spark context in the same JVM IntegrationTest / fork := true, + IntegrationTest / testGrouping := { + val tests = (IntegrationTest / definedTests).value + val forkOptions = ForkOptions() + val groups = tests.grouped(tests.size / 4 + 1).zipWithIndex.map { case (group, index) => + val groupName = s"group-${index + 1}" + new Tests.Group( + name = groupName, + tests = group, + runPolicy = Tests.SubProcess( + forkOptions.withRunJVMOptions(forkOptions.runJVMOptions ++ + Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/integ-test/target/tmp/$groupName"))) + ) + } + groups.toSeq + } )), inConfig(AwsIntegrationTest)(Defaults.testSettings ++ Seq( AwsIntegrationTest / javaSource := baseDirectory.value / "src/aws-integration/java", AwsIntegrationTest / scalaSource := baseDirectory.value / "src/aws-integration/scala", - AwsIntegrationTest / parallelExecution := false, + AwsIntegrationTest / parallelExecution := true, AwsIntegrationTest / fork := true, )), libraryDependencies ++= Seq( diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index b675265b7..1d301087f 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -12,6 +12,7 @@ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf} import org.apache.spark.sql.flint.config.FlintSparkConf.{EXTERNAL_SCHEDULER_ENABLED, HYBRID_SCAN_ENABLED, METADATA_CACHE_WRITE} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH import org.apache.spark.sql.test.SharedSparkSession trait FlintSuite extends SharedSparkSession { @@ -30,6 +31,7 @@ trait FlintSuite extends SharedSparkSession { .set( FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key, "org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest") + .set(WAREHOUSE_PATH.key, s"spark-warehouse/${suiteName}") conf }