Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FlintJob integration test with EMR serverless #449

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,13 @@ sbt integtest/test
### AWS Integration Test
The integration folder contains tests for cloud server providers. For instance, test against AWS OpenSearch domain, configure the following settings. The client will use the default credential provider to access the AWS OpenSearch domain.
```
export AWS_OPENSEARCH_HOST=search-xxx.aos.us-west-2.on.aws
export AWS_OPENSEARCH_HOST=search-xxx.us-west-2.on.aws
export AWS_REGION=us-west-2
export AWS_EMRS_APPID=xxx
export AWS_EMRS_EXECUTION_ROLE=xxx
export AWS_S3_CODE_BUCKET=xxx
export AWS_S3_CODE_PREFIX=xxx
export AWS_OPENSEARCH_RESULT_INDEX=query_execution_result_glue
```
And run the
```
Expand Down
27 changes: 19 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import Dependencies._

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile
// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59
lazy val jacksonVersion = "2.13.4"

// The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version.
// Issue: https://github.com/opensearch-project/opensearch-spark/issues/442
Expand Down Expand Up @@ -49,7 +52,11 @@ lazy val commonSettings = Seq(
compileScalastyle := (Compile / scalastyle).toTask("").value,
Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value,
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value)
Test / test := ((Test / test) dependsOn testScalastyle).value,
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion
))

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
Expand Down Expand Up @@ -218,9 +225,16 @@ lazy val integtest = (project in file("integ-test"))
commonSettings,
name := "integ-test",
scalaVersion := scala212,
inConfig(IntegrationTest)(Defaults.testSettings),
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / parallelExecution := false,
javaOptions ++= Seq(
s"-DappJar=${(sparkSqlApplication / assembly).value.getAbsolutePath}",
s"-DextensionJar=${(flintSparkIntegration / assembly).value.getAbsolutePath}",
s"-DpplJar=${(pplSparkIntegration / assembly).value.getAbsolutePath}",
),
inConfig(IntegrationTest)(Defaults.testSettings ++ Seq(
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / parallelExecution := false,
IntegrationTest / fork := true,
)),
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
Expand All @@ -229,10 +243,7 @@ lazy val integtest = (project in file("integ-test"))
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test",
// add opensearch-java client to get node stats
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test"),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.aws

import java.time.LocalDateTime

import scala.concurrent.duration.DurationInt

import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder
import com.amazonaws.services.emrserverless.model.{GetJobRunRequest, JobDriver, SparkSubmit, StartJobRunRequest}
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import org.apache.spark.internal.Logging

class AWSEmrServerlessAccessTestSuite
extends AnyFlatSpec
with BeforeAndAfter
with Matchers
with Logging {

lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST")
lazy val testPort: Int = -1
lazy val testRegion: String = System.getenv("AWS_REGION")
lazy val testScheme: String = "https"
lazy val testAuth: String = "sigv4"

lazy val testAppId: String = System.getenv("AWS_EMRS_APPID")
lazy val testExecutionRole: String = System.getenv("AWS_EMRS_EXECUTION_ROLE")
lazy val testS3CodeBucket: String = System.getenv("AWS_S3_CODE_BUCKET")
lazy val testS3CodePrefix: String = System.getenv("AWS_S3_CODE_PREFIX")
lazy val testResultIndex: String = System.getenv("AWS_OPENSEARCH_RESULT_INDEX")

"EMR Serverless job" should "run successfully" in {
val s3Client = AmazonS3ClientBuilder.standard().withRegion(testRegion).build()
val emrServerless = AWSEMRServerlessClientBuilder.standard().withRegion(testRegion).build()

val appJarPath =
sys.props.getOrElse("appJar", throw new IllegalArgumentException("appJar not set"))
val extensionJarPath = sys.props.getOrElse(
"extensionJar",
throw new IllegalArgumentException("extensionJar not set"))
val pplJarPath =
sys.props.getOrElse("pplJar", throw new IllegalArgumentException("pplJar not set"))

s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/sql-job.jar",
new java.io.File(appJarPath))
s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/extension.jar",
new java.io.File(extensionJarPath))
s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/ppl.jar",
new java.io.File(pplJarPath))

val jobRunRequest = new StartJobRunRequest()
.withApplicationId(testAppId)
.withExecutionRoleArn(testExecutionRole)
.withName(s"integration-${LocalDateTime.now()}")
.withJobDriver(new JobDriver()
.withSparkSubmit(new SparkSubmit()
.withEntryPoint(s"s3://$testS3CodeBucket/$testS3CodePrefix/sql-job.jar")
.withEntryPointArguments(testResultIndex)
.withSparkSubmitParameters(s"--class org.apache.spark.sql.FlintJob --jars " +
s"s3://$testS3CodeBucket/$testS3CodePrefix/extension.jar," +
s"s3://$testS3CodeBucket/$testS3CodePrefix/ppl.jar " +
s"--conf spark.datasource.flint.host=$testHost " +
s"--conf spark.datasource.flint.port=-1 " +
s"--conf spark.datasource.flint.scheme=$testScheme " +
s"--conf spark.datasource.flint.auth=$testAuth " +
s"--conf spark.sql.catalog.glue=org.opensearch.sql.FlintDelegatingSessionCatalog " +
s"--conf spark.flint.datasource.name=glue " +
s"""--conf spark.flint.job.query="SELECT 1" """ +
s"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")))

val jobRunResponse = emrServerless.startJobRun(jobRunRequest)

val startTime = System.currentTimeMillis()
val timeout = 5.minutes.toMillis
var jobState = "STARTING"

while (System.currentTimeMillis() - startTime < timeout
&& (jobState != "FAILED" && jobState != "SUCCESS")) {
Thread.sleep(30000)
val request = new GetJobRunRequest()
.withApplicationId(testAppId)
.withJobRunId(jobRunResponse.getJobRunId)
jobState = emrServerless.getJobRun(request).getJobRun.getState
logInfo(s"Current job state: $jobState at ${System.currentTimeMillis()}")
}

jobState shouldBe "SUCCESS"
}
}
Loading