From b1da712adfc84ea6ba9ae235409fb6f3335baaa5 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 3 Oct 2023 10:16:05 -0700 Subject: [PATCH 1/6] Add conf for spark structured streaming job Signed-off-by: Peng Huo --- .../model/S3GlueSparkSubmitParameters.java | 97 ---------- .../model/SparkSubmitParameters.java | 166 ++++++++++++++++++ .../client/EmrServerlessClientImplEMR.java | 1 + .../sql/spark/client/StartJobRequest.java | 9 + .../dispatcher/SparkQueryDispatcher.java | 79 ++------- .../client/EmrServerlessClientImplTest.java | 2 +- .../sql/spark/client/StartJobRequestTest.java | 28 +++ .../dispatcher/SparkQueryDispatcherTest.java | 16 +- 8 files changed, 230 insertions(+), 168 deletions(-) delete mode 100644 spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java deleted file mode 100644 index fadb8a67a9..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.spark.asyncquery.model; - -import static org.opensearch.sql.spark.data.constants.SparkConstants.AWS_SNAPSHOT_REPOSITORY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR; -import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE; - -import java.util.LinkedHashMap; -import java.util.Map; -import lombok.Getter; -import lombok.Setter; - -@Getter -@Setter -public class S3GlueSparkSubmitParameters { - - private String className; - private Map config; - public static final String SPACE = " "; - public static final String EQUALS = "="; - - public S3GlueSparkSubmitParameters() { - this.className = DEFAULT_CLASS_NAME; - this.config = new LinkedHashMap<>(); - this.config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE); - this.config.put( - HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, - DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - this.config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); - this.config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); - this.config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); - this.config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); - this.config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); - this.config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); - this.config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); - this.config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); - this.config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); - this.config.put(FLINT_INDEX_STORE_AWSREGION_KEY, FLINT_DEFAULT_REGION); - this.config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - this.config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION); - this.config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); - } - - public void addParameter(String key, String value) { - this.config.put(key, value); - } - - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(" --class "); - stringBuilder.append(this.className); - stringBuilder.append(SPACE); - for (String key : config.keySet()) { - stringBuilder.append(" --conf "); - stringBuilder.append(key); - stringBuilder.append(EQUALS); - stringBuilder.append(config.get(key)); - stringBuilder.append(SPACE); - } - return stringBuilder.toString(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java new file mode 100644 index 0000000000..b7c6188c51 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.AWS_SNAPSHOT_REPOSITORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER; +import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR; +import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedHashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +/** + * Define Spark Submit Parameters. + */ +@RequiredArgsConstructor +public class SparkSubmitParameters { + public static final String SPACE = " "; + public static final String EQUALS = "="; + + private final String className; + private final Map config; + + public static class Builder { + + private final String className; + private final Map config; + + private Builder() { + className = DEFAULT_CLASS_NAME; + config = new LinkedHashMap<>(); + + config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE); + config.put( + HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, + DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); + config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); + config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); + config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); + config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); + config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); + config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); + config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); + config.put(FLINT_INDEX_STORE_AWSREGION_KEY, FLINT_DEFAULT_REGION); + config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); + config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION); + config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); + } + + public static Builder builder() { + return new Builder(); + } + + public Builder dataSource(DataSourceMetadata metadata) { + if (DataSourceType.S3GLUE.equals(metadata.getConnector())) { + String roleArn = metadata.getProperties().get("glue.auth.role_arn"); + + config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); + config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); + config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn); + + URI uri = parseUri(metadata.getProperties().get("glue.indexstore.opensearch.uri"), + metadata.getName()); + flintConfig( + uri.getHost(), + String.valueOf(uri.getPort()), + uri.getScheme(), + metadata.getProperties().get("glue.indexstore.opensearch.auth"), + metadata.getProperties().get("glue.indexstore.opensearch.region")); + + config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); + + return this; + } + throw new UnsupportedOperationException( + String.format( + "UnSupported datasource type for async queries:: %s", + metadata.getConnector())); + } + + private void flintConfig(String host, String port, String scheme, String auth, String region) { + config.put(FLINT_INDEX_STORE_HOST_KEY, host); + config.put(FLINT_INDEX_STORE_PORT_KEY, port); + config.put(FLINT_INDEX_STORE_SCHEME_KEY, scheme); + config.put(FLINT_INDEX_STORE_AUTH_KEY, auth); + config.put(FLINT_INDEX_STORE_AWSREGION_KEY, region); + } + + private URI parseUri(String opensearchUri, String datasourceName) { + try { + return new URI(opensearchUri); + } catch (URISyntaxException e) { + throw new IllegalArgumentException( + String.format( + "Bad URI in indexstore configuration of the : %s datasoure.", datasourceName)); + } + } + + public Builder structuredStreaming() { + config.put("spark.flint.job.type", "wait"); + + return this; + } + + public SparkSubmitParameters build() { + return new SparkSubmitParameters(className, config); + } + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(" --class "); + stringBuilder.append(this.className); + stringBuilder.append(SPACE); + for (String key : config.keySet()) { + stringBuilder.append(" --conf "); + stringBuilder.append(key); + stringBuilder.append(EQUALS); + stringBuilder.append(config.get(key)); + stringBuilder.append(SPACE); + } + return stringBuilder.toString(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java index 83e570ece2..1a8e3203b8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java @@ -40,6 +40,7 @@ public String startJobRun(StartJobRequest startJobRequest) { .withApplicationId(startJobRequest.getApplicationId()) .withExecutionRoleArn(startJobRequest.getExecutionRoleArn()) .withTags(startJobRequest.getTags()) + .withExecutionTimeoutMinutes(startJobRequest.executionTimeout()) .withJobDriver( new JobDriver() .withSparkSubmit( diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java index 94689c7030..2c551c306e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java @@ -20,4 +20,13 @@ public class StartJobRequest { private final String executionRoleArn; private final String sparkSubmitParams; private final Map tags; + + /** + * true if it is Spark Structured Streaming job. + */ + private final boolean isStructuredStreaming; + + public Long executionTimeout() { + return isStructuredStreaming ? 0L : 60L; + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 904d199663..cb48d8718d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -5,30 +5,16 @@ package org.opensearch.sql.spark.dispatcher; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; - import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRunState; -import java.net.URI; -import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import lombok.AllArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; -import org.opensearch.sql.spark.asyncquery.model.S3GlueSparkSubmitParameters; +import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @@ -38,7 +24,9 @@ import org.opensearch.sql.spark.rest.model.LangType; import org.opensearch.sql.spark.utils.SQLQueryUtils; -/** This class takes care of understanding query and dispatching job query to emr serverless. */ +/** + * This class takes care of understanding query and dispatching job query to emr serverless. + */ @AllArgsConstructor public class SparkQueryDispatcher { @@ -78,9 +66,9 @@ public String cancelJob(String applicationId, String jobId) { private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryRequest) { if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) { - if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) + if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) { return getStartJobRequestForIndexRequest(dispatchQueryRequest); - else { + } else { return getStartJobRequestForNonIndexQueries(dispatchQueryRequest); } } @@ -88,48 +76,6 @@ private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryReq String.format("UnSupported Lang type:: %s", dispatchQueryRequest.getLangType())); } - private String getDataSourceRoleARN(DataSourceMetadata dataSourceMetadata) { - if (DataSourceType.S3GLUE.equals(dataSourceMetadata.getConnector())) { - return dataSourceMetadata.getProperties().get("glue.auth.role_arn"); - } - throw new UnsupportedOperationException( - String.format( - "UnSupported datasource type for async queries:: %s", - dataSourceMetadata.getConnector())); - } - - private String constructSparkParameters(String datasourceName) { - DataSourceMetadata dataSourceMetadata = - dataSourceService.getRawDataSourceMetadata(datasourceName); - S3GlueSparkSubmitParameters s3GlueSparkSubmitParameters = new S3GlueSparkSubmitParameters(); - s3GlueSparkSubmitParameters.addParameter( - DRIVER_ENV_ASSUME_ROLE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - s3GlueSparkSubmitParameters.addParameter( - EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - s3GlueSparkSubmitParameters.addParameter( - HIVE_METASTORE_GLUE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - String opensearchuri = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.uri"); - URI uri; - try { - uri = new URI(opensearchuri); - } catch (URISyntaxException e) { - throw new IllegalArgumentException( - String.format( - "Bad URI in indexstore configuration of the : %s datasoure.", datasourceName)); - } - String auth = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.auth"); - String region = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.region"); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_HOST_KEY, uri.getHost()); - s3GlueSparkSubmitParameters.addParameter( - FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort())); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme()); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, auth); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AWSREGION_KEY, region); - s3GlueSparkSubmitParameters.addParameter( - "spark.sql.catalog." + datasourceName, FLINT_DELEGATE_CATALOG); - return s3GlueSparkSubmitParameters.toString(); - } - private StartJobRequest getStartJobRequestForNonIndexQueries( DispatchQueryRequest dispatchQueryRequest) { StartJobRequest startJobRequest; @@ -153,8 +99,10 @@ private StartJobRequest getStartJobRequestForNonIndexQueries( jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), - constructSparkParameters(fullyQualifiedTableName.getDatasourceName()), - tags); + SparkSubmitParameters.Builder.builder() + .dataSource(dataSourceService.getRawDataSourceMetadata(fullyQualifiedTableName.getDatasourceName())) + .build().toString(), + tags, false); return startJobRequest; } @@ -180,8 +128,11 @@ private StartJobRequest getStartJobRequestForIndexRequest( jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), - constructSparkParameters(fullyQualifiedTableName.getDatasourceName()), - tags); + SparkSubmitParameters.Builder.builder() + .dataSource(dataSourceService.getRawDataSourceMetadata(fullyQualifiedTableName.getDatasourceName())) + .structuredStreaming() + .build().toString(), + tags, true); return startJobRequest; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index 0765b90534..229fb8814f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -44,7 +44,7 @@ void testStartJobRun() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, SPARK_SUBMIT_PARAMETERS, - new HashMap<>())); + new HashMap<>(), false)); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java new file mode 100644 index 0000000000..29c9b5d3e9 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class StartJobRequestTest { + + @Test + void executionTimeout() { + assertEquals(60L, onDemandJob().executionTimeout()); + assertEquals(0L, streamingJob().executionTimeout()); + } + + private StartJobRequest onDemandJob() { + return new StartJobRequest("","","","","", Map.of(), false); + } + + private StartJobRequest streamingJob() { + return new StartJobRequest("","","","","", Map.of(), true); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index d83505fde0..7275947730 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -68,7 +68,7 @@ void testDispatchSelectQuery() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, constructExpectedSparkSubmitParameterString(), - tags))) + tags, false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -85,7 +85,7 @@ void testDispatchSelectQuery() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, constructExpectedSparkSubmitParameterString(), - tags)); + tags, false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -112,8 +112,8 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:my_glue.default.http_logs.elb_and_requestUri", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), - tags))) + withStructuredStreaming(constructExpectedSparkSubmitParameterString()), + tags, true))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -129,8 +129,8 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:my_glue.default.http_logs.elb_and_requestUri", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), - tags)); + withStructuredStreaming(constructExpectedSparkSubmitParameterString()), + tags, true)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -389,6 +389,10 @@ private String constructExpectedSparkSubmitParameterString() { + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog "; } + private String withStructuredStreaming(String parameters) { + return parameters + " --conf spark.flint.job.type=wait "; + } + private DataSourceMetadata constructMyGlueDataSourceMetadata() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); dataSourceMetadata.setName("my_glue"); From 06f7a741a8d3bc2263ba43cdd6b6f574b442603d Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 3 Oct 2023 10:30:27 -0700 Subject: [PATCH 2/6] update Signed-off-by: Peng Huo --- .../sql/spark/asyncquery/model/SparkSubmitParameters.java | 2 +- .../org/opensearch/sql/spark/client/StartJobRequest.java | 5 ++++- .../opensearch/sql/spark/client/StartJobRequestTest.java | 7 ++++--- .../sql/spark/dispatcher/SparkQueryDispatcherTest.java | 2 +- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index b7c6188c51..f586f0386c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -138,7 +138,7 @@ private URI parseUri(String opensearchUri, String datasourceName) { } public Builder structuredStreaming() { - config.put("spark.flint.job.type", "wait"); + config.put("spark.flint.job.type", "streaming"); return this; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java index 2c551c306e..0fad27bf61 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java @@ -14,6 +14,9 @@ */ @Data public class StartJobRequest { + + public static final Long DEFAULT_JOB_TIMEOUT = 120L; + private final String query; private final String jobName; private final String applicationId; @@ -27,6 +30,6 @@ public class StartJobRequest { private final boolean isStructuredStreaming; public Long executionTimeout() { - return isStructuredStreaming ? 0L : 60L; + return isStructuredStreaming ? 0L : DEFAULT_JOB_TIMEOUT; } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java index 29c9b5d3e9..783ce8466e 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.spark.client; import static org.junit.jupiter.api.Assertions.*; +import static org.opensearch.sql.spark.client.StartJobRequest.DEFAULT_JOB_TIMEOUT; import java.util.Map; import org.junit.jupiter.api.Test; @@ -14,15 +15,15 @@ class StartJobRequestTest { @Test void executionTimeout() { - assertEquals(60L, onDemandJob().executionTimeout()); + assertEquals(DEFAULT_JOB_TIMEOUT, onDemandJob().executionTimeout()); assertEquals(0L, streamingJob().executionTimeout()); } private StartJobRequest onDemandJob() { - return new StartJobRequest("","","","","", Map.of(), false); + return new StartJobRequest("", "", "", "", "", Map.of(), false); } private StartJobRequest streamingJob() { - return new StartJobRequest("","","","","", Map.of(), true); + return new StartJobRequest("", "", "", "", "", Map.of(), true); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 7275947730..0980c72472 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -390,7 +390,7 @@ private String constructExpectedSparkSubmitParameterString() { } private String withStructuredStreaming(String parameters) { - return parameters + " --conf spark.flint.job.type=wait "; + return parameters + " --conf spark.flint.job.type=streaming "; } private DataSourceMetadata constructMyGlueDataSourceMetadata() { From e8d32de9fba3e315059f89de26f3e570e1cf8d08 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 3 Oct 2023 16:54:09 -0700 Subject: [PATCH 3/6] fix format Signed-off-by: Peng Huo --- .../model/SparkSubmitParameters.java | 20 ++-- .../sql/spark/client/StartJobRequest.java | 4 +- .../dispatcher/SparkQueryDispatcher.java | 20 ++-- .../client/EmrServerlessClientImplTest.java | 3 +- .../dispatcher/SparkQueryDispatcherTest.java | 102 ++++++++++-------- 5 files changed, 85 insertions(+), 64 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 32ad425ab7..fa43b98de2 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -21,7 +21,6 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; @@ -56,9 +55,7 @@ import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.AuthenticationType; -/** - * Define Spark Submit Parameters. - */ +/** Define Spark Submit Parameters. */ @RequiredArgsConstructor public class SparkSubmitParameters { public static final String SPACE = " "; @@ -107,8 +104,9 @@ public Builder dataSource(DataSourceMetadata metadata) { config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn); config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); - URI uri = parseUri(metadata.getProperties().get("glue.indexstore.opensearch.uri"), - metadata.getName()); + URI uri = + parseUri( + metadata.getProperties().get("glue.indexstore.opensearch.uri"), metadata.getName()); flintConfig( metadata, uri.getHost(), @@ -119,12 +117,11 @@ public Builder dataSource(DataSourceMetadata metadata) { } throw new UnsupportedOperationException( String.format( - "UnSupported datasource type for async queries:: %s", - metadata.getConnector())); + "UnSupported datasource type for async queries:: %s", metadata.getConnector())); } - private void flintConfig(DataSourceMetadata metadata, String host, String port, String scheme, - String auth) { + private void flintConfig( + DataSourceMetadata metadata, String host, String port, String scheme, String auth) { config.put(FLINT_INDEX_STORE_HOST_KEY, host); config.put(FLINT_INDEX_STORE_PORT_KEY, port); config.put(FLINT_INDEX_STORE_SCHEME_KEY, scheme); @@ -132,8 +129,7 @@ private void flintConfig(DataSourceMetadata metadata, String host, String port, } private void setFlintIndexStoreAuthProperties( - DataSourceMetadata dataSourceMetadata, - String authType) { + DataSourceMetadata dataSourceMetadata, String authType) { if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) { config.put(FLINT_INDEX_STORE_AUTH_KEY, authType); String username = diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java index 0fad27bf61..df8f9f61b1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java @@ -24,9 +24,7 @@ public class StartJobRequest { private final String sparkSubmitParams; private final Map tags; - /** - * true if it is Spark Structured Streaming job. - */ + /** true if it is Spark Structured Streaming job. */ private final boolean isStructuredStreaming; public Long executionTimeout() { diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 32dc6e0139..21e7ac2866 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -103,9 +103,13 @@ private StartJobRequest getStartJobRequestForNonIndexQueries( dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() - .dataSource(dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource())) - .build().toString(), - tags, false); + .dataSource( + dataSourceService.getRawDataSourceMetadata( + dispatchQueryRequest.getDatasource())) + .build() + .toString(), + tags, + false); return startJobRequest; } @@ -128,10 +132,14 @@ private StartJobRequest getStartJobRequestForIndexRequest( dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() - .dataSource(dataSourceService.getRawDataSourceMetadata(dispatchQueryRequest.getDatasource())) + .dataSource( + dataSourceService.getRawDataSourceMetadata( + dispatchQueryRequest.getDatasource())) .structuredStreaming() - .build().toString(), - tags, true); + .build() + .toString(), + tags, + true); return startJobRequest; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index 229fb8814f..17d4fe55c0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -44,7 +44,8 @@ void testStartJobRun() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, SPARK_SUBMIT_PARAMETERS, - new HashMap<>(), false)); + new HashMap<>(), + false)); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index df97ee9a68..2a8c21d342 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -75,7 +75,8 @@ void testDispatchSelectQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags, false))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -103,7 +104,8 @@ void testDispatchSelectQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags, false)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -133,7 +135,8 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); } }), - tags, false))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithBasicAuth(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -162,7 +165,8 @@ void testDispatchSelectQueryWithBasicAuthIndexStoreDatasource() { put(FLINT_INDEX_STORE_AUTH_PASSWORD, "password"); } }), - tags, false)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -190,7 +194,8 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { { } }), - tags, false))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadataWithNoAuth(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -217,7 +222,8 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { { } }), - tags, false)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -244,14 +250,16 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - withStructuredStreaming(constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - })), - tags, true))) + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -272,14 +280,16 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - withStructuredStreaming(constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - })), - tags, true)); + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -308,7 +318,8 @@ void testDispatchWithPPLQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags, false))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -336,7 +347,8 @@ void testDispatchWithPPLQuery() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags, false)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -365,7 +377,8 @@ void testDispatchQueryWithoutATableAndDataSourceName() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags, false))) + tags, + false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -393,7 +406,8 @@ void testDispatchQueryWithoutATableAndDataSourceName() { put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); } }), - tags, false)); + tags, + false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -420,14 +434,16 @@ void testDispatchIndexQueryWithoutADatasourceName() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - withStructuredStreaming(constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - })), - tags, true))) + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -448,14 +464,16 @@ void testDispatchIndexQueryWithoutADatasourceName() { "TEST_CLUSTER:index-query", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - withStructuredStreaming(constructExpectedSparkSubmitParameterString( - "sigv4", - new HashMap<>() { - { - put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); - } - })), - tags, true)); + withStructuredStreaming( + constructExpectedSparkSubmitParameterString( + "sigv4", + new HashMap<>() { + { + put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); + } + })), + tags, + true)); Assertions.assertEquals(EMR_JOB_ID, jobId); } From 1bc189ff93e1e36bd646e316fe3c578c3dd8db08 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 3 Oct 2023 17:15:02 -0700 Subject: [PATCH 4/6] fix format Signed-off-by: Peng Huo --- .../model/SparkSubmitParameters.java | 46 +++++++++---------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index fa43b98de2..627d6cfcc5 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -9,6 +9,8 @@ import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME; import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_REGION; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_INDEX_STORE_OPENSEARCH_URI; +import static org.opensearch.sql.datasources.glue.GlueDataSourceFactory.GLUE_ROLE_ARN; import static org.opensearch.sql.spark.data.constants.SparkConstants.AWS_SNAPSHOT_REPOSITORY; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; @@ -50,6 +52,7 @@ import java.net.URISyntaxException; import java.util.LinkedHashMap; import java.util.Map; +import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -97,22 +100,21 @@ public static Builder builder() { public Builder dataSource(DataSourceMetadata metadata) { if (DataSourceType.S3GLUE.equals(metadata.getConnector())) { - String roleArn = metadata.getProperties().get("glue.auth.role_arn"); + String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn); config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); - URI uri = + setFlintIndexStoreHost( parseUri( - metadata.getProperties().get("glue.indexstore.opensearch.uri"), metadata.getName()); - flintConfig( - metadata, - uri.getHost(), - String.valueOf(uri.getPort()), - uri.getScheme(), - metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH)); + metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName())); + setFlintIndexStoreAuthProperties( + metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH), + () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME), + () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD), + () -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION)); return this; } throw new UnsupportedOperationException( @@ -120,28 +122,24 @@ public Builder dataSource(DataSourceMetadata metadata) { "UnSupported datasource type for async queries:: %s", metadata.getConnector())); } - private void flintConfig( - DataSourceMetadata metadata, String host, String port, String scheme, String auth) { - config.put(FLINT_INDEX_STORE_HOST_KEY, host); - config.put(FLINT_INDEX_STORE_PORT_KEY, port); - config.put(FLINT_INDEX_STORE_SCHEME_KEY, scheme); - setFlintIndexStoreAuthProperties(metadata, auth); + private void setFlintIndexStoreHost(URI uri) { + config.put(FLINT_INDEX_STORE_HOST_KEY, uri.getHost()); + config.put(FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort())); + config.put(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme()); } private void setFlintIndexStoreAuthProperties( - DataSourceMetadata dataSourceMetadata, String authType) { + String authType, + Supplier userName, + Supplier password, + Supplier region) { if (AuthenticationType.get(authType).equals(AuthenticationType.BASICAUTH)) { config.put(FLINT_INDEX_STORE_AUTH_KEY, authType); - String username = - dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME); - String password = - dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD); - config.put(FLINT_INDEX_STORE_AUTH_USERNAME, username); - config.put(FLINT_INDEX_STORE_AUTH_PASSWORD, password); + config.put(FLINT_INDEX_STORE_AUTH_USERNAME, userName.get()); + config.put(FLINT_INDEX_STORE_AUTH_PASSWORD, password.get()); } else if (AuthenticationType.get(authType).equals(AuthenticationType.AWSSIGV4AUTH)) { - String region = dataSourceMetadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION); config.put(FLINT_INDEX_STORE_AUTH_KEY, "sigv4"); - config.put(FLINT_INDEX_STORE_AWSREGION_KEY, region); + config.put(FLINT_INDEX_STORE_AWSREGION_KEY, region.get()); } else { config.put(FLINT_INDEX_STORE_AUTH_KEY, authType); } From 888f3e60af9a77d82de8394f5d7caea2af52c095 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 3 Oct 2023 17:20:19 -0700 Subject: [PATCH 5/6] remove unused code Signed-off-by: Peng Huo --- .../sql/spark/dispatcher/SparkQueryDispatcher.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 21e7ac2866..b1d2184bb8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -79,16 +79,6 @@ private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryReq } } - private String getDataSourceRoleARN(DataSourceMetadata dataSourceMetadata) { - if (DataSourceType.S3GLUE.equals(dataSourceMetadata.getConnector())) { - return dataSourceMetadata.getProperties().get("glue.auth.role_arn"); - } - throw new UnsupportedOperationException( - String.format( - "UnSupported datasource type for async queries:: %s", - dataSourceMetadata.getConnector())); - } - private StartJobRequest getStartJobRequestForNonIndexQueries( DispatchQueryRequest dispatchQueryRequest) { StartJobRequest startJobRequest; From 7c1c1002516b1b675b3c1697ca81d491b51a927e Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 3 Oct 2023 17:38:26 -0700 Subject: [PATCH 6/6] fix format Signed-off-by: Peng Huo --- .../opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index b1d2184bb8..1fdc391c85 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -13,8 +13,6 @@ import lombok.AllArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient;