Skip to content

Commit

Permalink
Reduce scope of changes
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Mar 12, 2024
1 parent f110ecc commit 7c86b3a
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.datasources.auth.AuthenticationType;

/** Defines the parameters required for Spark submit command construction. */
/** Define Spark Submit Parameters. */
@AllArgsConstructor
@RequiredArgsConstructor
public class SparkSubmitParameters {
private static final String SPACE = " ";
private static final String EQUALS = "=";
private static final String FLINT_BASIC_AUTH = "basic";
public static final String SPACE = " ";
public static final String EQUALS = "=";
public static final String FLINT_BASIC_AUTH = "basic";

private final String className;
private final Map<String, String> config;
Expand All @@ -40,12 +40,34 @@ public class SparkSubmitParameters {
private String extraParameters;

public static class Builder {

private String className;
private final Map<String, String> config = new LinkedHashMap<>();
private final Map<String, String> config;
private String extraParameters;

private Builder() {
initializeDefaultConfigurations();
className = DEFAULT_CLASS_NAME;
config = new LinkedHashMap<>();

config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE);
config.put(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
config.put(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST);
config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT);
config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
}

public static Builder builder() {
Expand All @@ -63,102 +85,41 @@ public Builder clusterName(String clusterName) {
return this;
}

public Builder dataSource(DataSourceMetadata metadata) {
if (!DataSourceType.S3GLUE.equals(metadata.getConnector())) {
throw new UnsupportedOperationException(
String.format(
"Unsupported datasource type for async queries: %s", metadata.getConnector()));
}

configureDataSource(metadata);
return this;
}

public Builder extraParameters(String params) {
this.extraParameters = params;
return this;
}

public Builder query(String query) {
config.put(FLINT_JOB_QUERY, query);
return this;
}

public Builder sessionExecution(String sessionId, String datasourceName) {
config.put(FLINT_JOB_REQUEST_INDEX, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
config.put(FLINT_JOB_SESSION_ID, sessionId);
return this;
}

public Builder structuredStreaming(Boolean isStructuredStreaming) {
if (Boolean.TRUE.equals(isStructuredStreaming)) {
config.put("spark.flint.job.type", "streaming");
public Builder dataSource(DataSourceMetadata metadata) {
if (DataSourceType.S3GLUE.equals(metadata.getConnector())) {
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);

config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
config.put(FLINT_DATA_SOURCE_KEY, metadata.getName());

setFlintIndexStoreHost(
parseUri(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName()));
setFlintIndexStoreAuthProperties(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION));
config.put("spark.flint.datasource.name", metadata.getName());
return this;
}
return this;
throw new UnsupportedOperationException(
String.format(
"UnSupported datasource type for async queries:: %s", metadata.getConnector()));
}

public SparkSubmitParameters build() {
return new SparkSubmitParameters(className, config, extraParameters);
}

private void configureDataSource(DataSourceMetadata metadata) {
// DataSource specific configuration
String roleArn = metadata.getProperties().get(GLUE_ROLE_ARN);

config.put(DRIVER_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(EXECUTOR_ENV_ASSUME_ROLE_ARN_KEY, roleArn);
config.put(HIVE_METASTORE_GLUE_ARN_KEY, roleArn);
config.put("spark.sql.catalog." + metadata.getName(), FLINT_DELEGATE_CATALOG);
config.put(FLINT_DATA_SOURCE_KEY, metadata.getName());

URI uri =
parseUri(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_URI), metadata.getName());
private void setFlintIndexStoreHost(URI uri) {
config.put(FLINT_INDEX_STORE_HOST_KEY, uri.getHost());
config.put(FLINT_INDEX_STORE_PORT_KEY, String.valueOf(uri.getPort()));
config.put(FLINT_INDEX_STORE_SCHEME_KEY, uri.getScheme());

setFlintIndexStoreAuthProperties(
metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_USERNAME),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_AUTH_PASSWORD),
() -> metadata.getProperties().get(GLUE_INDEX_STORE_OPENSEARCH_REGION));
config.put("spark.flint.datasource.name", metadata.getName());
}

private void initializeDefaultConfigurations() {
className = DEFAULT_CLASS_NAME;
// Default configurations initialization
config.put(S3_AWS_CREDENTIALS_PROVIDER_KEY, DEFAULT_S3_AWS_CREDENTIALS_PROVIDER_VALUE);
config.put(
HADOOP_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY,
DEFAULT_GLUE_CATALOG_CREDENTIALS_PROVIDER_FACTORY_KEY);
config.put(
SPARK_JAR_PACKAGES_KEY,
SPARK_STANDALONE_PACKAGE + "," + SPARK_LAUNCHER_PACKAGE + "," + PPL_STANDALONE_PACKAGE);
config.put(SPARK_JAR_REPOSITORIES_KEY, AWS_SNAPSHOT_REPOSITORY);
config.put(SPARK_DRIVER_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_EXECUTOR_ENV_JAVA_HOME_KEY, JAVA_HOME_LOCATION);
config.put(SPARK_DRIVER_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(SPARK_EXECUTOR_ENV_FLINT_CLUSTER_NAME_KEY, FLINT_DEFAULT_CLUSTER_NAME);
config.put(FLINT_INDEX_STORE_HOST_KEY, FLINT_DEFAULT_HOST);
config.put(FLINT_INDEX_STORE_PORT_KEY, FLINT_DEFAULT_PORT);
config.put(FLINT_INDEX_STORE_SCHEME_KEY, FLINT_DEFAULT_SCHEME);
config.put(FLINT_INDEX_STORE_AUTH_KEY, FLINT_DEFAULT_AUTH);
config.put(FLINT_CREDENTIALS_PROVIDER_KEY, EMR_ASSUME_ROLE_CREDENTIALS_PROVIDER);
config.put(SPARK_SQL_EXTENSIONS_KEY, FLINT_SQL_EXTENSION + "," + FLINT_PPL_EXTENSION);
config.put(HIVE_METASTORE_CLASS_KEY, GLUE_HIVE_CATALOG_FACTORY_CLASS);
}

private URI parseUri(String opensearchUri, String datasourceName) {
try {
return new URI(opensearchUri);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
String.format(
"Bad URI in indexstore configuration for datasource: %s.", datasourceName),
e);
}
}

private void setFlintIndexStoreAuthProperties(
Expand All @@ -177,20 +138,57 @@ private void setFlintIndexStoreAuthProperties(
config.put(FLINT_INDEX_STORE_AUTH_KEY, authType);
}
}

private URI parseUri(String opensearchUri, String datasourceName) {
try {
return new URI(opensearchUri);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
String.format(
"Bad URI in indexstore configuration of the : %s datasoure.", datasourceName));
}
}

public Builder structuredStreaming(Boolean isStructuredStreaming) {
if (isStructuredStreaming) {
config.put("spark.flint.job.type", "streaming");
}
return this;
}

public Builder extraParameters(String params) {
extraParameters = params;
return this;
}

public Builder sessionExecution(String sessionId, String datasourceName) {
config.put(FLINT_JOB_REQUEST_INDEX, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName));
config.put(FLINT_JOB_SESSION_ID, sessionId);
return this;
}

public SparkSubmitParameters build() {
return new SparkSubmitParameters(className, config, extraParameters);
}
}

@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder(" --class ").append(className).append(SPACE);
config.forEach(
(key, value) ->
stringBuilder
.append(" --conf ")
.append(key)
.append(EQUALS)
.append(value)
.append(SPACE));
if (extraParameters != null) stringBuilder.append(extraParameters);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(" --class ");
stringBuilder.append(this.className);
stringBuilder.append(SPACE);
for (String key : config.keySet()) {
stringBuilder.append(" --conf ");
stringBuilder.append(key);
stringBuilder.append(EQUALS);
stringBuilder.append(config.get(key));
stringBuilder.append(SPACE);
}

if (extraParameters != null) {
stringBuilder.append(extraParameters);
}
return stringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ void testDispatchWithWrongURI() {
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME)));
Assertions.assertEquals(
"Bad URI in indexstore configuration for datasource: my_glue.",
"Bad URI in indexstore configuration of the : my_glue datasoure.",
illegalArgumentException.getMessage());
}

Expand All @@ -723,7 +723,7 @@ void testDispatchWithUnSupportedDataSourceType() {
EMRS_EXECUTION_ROLE,
TEST_CLUSTER_NAME)));
Assertions.assertEquals(
"Unsupported datasource type for async queries: PROMETHEUS",
"UnSupported datasource type for async queries:: PROMETHEUS",
unsupportedOperationException.getMessage());
}

Expand Down

0 comments on commit 7c86b3a

Please sign in to comment.