diff --git a/build.sbt b/build.sbt index 6ed0ec537..f9788c852 100644 --- a/build.sbt +++ b/build.sbt @@ -6,6 +6,9 @@ import Dependencies._ lazy val scala212 = "2.12.14" lazy val sparkVersion = "3.3.2" +// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile +// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59 +lazy val jacksonVersion = "2.13.4" // The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version. // Issue: https://github.com/opensearch-project/opensearch-spark/issues/442 @@ -49,7 +52,11 @@ lazy val commonSettings = Seq( compileScalastyle := (Compile / scalastyle).toTask("").value, Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value, testScalastyle := (Test / scalastyle).toTask("").value, - Test / test := ((Test / test) dependsOn testScalastyle).value) + Test / test := ((Test / test) dependsOn testScalastyle).value, + dependencyOverrides ++= Seq( + "com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion, + "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion + )) // running `scalafmtAll` includes all subprojects under root lazy val root = (project in file(".")) @@ -218,9 +225,16 @@ lazy val integtest = (project in file("integ-test")) commonSettings, name := "integ-test", scalaVersion := scala212, - inConfig(IntegrationTest)(Defaults.testSettings), - IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", - IntegrationTest / parallelExecution := false, + javaOptions ++= Seq( + s"-DappJar=${(sparkSqlApplication / assembly).value.getAbsolutePath}", + s"-DextensionJar=${(flintSparkIntegration / assembly).value.getAbsolutePath}", + s"-DpplJar=${(pplSparkIntegration / assembly).value.getAbsolutePath}", + ), + inConfig(IntegrationTest)(Defaults.testSettings ++ Seq( + IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala", + IntegrationTest / parallelExecution := false, + IntegrationTest / fork := true, + )), libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided" exclude ("com.fasterxml.jackson.core", "jackson-databind"), @@ -229,10 +243,7 @@ lazy val integtest = (project in file("integ-test")) "com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", "org.testcontainers" % "testcontainers" % "1.18.0" % "test", "org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test", - "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test", - // add opensearch-java client to get node stats - "org.opensearch.client" % "opensearch-java" % "2.6.0" % "test" - exclude ("com.fasterxml.jackson.core", "jackson-databind")), + "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test"), libraryDependencies ++= deps(sparkVersion), Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value, (sparkSqlApplication / assembly).value diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala new file mode 100644 index 000000000..2b1daee1d --- /dev/null +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/aws/AWSEmrServerlessAccessTestSuite.scala @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.aws + +import java.time.LocalDateTime + +import scala.concurrent.duration.DurationInt + +import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder +import com.amazonaws.services.emrserverless.model.{GetJobRunRequest, JobDriver, SparkSubmit, StartJobRunRequest} +import com.amazonaws.services.s3.AmazonS3ClientBuilder +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.internal.Logging + +class AWSEmrServerlessAccessTestSuite extends AnyFlatSpec with BeforeAndAfter with Matchers with Logging { + + lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST") + lazy val testPort: Int = -1 + lazy val testRegion: String = System.getenv("AWS_REGION") + lazy val testScheme: String = "https" + lazy val testAuth: String = "sigv4" + + lazy val testAppId: String = System.getenv("AWS_EMRS_APPID") + lazy val testExecutionRole: String = System.getenv("AWS_EMRS_EXECUTION_ROLE") + lazy val testS3CodeBucket: String = System.getenv("AWS_S3_CODE_BUCKET") + lazy val testS3CodePrefix: String = System.getenv("AWS_S3_CODE_PREFIX") + lazy val testResultIndex: String = System.getenv("AWS_OPENSEARCH_RESULT_INDEX") + + "EMR Serverless job" should "run successfully" in { + val s3Client = AmazonS3ClientBuilder.standard().withRegion(testRegion).build() + val emrServerless = AWSEMRServerlessClientBuilder.standard().withRegion(testRegion).build() + + val appJarPath = + sys.props.getOrElse("appJar", throw new IllegalArgumentException("appJar not set")) + val extensionJarPath = sys.props.getOrElse( + "extensionJar", + throw new IllegalArgumentException("extensionJar not set")) + val pplJarPath = + sys.props.getOrElse("pplJar", throw new IllegalArgumentException("pplJar not set")) + + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/sql-job.jar", + new java.io.File(appJarPath)) + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/extension.jar", + new java.io.File(extensionJarPath)) + s3Client.putObject( + testS3CodeBucket, + s"$testS3CodePrefix/ppl.jar", + new java.io.File(pplJarPath)) + + val jobRunRequest = new StartJobRunRequest() + .withApplicationId(testAppId) + .withExecutionRoleArn(testExecutionRole) + .withName(s"integration-${LocalDateTime.now()}") + .withJobDriver(new JobDriver() + .withSparkSubmit(new SparkSubmit() + .withEntryPoint(s"s3://$testS3CodeBucket/$testS3CodePrefix/sql-job.jar") + .withEntryPointArguments(testResultIndex) + .withSparkSubmitParameters(s"--class org.apache.spark.sql.FlintJob --jars " + + s"s3://$testS3CodeBucket/$testS3CodePrefix/extension.jar," + + s"s3://$testS3CodeBucket/$testS3CodePrefix/ppl.jar " + + s"--conf spark.datasource.flint.host=$testHost " + + s"--conf spark.datasource.flint.port=-1 " + + s"--conf spark.datasource.flint.scheme=$testScheme " + + s"--conf spark.datasource.flint.auth=$testAuth " + + s"--conf spark.sql.catalog.glue=org.opensearch.sql.FlintDelegatingSessionCatalog " + + s"--conf spark.flint.datasource.name=glue " + + s"""--conf spark.flint.job.query="SELECT 1" """ + + s"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"))) + + val jobRunResponse = emrServerless.startJobRun(jobRunRequest) + + val startTime = System.currentTimeMillis() + val timeout = 5.minutes.toMillis + var jobState = "STARTING" + + while (System.currentTimeMillis() - startTime < timeout + && (jobState != "FAILED" && jobState != "SUCCESS")) { + Thread.sleep(30000) + val request = new GetJobRunRequest() + .withApplicationId(testAppId) + .withJobRunId(jobRunResponse.getJobRunId) + jobState = emrServerless.getJobRun(request).getJobRun.getState + logInfo(s"Current job state: $jobState at ${System.currentTimeMillis()}") + } + + jobState shouldBe "SUCCESS" + } +}