diff --git a/docker/apache-spark-sample/.env b/docker/apache-spark-sample/.env
new file mode 100644
index 000000000..a047df5ba
--- /dev/null
+++ b/docker/apache-spark-sample/.env
@@ -0,0 +1,4 @@
+MASTER_UI_PORT=8080
+MASTER_PORT=7077
+UI_PORT=4040
+PPL_JAR=../../ppl-spark-integration/target/scala-2.12/ppl-spark-integration-assembly-0.7.0-SNAPSHOT.jar
diff --git a/docker/apache-spark-sample/docker-compose.yml b/docker/apache-spark-sample/docker-compose.yml
new file mode 100644
index 000000000..df2da6d52
--- /dev/null
+++ b/docker/apache-spark-sample/docker-compose.yml
@@ -0,0 +1,41 @@
+services:
+ spark:
+ image: bitnami/spark:3.5.3
+ ports:
+ - "${MASTER_UI_PORT:-8080}:8080"
+ - "${MASTER_PORT:-7077}:7077"
+ - "${UI_PORT:-4040}:4040"
+ environment:
+ - SPARK_MODE=master
+ - SPARK_RPC_AUTHENTICATION_ENABLED=no
+ - SPARK_RPC_ENCRYPTION_ENABLED=no
+ - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
+ - SPARK_SSL_ENABLED=no
+ - SPARK_PUBLIC_DNS=localhost
+ volumes:
+ - type: bind
+ source: ./spark-defaults.conf
+ target: /opt/bitnami/spark/conf/spark-defaults.conf
+ - type: bind
+ source: $PPL_JAR
+ target: /opt/bitnami/spark/jars/ppl-spark-integration.jar
+
+ spark-worker:
+ image: bitnami/spark:3.5.3
+ environment:
+ - SPARK_MODE=worker
+ - SPARK_MASTER_URL=spark://spark:7077
+ - SPARK_WORKER_MEMORY=${WORKER_MEMORY:-1G}
+ - SPARK_WORKER_CORES=${WORKER_CORES:-1}
+ - SPARK_RPC_AUTHENTICATION_ENABLED=no
+ - SPARK_RPC_ENCRYPTION_ENABLED=no
+ - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
+ - SPARK_SSL_ENABLED=no
+ - SPARK_PUBLIC_DNS=localhost
+ volumes:
+ - type: bind
+ source: ./spark-defaults.conf
+ target: /opt/bitnami/spark/conf/spark-defaults.conf
+ - type: bind
+ source: $PPL_JAR
+ target: /opt/bitnami/spark/jars/ppl-spark-integration.jar
diff --git a/docker/apache-spark-sample/spark-defaults.conf b/docker/apache-spark-sample/spark-defaults.conf
new file mode 100644
index 000000000..47fdaae03
--- /dev/null
+++ b/docker/apache-spark-sample/spark-defaults.conf
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Default system properties included when running spark-submit.
+# This is useful for setting default environmental settings.
+
+# Example:
+# spark.master spark://master:7077
+# spark.eventLog.enabled true
+# spark.eventLog.dir hdfs://namenode:8021/directory
+# spark.serializer org.apache.spark.serializer.KryoSerializer
+# spark.driver.memory 5g
+# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
+spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions
+spark.sql.catalog.dev org.apache.spark.opensearch.catalog.OpenSearchCatalog
diff --git a/docker/spark-emr-sample/.env b/docker/spark-emr-sample/.env
new file mode 100644
index 000000000..a717532a4
--- /dev/null
+++ b/docker/spark-emr-sample/.env
@@ -0,0 +1 @@
+PPL_JAR=../../ppl-spark-integration/target/scala-2.12/ppl-spark-integration-assembly-0.7.0-SNAPSHOT.jar
diff --git a/docker/spark-emr-sample/docker-compose.yml b/docker/spark-emr-sample/docker-compose.yml
new file mode 100644
index 000000000..d0da9f166
--- /dev/null
+++ b/docker/spark-emr-sample/docker-compose.yml
@@ -0,0 +1,17 @@
+services:
+ spark-emr:
+ image: public.ecr.aws/emr-serverless/spark/emr-7.5.0:20241125
+ volumes:
+ - type: bind
+ source: ./logging-conf
+ target: /var/loggingConfiguration/spark
+ - type: bind
+ source: ../spark-sample-app/target/scala-2.12
+ target: /app
+ - type: bind
+ source: ./spark-conf
+ target: /etc/spark/conf
+ - type: bind
+ source: ${PPL_JAR}
+ target: /usr/lib/spark/jars/ppl-spark-integration.jar
+ command: driver --class MyApp /app/myapp_2.12-1.0.jar
diff --git a/docker/spark-emr-sample/logging-conf/run-adot-collector.sh b/docker/spark-emr-sample/logging-conf/run-adot-collector.sh
new file mode 100644
index 000000000..0873413aa
--- /dev/null
+++ b/docker/spark-emr-sample/logging-conf/run-adot-collector.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+# Do nothing as default logging is sufficient
diff --git a/docker/spark-emr-sample/logging-conf/run-fluentd-spark.sh b/docker/spark-emr-sample/logging-conf/run-fluentd-spark.sh
new file mode 100644
index 000000000..0873413aa
--- /dev/null
+++ b/docker/spark-emr-sample/logging-conf/run-fluentd-spark.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+# Do nothing as default logging is sufficient
diff --git a/docker/spark-emr-sample/spark-conf/hive-site.xml b/docker/spark-emr-sample/spark-conf/hive-site.xml
new file mode 100644
index 000000000..f0dc50e1e
--- /dev/null
+++ b/docker/spark-emr-sample/spark-conf/hive-site.xml
@@ -0,0 +1,25 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ hive.metastore.connect.retries
+ 15
+
+
\ No newline at end of file
diff --git a/docker/spark-emr-sample/spark-conf/log4j2.properties b/docker/spark-emr-sample/spark-conf/log4j2.properties
new file mode 100644
index 000000000..27ff7047f
--- /dev/null
+++ b/docker/spark-emr-sample/spark-conf/log4j2.properties
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This property will be overridden for JVMs running inside YARN containers.
+# Other log4j configurations may reference the property, for example, in order to
+# cause a log file to appear in the usual log directory for the YARN container,
+# so that LogPusher will upload it to S3. The following provides a default value
+# to be used for this property such that logs are still written to a valid location
+# even for Spark processes run *outside* of a YARN container (e.g., a Spark
+# driver run in client deploy-mode).
+spark.yarn.app.container.log.dir=/var/log/spark/user/${user.name}
+
+# Set everything to be logged to the console
+rootLogger.level = info
+rootLogger.appenderRef.stdout.ref = console
+
+appender.console.type = Console
+appender.console.name = console
+appender.console.target = SYSTEM_ERR
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+
+# Set the default spark-shell/spark-sql log level to WARN. When running the
+# spark-shell/spark-sql, the log level for these classes is used to overwrite
+# the root logger's log level, so that the user can have different defaults
+# for the shell and regular Spark apps.
+logger.repl.name = org.apache.spark.repl.Main
+logger.repl.level = warn
+
+logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
+logger.thriftserver.level = warn
+
+# Settings to quiet third party logs that are too verbose
+logger.jetty1.name = org.sparkproject.jetty
+logger.jetty1.level = warn
+logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
+logger.jetty2.level = error
+logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
+logger.replexprTyper.level = info
+logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
+logger.replSparkILoopInterpreter.level = info
+logger.parquet1.name = org.apache.parquet
+logger.parquet1.level = error
+logger.parquet2.name = parquet
+logger.parquet2.level = error
+logger.hudi.name = org.apache.hudi
+logger.hudi.level = warn
+
+# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
+logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
+logger.RetryingHMSHandler.level = fatal
+logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
+logger.FunctionRegistry.level = error
+
+# For deploying Spark ThriftServer
+# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
+appender.console.filter.1.type = RegexFilter
+appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
+appender.console.filter.1.onMatch = deny
+appender.console.filter.1.onMismatch = neutral
\ No newline at end of file
diff --git a/docker/spark-emr-sample/spark-conf/metrics.properties b/docker/spark-emr-sample/spark-conf/metrics.properties
new file mode 100644
index 000000000..e69de29bb
diff --git a/docker/spark-emr-sample/spark-conf/spark-defaults.conf b/docker/spark-emr-sample/spark-conf/spark-defaults.conf
new file mode 100644
index 000000000..0a5dabe7d
--- /dev/null
+++ b/docker/spark-emr-sample/spark-conf/spark-defaults.conf
@@ -0,0 +1,65 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+spark.driver.extraClassPath /usr/lib/livy/rsc-jars/*:/usr/lib/livy/repl_2.12-jars/*:/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/goodies/lib/emr-serverless-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-java-sdk/*:/docker/usr/share/aws/emr/emrfs/conf:/docker/usr/share/aws/emr/emrfs/lib/*:/docker/usr/share/aws/emr/emrfs/auxlib/*:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/docker/usr/share/aws/emr/security/conf:/docker/usr/share/aws/emr/security/lib/*:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar:/usr/share/aws/redshift/spark-redshift/lib/*:/usr/share/aws/iceberg/lib/iceberg-emr-common.jar:/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar
+spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
+spark.executor.extraClassPath /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/goodies/lib/emr-serverless-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-java-sdk/*:/docker/usr/share/aws/emr/emrfs/conf:/docker/usr/share/aws/emr/emrfs/lib/*:/docker/usr/share/aws/emr/emrfs/auxlib/*:/docker/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/docker/usr/share/aws/emr/security/conf:/docker/usr/share/aws/emr/security/lib/*:/docker/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/docker/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/docker/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/docker/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/usr/share/aws/redshift/jdbc/RedshiftJDBC.jar:/usr/share/aws/redshift/spark-redshift/lib/*:/usr/share/aws/iceberg/lib/iceberg-emr-common.jar:/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar
+spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
+spark.eventLog.enabled true
+spark.eventLog.dir file:///var/log/spark/apps
+spark.history.fs.logDirectory file:///var/log/spark/apps
+spark.history.ui.port 18080
+spark.blacklist.decommissioning.enabled true
+spark.blacklist.decommissioning.timeout 1h
+spark.resourceManager.cleanupExpiredHost true
+spark.stage.attempt.ignoreOnDecommissionFetchFailure true
+spark.decommissioning.timeout.threshold 20
+spark.files.fetchFailure.unRegisterOutputOnHost true
+spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem 2
+spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem true
+spark.hadoop.fs.s3.getObject.initialSocketTimeoutMilliseconds 2000
+spark.sql.parquet.output.committer.class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
+spark.sql.parquet.fs.optimized.committer.optimization-enabled true
+spark.sql.emr.internal.extensions com.amazonaws.emr.spark.EmrSparkSessionExtensions
+spark.executor.memory 14G
+spark.executor.cores 4
+spark.driver.memory 14G
+spark.driver.cores 4
+spark.executor.defaultJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
+spark.driver.defaultJavaOptions -XX:OnOutOfMemoryError='kill -9 %p'
+spark.hadoop.mapreduce.output.fs.optimized.committer.enabled true
+
+spark.master custom:emr-serverless
+spark.submit.deployMode client
+spark.submit.customResourceManager.submit.class org.apache.spark.deploy.emrserverless.submit.EmrServerlessClientApplication
+spark.hadoop.fs.defaultFS file:///
+spark.dynamicAllocation.enabled true
+spark.dynamicAllocation.shuffleTracking.enabled true
+spark.hadoop.fs.s3.customAWSCredentialsProvider com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+spark.authenticate true
+spark.ui.enabled false
+spark.ui.custom.executor.log.url /logs/{{CONTAINER_ID}}/{{FILE_NAME}}.gz
+
+spark.emr-serverless.client.create.batch.size 100
+spark.emr-serverless.client.describe.batch.size 100
+spark.emr-serverless.client.release.batch.size 100
+spark.dynamicAllocation.initialExecutors 3
+spark.dynamicAllocation.minExecutors 0
+spark.executor.instances 3
+spark.hadoop.fs.s3a.aws.credentials.provider software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
+spark.sql.hive.metastore.sharedPrefixes software.amazon.awssdk.services.dynamodb
+spark.sql.legacy.createHiveTableByDefault false
+spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions
+spark.sql.catalog.dev org.apache.spark.opensearch.catalog.OpenSearchCatalog
diff --git a/docker/spark-emr-sample/spark-conf/spark-env.sh b/docker/spark-emr-sample/spark-conf/spark-env.sh
new file mode 100644
index 000000000..a40f294b6
--- /dev/null
+++ b/docker/spark-emr-sample/spark-conf/spark-env.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+export SPARK_HOME=${SPARK_HOME:-/usr/lib/spark}
+export SPARK_LOG_DIR=${SPARK_LOG_DIR:-/var/log/spark}
+export HADOOP_HOME=${HADOOP_HOME:-/usr/lib/hadoop}
+export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/etc/hadoop/conf}
+export HIVE_CONF_DIR=${HIVE_CONF_DIR:-/etc/hive/conf}
+
+export SPARK_MASTER_PORT=7077
+export SPARK_MASTER_IP=$STANDALONE_SPARK_MASTER_HOST
+export SPARK_MASTER_WEBUI_PORT=8080
+
+export SPARK_WORKER_DIR=${SPARK_WORKER_DIR:-/var/run/spark/work}
+export SPARK_WORKER_PORT=7078
+export SPARK_WORKER_WEBUI_PORT=8081
+
+export HIVE_SERVER2_THRIFT_BIND_HOST=0.0.0.0
+export HIVE_SERVER2_THRIFT_PORT=10001
+
+
+export SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -XX:OnOutOfMemoryError='kill -9 %p'"
+export PYSPARK_PYTHON=${PYSPARK_PYTHON:-/usr/bin/python3}
+export PYSPARK_DRIVER_PYTHON=${PYSPARK_DRIVER_PYTHON:-/usr/bin/python3}
+
+export AWS_STS_REGIONAL_ENDPOINTS=regional
diff --git a/docker/spark-sample-app/build.sbt b/docker/spark-sample-app/build.sbt
new file mode 100644
index 000000000..ea49bfd20
--- /dev/null
+++ b/docker/spark-sample-app/build.sbt
@@ -0,0 +1,8 @@
+name := "MyApp"
+
+version := "1.0"
+
+scalaVersion := "2.12.20"
+
+libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.3"
+
diff --git a/docker/spark-sample-app/src/main/scala/MyApp.scala b/docker/spark-sample-app/src/main/scala/MyApp.scala
new file mode 100644
index 000000000..6e2171c41
--- /dev/null
+++ b/docker/spark-sample-app/src/main/scala/MyApp.scala
@@ -0,0 +1,27 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+import org.apache.spark.sql.SparkSession
+
+object MyApp {
+ def main(args: Array[String]): Unit = {
+ var spark = SparkSession.builder()
+ .master("local[1]")
+ .appName("MyApp")
+ .getOrCreate();
+
+ println("APP Name :" + spark.sparkContext.appName);
+ println("Deploy Mode :" + spark.sparkContext.deployMode);
+ println("Master :" + spark.sparkContext.master);
+
+ spark.sql("CREATE table foo (id int, name varchar(100))").show()
+ println(">>> Table created")
+ spark.sql("SELECT * FROM foo").show()
+ println(">>> SQL query of table completed")
+
+ spark.sql("source=foo | fields id").show()
+ println(">>> PPL query of table completed")
+ }
+}
diff --git a/docs/index.md b/docs/index.md
index e0211d8fa..abc801bde 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -546,6 +546,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.index.checkpointLocation.rootDir`: default is None. Flint will create a default checkpoint location in format of '//' to isolate checkpoint data.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.
+- `spark.datasource.flint.request.completionDelayMillis`: Time to wait in milliseconds after request is complete. Applied after index creation. Default value is 2000 if using aoss service, otherwise 0.
- `spark.flint.monitor.initialDelaySeconds`: Initial delay in seconds before starting the monitoring task. Default value is 15.
- `spark.flint.monitor.intervalSeconds`: Interval in seconds for scheduling the monitoring task. Default value is 60.
- `spark.flint.monitor.maxErrorCount`: Maximum number of consecutive errors allowed before stopping the monitoring task. Default value is 5.
diff --git a/docs/spark-docker.md b/docs/spark-docker.md
new file mode 100644
index 000000000..d1200e2b3
--- /dev/null
+++ b/docs/spark-docker.md
@@ -0,0 +1,164 @@
+# Running Queries with Apache Spark in Docker
+
+There are [Bitnami Apache Spark docker images](https://hub.docker.com/r/bitnami/spark). These
+can be modified to be able to include the OpenSearch Spark PPL extension. With the OpenSearch
+Spark PPL extension, the docker image can be used to test PPL commands.
+
+The Bitnami Apache Spark image can be used to run a Spark cluster and also to run
+`spark-shell` for running queries.
+
+## Prepare OpenSearch Spark PPL Extension
+
+Create a local build or copy of the OpenSearch Spark PPL extension. Make a note of the
+location of the Jar file as well as the name of the Jar file.
+
+From the root of this repository, build the OpenSearch Spark PPL extension with:
+
+```
+sbt clean
+sbt assembly
+```
+
+Refer to the [Developer Guide](../DEVELOPER_GUIDE.md) for more information.
+
+## Using Docker Compose
+
+There are sample files in this repository at `docker/apache-spark-sample` They can be used to
+start up both nodes with the command:
+
+```
+docker compose up -d
+```
+
+The cluster can be stopped with:
+
+```
+docker compose down
+```
+
+### Configuration
+
+There is a file `docker/apache-spark-sample/.env` that can be edited to change some settings.
+
+| Variable Name | Description |
+|----------------|---------------------------------------------------|
+| MASTER_UI_PORT | Host port to bind to port 8080 of the master node |
+| MASTER_PORT | Host port to bind to port 7077 of the master node |
+| UI_PORT | Host port to bind to port 4040 of the master node |
+| PPL_JAR | Path to the PPL Jar file |
+
+## Running Spark Shell
+
+Can run `spark-shell` on the master node.
+
+```
+docker exec -it apache-spark-sample-spark-1 /opt/bitnami/spark/bin/spark-shell
+```
+
+Within the Spark Shell, you can submit queries, including PPL queries. For example a sample
+table can be created, populated and finally queried using PPL.
+
+```
+spark.sql("CREATE TABLE test_table(id int, name varchar(100))")
+spark.sql("INSERT INTO test_table (id, name) VALUES(1, 'Foo')")
+spark.sql("INSERT INTO test_table (id, name) VALUES(2, 'Bar')")
+spark.sql("source=test_table | eval x = id + 5 | fields x, name").show()
+```
+
+For further information, see the [Spark PPL Test Instructions](ppl-lang/local-spark-ppl-test-instruction.md)
+
+## Manual Setup
+
+### spark-conf
+
+Contains the Apache Spark configuration. Need to add three lines to the `spark-defaults.conf`
+file:
+```
+spark.sql.legacy.createHiveTableByDefault false
+spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions
+spark.sql.catalog.dev org.apache.spark.opensearch.catalog.OpenSearchCatalog
+```
+
+An example file available in this repository at `docker/apache-spark-sample/spark-defaults.conf`
+
+## Prepare OpenSearch Spark PPL Extension
+
+Create a local build or copy of the OpenSearch Spark PPL extension. Make a note of the
+location of the Jar file as well as the name of the Jar file.
+
+## Run the Spark Cluster
+
+Need to run a master node and a worker node. For these to communicate, first create a network
+for them to use.
+
+```
+docker network create spark-network
+```
+
+### Master Node
+
+The master node can be run with the following command:
+```
+docker run \
+ -d \
+ --name spark \
+ --network spark-network \
+ -p 8080:8080 \
+ -p 7077:7077 \
+ -p 4040:4040 \
+ -e SPARK_MODE=master \
+ -e SPARK_RPC_AUTHENTICATION_ENABLED=no \
+ -e SPARK_RPC_ENCRYPTION_ENABLED=no \
+ -e SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no \
+ -e SPARK_SSL_ENABLED=no \
+ -e SPARK_PUBLIC_DNS=localhost \
+ -v :/opt/bitnami/spark/conf/spark-defaults.conf \
+ -v /:/opt/bitnami/spark/jars/ \
+ bitnami/spark:3.5.3
+```
+
+* `-d`
+ Run the container in the background and return to the shell
+* `--name spark`
+ Name the docker container `spark`
+* ``
+ Replace with the path to the Spark configuration file.
+* ``
+ Replace with the path to the directory containing the OpenSearch Spark PPL extension
+ Jar file.
+* ``
+ Replace with the filename of the OpenSearch Spark PPL extension Jar file.
+
+### Worker Node
+
+The worker node can be run with the following command:
+```
+docker run \
+ -d \
+ --name spark-worker \
+ --network spark-network \
+ -e SPARK_MODE=worker \
+ -e SPARK_MASTER_URL=spark://spark:7077 \
+ -e SPARK_WORKER_MEMORY=1G \
+ -e SPARK_WORKER_CORES=1 \
+ -e SPARK_RPC_AUTHENTICATION_ENABLED=no \
+ -e SPARK_RPC_ENCRYPTION_ENABLED=no \
+ -e SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no \
+ -e SPARK_SSL_ENABLED=no \
+ -e SPARK_PUBLIC_DNS=localhost \
+ -v :/opt/bitnami/spark/conf/spark-defaults.conf \
+ -v /:/opt/bitnami/spark/jars/ \
+ bitnami/spark:3.5.3
+```
+
+* `-d`
+ Run the container in the background and return to the shell
+* `--name spark-worker`
+ Name the docker container `spark-worker`
+* ``
+ Replace with the path to the Spark configuration file.
+* ``
+ Replace with the path to the directory containing the OpenSearch Spark PPL extension
+ Jar file.
+* ``
+ Replace with the filename of the OpenSearch Spark PPL extension Jar file.
diff --git a/docs/spark-emr-docker.md b/docs/spark-emr-docker.md
new file mode 100644
index 000000000..7eef4d250
--- /dev/null
+++ b/docs/spark-emr-docker.md
@@ -0,0 +1,147 @@
+# Running Queries with Spark EMR in Docker
+
+Spark EMR images are available on the Amazon ECR Public Gallery. These can be modified to
+be able to include the OpenSearch Spark PPL extension. With the OpenSearch Spark PPL
+extension, the docker image can be used to test PPL commands.
+
+The Spark EMR image will run an Apache Spark app if one was specified and then shutdown.
+
+## Prepare OpenSearch Spark PPL Extension
+
+Create a local build or copy of the OpenSearch Spark PPL extension. Make a note of the
+location of the Jar file as well as the name of the Jar file.
+
+From the root of this repository, build the OpenSearch Spark PPL extension with:
+
+```
+sbt clean
+sbt assembly
+```
+
+Refer to the [Developer Guide](../DEVELOPER_GUIDE.md) for more information.
+
+## Using Docker Compose
+
+There are sample files in this repository at `docker/spark-emr-sample` They can be used to
+run the Spark EMR container:
+
+```
+docker compose up
+```
+
+Remove the docker resources afterwards with:
+
+```
+docker compose down
+```
+
+### Configuration
+
+There is a file `docker/spark-emr-sample/.env` that can be edited to change some settings.
+
+| Variable Name | Description |
+|----------------|---------------------------------------------------|
+| PPL_JAR | Path to the PPL Jar file |
+
+## Logs
+
+The logs are available in `/var/log/spark` in the docker container.
+
+STDERR for the app run is available in `/var/log/spark/user/stderr`.
+
+STDOUT for the app
+run is available in `/var/log/spark/user/stdout`.
+
+## Manual Setup
+
+Need to create two directories. These directories will be bound to the directories in the
+image.
+
+Look in `docker/spark-emr-sample` in this repository for samples of the directories
+described below.
+
+### logging-conf
+Contains two shell scripts that are run during startup to configure logging.
+* `run-adot-collector.sh`
+* `run-fluentd-spark.sh`
+
+Unless you need to make changes to the logging in the docker image, these can both be
+empty shell scripts.
+
+### spark-conf
+
+Contains the Apache Spark configuration. Need to add three lines to the `spark-defaults.conf`
+file:
+```
+spark.sql.legacy.createHiveTableByDefault false
+spark.sql.extensions org.opensearch.flint.spark.FlintPPLSparkExtensions
+spark.sql.catalog.dev org.apache.spark.opensearch.catalog.OpenSearchCatalog
+```
+
+## Create a Spark App
+
+An Apache Spark app is needed to provide queries to be run on the Spark EMR instance.
+The image has been tested with an app written in Scala.
+
+An example app is available in this repository in `docker/spark-sample--app`.
+
+### Bulid the Example App
+
+The example app can be built using [SBT](https://www.scala-sbt.org/).
+```
+cd docker/spark-sample-app
+sbt clean package
+```
+
+This will produce a Jar file in `docker/spark-sample-app/target/scala-2.12`
+that can be used with the Spark EMR image.
+
+## Prepare OpenSearch Spark PPL Extension
+
+Create a local build or copy of the OpenSearch Spark PPL extension. Make a note of the
+location of the Jar file as well as the name of the Jar file.
+
+## Run the Spark EMR Image
+
+The Spark EMR image can be run with the following command from the root of this repository:
+```
+docker run \
+ --name spark-emr \
+ -v ./docker/spark-emr-sample/logging-conf:/var/loggingConfiguration/spark \
+ -v ./docker/spark-sample-app/target/scala-2.12:/app \
+ -v ./docker/spark-emr-sample/spark-conf:/etc/spark/conf \
+ -v /:/usr/lib/spark/jars/ \
+ public.ecr.aws/emr-serverless/spark/emr-7.5.0:20241125 \
+ driver \
+ --class MyApp \
+ /app/myapp_2.12-1.0.jar
+```
+
+* `--name spark-emr`
+ Name the docker container `spark-emr`
+* `-v ./docker/spark-emr-sample/logging-conf:/var/loggingConfiguration/spark`
+
+ Bind the directory containing logging shell scripts to the docker image. Needs to bind
+ to `/var/loggingConfiguration/spark` in the image.
+* `-v ./docker/spark-sample-app/target/scala-2.12:/app`
+
+ Bind the directory containing the Apache Spark app Jar file to a location in the
+ docker image. The directory in the docker image must match the path used in the final
+ argument.
+* `-v ./docker/spark-emr-sample/spark-conf:/etc/spark/conf`
+
+ Bind the directory containing the Apache Spark configuration. Needs to bind to
+ `/etc/spark/conf` in the image.
+* ``
+ Replace with the path to the directory containing the OpenSearch Spark PPL extension
+ Jar file.
+* ``
+ Replace with the filename of the OpenSearch Spark PPL extension Jar file.
+* `driver`
+ Start the Spark EMR container as a driver. This will run `spark-submit` to run an
+ app.
+* `--class MyApp`
+ The main class of the Spark App to run.
+* `/app/myapp_2.12-1.0.jar`
+ The full path within the docker container where the Jar file of the Spark app is
+ located.
diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java
index 6ddc6ae9c..f9d181b70 100644
--- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java
+++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java
@@ -88,7 +88,11 @@ public class FlintOptions implements Serializable {
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;
public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 3 * 60 * 1000;
-
+
+ public static final String REQUEST_COMPLETION_DELAY_MILLIS = "request.completionDelayMillis";
+ public static final int DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS = 0;
+ public static final int DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS = 2000;
+
public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";
public static final String BATCH_BYTES = "write.batch_bytes";
@@ -178,6 +182,13 @@ public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}
+ public int getRequestCompletionDelayMillis() {
+ int defaultValue = SERVICE_NAME_AOSS.equals(getServiceName())
+ ? DEFAULT_AOSS_REQUEST_COMPLETION_DELAY_MILLIS
+ : DEFAULT_REQUEST_COMPLETION_DELAY_MILLIS;
+ return Integer.parseInt(options.getOrDefault(REQUEST_COMPLETION_DELAY_MILLIS, String.valueOf(defaultValue)));
+ }
+
public String getDataSourceName() {
return options.getOrDefault(DATA_SOURCE_NAME, "");
}
diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java
index 2bc097bba..5861ccf22 100644
--- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java
+++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java
@@ -44,6 +44,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {
LOG.info("Creating Flint index " + indexName + " with metadata " + metadata);
try {
createIndex(indexName, FlintOpenSearchIndexMetadataService.serialize(metadata, false), metadata.indexSettings());
+ waitRequestComplete(); // Delay to ensure create is complete before making other requests for the index
emitIndexCreationSuccessMetric(metadata.kind());
} catch (IllegalStateException ex) {
emitIndexCreationFailureMetric(metadata.kind());
@@ -131,6 +132,14 @@ private String sanitizeIndexName(String indexName) {
return OpenSearchClientUtils.sanitizeIndexName(indexName);
}
+ private void waitRequestComplete() {
+ try {
+ Thread.sleep(options.getRequestCompletionDelayMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
private void emitIndexCreationSuccessMetric(String indexKind) {
emitIndexCreationMetric(indexKind, "success");
}
diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
index bdcc120c0..364a8a1de 100644
--- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
+++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala
@@ -201,6 +201,11 @@ object FlintSparkConf {
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))
+ val REQUEST_COMPLETION_DELAY_MILLIS =
+ FlintConfig(s"spark.datasource.flint.${FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS}")
+ .datasourceOption()
+ .doc("delay in milliseconds after index creation is completed")
+ .createOptional()
val DATA_SOURCE_NAME =
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
@@ -356,7 +361,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
REQUEST_INDEX,
METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER,
EXCLUDE_JOB_IDS,
- SCROLL_SIZE)
+ SCROLL_SIZE,
+ REQUEST_COMPLETION_DELAY_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
case (_, None) => None
diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala
index 0cde6ab0f..594322bae 100644
--- a/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala
+++ b/flint-spark-integration/src/test/scala/org/apache/spark/sql/flint/config/FlintSparkConfSuite.scala
@@ -114,6 +114,21 @@ class FlintSparkConfSuite extends FlintSuite {
}
}
+ test("test request completionDelayMillis default value") {
+ FlintSparkConf().flintOptions().getRequestCompletionDelayMillis shouldBe 0
+ }
+
+ test("test request completionDelayMillis default value for aoss") {
+ val options = FlintSparkConf(Map("auth.servicename" -> "aoss").asJava).flintOptions()
+ options.getRequestCompletionDelayMillis shouldBe 2000
+ }
+
+ test("test specified request completionDelayMillis") {
+ val options =
+ FlintSparkConf(Map("request.completionDelayMillis" -> "1000").asJava).flintOptions()
+ options.getRequestCompletionDelayMillis shouldBe 1000
+ }
+
test("externalSchedulerIntervalThreshold should return default value when empty") {
val options = FlintSparkConf(Map("spark.flint.job.externalScheduler.interval" -> "").asJava)
assert(options
diff --git a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala
index a2c2d26f6..fe3cefef8 100644
--- a/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala
+++ b/integ-test/src/integration/scala/org/opensearch/flint/core/FlintOpenSearchClientSuite.scala
@@ -65,6 +65,27 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
(settings \ "index.number_of_replicas").extract[String] shouldBe "2"
}
+ it should "create index with request completion delay config" in {
+ val metadata = FlintOpenSearchIndexMetadataService.deserialize("{}")
+ // Create a dummy index to avoid timing the initial overhead
+ flintClient.createIndex("dummy", metadata)
+
+ val indexName = "flint_test_without_request_completion_delay"
+ val elapsedTimeWithoutDelay = timer {
+ flintClient.createIndex(indexName, metadata)
+ }
+
+ val delayIndexName = "flint_test_with_request_completion_delay"
+ val delayOptions =
+ openSearchOptions + (FlintOptions.REQUEST_COMPLETION_DELAY_MILLIS -> "2000")
+ val delayFlintOptions = new FlintOptions(delayOptions.asJava)
+ val delayFlintClient = new FlintOpenSearchClient(delayFlintOptions)
+ val elapsedTimeWithDelay = timer {
+ delayFlintClient.createIndex(delayIndexName, metadata)
+ }
+ elapsedTimeWithDelay - elapsedTimeWithoutDelay should be >= 1800L // allowing 200ms of wiggle room
+ }
+
it should "get all index names with the given index name pattern" in {
val metadata = FlintOpenSearchIndexMetadataService.deserialize(
"""{"properties": {"test": { "type": "integer" } } }""")
@@ -220,4 +241,11 @@ class FlintOpenSearchClientSuite extends AnyFlatSpec with OpenSearchSuite with M
def createTable(indexName: String, options: FlintOptions): Table = {
OpenSearchCluster.apply(indexName, options).asScala.head
}
+
+ def timer(block: => Unit): Long = {
+ val start = System.currentTimeMillis()
+ block
+ val end = System.currentTimeMillis()
+ end - start
+ }
}
diff --git a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4 b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
index 003fb7840..12a503b05 100644
--- a/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
+++ b/ppl-spark-integration/src/main/antlr4/OpenSearchPPLParser.g4
@@ -266,21 +266,16 @@ fillnullCommand
| fillNullWithFieldVariousValues)
;
- fillNullWithTheSameValue
- : WITH nullReplacement IN nullableField (COMMA nullableField)*
- ;
-
- fillNullWithFieldVariousValues
- : USING nullableField EQUAL nullReplacement (COMMA nullableField EQUAL nullReplacement)*
- ;
-
+fillNullWithTheSameValue
+ : WITH nullReplacement = valueExpression IN nullableFieldList = fieldList
+ ;
- nullableField
- : fieldExpression
+fillNullWithFieldVariousValues
+ : USING nullableReplacementExpression (COMMA nullableReplacementExpression)*
;
- nullReplacement
- : expression
+nullableReplacementExpression
+ : nullableField = fieldExpression EQUAL nullableReplacement = valueExpression
;
expandCommand
diff --git a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
index 143a8c97e..7b6b7c863 100644
--- a/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
+++ b/ppl-spark-integration/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
@@ -584,19 +584,18 @@ public UnresolvedPlan visitFillnullCommand(OpenSearchPPLParser.FillnullCommandCo
FillNullWithFieldVariousValuesContext variousValuesContext = ctx.fillNullWithFieldVariousValues();
if (sameValueContext != null) {
// todo consider using expression instead of Literal
- UnresolvedExpression replaceNullWithMe = internalVisitExpression(sameValueContext.nullReplacement().expression());
- List fieldsToReplace = sameValueContext.nullableField()
+ UnresolvedExpression replaceNullWithMe = internalVisitExpression(sameValueContext.nullReplacement);
+ List fieldsToReplace = sameValueContext.nullableFieldList.fieldExpression()
.stream()
.map(this::internalVisitExpression)
.map(Field.class::cast)
.collect(Collectors.toList());
return new FillNull(ofSameValue(replaceNullWithMe, fieldsToReplace));
} else if (variousValuesContext != null) {
- List nullableFieldFills = IntStream.range(0, variousValuesContext.nullableField().size())
+ List nullableFieldFills = IntStream.range(0, variousValuesContext.nullableReplacementExpression().size())
.mapToObj(index -> {
- variousValuesContext.nullableField(index);
- UnresolvedExpression replaceNullWithMe = internalVisitExpression(variousValuesContext.nullReplacement(index).expression());
- Field nullableFieldReference = (Field) internalVisitExpression(variousValuesContext.nullableField(index));
+ UnresolvedExpression replaceNullWithMe = internalVisitExpression(variousValuesContext.nullableReplacementExpression(index).nullableReplacement);
+ Field nullableFieldReference = (Field) internalVisitExpression(variousValuesContext.nullableReplacementExpression(index).nullableField);
return new NullableFieldFill(nullableFieldReference, replaceNullWithMe);
})
.collect(Collectors.toList());