Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FlintJob Logic for EMR-S #52

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version = 2.7.5
29 changes: 27 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,37 @@ lazy val standaloneCosmetic = project
Compile / packageBin := (flintSparkIntegration / assembly).value)

lazy val sparkSqlApplication = (project in file("spark-sql-application"))
// dependency will be provided at runtime, so it doesn't need to be included in the assembled JAR
.dependsOn(flintSparkIntegration % "provided")
.settings(
commonSettings,
name := "sql-job",
scalaVersion := scala212,
libraryDependencies ++= Seq("org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion))
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion),
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2",
// Assembly settings
// the sbt assembly plugin found multiple copies of the module-info.class file with
// different contents in the jars that it was merging flintCore dependencies.
// This can happen if you have multiple dependencies that include the same library,
// but with different versions.
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / test := (Test / test).value
)

lazy val sparkSqlApplicationCosmetic = project
.settings(
Expand Down
86 changes: 81 additions & 5 deletions spark-sql-application/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
# Spark SQL Application

This application execute sql query and store the result in OpenSearch index in following format
We have two applications: SQLJob and FlintJob.

SQLJob is designed for EMR Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:
```
"stepId":"<emr-step-id>",
"applicationId":"<spark-application-id>"
"applicationId":"<spark-application-id>",
"schema": "json blob",
"result": "json blob"
```

FlintJob is designed for EMR Serverless Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:

```
"jobRunId":"<emrs-job-id>",
"applicationId":"<spark-application-id>",
"schema": "json blob",
"result": "json blob",
"dataSourceName":"<opensearch-data-source-name>"
```

## Prerequisites

+ Spark 3.3.1
Expand All @@ -16,8 +28,9 @@ This application execute sql query and store the result in OpenSearch index in f

## Usage

To use this application, you can run Spark with Flint extension:
To use these applications, you can run Spark with Flint extension:

SQLJob
```
./bin/spark-submit \
--class org.opensearch.sql.SQLJob \
Expand All @@ -32,11 +45,41 @@ To use this application, you can run Spark with Flint extension:
<opensearch-region> \
```

FlintJob
```
aws emr-serverless start-job-run \
--region <region-name> \
--application-id <application-id> \
--execution-role-arn <execution-role> \
--job-driver '{"sparkSubmit": {"entryPoint": "<flint-job-s3-path>", \
"entryPointArguments":["'<sql-query>'", "<result-index>", "<data-source-name>"], \
"sparkSubmitParameters":"--class org.opensearch.sql.FlintJob \
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
--conf spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
--conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
--conf spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory \
--conf spark.hive.metastore.glue.role.arn=<role-to-access-s3-and-opensearch> \
--conf spark.jars=<path-to-AWSGlueDataCatalogHiveMetaStoreAuth-jar> \
--conf spark.jars.packages=<flint-spark-integration-jar-name> \
--conf spark.jars.repositories=<path-to-download_spark-integration-jar> \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
--conf spark.executorEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
--conf spark.datasource.flint.host=<opensearch-url> \
--conf spark.datasource.flint.port=<opensearch-port> \
--conf spark.datasource.flint.scheme=<http-or-https> \
--conf spark.datasource.flint.auth=<auth-type> \
--conf spark.datasource.flint.region=<region-name> \
--conf spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
--conf spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions \
--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory "}}'
<data-source-name>
```

## Result Specifications

Following example shows how the result is written to OpenSearch index after query execution.

Let's assume sql query result is
Let's assume SQL query result is
```
+------+------+
|Letter|Number|
Expand All @@ -46,7 +89,7 @@ Let's assume sql query result is
|C |3 |
+------+------+
```
OpenSearch index document will look like
For SQLJob, OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
Expand All @@ -68,6 +111,31 @@ OpenSearch index document will look like
}
```

For FlintJob, OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
"_id" : "A2WOsYgBMUoqCqlDJHrn",
"_score" : 1.0,
"_source" : {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing status field.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

"result" : [
"{'Letter':'A','Number':1}",
"{'Letter':'B','Number':2}",
"{'Letter':'C','Number':3}"
],
"schema" : [
"{'column_name':'Letter','data_type':'string'}",
"{'column_name':'Number','data_type':'integer'}"
],
"jobRunId" : "s-JZSB1139WIVU",
"applicationId" : "application_1687726870985_0003",
"dataSourceName": "myS3Glue",
"status": "SUCCESS",
"error": ""
}
}
```

## Build

To build and run this application with Spark, you can run:
Expand All @@ -76,6 +144,8 @@ To build and run this application with Spark, you can run:
sbt clean sparkSqlApplicationCosmetic/publishM2
```

The jar file is located at `spark-sql-application/target/scala-2.12` folder.

## Test

To run tests, you can use:
Expand All @@ -92,6 +162,12 @@ To check code with scalastyle, you can run:
sbt scalastyle
```

To check code with scalastyle, you can run:

```
sbt testScalastyle
```

## Code of Conduct

This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md).
Expand Down
Loading
Loading