Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align with latest spark-sql application #2216

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER;
import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST;
Expand All @@ -33,7 +32,6 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION;
import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR;
import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY;
Expand All @@ -42,9 +40,9 @@
import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_LAUNCHER_PACKAGE;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY;
import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE;

Expand Down Expand Up @@ -80,8 +78,7 @@ private Builder() {
config.put(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR);
config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE);
config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE);
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
Expand Down Expand Up @@ -115,6 +112,7 @@ public Builder dataSource(DataSourceMetadata metadata) {
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION));
config.put("spark.flint.datasource.name", metadata.getName());
return this;
}
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,19 @@
public class SparkConstants {
public static final String EMR = "emr";
public static final String STEP_ID_FIELD = "stepId.keyword";
// TODO should be replaced with mvn jar.
// EMR-S will download JAR to local maven
public static final String SPARK_SQL_APPLICATION_JAR =
"s3://flint-data-dp-eu-west-1-beta/code/flint/sql-job.jar";
"file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar";
public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result";
// TODO should be replaced with mvn jar.
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
// TODO should be replaced with mvn jar.
public static final String GLUE_CATALOG_HIVE_JAR =
"s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar";
// TODO should be replaced with mvn jar.
public static final String FLINT_CATALOG_JAR =
"s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
public static final String FLINT_DEFAULT_AUTH = "noauth";
public static final String FLINT_DEFAULT_REGION = "us-west-2";
public static final String DEFAULT_CLASS_NAME = "org.opensearch.sql.FlintJob";
public static final String DEFAULT_CLASS_NAME = "org.apache.spark.sql.FlintJob";
public static final String S3_AWS_CREDENTIALS_PROVIDER_KEY =
"spark.hadoop.fs.s3.customAWSCredentialsProvider";
public static final String DRIVER_ENV_ASSUME_ROLE_ARN_KEY =
Expand Down Expand Up @@ -62,11 +56,14 @@ public class SparkConstants {
"com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory";
public static final String SPARK_STANDALONE_PACKAGE =
"org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT";
public static final String SPARK_LAUNCHER_PACKAGE =
"org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT";
public static final String AWS_SNAPSHOT_REPOSITORY =
"https://aws.oss.sonatype.org/content/repositories/snapshots";
public static final String GLUE_HIVE_CATALOG_FACTORY_CLASS =
"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory";
public static final String FLINT_DELEGATE_CATALOG = "org.opensearch.sql.FlintDelegateCatalog";
public static final String FLINT_DELEGATE_CATALOG =
"org.opensearch.sql.FlintDelegatingSessionCatalog";
public static final String FLINT_SQL_EXTENSION =
"org.opensearch.flint.spark.FlintSparkExtensions";
public static final String EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.sql.spark.response;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STEP_ID_FIELD;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -34,7 +33,7 @@ public JobExecutionResponseReader(Client client) {
}

public JSONObject getResultFromOpensearchIndex(String jobId) {
return searchInSparkIndex(QueryBuilders.termQuery(STEP_ID_FIELD, jobId));
return searchInSparkIndex(QueryBuilders.termQuery("jobRunId", jobId));
}

private JSONObject searchInSparkIndex(QueryBuilder query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,14 +597,12 @@ private String constructExpectedSparkSubmitParameterString(
authParamConfigBuilder.append(authParams.get(key));
authParamConfigBuilder.append(" ");
}
return " --class org.opensearch.sql.FlintJob --conf"
return " --class org.apache.spark.sql.FlintJob --conf"
+ " spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.emr.AssumeRoleAWSCredentialsProvider"
+ " --conf"
+ " spark.hadoop.aws.catalog.credentials.provider.factory.class=com.amazonaws.glue.catalog.metastore.STSAssumeRoleSessionCredentialsProviderFactory"
+ " --conf"
+ " spark.jars=s3://flint-data-dp-eu-west-1-beta/code/flint/AWSGlueDataCatalogHiveMetaStoreAuth-1.0.jar,s3://flint-data-dp-eu-west-1-beta/code/flint/flint-catalog.jar"
+ " --conf"
+ " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT"
+ " spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.1.0-SNAPSHOT,org.opensearch:opensearch-spark-sql-application_2.12:0.1.0-SNAPSHOT"
+ " --conf"
+ " spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots"
+ " --conf"
Expand All @@ -625,8 +623,9 @@ private String constructExpectedSparkSubmitParameterString(
+ " spark.executorEnv.ASSUME_ROLE_CREDENTIALS_ROLE_ARN=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf"
+ " spark.hive.metastore.glue.role.arn=arn:aws:iam::924196221507:role/FlintOpensearchServiceRole"
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog "
+ authParamConfigBuilder;
+ " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegatingSessionCatalog "
+ authParamConfigBuilder
+ " --conf spark.flint.datasource.name=my_glue ";
}

private String withStructuredStreaming(String parameters) {
Expand Down