Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some instructions to use Spark EMR docker image #965

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docker/spark-emr-sample/example-app/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name := "MyApp"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets create a simple yet more complex use case application that will run multiple ppl queries and result with a report - similar to the next html report

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is intended to provide instructions so that a developer could run queries against docker images of Spark.

I think that your comment here is asking to generate an HTML report of the integration test results. See my comment below, I consider integration tests out of scope for this PR.

I can add more to the app if it will help a developer better understand how to run their queries. Perhaps loading a simple table.


version := "1.0"

scalaVersion := "2.12.20"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3"

27 changes: 27 additions & 0 deletions docker/spark-emr-sample/example-app/src/main/scala/MyApp.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

import org.apache.spark.sql.SparkSession

object MyApp {
def main(args: Array[String]): Unit = {
var spark = SparkSession.builder()
.master("local[1]")
.appName("MyApp")
.getOrCreate();

println("APP Name :" + spark.sparkContext.appName);
println("Deploy Mode :" + spark.sparkContext.deployMode);
println("Master :" + spark.sparkContext.master);

spark.sql("CREATE table foo (id int, name varchar(100))").show()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a more generic mechanism of creating & loading table data and running a list of dedicated PPL queries based on the table's fields

  • we can use a simpler version of our IT tests for the first iteration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, but I consider that out of scope for this PR. This is work that is planned for follow up PRs. This PR is intended to provide instructions and minimal "code" for a developer to be able to run queries against docker images of Spark.

This PR is not intended to run the integration tests.

The follow up PRs would be:

  1. Create docker files that are intended to be used by the integration tests. This would include containers for S3 and the dashboard server. Only focus on testing against Apache Spark release (not EMR).
  2. Update integration tests to run against the docker containers. This includes loading data, running queries and verifying the output. Also includes running the integration tests from the SBT build.
  3. Create docker files for running the integration tests against Spark EMR. This includes changes so that the user can choose whether integration tests run against Apache Spark or Spark EMR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sound good !
thanks !

println(">>> Table created")
spark.sql("SELECT * FROM foo").show()
println(">>> SQL query of table completed")

spark.sql("source=foo | fields id").show()
println(">>> PPL query of table completed")
}
}
3 changes: 3 additions & 0 deletions docker/spark-emr-sample/logging-conf/run-adot-collector.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

# Do nothing as default logging is sufficient
3 changes: 3 additions & 0 deletions docker/spark-emr-sample/logging-conf/run-fluentd-spark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

# Do nothing as default logging is sufficient
25 changes: 25 additions & 0 deletions docker/spark-emr-sample/spark-conf/hive-site.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Licensed to the Apache Software Foundation (ASF) under one or more -->
<!-- contributor license agreements. See the NOTICE file distributed with -->
<!-- this work for additional information regarding copyright ownership. -->
<!-- The ASF licenses this file to You under the Apache License, Version 2.0 -->
<!-- (the "License"); you may not use this file except in compliance with -->
<!-- the License. You may obtain a copy of the License at -->
<!-- -->
<!-- http://www.apache.org/licenses/LICENSE-2.0 -->
<!-- -->
<!-- Unless required by applicable law or agreed to in writing, software -->
<!-- distributed under the License is distributed on an "AS IS" BASIS, -->
<!-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -->
<!-- See the License for the specific language governing permissions and -->
<!-- limitations under the License. -->

<configuration>

<property>
<name>hive.metastore.connect.retries</name>
<value>15</value>
</property>
</configuration>
74 changes: 74 additions & 0 deletions docker/spark-emr-sample/spark-conf/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This property will be overridden for JVMs running inside YARN containers.
# Other log4j configurations may reference the property, for example, in order to
# cause a log file to appear in the usual log directory for the YARN container,
# so that LogPusher will upload it to S3. The following provides a default value
# to be used for this property such that logs are still written to a valid location
# even for Spark processes run *outside* of a YARN container (e.g., a Spark
# driver run in client deploy-mode).
spark.yarn.app.container.log.dir=/var/log/spark/user/${user.name}

# Set everything to be logged to the console
rootLogger.level = info
rootLogger.appenderRef.stdout.ref = console

appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = warn

# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = info
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = info
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error
logger.hudi.name = org.apache.hudi
logger.hudi.level = warn

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error

# For deploying Spark ThriftServer
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny
appender.console.filter.1.onMismatch = neutral
Empty file.
65 changes: 65 additions & 0 deletions docker/spark-emr-sample/spark-conf/spark-defaults.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

