diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java index 9a73b0f364..7ddb92900d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/model/SparkSubmitParameters.java @@ -59,6 +59,8 @@ private Builder() { config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY); config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION); + config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME); + config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME); config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST); config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT); config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME); @@ -77,6 +79,12 @@ public Builder className(String className) { return this; } + public Builder clusterName(String clusterName) { + config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, clusterName); + config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, clusterName); + return this; + } + public Builder dataSource(DataSourceMetadata metadata) { if (DataSourceType.S3GLUE.equals(metadata.getConnector())) { String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN); diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index 3a243cb5b3..95b3c25b99 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -25,6 +25,7 @@ public class SparkConstants { public static final String FLINT_INTEGRATION_JAR = "s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar"; // TODO should be replaced with mvn jar. + public static final String FLINT_DEFAULT_CLUSTER_NAME = "opensearch-cluster"; public static final String FLINT_DEFAULT_HOST = "localhost"; public static final String FLINT_DEFAULT_PORT = "9200"; public static final String FLINT_DEFAULT_SCHEME = "http"; @@ -45,6 +46,10 @@ public class SparkConstants { public static final String SPARK_DRIVER_ENV_JAVA_HOME_KEY = "spark.emr-serverless.driverEnv.JAVA_HOME"; public static final String SPARK_EXECUTOR_ENV_JAVA_HOME_KEY = "spark.executorEnv.JAVA_HOME"; + public static final String SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY = + "spark.emr-serverless.driverEnv.FLINT_CLUSTER_NAME"; + public static final String SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY = + "spark.executorEnv.FLINT_CLUSTER_NAME"; public static final String FLINT_INDEX_STORE_HOST_KEY = "spark.datasource.flint.host"; public static final String FLINT_INDEX_STORE_PORT_KEY = "spark.datasource.flint.port"; public static final String FLINT_INDEX_STORE_SCHEME_KEY = "spark.datasource.flint.scheme"; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index de25f1188c..46dec38038 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -67,7 +67,8 @@ public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource())); - String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query"; + String clusterName = dispatchQueryRequest.getClusterName(); + String jobName = clusterName + ":" + "non-index-query"; Map tags = context.getTags(); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); @@ -79,6 +80,7 @@ public DispatchQueryResponse submit( dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() + .clusterName(clusterName) .dataSource(context.getDataSourceMetadata()) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) .build() diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java index 1da38f03a7..1afba22db7 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/InteractiveQueryHandler.java @@ -70,7 +70,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { public DispatchQueryResponse submit( DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { Session session = null; - String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query"; + String clusterName = dispatchQueryRequest.getClusterName(); + String jobName = clusterName + ":" + "non-index-query"; Map tags = context.getTags(); DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata(); @@ -98,6 +99,7 @@ public DispatchQueryResponse submit( dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() .className(FLINT_SESSION_CLASS_NAME) + .clusterName(clusterName) .dataSource(dataSourceMetadata) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()), tags, diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 6a4045b85a..75337a3dad 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -43,7 +43,8 @@ public DispatchQueryResponse submit( leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource())); - String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query"; + String clusterName = dispatchQueryRequest.getClusterName(); + String jobName = clusterName + ":" + "index-query"; IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails(); Map tags = context.getTags(); tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName()); @@ -56,6 +57,7 @@ public DispatchQueryResponse submit( dispatchQueryRequest.getApplicationId(), dispatchQueryRequest.getExecutionRoleARN(), SparkSubmitParameters.Builder.builder() + .clusterName(clusterName) .dataSource(dataSourceMetadata) .structuredStreaming(true) .extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()) diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 4787058db3..2a499e7d30 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -1055,7 +1055,8 @@ private String constructExpectedSparkSubmitParameterString( + " --conf" + " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" + " --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/" - + " --conf" + + " --conf spark.emr-serverless.driverEnv.FLINT_CLUSTER_NAME=TEST_CLUSTER --conf" + + " spark.executorEnv.FLINT_CLUSTER_NAME=TEST_CLUSTER --conf" + " spark.datasource.flint.host=search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com" + " --conf spark.datasource.flint.port=-1 --conf" + " spark.datasource.flint.scheme=https --conf spark.datasource.flint.auth="