Skip to content

Commit

Permalink
Add FlintJob integration test
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo committed Jul 24, 2024
1 parent 938f4be commit 1ee72bd
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 8 deletions.
27 changes: 19 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import Dependencies._

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
// Spark jackson version. Spark jackson-module-scala strictly check the jackson-databind version hould compatbile
// https://github.com/FasterXML/jackson-module-scala/blob/2.18/src/main/scala/com/fasterxml/jackson/module/scala/JacksonModule.scala#L59
lazy val jacksonVersion = "2.13.4"

// The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version.
// Issue: https://github.com/opensearch-project/opensearch-spark/issues/442
Expand Down Expand Up @@ -49,7 +52,11 @@ lazy val commonSettings = Seq(
compileScalastyle := (Compile / scalastyle).toTask("").value,
Compile / compile := ((Compile / compile) dependsOn compileScalastyle).value,
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value)
Test / test := ((Test / test) dependsOn testScalastyle).value,
dependencyOverrides ++= Seq(
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion
))

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
Expand Down Expand Up @@ -218,9 +225,16 @@ lazy val integtest = (project in file("integ-test"))
commonSettings,
name := "integ-test",
scalaVersion := scala212,
inConfig(IntegrationTest)(Defaults.testSettings),
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / parallelExecution := false,
javaOptions ++= Seq(
s"-DappJar=${(sparkSqlApplication / assembly).value.getAbsolutePath}",
s"-DextensionJar=${(flintSparkIntegration / assembly).value.getAbsolutePath}",
s"-DpplJar=${(pplSparkIntegration / assembly).value.getAbsolutePath}",
),
inConfig(IntegrationTest)(Defaults.testSettings ++ Seq(
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / parallelExecution := false,
IntegrationTest / fork := true,
)),
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
Expand All @@ -229,10 +243,7 @@ lazy val integtest = (project in file("integ-test"))
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test",
// add opensearch-java client to get node stats
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test"),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.aws

import java.time.LocalDateTime

import scala.concurrent.duration.DurationInt

import com.amazonaws.services.emrserverless.AWSEMRServerlessClientBuilder
import com.amazonaws.services.emrserverless.model.{GetJobRunRequest, JobDriver, SparkSubmit, StartJobRunRequest}
import com.amazonaws.services.s3.AmazonS3ClientBuilder
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import org.apache.spark.internal.Logging

class AWSEmrServerlessAccessTestSuite extends AnyFlatSpec with BeforeAndAfter with Matchers with Logging {

lazy val testHost: String = System.getenv("AWS_OPENSEARCH_HOST")
lazy val testPort: Int = -1
lazy val testRegion: String = System.getenv("AWS_REGION")
lazy val testScheme: String = "https"
lazy val testAuth: String = "sigv4"

lazy val testAppId: String = System.getenv("AWS_EMRS_APPID")
lazy val testExecutionRole: String = System.getenv("AWS_EMRS_EXECUTION_ROLE")
lazy val testS3CodeBucket: String = System.getenv("AWS_S3_CODE_BUCKET")
lazy val testS3CodePrefix: String = System.getenv("AWS_S3_CODE_PREFIX")
lazy val testResultIndex: String = System.getenv("AWS_OPENSEARCH_RESULT_INDEX")

"EMR Serverless job" should "run successfully" in {
val s3Client = AmazonS3ClientBuilder.standard().withRegion(testRegion).build()
val emrServerless = AWSEMRServerlessClientBuilder.standard().withRegion(testRegion).build()

val appJarPath =
sys.props.getOrElse("appJar", throw new IllegalArgumentException("appJar not set"))
val extensionJarPath = sys.props.getOrElse(
"extensionJar",
throw new IllegalArgumentException("extensionJar not set"))
val pplJarPath =
sys.props.getOrElse("pplJar", throw new IllegalArgumentException("pplJar not set"))

s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/sql-job.jar",
new java.io.File(appJarPath))
s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/extension.jar",
new java.io.File(extensionJarPath))
s3Client.putObject(
testS3CodeBucket,
s"$testS3CodePrefix/ppl.jar",
new java.io.File(pplJarPath))

val jobRunRequest = new StartJobRunRequest()
.withApplicationId(testAppId)
.withExecutionRoleArn(testExecutionRole)
.withName(s"integration-${LocalDateTime.now()}")
.withJobDriver(new JobDriver()
.withSparkSubmit(new SparkSubmit()
.withEntryPoint(s"s3://$testS3CodeBucket/$testS3CodePrefix/sql-job.jar")
.withEntryPointArguments(testResultIndex)
.withSparkSubmitParameters(s"--class org.apache.spark.sql.FlintJob --jars " +
s"s3://$testS3CodeBucket/$testS3CodePrefix/extension.jar," +
s"s3://$testS3CodeBucket/$testS3CodePrefix/ppl.jar " +
s"--conf spark.datasource.flint.host=$testHost " +
s"--conf spark.datasource.flint.port=-1 " +
s"--conf spark.datasource.flint.scheme=$testScheme " +
s"--conf spark.datasource.flint.auth=$testAuth " +
s"--conf spark.sql.catalog.glue=org.opensearch.sql.FlintDelegatingSessionCatalog " +
s"--conf spark.flint.datasource.name=glue " +
s"""--conf spark.flint.job.query="SELECT 1" """ +
s"--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")))

val jobRunResponse = emrServerless.startJobRun(jobRunRequest)

val startTime = System.currentTimeMillis()
val timeout = 5.minutes.toMillis
var jobState = "STARTING"

while (System.currentTimeMillis() - startTime < timeout
&& (jobState != "FAILED" && jobState != "SUCCESS")) {
Thread.sleep(30000)
val request = new GetJobRunRequest()
.withApplicationId(testAppId)
.withJobRunId(jobRunResponse.getJobRunId)
jobState = emrServerless.getJobRun(request).getJobRun.getState
logInfo(s"Current job state: $jobState at ${System.currentTimeMillis()}")
}

jobState shouldBe "SUCCESS"
}
}

0 comments on commit 1ee72bd

Please sign in to comment.