From b654e43df21837b2cd7eb34ef88e81c64c8df026 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Mon, 9 Oct 2023 12:44:04 -0700 Subject: [PATCH 1/6] Spark Execution Engine Config Refactor (#2249) Signed-off-by: Vamsi Manohar --- .../org/opensearch/sql/plugin/SQLPlugin.java | 34 +++++++---- .../AsyncQueryExecutorServiceImpl.java | 23 ++----- .../config/SparkExecutionEngineConfig.java | 24 +++----- ...rkExecutionEngineConfigClusterSetting.java | 30 +++++++++ .../SparkExecutionEngineConfigSupplier.java | 12 ++++ ...parkExecutionEngineConfigSupplierImpl.java | 42 +++++++++++++ .../AsyncQueryExecutorServiceImplTest.java | 52 ++++++++-------- ...cutionEngineConfigClusterSettingTest.java} | 10 +-- ...ExecutionEngineConfigSupplierImplTest.java | 61 +++++++++++++++++++ 9 files changed, 214 insertions(+), 74 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSetting.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImpl.java rename spark/src/test/java/org/opensearch/sql/spark/config/{SparkExecutionEngineConfigTest.java => SparkExecutionEngineConfigClusterSettingTest.java} (79%) create mode 100644 spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImplTest.java diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index d2c8c6ebb7..f3fd043b63 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -96,6 +96,8 @@ import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.EmrServerlessClientImpl; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.flint.FlintIndexMetadataReaderImpl; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -216,15 +218,21 @@ public Collection createComponents( dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); - if (StringUtils.isEmpty(this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG))) { + SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier = + new SparkExecutionEngineConfigSupplierImpl(pluginSettings); + SparkExecutionEngineConfig sparkExecutionEngineConfig = + sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(); + if (StringUtils.isEmpty(sparkExecutionEngineConfig.getRegion())) { LOGGER.warn( String.format( - "Async Query APIs are disabled as %s is not configured in cluster settings. " + "Async Query APIs are disabled as %s is not configured properly in cluster settings. " + "Please configure and restart the domain to enable Async Query APIs", SPARK_EXECUTION_ENGINE_CONFIG.getKeyValue())); this.asyncQueryExecutorService = new AsyncQueryExecutorServiceImpl(); } else { - this.asyncQueryExecutorService = createAsyncQueryExecutorService(); + this.asyncQueryExecutorService = + createAsyncQueryExecutorService( + sparkExecutionEngineConfigSupplier, sparkExecutionEngineConfig); } ModulesBuilder modules = new ModulesBuilder(); @@ -295,10 +303,13 @@ private DataSourceServiceImpl createDataSourceService() { dataSourceUserAuthorizationHelper); } - private AsyncQueryExecutorService createAsyncQueryExecutorService() { + private AsyncQueryExecutorService createAsyncQueryExecutorService( + SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier, + SparkExecutionEngineConfig sparkExecutionEngineConfig) { AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService = new OpensearchAsyncQueryJobMetadataStorageService(client, clusterService); - EMRServerlessClient emrServerlessClient = createEMRServerlessClient(); + EMRServerlessClient emrServerlessClient = + createEMRServerlessClient(sparkExecutionEngineConfig.getRegion()); JobExecutionResponseReader jobExecutionResponseReader = new JobExecutionResponseReader(client); SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( @@ -309,21 +320,18 @@ private AsyncQueryExecutorService createAsyncQueryExecutorService() { new FlintIndexMetadataReaderImpl(client), client); return new AsyncQueryExecutorServiceImpl( - asyncQueryJobMetadataStorageService, sparkQueryDispatcher, pluginSettings); + asyncQueryJobMetadataStorageService, + sparkQueryDispatcher, + sparkExecutionEngineConfigSupplier); } - private EMRServerlessClient createEMRServerlessClient() { - String sparkExecutionEngineConfigString = - this.pluginSettings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG); + private EMRServerlessClient createEMRServerlessClient(String region) { return AccessController.doPrivileged( (PrivilegedAction) () -> { - SparkExecutionEngineConfig sparkExecutionEngineConfig = - SparkExecutionEngineConfig.toSparkExecutionEngineConfig( - sparkExecutionEngineConfigString); AWSEMRServerless awsemrServerless = AWSEMRServerlessClientBuilder.standard() - .withRegion(sparkExecutionEngineConfig.getRegion()) + .withRegion(region) .withCredentials(new DefaultAWSCredentialsProviderChain()) .build(); return new EmrServerlessClientImpl(awsemrServerless); diff --git a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java index 55346bc289..13db103f4b 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImpl.java @@ -5,26 +5,22 @@ package org.opensearch.sql.spark.asyncquery; -import static org.opensearch.sql.common.setting.Settings.Key.CLUSTER_NAME; import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG; import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; import com.amazonaws.services.emrserverless.model.JobRunState; -import java.security.AccessController; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Optional; import lombok.AllArgsConstructor; import org.json.JSONObject; -import org.opensearch.cluster.ClusterName; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; @@ -37,7 +33,7 @@ public class AsyncQueryExecutorServiceImpl implements AsyncQueryExecutorService { private AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService; private SparkQueryDispatcher sparkQueryDispatcher; - private Settings settings; + private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier; private Boolean isSparkJobExecutionEnabled; public AsyncQueryExecutorServiceImpl() { @@ -47,26 +43,19 @@ public AsyncQueryExecutorServiceImpl() { public AsyncQueryExecutorServiceImpl( AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService, SparkQueryDispatcher sparkQueryDispatcher, - Settings settings) { + SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier) { this.isSparkJobExecutionEnabled = Boolean.TRUE; this.asyncQueryJobMetadataStorageService = asyncQueryJobMetadataStorageService; this.sparkQueryDispatcher = sparkQueryDispatcher; - this.settings = settings; + this.sparkExecutionEngineConfigSupplier = sparkExecutionEngineConfigSupplier; } @Override public CreateAsyncQueryResponse createAsyncQuery( CreateAsyncQueryRequest createAsyncQueryRequest) { validateSparkExecutionEngineSettings(); - String sparkExecutionEngineConfigString = - settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG); SparkExecutionEngineConfig sparkExecutionEngineConfig = - AccessController.doPrivileged( - (PrivilegedAction) - () -> - SparkExecutionEngineConfig.toSparkExecutionEngineConfig( - sparkExecutionEngineConfigString)); - ClusterName clusterName = settings.getSettingValue(CLUSTER_NAME); + sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(); DispatchQueryResponse dispatchQueryResponse = sparkQueryDispatcher.dispatch( new DispatchQueryRequest( @@ -75,7 +64,7 @@ public CreateAsyncQueryResponse createAsyncQuery( createAsyncQueryRequest.getDatasource(), createAsyncQueryRequest.getLang(), sparkExecutionEngineConfig.getExecutionRoleARN(), - clusterName.value(), + sparkExecutionEngineConfig.getClusterName(), sparkExecutionEngineConfig.getSparkSubmitParameters())); asyncQueryJobMetadataStorageService.storeJobMetadata( new AsyncQueryJobMetadata( diff --git a/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java index 23e5907b5c..537a635150 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java +++ b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfig.java @@ -1,25 +1,21 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - package org.opensearch.sql.spark.config; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.google.gson.Gson; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +/** + * POJO for spark Execution Engine Config. Interface between {@link + * org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService} and {@link + * SparkExecutionEngineConfigSupplier} + */ @Data -@JsonIgnoreProperties(ignoreUnknown = true) +@NoArgsConstructor +@AllArgsConstructor public class SparkExecutionEngineConfig { private String applicationId; private String region; private String executionRoleARN; - - /** Additional Spark submit parameters to append to request. */ private String sparkSubmitParameters; - - public static SparkExecutionEngineConfig toSparkExecutionEngineConfig(String jsonString) { - return new Gson().fromJson(jsonString, SparkExecutionEngineConfig.class); - } + private String clusterName; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSetting.java b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSetting.java new file mode 100644 index 0000000000..b3f1295faa --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSetting.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.config; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.google.gson.Gson; +import lombok.Data; + +/** + * This POJO is just for reading stringified json in `plugins.query.executionengine.spark.config` + * setting. + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class SparkExecutionEngineConfigClusterSetting { + private String applicationId; + private String region; + private String executionRoleARN; + + /** Additional Spark submit parameters to append to request. */ + private String sparkSubmitParameters; + + public static SparkExecutionEngineConfigClusterSetting toSparkExecutionEngineConfig( + String jsonString) { + return new Gson().fromJson(jsonString, SparkExecutionEngineConfigClusterSetting.class); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java new file mode 100644 index 0000000000..108cb07daf --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplier.java @@ -0,0 +1,12 @@ +package org.opensearch.sql.spark.config; + +/** Interface for extracting and providing SparkExecutionEngineConfig */ +public interface SparkExecutionEngineConfigSupplier { + + /** + * Get SparkExecutionEngineConfig + * + * @return {@link SparkExecutionEngineConfig}. + */ + SparkExecutionEngineConfig getSparkExecutionEngineConfig(); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImpl.java b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImpl.java new file mode 100644 index 0000000000..f4c32f24eb --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImpl.java @@ -0,0 +1,42 @@ +package org.opensearch.sql.spark.config; + +import static org.opensearch.sql.common.setting.Settings.Key.CLUSTER_NAME; +import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG; + +import java.security.AccessController; +import java.security.PrivilegedAction; +import lombok.AllArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.cluster.ClusterName; +import org.opensearch.sql.common.setting.Settings; + +@AllArgsConstructor +public class SparkExecutionEngineConfigSupplierImpl implements SparkExecutionEngineConfigSupplier { + + private Settings settings; + + @Override + public SparkExecutionEngineConfig getSparkExecutionEngineConfig() { + String sparkExecutionEngineConfigSettingString = + this.settings.getSettingValue(SPARK_EXECUTION_ENGINE_CONFIG); + SparkExecutionEngineConfig sparkExecutionEngineConfig = new SparkExecutionEngineConfig(); + if (!StringUtils.isBlank(sparkExecutionEngineConfigSettingString)) { + SparkExecutionEngineConfigClusterSetting sparkExecutionEngineConfigClusterSetting = + AccessController.doPrivileged( + (PrivilegedAction) + () -> + SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig( + sparkExecutionEngineConfigSettingString)); + sparkExecutionEngineConfig.setApplicationId( + sparkExecutionEngineConfigClusterSetting.getApplicationId()); + sparkExecutionEngineConfig.setExecutionRoleARN( + sparkExecutionEngineConfigClusterSetting.getExecutionRoleARN()); + sparkExecutionEngineConfig.setSparkSubmitParameters( + sparkExecutionEngineConfigClusterSetting.getSparkSubmitParameters()); + sparkExecutionEngineConfig.setRegion(sparkExecutionEngineConfigClusterSetting.getRegion()); + } + ClusterName clusterName = settings.getSettingValue(CLUSTER_NAME); + sparkExecutionEngineConfig.setClusterName(clusterName.value()); + return sparkExecutionEngineConfig; + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java index e16dd89639..01bccd9030 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java @@ -27,11 +27,11 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.cluster.ClusterName; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.asyncquery.exceptions.AsyncQueryNotFoundException; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; +import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; @@ -44,15 +44,17 @@ public class AsyncQueryExecutorServiceImplTest { @Mock private SparkQueryDispatcher sparkQueryDispatcher; @Mock private AsyncQueryJobMetadataStorageService asyncQueryJobMetadataStorageService; - @Mock private Settings settings; - private AsyncQueryExecutorService jobExecutorService; + @Mock private SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier; + @BeforeEach void setUp() { jobExecutorService = new AsyncQueryExecutorServiceImpl( - asyncQueryJobMetadataStorageService, sparkQueryDispatcher, settings); + asyncQueryJobMetadataStorageService, + sparkQueryDispatcher, + sparkExecutionEngineConfigSupplier); } @Test @@ -60,11 +62,14 @@ void testCreateAsyncQuery() { CreateAsyncQueryRequest createAsyncQueryRequest = new CreateAsyncQueryRequest( "select * from my_glue.default.http_logs", "my_glue", LangType.SQL); - when(settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG)) + when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig()) .thenReturn( - "{\"applicationId\":\"00fd775baqpu4g0p\",\"executionRoleARN\":\"arn:aws:iam::270824043731:role/emr-job-execution-role\",\"region\":\"eu-west-1\"}"); - when(settings.getSettingValue(Settings.Key.CLUSTER_NAME)) - .thenReturn(new ClusterName(TEST_CLUSTER_NAME)); + new SparkExecutionEngineConfig( + "00fd775baqpu4g0p", + "eu-west-1", + "arn:aws:iam::270824043731:role/emr-job-execution-role", + null, + TEST_CLUSTER_NAME)); when(sparkQueryDispatcher.dispatch( new DispatchQueryRequest( "00fd775baqpu4g0p", @@ -78,8 +83,7 @@ void testCreateAsyncQuery() { jobExecutorService.createAsyncQuery(createAsyncQueryRequest); verify(asyncQueryJobMetadataStorageService, times(1)) .storeJobMetadata(new AsyncQueryJobMetadata("00fd775baqpu4g0p", EMR_JOB_ID, null)); - verify(settings, times(1)).getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG); - verify(settings, times(1)).getSettingValue(Settings.Key.CLUSTER_NAME); + verify(sparkExecutionEngineConfigSupplier, times(1)).getSparkExecutionEngineConfig(); verify(sparkQueryDispatcher, times(1)) .dispatch( new DispatchQueryRequest( @@ -94,16 +98,14 @@ void testCreateAsyncQuery() { @Test void testCreateAsyncQueryWithExtraSparkSubmitParameter() { - when(settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG)) + when(sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig()) .thenReturn( - "{" - + "\"applicationId\": \"00fd775baqpu4g0p\"," - + "\"executionRoleARN\": \"arn:aws:iam::270824043731:role/emr-job-execution-role\"," - + "\"region\": \"eu-west-1\"," - + "\"sparkSubmitParameters\": \"--conf spark.dynamicAllocation.enabled=false\"" - + "}"); - when(settings.getSettingValue(Settings.Key.CLUSTER_NAME)) - .thenReturn(new ClusterName(TEST_CLUSTER_NAME)); + new SparkExecutionEngineConfig( + "00fd775baqpu4g0p", + "eu-west-1", + "arn:aws:iam::270824043731:role/emr-job-execution-role", + "--conf spark.dynamicAllocation.enabled=false", + TEST_CLUSTER_NAME)); when(sparkQueryDispatcher.dispatch(any())) .thenReturn(new DispatchQueryResponse(EMR_JOB_ID, false, null)); @@ -131,7 +133,7 @@ void testGetAsyncQueryResultsWithJobNotFoundException() { Assertions.assertEquals( "QueryId: " + EMR_JOB_ID + " not found", asyncQueryNotFoundException.getMessage()); verifyNoInteractions(sparkQueryDispatcher); - verifyNoInteractions(settings); + verifyNoInteractions(sparkExecutionEngineConfigSupplier); } @Test @@ -149,7 +151,7 @@ void testGetAsyncQueryResultsWithInProgressJob() { Assertions.assertNull(asyncQueryExecutionResponse.getResults()); Assertions.assertNull(asyncQueryExecutionResponse.getSchema()); Assertions.assertEquals("PENDING", asyncQueryExecutionResponse.getStatus()); - verifyNoInteractions(settings); + verifyNoInteractions(sparkExecutionEngineConfigSupplier); } @Test @@ -173,7 +175,7 @@ void testGetAsyncQueryResultsWithSuccessJob() throws IOException { 1, ((HashMap) asyncQueryExecutionResponse.getResults().get(0).value()) .get("1")); - verifyNoInteractions(settings); + verifyNoInteractions(sparkExecutionEngineConfigSupplier); } @Test @@ -200,7 +202,7 @@ void testCancelJobWithJobNotFound() { Assertions.assertEquals( "QueryId: " + EMR_JOB_ID + " not found", asyncQueryNotFoundException.getMessage()); verifyNoInteractions(sparkQueryDispatcher); - verifyNoInteractions(settings); + verifyNoInteractions(sparkExecutionEngineConfigSupplier); } @Test @@ -212,6 +214,6 @@ void testCancelJob() { .thenReturn(EMR_JOB_ID); String jobId = jobExecutorService.cancelQuery(EMR_JOB_ID); Assertions.assertEquals(EMR_JOB_ID, jobId); - verifyNoInteractions(settings); + verifyNoInteractions(sparkExecutionEngineConfigSupplier); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigTest.java b/spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingTest.java similarity index 79% rename from spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigTest.java rename to spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingTest.java index 29b69ea830..c6be37567d 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigClusterSettingTest.java @@ -10,7 +10,7 @@ import org.junit.jupiter.api.Test; -public class SparkExecutionEngineConfigTest { +public class SparkExecutionEngineConfigClusterSettingTest { @Test public void testToSparkExecutionEngineConfigWithoutAllFields() { @@ -20,8 +20,8 @@ public void testToSparkExecutionEngineConfigWithoutAllFields() { + "\"executionRoleARN\": \"role-1\"," + "\"region\": \"us-west-1\"" + "}"; - SparkExecutionEngineConfig config = - SparkExecutionEngineConfig.toSparkExecutionEngineConfig(json); + SparkExecutionEngineConfigClusterSetting config = + SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(json); assertEquals("app-1", config.getApplicationId()); assertEquals("role-1", config.getExecutionRoleARN()); @@ -38,8 +38,8 @@ public void testToSparkExecutionEngineConfigWithAllFields() { + "\"region\": \"us-west-1\"," + "\"sparkSubmitParameters\": \"--conf A=1\"" + "}"; - SparkExecutionEngineConfig config = - SparkExecutionEngineConfig.toSparkExecutionEngineConfig(json); + SparkExecutionEngineConfigClusterSetting config = + SparkExecutionEngineConfigClusterSetting.toSparkExecutionEngineConfig(json); assertEquals("app-1", config.getApplicationId()); assertEquals("role-1", config.getExecutionRoleARN()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImplTest.java new file mode 100644 index 0000000000..298a56b17a --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/config/SparkExecutionEngineConfigSupplierImplTest.java @@ -0,0 +1,61 @@ +package org.opensearch.sql.spark.config; + +import static org.mockito.Mockito.when; +import static org.opensearch.sql.spark.constants.TestConstants.TEST_CLUSTER_NAME; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.cluster.ClusterName; +import org.opensearch.sql.common.setting.Settings; + +@ExtendWith(MockitoExtension.class) +public class SparkExecutionEngineConfigSupplierImplTest { + + @Mock private Settings settings; + + @Test + void testGetSparkExecutionEngineConfig() { + SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier = + new SparkExecutionEngineConfigSupplierImpl(settings); + when(settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG)) + .thenReturn( + "{" + + "\"applicationId\": \"00fd775baqpu4g0p\"," + + "\"executionRoleARN\": \"arn:aws:iam::270824043731:role/emr-job-execution-role\"," + + "\"region\": \"eu-west-1\"," + + "\"sparkSubmitParameters\": \"--conf spark.dynamicAllocation.enabled=false\"" + + "}"); + when(settings.getSettingValue(Settings.Key.CLUSTER_NAME)) + .thenReturn(new ClusterName(TEST_CLUSTER_NAME)); + SparkExecutionEngineConfig sparkExecutionEngineConfig = + sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(); + Assertions.assertEquals("00fd775baqpu4g0p", sparkExecutionEngineConfig.getApplicationId()); + Assertions.assertEquals( + "arn:aws:iam::270824043731:role/emr-job-execution-role", + sparkExecutionEngineConfig.getExecutionRoleARN()); + Assertions.assertEquals("eu-west-1", sparkExecutionEngineConfig.getRegion()); + Assertions.assertEquals( + "--conf spark.dynamicAllocation.enabled=false", + sparkExecutionEngineConfig.getSparkSubmitParameters()); + Assertions.assertEquals(TEST_CLUSTER_NAME, sparkExecutionEngineConfig.getClusterName()); + } + + @Test + void testGetSparkExecutionEngineConfigWithNullSetting() { + SparkExecutionEngineConfigSupplier sparkExecutionEngineConfigSupplier = + new SparkExecutionEngineConfigSupplierImpl(settings); + when(settings.getSettingValue(Settings.Key.SPARK_EXECUTION_ENGINE_CONFIG)).thenReturn(null); + when(settings.getSettingValue(Settings.Key.CLUSTER_NAME)) + .thenReturn(new ClusterName(TEST_CLUSTER_NAME)); + SparkExecutionEngineConfig sparkExecutionEngineConfig = + sparkExecutionEngineConfigSupplier.getSparkExecutionEngineConfig(); + Assertions.assertNull(sparkExecutionEngineConfig.getApplicationId()); + Assertions.assertNull(sparkExecutionEngineConfig.getExecutionRoleARN()); + Assertions.assertNull(sparkExecutionEngineConfig.getRegion()); + Assertions.assertNull(sparkExecutionEngineConfig.getSparkSubmitParameters()); + Assertions.assertEquals(TEST_CLUSTER_NAME, sparkExecutionEngineConfig.getClusterName()); + } +} From 9f17c4e3034d41245e6206974bbb909ca55100ba Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Wed, 11 Oct 2023 12:28:25 -0700 Subject: [PATCH 2/6] Provide auth.type and auth.role_arn paramters to end user (#2276) Signed-off-by: Vamsi Manohar --- .../service/DataSourceServiceImpl.java | 9 ++- .../service/DataSourceServiceImplTest.java | 67 ++++++++++++++++++- .../sql/datasource/DataSourceAPIsIT.java | 8 +++ 3 files changed, 80 insertions(+), 4 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java index d6c1907f84..25e8006d66 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/service/DataSourceServiceImpl.java @@ -34,6 +34,8 @@ public class DataSourceServiceImpl implements DataSourceService { private static String DATASOURCE_NAME_REGEX = "[@*A-Za-z]+?[*a-zA-Z_\\-0-9]*"; + public static final Set CONFIDENTIAL_AUTH_KEYS = + Set.of("auth.username", "auth.password", "auth.access_key", "auth.secret_key"); private final DataSourceLoaderCache dataSourceLoaderCache; @@ -159,7 +161,12 @@ private void removeAuthInfo(Set dataSourceMetadataSet) { private void removeAuthInfo(DataSourceMetadata dataSourceMetadata) { HashMap safeProperties = new HashMap<>(dataSourceMetadata.getProperties()); - safeProperties.entrySet().removeIf(entry -> entry.getKey().contains("auth")); + safeProperties + .entrySet() + .removeIf( + entry -> + CONFIDENTIAL_AUTH_KEYS.stream() + .anyMatch(confidentialKey -> entry.getKey().endsWith(confidentialKey))); dataSourceMetadata.setProperties(safeProperties); } } diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java index c8312e6013..6164d8b73f 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java @@ -233,7 +233,7 @@ void testGetDataSourceMetadataSet() { assertEquals(1, dataSourceMetadataSet.size()); DataSourceMetadata dataSourceMetadata = dataSourceMetadataSet.iterator().next(); assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.uri")); - assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type")); + assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type")); assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.username")); assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password")); assertFalse( @@ -352,11 +352,72 @@ void testRemovalOfAuthorizationInfo() { DataSourceMetadata dataSourceMetadata1 = dataSourceService.getDataSourceMetadata("testDS"); assertEquals("testDS", dataSourceMetadata1.getName()); assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector()); - assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type")); + assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type")); assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.username")); assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.password")); } + @Test + void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() { + HashMap properties = new HashMap<>(); + properties.put("prometheus.uri", "https://localhost:9090"); + properties.put("prometheus.auth.type", "awssigv4"); + properties.put("prometheus.auth.access_key", "access_key"); + properties.put("prometheus.auth.secret_key", "secret_key"); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata( + "testDS", + DataSourceType.PROMETHEUS, + Collections.singletonList("prometheus_access"), + properties, + null); + when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")) + .thenReturn(Optional.of(dataSourceMetadata)); + + DataSourceMetadata dataSourceMetadata1 = dataSourceService.getDataSourceMetadata("testDS"); + assertEquals("testDS", dataSourceMetadata1.getName()); + assertEquals(DataSourceType.PROMETHEUS, dataSourceMetadata1.getConnector()); + assertTrue(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.type")); + assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.access_key")); + assertFalse(dataSourceMetadata1.getProperties().containsKey("prometheus.auth.secret_key")); + } + + @Test + void testRemovalOfAuthorizationInfoForGlueWithRoleARN() { + HashMap properties = new HashMap<>(); + properties.put("glue.auth.type", "iam_role"); + properties.put("glue.auth.role_arn", "role_arn"); + properties.put("glue.indexstore.opensearch.uri", "http://localhost:9200"); + properties.put("glue.indexstore.opensearch.auth", "basicauth"); + properties.put("glue.indexstore.opensearch.auth.username", "username"); + properties.put("glue.indexstore.opensearch.auth.password", "password"); + DataSourceMetadata dataSourceMetadata = + new DataSourceMetadata( + "testGlue", + DataSourceType.S3GLUE, + Collections.singletonList("glue_access"), + properties, + null); + when(dataSourceMetadataStorage.getDataSourceMetadata("testGlue")) + .thenReturn(Optional.of(dataSourceMetadata)); + + DataSourceMetadata dataSourceMetadata1 = dataSourceService.getDataSourceMetadata("testGlue"); + assertEquals("testGlue", dataSourceMetadata1.getName()); + assertEquals(DataSourceType.S3GLUE, dataSourceMetadata1.getConnector()); + assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.auth.type")); + assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.auth.role_arn")); + assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.indexstore.opensearch.uri")); + assertTrue(dataSourceMetadata1.getProperties().containsKey("glue.indexstore.opensearch.auth")); + assertFalse( + dataSourceMetadata1 + .getProperties() + .containsKey("glue.indexstore.opensearch.auth.username")); + assertFalse( + dataSourceMetadata1 + .getProperties() + .containsKey("glue.indexstore.opensearch.auth.password")); + } + @Test void testGetDataSourceMetadataForNonExistingDataSource() { when(dataSourceMetadataStorage.getDataSourceMetadata("testDS")).thenReturn(Optional.empty()); @@ -381,7 +442,7 @@ void testGetDataSourceMetadataForSpecificDataSourceName() { "testDS", DataSourceType.PROMETHEUS, Collections.emptyList(), properties))); DataSourceMetadata dataSourceMetadata = this.dataSourceService.getDataSourceMetadata("testDS"); assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.uri")); - assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type")); + assertTrue(dataSourceMetadata.getProperties().containsKey("prometheus.auth.type")); assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.username")); assertFalse(dataSourceMetadata.getProperties().containsKey("prometheus.auth.password")); verify(dataSourceMetadataStorage, times(1)).getDataSourceMetadata("testDS"); diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index 087629a1f1..8623b9fa6f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -85,6 +85,10 @@ public void createDataSourceAPITest() { new Gson().fromJson(getResponseString, DataSourceMetadata.class); Assert.assertEquals( "https://localhost:9090", dataSourceMetadata.getProperties().get("prometheus.uri")); + Assert.assertEquals( + "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); + Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.username")); + Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password")); Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription()); } @@ -239,6 +243,10 @@ public void issue2196() { new Gson().fromJson(getResponseString, DataSourceMetadata.class); Assert.assertEquals( "https://localhost:9090", dataSourceMetadata.getProperties().get("prometheus.uri")); + Assert.assertEquals( + "basicauth", dataSourceMetadata.getProperties().get("prometheus.auth.type")); + Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.username")); + Assert.assertNull(dataSourceMetadata.getProperties().get("prometheus.auth.password")); Assert.assertEquals("Prometheus Creation for Integ test", dataSourceMetadata.getDescription()); } } From b86cf2f9373307a57063edc2d96eafb4e37bbc8b Mon Sep 17 00:00:00 2001 From: YANGDB Date: Thu, 12 Oct 2023 09:12:51 -0700 Subject: [PATCH 3/6] release-notes-2.11 (#2284) * add 2.11 release notes --------- Signed-off-by: YANGDB --- .../opensearch-sql.release-notes-2.11.0.0.md | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 release-notes/opensearch-sql.release-notes-2.11.0.0.md diff --git a/release-notes/opensearch-sql.release-notes-2.11.0.0.md b/release-notes/opensearch-sql.release-notes-2.11.0.0.md new file mode 100644 index 0000000000..a560d5c8dd --- /dev/null +++ b/release-notes/opensearch-sql.release-notes-2.11.0.0.md @@ -0,0 +1,55 @@ +Compatible with OpenSearch and OpenSearch Dashboards Version 2.11.0 + +### Features + +### Enhancements +* Enable PPL lang and add datasource to async query API in https://github.com/opensearch-project/sql/pull/2195 +* Refactor Flint Auth in https://github.com/opensearch-project/sql/pull/2201 +* Add conf for spark structured streaming job in https://github.com/opensearch-project/sql/pull/2203 +* Submit long running job only when auto_refresh = false in https://github.com/opensearch-project/sql/pull/2209 +* Bug Fix, handle DESC TABLE response in https://github.com/opensearch-project/sql/pull/2213 +* Drop Index Implementation in https://github.com/opensearch-project/sql/pull/2217 +* Enable PPL Queries in https://github.com/opensearch-project/sql/pull/2223 +* Read extra Spark submit parameters from cluster settings in https://github.com/opensearch-project/sql/pull/2236 +* Spark Execution Engine Config Refactor in https://github.com/opensearch-project/sql/pull/2266 +* Provide auth.type and auth.role_arn paramters in GET Datasource API response. in https://github.com/opensearch-project/sql/pull/2283 +* Add support for `date_nanos` and tests. (#337) in https://github.com/opensearch-project/sql/pull/2020 +* Applied formatting improvements to Antlr files based on spotless changes (#2017) by @MitchellGale in https://github.com/opensearch-project/sql/pull/2023 +* Revert "Guarantee datasource read api is strong consistent read (#1815)" in https://github.com/opensearch-project/sql/pull/2031 +* Add _primary preference only for segment replication enabled indices in https://github.com/opensearch-project/sql/pull/2045 +* Changed allowlist config to denylist ip config for datasource uri hosts in https://github.com/opensearch-project/sql/pull/2058 + +### Bug Fixes +* fix broken link for connectors doc in https://github.com/opensearch-project/sql/pull/2199 +* Fix response codes returned by JSON formatting them in https://github.com/opensearch-project/sql/pull/2200 +* Bug fix, datasource API should be case sensitive in https://github.com/opensearch-project/sql/pull/2202 +* Minor fix in dropping covering index in https://github.com/opensearch-project/sql/pull/2240 +* Fix Unit tests for FlintIndexReader in https://github.com/opensearch-project/sql/pull/2242 +* Bug Fix , delete OpenSearch index when DROP INDEX in https://github.com/opensearch-project/sql/pull/2252 +* Correctly Set query status in https://github.com/opensearch-project/sql/pull/2232 +* Exclude generated files from spotless in https://github.com/opensearch-project/sql/pull/2024 +* Fix mockito core conflict. in https://github.com/opensearch-project/sql/pull/2131 +* Fix `ASCII` function and groom UT for text functions. (#301) in https://github.com/opensearch-project/sql/pull/2029 +* Fixed response codes For Requests With security exception. in https://github.com/opensearch-project/sql/pull/2036 + +### Documentation +* Datasource description in https://github.com/opensearch-project/sql/pull/2138 +* Add documentation for S3GlueConnector. in https://github.com/opensearch-project/sql/pull/2234 + +### Infrastructure +* bump aws-encryption-sdk-java to 1.71 in https://github.com/opensearch-project/sql/pull/2057 +* Run IT tests with security plugin (#335) #1986 by @MitchellGale in https://github.com/opensearch-project/sql/pull/2022 + +### Refactoring +* Merging Async Query APIs feature branch into main. in https://github.com/opensearch-project/sql/pull/2163 +* Removed Domain Validation in https://github.com/opensearch-project/sql/pull/2136 +* Check for existence of security plugin in https://github.com/opensearch-project/sql/pull/2069 +* Always use snapshot version for security plugin download in https://github.com/opensearch-project/sql/pull/2061 +* Add customized result index in data source etc in https://github.com/opensearch-project/sql/pull/2220 + +### Security +* bump okhttp to 4.10.0 (#2043) by @joshuali925 in https://github.com/opensearch-project/sql/pull/2044 +* bump okio to 3.4.0 by @joshuali925 in https://github.com/opensearch-project/sql/pull/2047 + +--- +**Full Changelog**: https://github.com/opensearch-project/sql/compare/2.3.0.0...v.2.11.0.0 \ No newline at end of file From f856cb3f15eb079e554fd1301a4bcfd7d5fefc0d Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Fri, 13 Oct 2023 14:34:12 -0700 Subject: [PATCH 4/6] add InteractiveSession and SessionManager (#2290) * add InteractiveSession and SessionManager Signed-off-by: Peng Huo * address comments Signed-off-by: Peng Huo --------- Signed-off-by: Peng Huo --- spark/build.gradle | 39 +++- .../session/CreateSessionRequest.java | 15 ++ .../execution/session/InteractiveSession.java | 61 +++++ .../sql/spark/execution/session/Session.java | 19 ++ .../spark/execution/session/SessionId.java | 23 ++ .../execution/session/SessionManager.java | 50 ++++ .../spark/execution/session/SessionModel.java | 143 ++++++++++++ .../spark/execution/session/SessionState.java | 36 +++ .../spark/execution/session/SessionType.java | 33 +++ .../statestore/SessionStateStore.java | 87 +++++++ .../session/InteractiveSessionTest.java | 213 ++++++++++++++++++ .../execution/session/SessionManagerTest.java | 38 ++++ .../execution/session/SessionStateTest.java | 20 ++ .../execution/session/SessionTypeTest.java | 20 ++ .../statestore/SessionStateStoreTest.java | 42 ++++ 15 files changed, 834 insertions(+), 5 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java diff --git a/spark/build.gradle b/spark/build.gradle index c06b5b6ecf..c2c925ecaf 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -52,15 +52,38 @@ dependencies { api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545' implementation group: 'commons-io', name: 'commons-io', version: '2.8.0' - testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') + testImplementation(platform("org.junit:junit-bom:5.6.2")) + + testImplementation('org.junit.jupiter:junit-jupiter') testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.2.0' testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.2.0' - testImplementation 'junit:junit:4.13.1' - testImplementation "org.opensearch.test:framework:${opensearch_version}" + + testCompileOnly('junit:junit:4.13.1') { + exclude group: 'org.hamcrest', module: 'hamcrest-core' + } + testRuntimeOnly("org.junit.vintage:junit-vintage-engine") { + exclude group: 'org.hamcrest', module: 'hamcrest-core' + } + testRuntimeOnly("org.junit.platform:junit-platform-launcher") { + because 'allows tests to run from IDEs that bundle older version of launcher' + } + testImplementation("org.opensearch.test:framework:${opensearch_version}") } test { - useJUnitPlatform() + useJUnitPlatform { + includeEngines("junit-jupiter") + } + testLogging { + events "failed" + exceptionFormat "full" + } +} +task junit4(type: Test) { + useJUnitPlatform { + includeEngines("junit-vintage") + } + systemProperty 'tests.security.manager', 'false' testLogging { events "failed" exceptionFormat "full" @@ -68,6 +91,8 @@ test { } jacocoTestReport { + dependsOn test, junit4 + executionData test, junit4 reports { html.enabled true xml.enabled true @@ -78,9 +103,10 @@ jacocoTestReport { })) } } -test.finalizedBy(project.tasks.jacocoTestReport) jacocoTestCoverageVerification { + dependsOn test, junit4 + executionData test, junit4 violationRules { rule { element = 'CLASS' @@ -92,6 +118,9 @@ jacocoTestCoverageVerification { 'org.opensearch.sql.spark.asyncquery.exceptions.*', 'org.opensearch.sql.spark.dispatcher.model.*', 'org.opensearch.sql.spark.flint.FlintIndexType', + // ignore because XContext IOException + 'org.opensearch.sql.spark.execution.statestore.SessionStateStore', + 'org.opensearch.sql.spark.execution.session.SessionModel' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java new file mode 100644 index 0000000000..17e3346248 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/CreateSessionRequest.java @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import lombok.Data; +import org.opensearch.sql.spark.client.StartJobRequest; + +@Data +public class CreateSessionRequest { + private final StartJobRequest startJobRequest; + private final String datasourceName; +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java new file mode 100644 index 0000000000..620e46b9be --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/InteractiveSession.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import static org.opensearch.sql.spark.execution.session.SessionModel.initInteractiveSession; + +import java.util.Optional; +import lombok.Builder; +import lombok.Getter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.execution.statestore.SessionStateStore; + +/** + * Interactive session. + * + *

ENTRY_STATE: not_started + */ +@Getter +@Builder +public class InteractiveSession implements Session { + private static final Logger LOG = LogManager.getLogger(); + + private final SessionId sessionId; + private final SessionStateStore sessionStateStore; + private final EMRServerlessClient serverlessClient; + + private SessionModel sessionModel; + + @Override + public void open(CreateSessionRequest createSessionRequest) { + try { + String jobID = serverlessClient.startJobRun(createSessionRequest.getStartJobRequest()); + String applicationId = createSessionRequest.getStartJobRequest().getApplicationId(); + + sessionModel = + initInteractiveSession( + applicationId, jobID, sessionId, createSessionRequest.getDatasourceName()); + sessionStateStore.create(sessionModel); + } catch (VersionConflictEngineException e) { + String errorMsg = "session already exist. " + sessionId; + LOG.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + } + + @Override + public void close() { + Optional model = sessionStateStore.get(sessionModel.getSessionId()); + if (model.isEmpty()) { + throw new IllegalStateException("session not exist. " + sessionModel.getSessionId()); + } else { + serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId()); + } + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java new file mode 100644 index 0000000000..ec9775e60a --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/Session.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +/** Session define the statement execution context. Each session is binding to one Spark Job. */ +public interface Session { + /** open session. */ + void open(CreateSessionRequest createSessionRequest); + + /** close session. */ + void close(); + + SessionModel getSessionModel(); + + SessionId getSessionId(); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java new file mode 100644 index 0000000000..a2847cde18 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionId.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import lombok.Data; +import org.apache.commons.lang3.RandomStringUtils; + +@Data +public class SessionId { + private final String sessionId; + + public static SessionId newSessionId() { + return new SessionId(RandomStringUtils.random(10, true, true)); + } + + @Override + public String toString() { + return "sessionId=" + sessionId; + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java new file mode 100644 index 0000000000..3d0916bac8 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; + +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.execution.statestore.SessionStateStore; + +/** + * Singleton Class + * + *

todo. add Session cache and Session sweeper. + */ +@RequiredArgsConstructor +public class SessionManager { + private final SessionStateStore stateStore; + private final EMRServerlessClient emrServerlessClient; + + public Session createSession(CreateSessionRequest request) { + InteractiveSession session = + InteractiveSession.builder() + .sessionId(newSessionId()) + .sessionStateStore(stateStore) + .serverlessClient(emrServerlessClient) + .build(); + session.open(request); + return session; + } + + public Optional getSession(SessionId sid) { + Optional model = stateStore.get(sid); + if (model.isPresent()) { + InteractiveSession session = + InteractiveSession.builder() + .sessionId(sid) + .sessionStateStore(stateStore) + .serverlessClient(emrServerlessClient) + .sessionModel(model.get()) + .build(); + return Optional.ofNullable(session); + } + return Optional.empty(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java new file mode 100644 index 0000000000..656f0ec8ce --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionModel.java @@ -0,0 +1,143 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; +import static org.opensearch.sql.spark.execution.session.SessionType.INTERACTIVE; + +import java.io.IOException; +import lombok.Builder; +import lombok.Data; +import lombok.SneakyThrows; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.core.xcontent.XContentParserUtils; +import org.opensearch.index.seqno.SequenceNumbers; + +/** Session data in flint.ql.sessions index. */ +@Data +@Builder +public class SessionModel implements ToXContentObject { + public static final String VERSION = "version"; + public static final String TYPE = "type"; + public static final String SESSION_TYPE = "sessionType"; + public static final String SESSION_ID = "sessionId"; + public static final String SESSION_STATE = "state"; + public static final String DATASOURCE_NAME = "dataSourceName"; + public static final String LAST_UPDATE_TIME = "lastUpdateTime"; + public static final String APPLICATION_ID = "applicationId"; + public static final String JOB_ID = "jobId"; + public static final String ERROR = "error"; + public static final String UNKNOWN = "unknown"; + public static final String SESSION_DOC_TYPE = "session"; + + private final String version; + private final SessionType sessionType; + private final SessionId sessionId; + private final SessionState sessionState; + private final String applicationId; + private final String jobId; + private final String datasourceName; + private final String error; + private final long lastUpdateTime; + + private final long seqNo; + private final long primaryTerm; + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder + .startObject() + .field(VERSION, version) + .field(TYPE, SESSION_DOC_TYPE) + .field(SESSION_TYPE, sessionType.getSessionType()) + .field(SESSION_ID, sessionId.getSessionId()) + .field(SESSION_STATE, sessionState.getSessionState()) + .field(DATASOURCE_NAME, datasourceName) + .field(APPLICATION_ID, applicationId) + .field(JOB_ID, jobId) + .field(LAST_UPDATE_TIME, lastUpdateTime) + .field(ERROR, error) + .endObject(); + return builder; + } + + public static SessionModel of(SessionModel copy, long seqNo, long primaryTerm) { + return builder() + .version(copy.version) + .sessionType(copy.sessionType) + .sessionId(new SessionId(copy.sessionId.getSessionId())) + .sessionState(copy.sessionState) + .datasourceName(copy.datasourceName) + .seqNo(seqNo) + .primaryTerm(primaryTerm) + .build(); + } + + @SneakyThrows + public static SessionModel fromXContent(XContentParser parser, long seqNo, long primaryTerm) { + SessionModelBuilder builder = new SessionModelBuilder(); + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case VERSION: + builder.version(parser.text()); + break; + case SESSION_TYPE: + builder.sessionType(SessionType.fromString(parser.text())); + break; + case SESSION_ID: + builder.sessionId(new SessionId(parser.text())); + break; + case SESSION_STATE: + builder.sessionState(SessionState.fromString(parser.text())); + break; + case DATASOURCE_NAME: + builder.datasourceName(parser.text()); + break; + case ERROR: + builder.error(parser.text()); + break; + case APPLICATION_ID: + builder.applicationId(parser.text()); + break; + case JOB_ID: + builder.jobId(parser.text()); + break; + case LAST_UPDATE_TIME: + builder.lastUpdateTime(parser.longValue()); + break; + case TYPE: + // do nothing. + break; + } + } + builder.seqNo(seqNo); + builder.primaryTerm(primaryTerm); + return builder.build(); + } + + public static SessionModel initInteractiveSession( + String applicationId, String jobId, SessionId sid, String datasourceName) { + return builder() + .version("1.0") + .sessionType(INTERACTIVE) + .sessionId(sid) + .sessionState(NOT_STARTED) + .datasourceName(datasourceName) + .applicationId(applicationId) + .jobId(jobId) + .error(UNKNOWN) + .lastUpdateTime(System.currentTimeMillis()) + .seqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) + .primaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + .build(); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java new file mode 100644 index 0000000000..509d5105e9 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionState.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; + +@Getter +public enum SessionState { + NOT_STARTED("not_started"), + RUNNING("running"), + DEAD("dead"), + FAIL("fail"); + + private final String sessionState; + + SessionState(String sessionState) { + this.sessionState = sessionState; + } + + private static Map STATES = + Arrays.stream(SessionState.values()) + .collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t)); + + public static SessionState fromString(String key) { + if (STATES.containsKey(key)) { + return STATES.get(key); + } + throw new IllegalArgumentException("Invalid session state: " + key); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java new file mode 100644 index 0000000000..dd179a1dc5 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionType.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; +import lombok.Getter; + +@Getter +public enum SessionType { + INTERACTIVE("interactive"); + + private final String sessionType; + + SessionType(String sessionType) { + this.sessionType = sessionType; + } + + private static Map TYPES = + Arrays.stream(SessionType.values()) + .collect(Collectors.toMap(t -> t.name().toLowerCase(), t -> t)); + + public static SessionType fromString(String key) { + if (TYPES.containsKey(key)) { + return TYPES.get(key); + } + throw new IllegalArgumentException("Invalid session type: " + key); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java new file mode 100644 index 0000000000..6ddce55360 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/SessionStateStore.java @@ -0,0 +1,87 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.statestore; + +import java.io.IOException; +import java.util.Locale; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionModel; + +@RequiredArgsConstructor +public class SessionStateStore { + private static final Logger LOG = LogManager.getLogger(); + + private final String indexName; + private final Client client; + + public SessionModel create(SessionModel session) { + try { + IndexRequest indexRequest = + new IndexRequest(indexName) + .id(session.getSessionId().getSessionId()) + .source(session.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .setIfSeqNo(session.getSeqNo()) + .setIfPrimaryTerm(session.getPrimaryTerm()) + .create(true) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + IndexResponse indexResponse = client.index(indexRequest).actionGet(); + if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) { + LOG.debug("Successfully created doc. id: {}", session.getSessionId()); + return SessionModel.of(session, indexResponse.getSeqNo(), indexResponse.getPrimaryTerm()); + } else { + throw new RuntimeException( + String.format( + Locale.ROOT, + "Failed create doc. id: %s, error: %s", + session.getSessionId(), + indexResponse.getResult().getLowercase())); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public Optional get(SessionId sid) { + try { + GetRequest getRequest = new GetRequest().index(indexName).id(sid.getSessionId()); + GetResponse getResponse = client.get(getRequest).actionGet(); + if (getResponse.isExists()) { + XContentParser parser = + XContentType.JSON + .xContent() + .createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + getResponse.getSourceAsString()); + parser.nextToken(); + return Optional.of( + SessionModel.fromXContent( + parser, getResponse.getSeqNo(), getResponse.getPrimaryTerm())); + } else { + return Optional.empty(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java new file mode 100644 index 0000000000..53dc211ded --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -0,0 +1,213 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import static org.opensearch.sql.spark.execution.session.InteractiveSessionTest.TestSession.testSession; +import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; + +import com.amazonaws.services.emrserverless.model.CancelJobRunResult; +import com.amazonaws.services.emrserverless.model.GetJobRunResult; +import java.util.HashMap; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.StartJobRequest; +import org.opensearch.sql.spark.execution.statestore.SessionStateStore; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +/** mock-maker-inline does not work with OpenSearchTestCase. */ +public class InteractiveSessionTest extends OpenSearchSingleNodeTestCase { + + private static final String indexName = "mockindex"; + + private TestEMRServerlessClient emrsClient; + private StartJobRequest startJobRequest; + private SessionStateStore stateStore; + + @Before + public void setup() { + emrsClient = new TestEMRServerlessClient(); + startJobRequest = new StartJobRequest("", "", "appId", "", "", new HashMap<>(), false, ""); + stateStore = new SessionStateStore(indexName, client()); + createIndex(indexName); + } + + @After + public void clean() { + client().admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet(); + } + + @Test + public void openCloseSession() { + InteractiveSession session = + InteractiveSession.builder() + .sessionId(SessionId.newSessionId()) + .sessionStateStore(stateStore) + .serverlessClient(emrsClient) + .build(); + + // open session + TestSession testSession = testSession(session, stateStore); + testSession + .open(new CreateSessionRequest(startJobRequest, "datasource")) + .assertSessionState(NOT_STARTED) + .assertAppId("appId") + .assertJobId("jobId"); + emrsClient.startJobRunCalled(1); + + // close session + testSession.close(); + emrsClient.cancelJobRunCalled(1); + } + + @Test + public void openSessionFailedConflict() { + SessionId sessionId = new SessionId("duplicate-session-id"); + InteractiveSession session = + InteractiveSession.builder() + .sessionId(sessionId) + .sessionStateStore(stateStore) + .serverlessClient(emrsClient) + .build(); + session.open(new CreateSessionRequest(startJobRequest, "datasource")); + + InteractiveSession duplicateSession = + InteractiveSession.builder() + .sessionId(sessionId) + .sessionStateStore(stateStore) + .serverlessClient(emrsClient) + .build(); + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> duplicateSession.open(new CreateSessionRequest(startJobRequest, "datasource"))); + assertEquals("session already exist. sessionId=duplicate-session-id", exception.getMessage()); + } + + @Test + public void closeNotExistSession() { + SessionId sessionId = SessionId.newSessionId(); + InteractiveSession session = + InteractiveSession.builder() + .sessionId(sessionId) + .sessionStateStore(stateStore) + .serverlessClient(emrsClient) + .build(); + session.open(new CreateSessionRequest(startJobRequest, "datasource")); + + client().delete(new DeleteRequest(indexName, sessionId.getSessionId())); + + IllegalStateException exception = assertThrows(IllegalStateException.class, session::close); + assertEquals("session not exist. " + sessionId, exception.getMessage()); + emrsClient.cancelJobRunCalled(0); + } + + @Test + public void sessionManagerCreateSession() { + Session session = + new SessionManager(stateStore, emrsClient) + .createSession(new CreateSessionRequest(startJobRequest, "datasource")); + + TestSession testSession = testSession(session, stateStore); + testSession.assertSessionState(NOT_STARTED).assertAppId("appId").assertJobId("jobId"); + } + + @Test + public void sessionManagerGetSession() { + SessionManager sessionManager = new SessionManager(stateStore, emrsClient); + Session session = + sessionManager.createSession(new CreateSessionRequest(startJobRequest, "datasource")); + + Optional managerSession = sessionManager.getSession(session.getSessionId()); + assertTrue(managerSession.isPresent()); + assertEquals(session.getSessionId(), managerSession.get().getSessionId()); + } + + @Test + public void sessionManagerGetSessionNotExist() { + SessionManager sessionManager = new SessionManager(stateStore, emrsClient); + + Optional managerSession = sessionManager.getSession(new SessionId("no-exist")); + assertTrue(managerSession.isEmpty()); + } + + @RequiredArgsConstructor + static class TestSession { + private final Session session; + private final SessionStateStore stateStore; + + public static TestSession testSession(Session session, SessionStateStore stateStore) { + return new TestSession(session, stateStore); + } + + public TestSession assertSessionState(SessionState expected) { + assertEquals(expected, session.getSessionModel().getSessionState()); + + Optional sessionStoreState = + stateStore.get(session.getSessionModel().getSessionId()); + assertTrue(sessionStoreState.isPresent()); + assertEquals(expected, sessionStoreState.get().getSessionState()); + + return this; + } + + public TestSession assertAppId(String expected) { + assertEquals(expected, session.getSessionModel().getApplicationId()); + return this; + } + + public TestSession assertJobId(String expected) { + assertEquals(expected, session.getSessionModel().getJobId()); + return this; + } + + public TestSession open(CreateSessionRequest req) { + session.open(req); + return this; + } + + public TestSession close() { + session.close(); + return this; + } + } + + static class TestEMRServerlessClient implements EMRServerlessClient { + + private int startJobRunCalled = 0; + private int cancelJobRunCalled = 0; + + @Override + public String startJobRun(StartJobRequest startJobRequest) { + startJobRunCalled++; + return "jobId"; + } + + @Override + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + return null; + } + + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + cancelJobRunCalled++; + return null; + } + + public void startJobRunCalled(int expectedTimes) { + assertEquals(expectedTimes, startJobRunCalled); + } + + public void cancelJobRunCalled(int expectedTimes) { + assertEquals(expectedTimes, cancelJobRunCalled); + } + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java new file mode 100644 index 0000000000..d35105f787 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.After; +import org.junit.Before; +import org.mockito.MockMakers; +import org.mockito.MockSettings; +import org.mockito.Mockito; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.sql.spark.execution.statestore.SessionStateStore; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +class SessionManagerTest extends OpenSearchSingleNodeTestCase { + private static final String indexName = "mockindex"; + + // mock-maker-inline does not work with OpenSearchTestCase. make sure use mockSettings when mock. + private static final MockSettings mockSettings = + Mockito.withSettings().mockMaker(MockMakers.SUBCLASS); + + private SessionStateStore stateStore; + + @Before + public void setup() { + stateStore = new SessionStateStore(indexName, client()); + createIndex(indexName); + } + + @After + public void clean() { + client().admin().indices().delete(new DeleteIndexRequest(indexName)).actionGet(); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java new file mode 100644 index 0000000000..a987c80d59 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionStateTest.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import org.junit.jupiter.api.Test; + +class SessionStateTest { + @Test + public void invalidSessionType() { + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> SessionState.fromString("invalid")); + assertEquals("Invalid session state: invalid", exception.getMessage()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java new file mode 100644 index 0000000000..a2ab43e709 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionTypeTest.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; + +import org.junit.jupiter.api.Test; + +class SessionTypeTest { + @Test + public void invalidSessionType() { + IllegalArgumentException exception = + assertThrows(IllegalArgumentException.class, () -> SessionType.fromString("invalid")); + assertEquals("Invalid session type: invalid", exception.getMessage()); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java new file mode 100644 index 0000000000..9c779555d7 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/SessionStateStoreTest.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.statestore; + +import static org.junit.Assert.assertThrows; +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.client.Client; +import org.opensearch.sql.spark.execution.session.SessionId; +import org.opensearch.sql.spark.execution.session.SessionModel; + +@ExtendWith(MockitoExtension.class) +class SessionStateStoreTest { + @Mock(answer = RETURNS_DEEP_STUBS) + private Client client; + + @Mock private IndexResponse indexResponse; + + @Test + public void createWithException() { + when(client.index(any()).actionGet()).thenReturn(indexResponse); + doReturn(DocWriteResponse.Result.NOT_FOUND).when(indexResponse).getResult(); + SessionModel sessionModel = + SessionModel.initInteractiveSession( + "appId", "jobId", SessionId.newSessionId(), "datasource"); + SessionStateStore sessionStateStore = new SessionStateStore("indexName", client); + + assertThrows(RuntimeException.class, () -> sessionStateStore.create(sessionModel)); + } +} From b76a15e9db35f259f4a5a3e4567ba8c7b84bc962 Mon Sep 17 00:00:00 2001 From: Derek Ho Date: Mon, 16 Oct 2023 15:28:51 -0400 Subject: [PATCH 5/6] Bump bwc verison to 2.12 (#2292) Signed-off-by: Derek Ho --- integ-test/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integ-test/build.gradle b/integ-test/build.gradle index 6925cb9101..f2e70d9908 100644 --- a/integ-test/build.gradle +++ b/integ-test/build.gradle @@ -42,7 +42,7 @@ apply plugin: 'java' apply plugin: 'io.freefair.lombok' apply plugin: 'com.wiredforcode.spawn' -String baseVersion = "2.11.0" +String baseVersion = "2.12.0" String bwcVersion = baseVersion + ".0"; String baseName = "sqlBwcCluster" String bwcFilePath = "src/test/resources/bwc/" From 501cf915d16215fcf4c5df451ea64b5e31abe3c4 Mon Sep 17 00:00:00 2001 From: Derek Ho Date: Tue, 17 Oct 2023 11:00:07 -0400 Subject: [PATCH 6/6] Upgrade json (#2307) Signed-off-by: Derek Ho --- legacy/build.gradle | 2 +- opensearch/build.gradle | 2 +- ppl/build.gradle | 2 +- prometheus/build.gradle | 2 +- spark/build.gradle | 2 +- sql/build.gradle | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/legacy/build.gradle b/legacy/build.gradle index fc985989e5..ca20476610 100644 --- a/legacy/build.gradle +++ b/legacy/build.gradle @@ -108,7 +108,7 @@ dependencies { } } implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' - implementation group: 'org.json', name: 'json', version:'20230227' + implementation group: 'org.json', name: 'json', version:'20231013' implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.12.0' implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" // add geo module as dependency. https://github.com/opensearch-project/OpenSearch/pull/4180/. diff --git a/opensearch/build.gradle b/opensearch/build.gradle index 34b5c3f452..c9087bca49 100644 --- a/opensearch/build.gradle +++ b/opensearch/build.gradle @@ -37,7 +37,7 @@ dependencies { implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}" implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}" - implementation group: 'org.json', name: 'json', version:'20230227' + implementation group: 'org.json', name: 'json', version:'20231013' compileOnly group: 'org.opensearch.client', name: 'opensearch-rest-high-level-client', version: "${opensearch_version}" implementation group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" diff --git a/ppl/build.gradle b/ppl/build.gradle index a798b3f4b0..04ad71ced6 100644 --- a/ppl/build.gradle +++ b/ppl/build.gradle @@ -49,7 +49,7 @@ dependencies { implementation "org.antlr:antlr4-runtime:4.7.1" implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' - api group: 'org.json', name: 'json', version: '20230227' + api group: 'org.json', name: 'json', version: '20231013' implementation group: 'org.apache.logging.log4j', name: 'log4j-core', version:'2.20.0' api project(':common') api project(':core') diff --git a/prometheus/build.gradle b/prometheus/build.gradle index f8c10c7f6b..c2878ab1b4 100644 --- a/prometheus/build.gradle +++ b/prometheus/build.gradle @@ -22,7 +22,7 @@ dependencies { implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: "${versions.jackson}" implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "${versions.jackson_databind}" implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: "${versions.jackson}" - implementation group: 'org.json', name: 'json', version: '20230227' + implementation group: 'org.json', name: 'json', version: '20231013' testImplementation('org.junit.jupiter:junit-jupiter:5.6.2') testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1' diff --git a/spark/build.gradle b/spark/build.gradle index c2c925ecaf..49ff96bec5 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -47,7 +47,7 @@ dependencies { implementation project(':datasources') implementation group: 'org.opensearch', name: 'opensearch', version: "${opensearch_version}" - implementation group: 'org.json', name: 'json', version: '20230227' + implementation group: 'org.json', name: 'json', version: '20231013' api group: 'com.amazonaws', name: 'aws-java-sdk-emr', version: '1.12.545' api group: 'com.amazonaws', name: 'aws-java-sdk-emrserverless', version: '1.12.545' implementation group: 'commons-io', name: 'commons-io', version: '2.8.0' diff --git a/sql/build.gradle b/sql/build.gradle index 2984158e57..c9b46d38f1 100644 --- a/sql/build.gradle +++ b/sql/build.gradle @@ -47,7 +47,7 @@ dependencies { implementation "org.antlr:antlr4-runtime:4.7.1" implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre' - implementation group: 'org.json', name: 'json', version:'20230227' + implementation group: 'org.json', name: 'json', version:'20231013' implementation project(':common') implementation project(':core') api project(':protocol')