Skip to content

Commit

Permalink
Merge branch 'main' into replace_static_extensions_with_params
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Oct 4, 2023
2 parents d39f682 + 71d67a0 commit 3b63b12
Show file tree
Hide file tree
Showing 5 changed files with 547 additions and 7 deletions.
1 change: 1 addition & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
version = 2.7.5
29 changes: 27 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,37 @@ lazy val standaloneCosmetic = project
Compile / packageBin := (flintSparkIntegration / assembly).value)

lazy val sparkSqlApplication = (project in file("spark-sql-application"))
// dependency will be provided at runtime, so it doesn't need to be included in the assembled JAR
.dependsOn(flintSparkIntegration % "provided")
.settings(
commonSettings,
name := "sql-job",
scalaVersion := scala212,
libraryDependencies ++= Seq("org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion))
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion),
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2",
// Assembly settings
// the sbt assembly plugin found multiple copies of the module-info.class file with
// different contents in the jars that it was merging flintCore dependencies.
// This can happen if you have multiple dependencies that include the same library,
// but with different versions.
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
},
assembly / assemblyMergeStrategy := {
case PathList(ps@_*) if ps.last endsWith ("module-info.class") =>
MergeStrategy.discard
case PathList("module-info.class") => MergeStrategy.discard
case PathList("META-INF", "versions", xs@_, "module-info.class") =>
MergeStrategy.discard
case x =>
val oldStrategy = (assembly / assemblyMergeStrategy).value
oldStrategy(x)
},
assembly / test := (Test / test).value
)

lazy val sparkSqlApplicationCosmetic = project
.settings(
Expand Down
86 changes: 81 additions & 5 deletions spark-sql-application/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
# Spark SQL Application

This application execute sql query and store the result in OpenSearch index in following format
We have two applications: SQLJob and FlintJob.

SQLJob is designed for EMR Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:
```
"stepId":"<emr-step-id>",
"applicationId":"<spark-application-id>"
"applicationId":"<spark-application-id>",
"schema": "json blob",
"result": "json blob"
```

FlintJob is designed for EMR Serverless Spark, executing SQL queries and storing the results in the OpenSearch index in the following format:

```
"jobRunId":"<emrs-job-id>",
"applicationId":"<spark-application-id>",
"schema": "json blob",
"result": "json blob",
"dataSourceName":"<opensearch-data-source-name>"
```

## Prerequisites

+ Spark 3.3.1
Expand All @@ -16,8 +28,9 @@ This application execute sql query and store the result in OpenSearch index in f

## Usage

To use this application, you can run Spark with Flint extension:
To use these applications, you can run Spark with Flint extension:

SQLJob
```
./bin/spark-submit \
--class org.opensearch.sql.SQLJob \
Expand All @@ -32,11 +45,41 @@ To use this application, you can run Spark with Flint extension:
<opensearch-region> \
```

FlintJob
```
aws emr-serverless start-job-run \
--region <region-name> \
--application-id <application-id> \
--execution-role-arn <execution-role> \
--job-driver '{"sparkSubmit": {"entryPoint": "<flint-job-s3-path>", \
"entryPointArguments":["'<sql-query>'", "<result-index>", "<data-source-name>"], \
"sparkSubmitParameters":"--class org.opensearch.sql.FlintJob \
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
--conf spark.emr-serverless.driverEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
--conf spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=<role-to-access-s3-and-opensearch> \
--conf spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory \
--conf spark.hive.metastore.glue.role.arn=<role-to-access-s3-and-opensearch> \
--conf spark.jars=<path-to-AWSGlueDataCatalogHiveMetaStoreAuth-jar> \
--conf spark.jars.packages=<flint-spark-integration-jar-name> \
--conf spark.jars.repositories=<path-to-download_spark-integration-jar> \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
--conf spark.executorEnv.JAVA_HOME=<java-home-in-emr-serverless-host> \
--conf spark.datasource.flint.host=<opensearch-url> \
--conf spark.datasource.flint.port=<opensearch-port> \
--conf spark.datasource.flint.scheme=<http-or-https> \
--conf spark.datasource.flint.auth=<auth-type> \
--conf spark.datasource.flint.region=<region-name> \
--conf spark.datasource.flint.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider \
--conf spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions \
--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory "}}'
<data-source-name>
```

## Result Specifications

Following example shows how the result is written to OpenSearch index after query execution.

Let's assume sql query result is
Let's assume SQL query result is
```
+------+------+
|Letter|Number|
Expand All @@ -46,7 +89,7 @@ Let's assume sql query result is
|C |3 |
+------+------+
```
OpenSearch index document will look like
For SQLJob, OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
Expand All @@ -68,6 +111,31 @@ OpenSearch index document will look like
}
```

For FlintJob, OpenSearch index document will look like
```json
{
"_index" : ".query_execution_result",
"_id" : "A2WOsYgBMUoqCqlDJHrn",
"_score" : 1.0,
"_source" : {
"result" : [
"{'Letter':'A','Number':1}",
"{'Letter':'B','Number':2}",
"{'Letter':'C','Number':3}"
],
"schema" : [
"{'column_name':'Letter','data_type':'string'}",
"{'column_name':'Number','data_type':'integer'}"
],
"jobRunId" : "s-JZSB1139WIVU",
"applicationId" : "application_1687726870985_0003",
"dataSourceName": "myS3Glue",
"status": "SUCCESS",
"error": ""
}
}
```

## Build

To build and run this application with Spark, you can run:
Expand All @@ -76,6 +144,8 @@ To build and run this application with Spark, you can run:
sbt clean sparkSqlApplicationCosmetic/publishM2
```

The jar file is located at `spark-sql-application/target/scala-2.12` folder.

## Test

To run tests, you can use:
Expand All @@ -92,6 +162,12 @@ To check code with scalastyle, you can run:
sbt scalastyle
```

To check code with scalastyle, you can run:

```
sbt testScalastyle
```

## Code of Conduct

This project has adopted an [Open Source Code of Conduct](../CODE_OF_CONDUCT.md).
Expand Down
Loading

0 comments on commit 3b63b12

Please sign in to comment.