Skip to content

Commit

Permalink
Add cluster name in spark submit params
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jan 3, 2024
1 parent 797276d commit 5bd122f
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private Builder() {
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(FLINT_INDEX_STORE_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST);
config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT);
config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
Expand All @@ -77,6 +78,11 @@ public Builder className(String className) {
return this;
}

public Builder clusterName(String clusterName) {
config.put(FLINT_INDEX_STORE_CLUSTER_NAME_KEY, clusterName);
return this;
}

public Builder dataSource(DataSourceMetadata metadata) {
if (DataSourceType.S3GLUE.equals(metadata.getConnector())) {
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class SparkConstants {
public static final String FLINT_INTEGRATION_JAR =
"s3://spark-datasource/flint-spark-integration-assembly-0.1.0-SNAPSHOT.jar";
// TODO should be replaced with mvn jar.
public static final String FLINT_DEFAULT_CLUSTER_NAME = "opensearch-cluster";
public static final String FLINT_DEFAULT_HOST = "localhost";
public static final String FLINT_DEFAULT_PORT = "9200";
public static final String FLINT_DEFAULT_SCHEME = "http";
Expand All @@ -45,6 +46,7 @@ public class SparkConstants {
public static final String SPARK_DRIVER_ENV_JAVA_HOME_KEY =
"spark.emr-serverless.driverEnv.JAVA_HOME";
public static final String SPARK_EXECUTOR_ENV_JAVA_HOME_KEY = "spark.executorEnv.JAVA_HOME";
public static final String FLINT_INDEX_STORE_CLUSTER_NAME_KEY = "spark.flint.clusterName";
public static final String FLINT_INDEX_STORE_HOST_KEY = "spark.datasource.flint.host";
public static final String FLINT_INDEX_STORE_PORT_KEY = "spark.datasource.flint.port";
public static final String FLINT_INDEX_STORE_SCHEME_KEY = "spark.datasource.flint.scheme";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));

String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
String clusterName = dispatchQueryRequest.getClusterName();
String jobName = clusterName + ":" + "non-index-query";
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();

Expand All @@ -79,6 +80,7 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.clusterName(clusterName)
.dataSource(context.getDataSourceMetadata())
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
Session session = null;
String jobName = dispatchQueryRequest.getClusterName() + ":" + "non-index-query";
String clusterName = dispatchQueryRequest.getClusterName();
String jobName = clusterName + ":" + "non-index-query";
Map<String, String> tags = context.getTags();
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();

Expand Down Expand Up @@ -98,6 +99,7 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.className(FLINT_SESSION_CLASS_NAME)
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams()),
tags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public DispatchQueryResponse submit(

leaseManager.borrow(new LeaseRequest(JobType.STREAMING, dispatchQueryRequest.getDatasource()));

String jobName = dispatchQueryRequest.getClusterName() + ":" + "index-query";
String clusterName = dispatchQueryRequest.getClusterName();
String jobName = clusterName + ":" + "index-query";
IndexQueryDetails indexQueryDetails = context.getIndexQueryDetails();
Map<String, String> tags = context.getTags();
tags.put(INDEX_TAG_KEY, indexQueryDetails.openSearchIndexName());
Expand All @@ -56,6 +57,7 @@ public DispatchQueryResponse submit(
dispatchQueryRequest.getApplicationId(),
dispatchQueryRequest.getExecutionRoleARN(),
SparkSubmitParameters.Builder.builder()
.clusterName(clusterName)
.dataSource(dataSourceMetadata)
.structuredStreaming(true)
.extraParameters(dispatchQueryRequest.getExtraSparkSubmitParams())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ private String constructExpectedSparkSubmitParameterString(
+ " --conf"
+ " spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64/"
+ " --conf spark.flint.clusterName=TEST_CLUSTER"
+ " --conf"
+ " spark.datasource.flint.host=search-flint-dp-benchmark-cf5crj5mj2kfzvgwdeynkxnefy.eu-west-1.es.amazonaws.com"
+ " --conf spark.datasource.flint.port=-1 --conf"
Expand Down

0 comments on commit 5bd122f

Please sign in to comment.