diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java deleted file mode 100644 index fadb8a67a9..0000000000 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/S3GlueSparkSubmitParameters.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * - * * Copyright OpenSearch Contributors - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.opensearch.sql.spark.asyncquery.model; - -import static org.opensearch.sql.spark.data.constants.SparkConstants.AWS_SNAPSHOT_REPOSITORY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR; -import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; -import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE; - -import java.util.LinkedHashMap; -import java.util.Map; -import lombok.Getter; -import lombok.Setter; - -@Getter -@Setter -public class S3GlueSparkSubmitParameters { - - private String className; - private Map config; - public static final String SPACE = " "; - public static final String EQUALS = "="; - - public S3GlueSparkSubmitParameters() { - this.className = DEFAULT_CLASS_NAME; - this.config = new LinkedHashMap<>(); - this.config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE); - this.config.put( - HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, - DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); - this.config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); - this.config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); - this.config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); - this.config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); - this.config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); - this.config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); - this.config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); - this.config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); - this.config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); - this.config.put(FLINT_INDEX_STORE_AWSREGION_KEY, FLINT_DEFAULT_REGION); - this.config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); - this.config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION); - this.config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); - } - - public void addParameter(String key, String value) { - this.config.put(key, value); - } - - @Override - public String toString() { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append(" --class "); - stringBuilder.append(this.className); - stringBuilder.append(SPACE); - for (String key : config.keySet()) { - stringBuilder.append(" --conf "); - stringBuilder.append(key); - stringBuilder.append(EQUALS); - stringBuilder.append(config.get(key)); - stringBuilder.append(SPACE); - } - return stringBuilder.toString(); - } -} diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java new file mode 100644 index 0000000000..b7c6188c51 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -0,0 +1,166 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.asyncquery.model; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.AWS_SNAPSHOT_REPOSITORY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE; +import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER; +import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CATALOG_JAR; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_CREDENTIALS_PROVIDER_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_AUTH; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_HOST; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_PORT; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_REGION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DEFAULT_SCHEME; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SQL_EXTENSION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_CATALOG_HIVE_JAR; +import static org.opensearch.sql.spark.data.constants.SparkConstants.GLUE_HIVE_CATALOG_FACTORY_CLASS; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_CLASS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.JAVA_HOME_LOCATION; +import static org.opensearch.sql.spark.data.constants.SparkConstants.S3_AWS_CREDENTIALS_PROVIDER_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_DRIVER_ENV_JAVA_HOME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_EXECUTOR_ENV_JAVA_HOME_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JARS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_PACKAGES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_JAR_REPOSITORIES_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_EXTENSIONS_KEY; +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_STANDALONE_PACKAGE; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedHashMap; +import java.util.Map; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.datasource.model.DataSourceType; + +/** + * Define Spark Submit Parameters. + */ +@RequiredArgsConstructor +public class SparkSubmitParameters { + public static final String SPACE = " "; + public static final String EQUALS = "="; + + private final String className; + private final Map config; + + public static class Builder { + + private final String className; + private final Map config; + + private Builder() { + className = DEFAULT_CLASS_NAME; + config = new LinkedHashMap<>(); + + config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE); + config.put( + HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY, + DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY); + config.put(SPARK_JARS_KEY, GLUE_CATALOG_HIVE_JAR + "," + FLINT_CATALOG_JAR); + config.put(SPARK_JAR_PACKAGES_KEY, SPARK_STANDALONE_PACKAGE); + config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); + config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); + config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); + config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); + config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH); + config.put(FLINT_INDEX_STORE_AWSREGION_KEY, FLINT_DEFAULT_REGION); + config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER); + config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION); + config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS); + } + + public static Builder builder() { + return new Builder(); + } + + public Builder dataSource(DataSourceMetadata metadata) { + if (DataSourceType.S3GLUE.equals(metadata.getConnector())) { + String roleArn = metadata.getProperties().get("glue.auth.role_arn"); + + config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn); + config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn); + config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn); + + URI uri = parseUri(metadata.getProperties().get("glue.indexstore.opensearch.uri"), + metadata.getName()); + flintConfig( + uri.getHost(), + String.valueOf(uri.getPort()), + uri.getScheme(), + metadata.getProperties().get("glue.indexstore.opensearch.auth"), + metadata.getProperties().get("glue.indexstore.opensearch.region")); + + config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG); + + return this; + } + throw new UnsupportedOperationException( + String.format( + "UnSupported datasource type for async queries:: %s", + metadata.getConnector())); + } + + private void flintConfig(String host, String port, String scheme, String auth, String region) { + config.put(FLINT_INDEX_STORE_HOST_KEY, host); + config.put(FLINT_INDEX_STORE_PORT_KEY, port); + config.put(FLINT_INDEX_STORE_SCHEME_KEY, scheme); + config.put(FLINT_INDEX_STORE_AUTH_KEY, auth); + config.put(FLINT_INDEX_STORE_AWSREGION_KEY, region); + } + + private URI parseUri(String opensearchUri, String datasourceName) { + try { + return new URI(opensearchUri); + } catch (URISyntaxException e) { + throw new IllegalArgumentException( + String.format( + "Bad URI in indexstore configuration of the : %s datasoure.", datasourceName)); + } + } + + public Builder structuredStreaming() { + config.put("spark.flint.job.type", "wait"); + + return this; + } + + public SparkSubmitParameters build() { + return new SparkSubmitParameters(className, config); + } + } + + @Override + public String toString() { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(" --class "); + stringBuilder.append(this.className); + stringBuilder.append(SPACE); + for (String key : config.keySet()) { + stringBuilder.append(" --conf "); + stringBuilder.append(key); + stringBuilder.append(EQUALS); + stringBuilder.append(config.get(key)); + stringBuilder.append(SPACE); + } + return stringBuilder.toString(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java index 83e570ece2..1a8e3203b8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImplEMR.java @@ -40,6 +40,7 @@ public String startJobRun(StartJobRequest startJobRequest) { .withApplicationId(startJobRequest.getApplicationId()) .withExecutionRoleArn(startJobRequest.getExecutionRoleArn()) .withTags(startJobRequest.getTags()) + .withExecutionTimeoutMinutes(startJobRequest.executionTimeout()) .withJobDriver( new JobDriver() .withSparkSubmit( diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java index 94689c7030..2c551c306e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/StartJobRequest.java @@ -20,4 +20,13 @@ public class StartJobRequest { private final String executionRoleArn; private final String sparkSubmitParams; private final Map tags; + + /** + * true if it is Spark Structured Streaming job. + */ + private final boolean isStructuredStreaming; + + public Long executionTimeout() { + return isStructuredStreaming ? 0L : 60L; + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java index 904d199663..cb48d8718d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java @@ -5,30 +5,16 @@ package org.opensearch.sql.spark.dispatcher; -import static org.opensearch.sql.spark.data.constants.SparkConstants.DRIVER_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_DELEGATE_CATALOG; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AUTH_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_AWSREGION_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_HOST_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_PORT_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_INDEX_STORE_SCHEME_KEY; -import static org.opensearch.sql.spark.data.constants.SparkConstants.HIVE_METASTORE_GLUE_ARN_KEY; - import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRunState; -import java.net.URI; -import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import lombok.AllArgsConstructor; import org.json.JSONObject; import org.opensearch.sql.datasource.DataSourceService; -import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.datasource.model.DataSourceType; import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl; -import org.opensearch.sql.spark.asyncquery.model.S3GlueSparkSubmitParameters; +import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.StartJobRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; @@ -38,7 +24,9 @@ import org.opensearch.sql.spark.rest.model.LangType; import org.opensearch.sql.spark.utils.SQLQueryUtils; -/** This class takes care of understanding query and dispatching job query to emr serverless. */ +/** + * This class takes care of understanding query and dispatching job query to emr serverless. + */ @AllArgsConstructor public class SparkQueryDispatcher { @@ -78,9 +66,9 @@ public String cancelJob(String applicationId, String jobId) { private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryRequest) { if (LangType.SQL.equals(dispatchQueryRequest.getLangType())) { - if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) + if (SQLQueryUtils.isIndexQuery(dispatchQueryRequest.getQuery())) { return getStartJobRequestForIndexRequest(dispatchQueryRequest); - else { + } else { return getStartJobRequestForNonIndexQueries(dispatchQueryRequest); } } @@ -88,48 +76,6 @@ private StartJobRequest getStartJobRequest(DispatchQueryRequest dispatchQueryReq String.format("UnSupported Lang type:: %s", dispatchQueryRequest.getLangType())); } - private String getDataSourceRoleARN(DataSourceMetadata dataSourceMetadata) { - if (DataSourceType.S3GLUE.equals(dataSourceMetadata.getConnector())) { - return dataSourceMetadata.getProperties().get("glue.auth.role_arn"); - } - throw new UnsupportedOperationException( - String.format( - "UnSupported datasource type for async queries:: %s", - dataSourceMetadata.getConnector())); - } - - private String constructSparkParameters(String datasourceName) { - DataSourceMetadata dataSourceMetadata = - dataSourceService.getRawDataSourceMetadata(datasourceName); - S3GlueSparkSubmitParameters s3GlueSparkSubmitParameters = new S3GlueSparkSubmitParameters(); - s3GlueSparkSubmitParameters.addParameter( - DRIVER_ENV_ASSUME_ROLE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - s3GlueSparkSubmitParameters.addParameter( - EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - s3GlueSparkSubmitParameters.addParameter( - HIVE_METASTORE_GLUE_ARN_KEY, getDataSourceRoleARN(dataSourceMetadata)); - String opensearchuri = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.uri"); - URI uri; - try { - uri = new URI(opensearchuri); - } catch (URISyntaxException e) { - throw new IllegalArgumentException( - String.format( - "Bad URI in indexstore configuration of the : %s datasoure.", datasourceName)); - } - String auth = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.auth"); - String region = dataSourceMetadata.getProperties().get("glue.indexstore.opensearch.region"); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_HOST_KEY, uri.getHost()); - s3GlueSparkSubmitParameters.addParameter( - FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort())); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme()); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AUTH_KEY, auth); - s3GlueSparkSubmitParameters.addParameter(FLINT_INDEX_STORE_AWSREGION_KEY, region); - s3GlueSparkSubmitParameters.addParameter( - "spark.sql.catalog." + datasourceName, FLINT_DELEGATE_CATALOG); - return s3GlueSparkSubmitParameters.toString(); - } - private StartJobRequest getStartJobRequestForNonIndexQueries( DispatchQueryRequest dispatchQueryRequest) { StartJobRequest startJobRequest; @@ -153,8 +99,10 @@ private StartJobRequest getStartJobRequestForNonIndexQueries( jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), - constructSparkParameters(fullyQualifiedTableName.getDatasourceName()), - tags); + SparkSubmitParameters.Builder.builder() + .dataSource(dataSourceService.getRawDataSourceMetadata(fullyQualifiedTableName.getDatasourceName())) + .build().toString(), + tags, false); return startJobRequest; } @@ -180,8 +128,11 @@ private StartJobRequest getStartJobRequestForIndexRequest( jobName, dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), - constructSparkParameters(fullyQualifiedTableName.getDatasourceName()), - tags); + SparkSubmitParameters.Builder.builder() + .dataSource(dataSourceService.getRawDataSourceMetadata(fullyQualifiedTableName.getDatasourceName())) + .structuredStreaming() + .build().toString(), + tags, true); return startJobRequest; } diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java index 0765b90534..229fb8814f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/client/EmrServerlessClientImplTest.java @@ -44,7 +44,7 @@ void testStartJobRun() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, SPARK_SUBMIT_PARAMETERS, - new HashMap<>())); + new HashMap<>(), false)); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java new file mode 100644 index 0000000000..29c9b5d3e9 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/client/StartJobRequestTest.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.client; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.Map; +import org.junit.jupiter.api.Test; + +class StartJobRequestTest { + + @Test + void executionTimeout() { + assertEquals(60L, onDemandJob().executionTimeout()); + assertEquals(0L, streamingJob().executionTimeout()); + } + + private StartJobRequest onDemandJob() { + return new StartJobRequest("","","","","", Map.of(), false); + } + + private StartJobRequest streamingJob() { + return new StartJobRequest("","","","","", Map.of(), true); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index d83505fde0..7275947730 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -68,7 +68,7 @@ void testDispatchSelectQuery() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, constructExpectedSparkSubmitParameterString(), - tags))) + tags, false))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -85,7 +85,7 @@ void testDispatchSelectQuery() { EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, constructExpectedSparkSubmitParameterString(), - tags)); + tags, false)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -112,8 +112,8 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:my_glue.default.http_logs.elb_and_requestUri", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), - tags))) + withStructuredStreaming(constructExpectedSparkSubmitParameterString()), + tags, true))) .thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.getRawDataSourceMetadata("my_glue")).thenReturn(dataSourceMetadata); @@ -129,8 +129,8 @@ void testDispatchIndexQuery() { "TEST_CLUSTER:my_glue.default.http_logs.elb_and_requestUri", EMRS_APPLICATION_ID, EMRS_EXECUTION_ROLE, - constructExpectedSparkSubmitParameterString(), - tags)); + withStructuredStreaming(constructExpectedSparkSubmitParameterString()), + tags, true)); Assertions.assertEquals(EMR_JOB_ID, jobId); } @@ -389,6 +389,10 @@ private String constructExpectedSparkSubmitParameterString() { + " --conf spark.sql.catalog.my_glue=org.opensearch.sql.FlintDelegateCatalog "; } + private String withStructuredStreaming(String parameters) { + return parameters + " --conf spark.flint.job.type=wait "; + } + private DataSourceMetadata constructMyGlueDataSourceMetadata() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata(); dataSourceMetadata.setName("my_glue");