Skip to content

Commit

Permalink
Add FlintJob integration test with EMR serverless (opensearch-project…
Browse files Browse the repository at this point in the history
…#449)

Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Jul 25, 2024
1 parent 938f4be commit 538dd54
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 9 deletions.
7 changes: 6 additions & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ sbt integtest/test
### AWS Integration Test
The integration folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain.
```
export AWS_OPENSEARCH_HOST=search-xxx.aos.us-west-2.on.aws
export AWS_OPENSEARCH_HOST=search-xxx.us-west-2.on.aws
export AWS_REGION=us-west-2
export AWS_EMRS_APPID=xxx
export AWS_EMRS_EXECUTION_ROLE=xxx
export AWS_S3_CODE_BUCKET=xxx
export AWS_S3_CODE_PREFIX=xxx
export AWS_OPENSEARCH_RESULT_INDEX=query_execution_result_glue
```
And run the
```
Expand Down
27 changes: 19 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import Dependencies._

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile
// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59
lazy val jacksonVersion = "2.13.4"

// The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version.
// Issue: https://github.com/opensearch-project/opensearch-spark/issues/442
Expand Down Expand Up @@ -49,7 +52,11 @@ lazy val commonSettings = Seq(
compileScalastyle := (Compile / scalastyle).toTask("").value,
Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value,
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value)
Test / test := ((Test / test) dependsOn testScalastyle).value,
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion
))

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
Expand Down Expand Up @@ -218,9 +225,16 @@ lazy val integtest = (project in file("integ-test"))
commonSettings,
name := "integ-test",
scalaVersion := scala212,
inConfig(IntegrationTest)(Defaults.testSettings),
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / parallelExecution := false,
javaOptions ++= Seq(
s"-DappJar=${(sparkSqlApplication / assembly).value.getAbsolutePath}",
s"-DextensionJar=${(flintSparkIntegration / assembly).value.getAbsolutePath}",
s"-DpplJar=${(pplSparkIntegration / assembly).value.getAbsolutePath}",
),
inConfig(IntegrationTest)(Defaults.testSettings ++ Seq(
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / parallelExecution := false,
IntegrationTest / fork := true,
)),
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
Expand All @@ -229,10 +243,7 @@ lazy val integtest = (project in file("integ-test"))
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test",
// add opensearch-java client to get node stats
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test"),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.aws

import java.time.LocalDateTime

import scala.concurrent.duration.DurationInt

import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder
import com.amazonaws.services.emrserverless.model.{GetJobRunRequest, JobDriver, SparkSubmit, StartJobRunRequest}
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import org.apache.spark.internal.Logging

class AWSEmrServerlessAccessTestSuite
extends AnyFlatSpec
with BeforeAndAfter
with Matchers
with Logging {

lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST")
lazy val testPort: Int = -1
lazy val testRegion: String = System.getenv("AWS_REGION")
lazy val testScheme: String = "https"
lazy val testAuth: String = "sigv4"

lazy val testAppId: String = System.getenv("AWS_EMRS_APPID")
lazy val testExecutionRole: String = System.getenv("AWS_EMRS_EXECUTION_ROLE")
lazy val testS3CodeBucket: String = System.getenv("AWS_S3_CODE_BUCKET")
lazy val testS3CodePrefix: String = System.getenv("AWS_S3_CODE_PREFIX")
lazy val testResultIndex: String = System.getenv("AWS_OPENSEARCH_RESULT_INDEX")

"EMR Serverless job" should "run successfully" in {
val s3Client = AmazonS3ClientBuilder.standard().withRegion(testRegion).build()
val emrServerless = AWSEMRServerlessClientBuilder.standard().withRegion(testRegion).build()

val appJarPath =
sys.props.getOrElse("appJar", throw new IllegalArgumentException("appJar not set"))
val extensionJarPath = sys.props.getOrElse(
"extensionJar",
throw new IllegalArgumentException("extensionJar not set"))
val pplJarPath =
sys.props.getOrElse("pplJar", throw new IllegalArgumentException("pplJar not set"))

s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/sql-job.jar",
new java.io.File(appJarPath))
s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/extension.jar",
new java.io.File(extensionJarPath))
s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/ppl.jar",
new java.io.File(pplJarPath))

val jobRunRequest = new StartJobRunRequest()
.withApplicationId(testAppId)
.withExecutionRoleArn(testExecutionRole)
.withName(s"integration-${LocalDateTime.now()}")
.withJobDriver(new JobDriver()
.withSparkSubmit(new SparkSubmit()
.withEntryPoint(s"s3://$testS3CodeBucket/$testS3CodePrefix/sql-job.jar")
.withEntryPointArguments(testResultIndex)
.withSparkSubmitParameters(s"--class org.apache.spark.sql.FlintJob --jars " +
s"s3://$testS3CodeBucket/$testS3CodePrefix/extension.jar," +
s"s3://$testS3CodeBucket/$testS3CodePrefix/ppl.jar " +
s"--conf spark.datasource.flint.host=$testHost " +
s"--conf spark.datasource.flint.port=-1 " +
s"--conf spark.datasource.flint.scheme=$testScheme " +
s"--conf spark.datasource.flint.auth=$testAuth " +
s"--conf spark.sql.catalog.glue=org.opensearch.sql.FlintDelegatingSessionCatalog " +
s"--conf spark.flint.datasource.name=glue " +
s"""--conf spark.flint.job.query="SELECT 1" """ +
s"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")))

val jobRunResponse = emrServerless.startJobRun(jobRunRequest)

val startTime = System.currentTimeMillis()
val timeout = 5.minutes.toMillis
var jobState = "STARTING"

while (System.currentTimeMillis() - startTime < timeout
&& (jobState != "FAILED" && jobState != "SUCCESS")) {
Thread.sleep(30000)
val request = new GetJobRunRequest()
.withApplicationId(testAppId)
.withJobRunId(jobRunResponse.getJobRunId)
jobState = emrServerless.getJobRun(request).getJobRun.getState
logInfo(s"Current job state: $jobState at ${System.currentTimeMillis()}")
}

jobState shouldBe "SUCCESS"
}
}

0 comments on commit 538dd54

Please sign in to comment.