From 1ee72bd2ba54e0c852ce29403b5af57d42c4f3bc Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 24 Jul 2024 14:15:58 -0700 Subject: [PATCH 1/2] Add FlintJob integration test Signed-off-by: Peng Huo --- build.sbt | 27 +++-- .../aws/AWSEmrServerlessAccessTestSuite.scala | 98 +++++++++++++++++++ 2 files changed, 117 insertions(+), 8 deletions(-) create mode 100644 integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala diff --git a/build.sbt b/build.sbt index 6ed0ec537..f9788c852 100644 --- a/build.sbt +++ b/build.sbt @@ -6,6 +6,9 @@ import Dependencies._ lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.3.2" +// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile +// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59 +lazy val jacksonVersion = "2.13.4" // The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version. // Issue: https://github.com/opensearch-project/opensearch-spark/issues/442 @@ -49,7 +52,11 @@ lazy val commonSettings = Seq( compileScalastyle := (Compile / scalastyle).toTask("").value, Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value, testScalastyle := (Test / scalastyle).toTask("").value, - Test / test := ((Test / test) dependsOn testScalastyle).value) + Test / test := ((Test / test) dependsOn testScalastyle).value, + dependencyOverrides ++= Seq( + "com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion, + "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion + )) // running `scalafmtAll` includes all subprojects under root lazy val root = (project in file(".")) @@ -218,9 +225,16 @@ lazy val integtest = (project in file("integ-test")) commonSettings, name := "integ-test", scalaVersion := scala212, - inConfig(IntegrationTest)(Defaults.testSettings), - IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", - IntegrationTest / parallelExecution := false, + javaOptions ++= Seq( + s"-DappJar=${(sparkSqlApplication / assembly).value.getAbsolutePath}", + s"-DextensionJar=${(flintSparkIntegration / assembly).value.getAbsolutePath}", + s"-DpplJar=${(pplSparkIntegration / assembly).value.getAbsolutePath}", + ), + inConfig(IntegrationTest)(Defaults.testSettings ++ Seq( + IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", + IntegrationTest / parallelExecution := false, + IntegrationTest / fork := true, + )), libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude ("com.fasterxml.jackson.core", "jackson-databind"), @@ -229,10 +243,7 @@ lazy val integtest = (project in file("integ-test")) "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", "org.testcontainers" % "testcontainers" % "1.18.0" % "test", "org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test", - // add opensearch-java client to get node stats - "org.opensearch.client" % "opensearch-java" % "2.6.0" % "test" - exclude ("com.fasterxml.jackson.core", "jackson-databind")), + "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test"), libraryDependencies ++= deps(sparkVersion), Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value, (sparkSqlApplication / assembly).value diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala new file mode 100644 index 000000000..2b1daee1d --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.aws + +import java.time.LocalDateTime + +import scala.concurrent.duration.DurationInt + +import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder +import com.amazonaws.services.emrserverless.model.{GetJobRunRequest, JobDriver, SparkSubmit, StartJobRunRequest} +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.internal.Logging + +class AWSEmrServerlessAccessTestSuite extends AnyFlatSpec with BeforeAndAfter with Matchers with Logging { + + lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST") + lazy val testPort: Int = -1 + lazy val testRegion: String = System.getenv("AWS_REGION") + lazy val testScheme: String = "https" + lazy val testAuth: String = "sigv4" + + lazy val testAppId: String = System.getenv("AWS_EMRS_APPID") + lazy val testExecutionRole: String = System.getenv("AWS_EMRS_EXECUTION_ROLE") + lazy val testS3CodeBucket: String = System.getenv("AWS_S3_CODE_BUCKET") + lazy val testS3CodePrefix: String = System.getenv("AWS_S3_CODE_PREFIX") + lazy val testResultIndex: String = System.getenv("AWS_OPENSEARCH_RESULT_INDEX") + + "EMR Serverless job" should "run successfully" in { + val s3Client = AmazonS3ClientBuilder.standard().withRegion(testRegion).build() + val emrServerless = AWSEMRServerlessClientBuilder.standard().withRegion(testRegion).build() + + val appJarPath = + sys.props.getOrElse("appJar", throw new IllegalArgumentException("appJar not set")) + val extensionJarPath = sys.props.getOrElse( + "extensionJar", + throw new IllegalArgumentException("extensionJar not set")) + val pplJarPath = + sys.props.getOrElse("pplJar", throw new IllegalArgumentException("pplJar not set")) + + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/sql-job.jar", + new java.io.File(appJarPath)) + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/extension.jar", + new java.io.File(extensionJarPath)) + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/ppl.jar", + new java.io.File(pplJarPath)) + + val jobRunRequest = new StartJobRunRequest() + .withApplicationId(testAppId) + .withExecutionRoleArn(testExecutionRole) + .withName(s"integration-${LocalDateTime.now()}") + .withJobDriver(new JobDriver() + .withSparkSubmit(new SparkSubmit() + .withEntryPoint(s"s3://$testS3CodeBucket/$testS3CodePrefix/sql-job.jar") + .withEntryPointArguments(testResultIndex) + .withSparkSubmitParameters(s"--class org.apache.spark.sql.FlintJob --jars " + + s"s3://$testS3CodeBucket/$testS3CodePrefix/extension.jar," + + s"s3://$testS3CodeBucket/$testS3CodePrefix/ppl.jar " + + s"--conf spark.datasource.flint.host=$testHost " + + s"--conf spark.datasource.flint.port=-1 " + + s"--conf spark.datasource.flint.scheme=$testScheme " + + s"--conf spark.datasource.flint.auth=$testAuth " + + s"--conf spark.sql.catalog.glue=org.opensearch.sql.FlintDelegatingSessionCatalog " + + s"--conf spark.flint.datasource.name=glue " + + s"""--conf spark.flint.job.query="SELECT 1" """ + + s"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"))) + + val jobRunResponse = emrServerless.startJobRun(jobRunRequest) + + val startTime = System.currentTimeMillis() + val timeout = 5.minutes.toMillis + var jobState = "STARTING" + + while (System.currentTimeMillis() - startTime < timeout + && (jobState != "FAILED" && jobState != "SUCCESS")) { + Thread.sleep(30000) + val request = new GetJobRunRequest() + .withApplicationId(testAppId) + .withJobRunId(jobRunResponse.getJobRunId) + jobState = emrServerless.getJobRun(request).getJobRun.getState + logInfo(s"Current job state: $jobState at ${System.currentTimeMillis()}") + } + + jobState shouldBe "SUCCESS" + } +} From 2e83d2b0c564d48a1051f5207539ed62639d080f Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 24 Jul 2024 16:24:52 -0700 Subject: [PATCH 2/2] Fix fmt Signed-off-by: Peng Huo --- DEVELOPER_GUIDE.md | 7 ++++++- .../flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 153cf5ed6..619a33e24 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -15,8 +15,13 @@ sbt integtest/test ### AWS Integration Test The integration folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain. ``` -export AWS_OPENSEARCH_HOST=search-xxx.aos.us-west-2.on.aws +export AWS_OPENSEARCH_HOST=search-xxx.us-west-2.on.aws export AWS_REGION=us-west-2 +export AWS_EMRS_APPID=xxx +export AWS_EMRS_EXECUTION_ROLE=xxx +export AWS_S3_CODE_BUCKET=xxx +export AWS_S3_CODE_PREFIX=xxx +export AWS_OPENSEARCH_RESULT_INDEX=query_execution_result_glue ``` And run the ``` diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala index 2b1daee1d..67e036d28 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala @@ -18,7 +18,11 @@ import org.scalatest.matchers.should.Matchers import org.apache.spark.internal.Logging -class AWSEmrServerlessAccessTestSuite extends AnyFlatSpec with BeforeAndAfter with Matchers with Logging { +class AWSEmrServerlessAccessTestSuite + extends AnyFlatSpec + with BeforeAndAfter + with Matchers + with Logging { lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST") lazy val testPort: Int = -1