spark.driver.extraClassPath /usr/lib/livy/rsc-jars/*:/usr/lib/livy/repl_2.12-jars/*:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/goodies/lib/emr-serverless-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-java-sdk/*:/docker/usr/share/aws/emr/emrfs/conf:/docker/usr/share/aws/emr/emrfs/lib/*:/docker/usr/share/aws/emr/emrfs/auxlib/*:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/docker/usr/share/aws/emr/security/conf:/docker/usr/share/aws/emr/security/lib/*:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar:/usr/share/aws/redshift/spark-redshift/lib/*:/usr/share/aws/iceberg/lib/iceberg-emr-common.jar:/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar
spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
spark.executor.extraClassPath /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/goodies/lib/emr-serverless-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-java-sdk/*:/docker/usr/share/aws/emr/emrfs/conf:/docker/usr/share/aws/emr/emrfs/lib/*:/docker/usr/share/aws/emr/emrfs/auxlib/*:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/docker/usr/share/aws/emr/security/conf:/docker/usr/share/aws/emr/security/lib/*:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar:/usr/share/aws/redshift/spark-redshift/lib/*:/usr/share/aws/iceberg/lib/iceberg-emr-common.jar:/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar
spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
spark.eventLog.enabled true
spark.eventLog.dir file:///var/log/spark/apps
spark.history.fs.logDirectory file:///var/log/spark/apps
spark.history.ui.port 18080
spark.blacklist.decommissioning.enabled true
spark.blacklist.decommissioning.timeout 1h
spark.resourceManager.cleanupExpiredHost true
spark.stage.attempt.ignoreOnDecommissionFetchFailure true
spark.decommissioning.timeout.threshold 20
spark.files.fetchFailure.unRegisterOutputOnHost true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem true
spark.hadoop.fs.s3.getObject.initialSocketTimeoutMilliseconds 2000
spark.sql.parquet.output.committer.class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.sql.emr.internal.extensions com.amazonaws.emr.spark.EmrSparkSessionExtensions
spark.executor.memory 14G
spark.executor.cores 4
spark.driver.memory 14G
spark.driver.cores 4
spark.executor.defaultJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
spark.driver.defaultJavaOptions -XX:OnOutOfMemoryError='kill -9 %p'
spark.hadoop.mapreduce.output.fs.optimized.committer.enabled true

spark.master custom:emr-serverless
spark.submit.deployMode client
spark.submit.customResourceManager.submit.class org.apache.spark.deploy.emrserverless.submit.EmrServerlessClientApplication
spark.hadoop.fs.defaultFS file:///
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.shuffleTracking.enabled true
spark.hadoop.fs.s3.customAWSCredentialsProvider com.amazonaws.auth.DefaultAWSCredentialsProviderChain
spark.authenticate true
spark.ui.enabled false
spark.ui.custom.executor.log.url /logs/{{CONTAINER_ID}}/{{FILE_NAME}}.gz

spark.emr-serverless.client.create.batch.size 100
spark.emr-serverless.client.describe.batch.size 100
spark.emr-serverless.client.release.batch.size 100
spark.dynamicAllocation.initialExecutors 3
spark.dynamicAllocation.minExecutors 0
spark.executor.instances 3
spark.hadoop.fs.s3a.aws.credentials.provider software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
spark.sql.hive.metastore.sharedPrefixes software.amazon.awssdk.services.dynamodb
spark.sql.legacy.createHiveTableByDefault false
spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions
spark.sql.catalog.dev org.apache.spark.opensearch.catalog.OpenSearchCatalog
39 changes: 39 additions & 0 deletions docker/spark-emr-sample/spark-conf/spark-env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

export SPARK_HOME=${SPARK_HOME:-/usr/lib/spark}
export SPARK_LOG_DIR=${SPARK_LOG_DIR:-/var/log/spark}
export HADOOP_HOME=${HADOOP_HOME:-/usr/lib/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}
export HIVE_CONF_DIR=${HIVE_CONF_DIR:-/etc/hive/conf}

export SPARK_MASTER_PORT=7077
export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST
export SPARK_MASTER_WEBUI_PORT=8080

export SPARK_WORKER_DIR=${SPARK_WORKER_DIR:-/var/run/spark/work}
export SPARK_WORKER_PORT=7078
export SPARK_WORKER_WEBUI_PORT=8081

export HIVE_SERVER2_THRIFT_BIND_HOST=0.0.0.0
export HIVE_SERVER2_THRIFT_PORT=10001


export SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -XX:OnOutOfMemoryError='kill -9 %p'"
export PYSPARK_PYTHON=${PYSPARK_PYTHON:-/usr/bin/python3}
export PYSPARK_DRIVER_PYTHON=${PYSPARK_DRIVER_PYTHON:-/usr/bin/python3}

export AWS_STS_REGIONAL_ENDPOINTS=regional
101 changes: 101 additions & 0 deletions docs/spark-emr-docker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Running Queries with Spark EMR in Docker

Spark EMR images are available on the Amazon ECR Public Gallery. These can be modified to
be able to include the OpenSearch Spark PPL extension. With the OpenSearch Spark PPL
extension, the docker image can be used to test PPL commands.

The Spark EMR image will run an Apache Spark app if one was specified and then shutdown.

## Setup

Need to create two directories. These directories will be bound to the directories in the
image.

Look in `docker/spark-emr-sample` in this repository for samples of the directories
described below.

### logging-conf
Contains two shell scripts that are run during startup to configure logging.
* `run-adot-collector.sh`
* `run-fluentd-spark.sh`

Unless you need to make changes to the logging in the docker image, these can both be
empty shell scripts.

### spark-conf

Contains the Apache Spark configuration. Need to add three lines to the `spark-defaults.conf`
file:
```
spark.sql.legacy.createHiveTableByDefault false
spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions
spark.sql.catalog.dev org.apache.spark.opensearch.catalog.OpenSearchCatalog
```

## Create a Spark App

An Apache Spark app is needed to provide queries to be run on the Spark EMR instance.
The image has been tested with an app written in Scala.

An example app is available in this repository in `docker/spark-emr-sample/example-app`.

### Bulid the Example App

The example app can be built using [SBT](https://www.scala-sbt.org/).
```
cd docker/spark-emr-sample
sbt clean package
```

This will produce a Jar file in `docker/spark-emr-sample/example-app/target/scala-2.12`
that can be used with the Spark EMR image.

## Prepare OpenSearch Spark PPL Extension

Create a local build or copy of the OpenSearch Spark PPL extension. Make a note of the
location of the Jar file as well as the name of the Jar file.

## Run the Spark EMR Image
Copy link
Member

@YANG-DB YANG-DB Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also please add a folder for emr based docker-compose ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a docker-compose.yml for EMR.


The Spark EMR image can be run with the following command:
```
docker run \
--name spark-emr \
-v ./docker/spark-emr-sample/logging-conf:/var/loggingConfiguration/spark \
-v ./docker/spark-emr-sample/example-app/target/scala-2.12:/app \
-v ./docker/spark-emr-sample/spark-conf:/etc/spark/conf \
-v <PATH_TO_SPARK_PPL_JAR_FILE>/<SPARK_PPL_JAR_FILE>:/usr/lib/spark/jars/<SPARK_PPL_JAR_FILE> \
public.ecr.aws/emr-serverless/spark/emr-7.5.0:20241125 \
driver \
--class MyApp \
/app/myapp_2.12-1.0.jar
```

* `--name spark-emr`
Name the docker container `spark-emr`
* `-v ./docker/spark-emr-sample/logging-conf:/var/loggingConfiguration/spark`

Bind the directory containing logging shell scripts to the docker image. Needs to bind
to `/var/loggingConfiguration/spark` in the image.
* `-v ./docker/spark-emr-sample/example-app/target/scala-2.12:/app`

Bind the directory containing the Apache Spark app Jar file to a location in the
docker image. The directory in the docker image must match the path used in the final
argument.
* `-v ./docker/spark-emr-sample/spark-conf:/etc/spark/conf`

Bind the directory containing the Apache Spark configuration. Needs to bind to
`/etc/spark/conf` in the image.
* `<PATH_TO_SPARK_PPL_JAR_FILE>`
Replace with the path to the directory containing the OpenSearch Spark PPL extension
Jar file.
* `<SPARK_PPL_JAR_FILE>`
Replace with the filename of the OpenSearch Spark PPL extension Jar file.
* `driver`
Start the Spark EMR container as a driver. This will run `spark-submit` to run an
app.
* `--class MyApp`
The main class of the Spark App to run.
* `/app/myapp_2.12-1.0.jar`
The full path within the docker container where the Jar file of the Spark app is
located.
Loading