From 538dd54bb8dfe66504414f4d19cc6f4c0cd6f03f Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 25 Jul 2024 08:46:35 -0700 Subject: [PATCH] Add FlintJob integration test with EMR serverless (#449) Signed-off-by: Peng Huo --- DEVELOPER_GUIDE.md | 7 +- build.sbt | 27 +++-- .../aws/AWSEmrServerlessAccessTestSuite.scala | 102 ++++++++++++++++++ 3 files changed, 127 insertions(+), 9 deletions(-) create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 153cf5ed6..619a33e24 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -15,8 +15,13 @@ sbt integtest/test ### AWS Integration Test The integration folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain. ``` -export AWS_OPENSEARCH_HOST=search-xxx.aos.us-west-2.on.aws +export AWS_OPENSEARCH_HOST=search-xxx.us-west-2.on.aws export AWS_REGION=us-west-2 +export AWS_EMRS_APPID=xxx +export AWS_EMRS_EXECUTION_ROLE=xxx +export AWS_S3_CODE_BUCKET=xxx +export AWS_S3_CODE_PREFIX=xxx +export AWS_OPENSEARCH_RESULT_INDEX=query_execution_result_glue ``` And run the ``` diff --git a/build.sbt b/build.sbt index 6ed0ec537..f9788c852 100644 --- a/build.sbt +++ b/build.sbt @@ -6,6 +6,9 @@ import Dependencies._ lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.3.2" +// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile +// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59 +lazy val jacksonVersion = "2.13.4" // The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version. // Issue: https://github.com/opensearch-project/opensearch-spark/issues/442 @@ -49,7 +52,11 @@ lazy val commonSettings = Seq( compileScalastyle := (Compile / scalastyle).toTask("").value, Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value, testScalastyle := (Test / scalastyle).toTask("").value, - Test / test := ((Test / test) dependsOn testScalastyle).value) + Test / test := ((Test / test) dependsOn testScalastyle).value, + dependencyOverrides ++= Seq( + "com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion, + "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion + )) // running `scalafmtAll` includes all subprojects under root lazy val root = (project in file(".")) @@ -218,9 +225,16 @@ lazy val integtest = (project in file("integ-test")) commonSettings, name := "integ-test", scalaVersion := scala212, - inConfig(IntegrationTest)(Defaults.testSettings), - IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", - IntegrationTest / parallelExecution := false, + javaOptions ++= Seq( + s"-DappJar=${(sparkSqlApplication / assembly).value.getAbsolutePath}", + s"-DextensionJar=${(flintSparkIntegration / assembly).value.getAbsolutePath}", + s"-DpplJar=${(pplSparkIntegration / assembly).value.getAbsolutePath}", + ), + inConfig(IntegrationTest)(Defaults.testSettings ++ Seq( + IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", + IntegrationTest / parallelExecution := false, + IntegrationTest / fork := true, + )), libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude ("com.fasterxml.jackson.core", "jackson-databind"), @@ -229,10 +243,7 @@ lazy val integtest = (project in file("integ-test")) "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", "org.testcontainers" % "testcontainers" % "1.18.0" % "test", "org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test", - // add opensearch-java client to get node stats - "org.opensearch.client" % "opensearch-java" % "2.6.0" % "test" - exclude ("com.fasterxml.jackson.core", "jackson-databind")), + "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test"), libraryDependencies ++= deps(sparkVersion), Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value, (sparkSqlApplication / assembly).value diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala new file mode 100644 index 000000000..67e036d28 --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala @@ -0,0 +1,102 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.aws + +import java.time.LocalDateTime + +import scala.concurrent.duration.DurationInt + +import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder +import com.amazonaws.services.emrserverless.model.{GetJobRunRequest, JobDriver, SparkSubmit, StartJobRunRequest} +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.internal.Logging + +class AWSEmrServerlessAccessTestSuite + extends AnyFlatSpec + with BeforeAndAfter + with Matchers + with Logging { + + lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST") + lazy val testPort: Int = -1 + lazy val testRegion: String = System.getenv("AWS_REGION") + lazy val testScheme: String = "https" + lazy val testAuth: String = "sigv4" + + lazy val testAppId: String = System.getenv("AWS_EMRS_APPID") + lazy val testExecutionRole: String = System.getenv("AWS_EMRS_EXECUTION_ROLE") + lazy val testS3CodeBucket: String = System.getenv("AWS_S3_CODE_BUCKET") + lazy val testS3CodePrefix: String = System.getenv("AWS_S3_CODE_PREFIX") + lazy val testResultIndex: String = System.getenv("AWS_OPENSEARCH_RESULT_INDEX") + + "EMR Serverless job" should "run successfully" in { + val s3Client = AmazonS3ClientBuilder.standard().withRegion(testRegion).build() + val emrServerless = AWSEMRServerlessClientBuilder.standard().withRegion(testRegion).build() + + val appJarPath = + sys.props.getOrElse("appJar", throw new IllegalArgumentException("appJar not set")) + val extensionJarPath = sys.props.getOrElse( + "extensionJar", + throw new IllegalArgumentException("extensionJar not set")) + val pplJarPath = + sys.props.getOrElse("pplJar", throw new IllegalArgumentException("pplJar not set")) + + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/sql-job.jar", + new java.io.File(appJarPath)) + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/extension.jar", + new java.io.File(extensionJarPath)) + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/ppl.jar", + new java.io.File(pplJarPath)) + + val jobRunRequest = new StartJobRunRequest() + .withApplicationId(testAppId) + .withExecutionRoleArn(testExecutionRole) + .withName(s"integration-${LocalDateTime.now()}") + .withJobDriver(new JobDriver() + .withSparkSubmit(new SparkSubmit() + .withEntryPoint(s"s3://$testS3CodeBucket/$testS3CodePrefix/sql-job.jar") + .withEntryPointArguments(testResultIndex) + .withSparkSubmitParameters(s"--class org.apache.spark.sql.FlintJob --jars " + + s"s3://$testS3CodeBucket/$testS3CodePrefix/extension.jar," + + s"s3://$testS3CodeBucket/$testS3CodePrefix/ppl.jar " + + s"--conf spark.datasource.flint.host=$testHost " + + s"--conf spark.datasource.flint.port=-1 " + + s"--conf spark.datasource.flint.scheme=$testScheme " + + s"--conf spark.datasource.flint.auth=$testAuth " + + s"--conf spark.sql.catalog.glue=org.opensearch.sql.FlintDelegatingSessionCatalog " + + s"--conf spark.flint.datasource.name=glue " + + s"""--conf spark.flint.job.query="SELECT 1" """ + + s"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"))) + + val jobRunResponse = emrServerless.startJobRun(jobRunRequest) + + val startTime = System.currentTimeMillis() + val timeout = 5.minutes.toMillis + var jobState = "STARTING" + + while (System.currentTimeMillis() - startTime < timeout + && (jobState != "FAILED" && jobState != "SUCCESS")) { + Thread.sleep(30000) + val request = new GetJobRunRequest() + .withApplicationId(testAppId) + .withJobRunId(jobRunResponse.getJobRunId) + jobState = emrServerless.getJobRun(request).getJobRun.getState + logInfo(s"Current job state: $jobState at ${System.currentTimeMillis()}") + } + + jobState shouldBe "SUCCESS" + } +}