From 3397e4a0787bf3376886f83ae7e50eadd181e6d8 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 4 Oct 2023 17:24:39 -0700 Subject: [PATCH 1/2] align with latest spark-sql application Signed-off-by: Peng Huo --- .../spark/asyncquery/model/SparkSubmitParameters.java | 7 ++++--- .../sql/spark/data/constants/SparkConstants.java | 9 ++++++--- .../spark/response/JobExecutionResponseReader.java | 3 +-- .../spark/dispatcher/SparkQueryDispatcherTest.java | 11 ++++++----- 4 files changed, 17 insertions(+), 13 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index f5ef155c9c..eea298bcf0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -18,7 +18,6 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER; import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; @@ -45,6 +44,7 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE; @@ -80,8 +80,8 @@ private Builder() { config.put( HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); - config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); + config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR); + config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE); config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); @@ -115,6 +115,7 @@ public Builder dataSource(DataSourceMetadata metadata) { () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME), () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD), () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION)); + config.put("spark.flint.datasource.name", metadata.getName()); return this; } throw new UnsupportedOperationException( diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index a0318a9478..111fac7478 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -10,7 +10,7 @@ public class SparkConstants { public static final String STEP_ID_FIELD = "stepId.keyword"; // TODO should be replaced with mvn jar. public static final String SPARK_SQL_APPLICATION_JAR = - "s3://flint-data-dp-eu-west-1-beta/code/flint/sql-job.jar"; + "s3://flint-data-dp-eu-west-1-beta/code/flint/opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar"; public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result"; // TODO should be replaced with mvn jar. public static final String FLINT_INTEGRATION_JAR = @@ -26,7 +26,7 @@ public class SparkConstants { public static final String FLINT_DEFAULT_SCHEME = "http"; public static final String FLINT_DEFAULT_AUTH = "noauth"; public static final String FLINT_DEFAULT_REGION = "us-west-2"; - public static final String DEFAULT_CLASS_NAME = "org.opensearch.sql.FlintJob"; + public static final String DEFAULT_CLASS_NAME = "org.apache.spark.sql.FlintJob"; public static final String S3_AWS_CREDENTIALS_PROVIDER_KEY = "spark.hadoop.fs.s3.customAWSCredentialsProvider"; public static final String DRIVER_ENV_ASSUME_ROLE_ARN_KEY = @@ -62,11 +62,14 @@ public class SparkConstants { "com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory"; public static final String SPARK_STANDALONE_PACKAGE = "org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT"; + public static final String SPARK_LAUNCHER_PACKAGE = + "org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT"; public static final String AWS_SNAPSHOT_REPOSITORY = "https://aws.oss.sonatype.org/content/repositories/snapshots"; public static final String GLUE_HIVE_CATALOG_FACTORY_CLASS = "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"; - public static final String FLINT_DELEGATE_CATALOG = "org.opensearch.sql.FlintDelegateCatalog"; + public static final String FLINT_DELEGATE_CATALOG = + "org.opensearch.sql.FlintDelegatingSessionCatalog"; public static final String FLINT_SQL_EXTENSION = "org.opensearch.flint.spark.FlintSparkExtensions"; public static final String EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER = diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index 8abb7cd11f..4ce136691a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -6,7 +6,6 @@ package org.opensearch.sql.spark.response; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,7 +33,7 @@ public JobExecutionResponseReader(Client client) { } public JSONObject getResultFromOpensearchIndex(String jobId) { - return searchInSparkIndex(QueryBuilders.termQuery(STEP_ID_FIELD, jobId)); + return searchInSparkIndex(QueryBuilders.termQuery("jobRunId", jobId)); } private JSONObject searchInSparkIndex(QueryBuilder query) { diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 2a8c21d342..9d82e8eea7 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -597,14 +597,14 @@ private String constructExpectedSparkSubmitParameterString( authParamConfigBuilder.append(authParams.get(key)); authParamConfigBuilder.append(" "); } - return " --class org.opensearch.sql.FlintJob --conf" + return " --class org.apache.spark.sql.FlintJob --conf" + " spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider" + " --conf" + " spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory" + " --conf" - + " spark.jars=s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar,s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar" + + " spark.jars=s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar" + " --conf" - + " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT" + + " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT" + " --conf" + " spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots" + " --conf" @@ -625,8 +625,9 @@ private String constructExpectedSparkSubmitParameterString( + " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" + " --conf" + " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole" - + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog " - + authParamConfigBuilder; + + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog " + + authParamConfigBuilder + + " --conf spark.flint.datasource.name=my_glue "; } private String withStructuredStreaming(String parameters) { From 2880611be50ae8a1892a2107c53f4b42cf94b8c9 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Wed, 4 Oct 2023 18:37:17 -0700 Subject: [PATCH 2/2] remove s3 depedency Signed-off-by: Peng Huo --- .../spark/asyncquery/model/SparkSubmitParameters.java | 3 --- .../sql/spark/data/constants/SparkConstants.java | 10 ++-------- .../sql/spark/dispatcher/SparkQueryDispatcherTest.java | 2 -- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index eea298bcf0..016e0ea9ff 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -32,7 +32,6 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR; import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; @@ -41,7 +40,6 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE; @@ -80,7 +78,6 @@ private Builder() { config.put( HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR); config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE); config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 111fac7478..b1f28fa7c8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -8,19 +8,13 @@ public class SparkConstants { public static final String EMR = "emr"; public static final String STEP_ID_FIELD = "stepId.keyword"; - // TODO should be replaced with mvn jar. + // EMR-S will download JAR to local maven public static final String SPARK_SQL_APPLICATION_JAR = - "s3://flint-data-dp-eu-west-1-beta/code/flint/opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar"; + "file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar"; public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result"; // TODO should be replaced with mvn jar. public static final String FLINT_INTEGRATION_JAR = "s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar"; - // TODO should be replaced with mvn jar. - public static final String GLUE_CATALOG_HIVE_JAR = - "s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar"; - // TODO should be replaced with mvn jar. - public static final String FLINT_CATALOG_JAR = - "s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar"; public static final String FLINT_DEFAULT_HOST = "localhost"; public static final String FLINT_DEFAULT_PORT = "9200"; public static final String FLINT_DEFAULT_SCHEME = "http"; diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 9d82e8eea7..08907e0700 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -602,8 +602,6 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory" + " --conf" - + " spark.jars=s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar" - + " --conf" + " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT" + " --conf" + " spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots"