Skip to content

Commit

Permalink
Address review comment
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed May 24, 2024
1 parent efa36ac commit a657a27
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@ public class SparkExecutionEngineConfigSupplierImpl implements SparkExecutionEng

@Override
public SparkExecutionEngineConfig getSparkExecutionEngineConfig(RequestContext requestContext) {
ClusterName clusterName = settings.getSettingValue(CLUSTER_NAME);
return getBuilderFromSettingsIfAvailable().clusterName(clusterName.value()).build();
}

private SparkExecutionEngineConfig.SparkExecutionEngineConfigBuilder
getBuilderFromSettingsIfAvailable() {
String sparkExecutionEngineConfigSettingString =
this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG);
SparkExecutionEngineConfig.SparkExecutionEngineConfigBuilder builder =
SparkExecutionEngineConfig.builder();
if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) {
SparkExecutionEngineConfigClusterSetting setting =
AccessController.doPrivileged(
(PrivilegedAction<SparkExecutionEngineConfigClusterSetting>)
() ->
SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(
sparkExecutionEngineConfigSettingString));
builder.applicationId(setting.getApplicationId());
builder.executionRoleARN(setting.getExecutionRoleARN());
builder.sparkSubmitParameterModifier(
new OpenSearchSparkSubmitParameterModifier(setting.getSparkSubmitParameters()));
builder.region(setting.getRegion());
return SparkExecutionEngineConfig.builder()
.applicationId(setting.getApplicationId())
.executionRoleARN(setting.getExecutionRoleARN())
.sparkSubmitParameterModifier(
new OpenSearchSparkSubmitParameterModifier(setting.getSparkSubmitParameters()))
.region(setting.getRegion());
} else {
return SparkExecutionEngineConfig.builder();
}
ClusterName clusterName = settings.getSettingValue(CLUSTER_NAME);
return builder.clusterName(clusterName.value()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;

/**
* Interface for extension point to allow modification of spark submit parameter.
* modifyParameter method is called after the default spark submit parameter is build.
* Interface for extension point to allow modification of spark submit parameter. modifyParameter
* method is called after the default spark submit parameter is build.
*/
public interface SparkSubmitParameterModifier {
void modifyParameters(SparkSubmitParameters parameters);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
package org.opensearch.sql.spark.config;

import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_APPLICATION_ID;
import static org.opensearch.sql.spark.constants.TestConstants.EMRS_EXECUTION_ROLE;
import static org.opensearch.sql.spark.constants.TestConstants.SPARK_SUBMIT_PARAMETERS;
import static org.opensearch.sql.spark.constants.TestConstants.TEST_CLUSTER_NAME;
import static org.opensearch.sql.spark.constants.TestConstants.US_WEST_REGION;

import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -24,29 +29,29 @@ void testGetSparkExecutionEngineConfig() {
SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier =
new SparkExecutionEngineConfigSupplierImpl(settings);
when(settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG))
.thenReturn(
"{"
+ "\"applicationId\": \"00fd775baqpu4g0p\","
+ "\"executionRoleARN\": \"arn:aws:iam::270824043731:role/emr-job-execution-role\","
+ "\"region\": \"eu-west-1\","
+ "\"sparkSubmitParameters\": \"--conf spark.dynamicAllocation.enabled=false\""
+ "}");
.thenReturn(getConfigJson());
when(settings.getSettingValue(Settings.Key.CLUSTER_NAME))
.thenReturn(new ClusterName(TEST_CLUSTER_NAME));

SparkExecutionEngineConfig sparkExecutionEngineConfig =
sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(requestContext);

Assertions.assertEquals("00fd775baqpu4g0p", sparkExecutionEngineConfig.getApplicationId());
Assertions.assertEquals(
"arn:aws:iam::270824043731:role/emr-job-execution-role",
sparkExecutionEngineConfig.getExecutionRoleARN());
Assertions.assertEquals("eu-west-1", sparkExecutionEngineConfig.getRegion());
Assertions.assertEquals(TEST_CLUSTER_NAME, sparkExecutionEngineConfig.getClusterName());
SparkSubmitParameters parameters = SparkSubmitParameters.builder().build();
sparkExecutionEngineConfig.getSparkSubmitParameterModifier().modifyParameters(parameters);
Assertions.assertTrue(
parameters.toString().contains("--conf spark.dynamicAllocation.enabled=false"));

Assertions.assertEquals(EMRS_APPLICATION_ID, sparkExecutionEngineConfig.getApplicationId());
Assertions.assertEquals(EMRS_EXECUTION_ROLE, sparkExecutionEngineConfig.getExecutionRoleARN());
Assertions.assertEquals(US_WEST_REGION, sparkExecutionEngineConfig.getRegion());
Assertions.assertEquals(TEST_CLUSTER_NAME, sparkExecutionEngineConfig.getClusterName());
Assertions.assertTrue(parameters.toString().contains(SPARK_SUBMIT_PARAMETERS));
}

String getConfigJson() {
return new JSONObject()
.put("applicationId", EMRS_APPLICATION_ID)
.put("executionRoleARN", EMRS_EXECUTION_ROLE)
.put("region", US_WEST_REGION)
.put("sparkSubmitParameters", SPARK_SUBMIT_PARAMETERS)
.toString();
}

@Test
Expand Down

0 comments on commit a657a27

Please sign in to comment.