From 65e88c21531d1b16fb0288b532e16467a5794a79 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Tue, 4 Jun 2024 15:28:42 -0700 Subject: [PATCH] Introduce SessionConfigSupplier to abstract settings (#2707) Signed-off-by: Tomoyuki Morita --- .../OpenSearchSessionConfigSupplier.java | 19 +++++++++++++++++++ .../session/SessionConfigSupplier.java | 11 +++++++++++ .../execution/session/SessionManager.java | 6 ++---- .../config/AsyncExecutorServiceModule.java | 14 ++++++++++++-- .../AsyncQueryExecutorServiceImplTest.java | 2 ++ .../AsyncQueryExecutorServiceSpec.java | 8 ++++++-- .../session/InteractiveSessionTest.java | 5 +++-- .../execution/session/SessionManagerTest.java | 3 ++- .../execution/statement/StatementTest.java | 5 +++-- 9 files changed, 60 insertions(+), 13 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/OpenSearchSessionConfigSupplier.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/OpenSearchSessionConfigSupplier.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/OpenSearchSessionConfigSupplier.java new file mode 100644 index 0000000000..7bad399df8 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/OpenSearchSessionConfigSupplier.java @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.common.setting.Settings; + +@RequiredArgsConstructor +public class OpenSearchSessionConfigSupplier implements SessionConfigSupplier { + private final Settings settings; + + @Override + public Long getSessionInactivityTimeoutMillis() { + return settings.getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java new file mode 100644 index 0000000000..4084e0f091 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionConfigSupplier.java @@ -0,0 +1,11 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.session; + +/** Interface to abstract session config */ +public interface SessionConfigSupplier { + Long getSessionInactivityTimeoutMillis(); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index f8d429dd38..685fbdf5fa 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -5,12 +5,10 @@ package org.opensearch.sql.spark.execution.session; -import static org.opensearch.sql.common.setting.Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS; import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; import java.util.Optional; import lombok.RequiredArgsConstructor; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statestore.SessionStorageService; import org.opensearch.sql.spark.execution.statestore.StatementStorageService; @@ -26,7 +24,7 @@ public class SessionManager { private final SessionStorageService sessionStorageService; private final StatementStorageService statementStorageService; private final EMRServerlessClientFactory emrServerlessClientFactory; - private final Settings settings; + private final SessionConfigSupplier sessionConfigSupplier; public Session createSession(CreateSessionRequest request) { InteractiveSession session = @@ -70,7 +68,7 @@ public Optional getSession(SessionId sid, String dataSourceName) { .serverlessClient(emrServerlessClientFactory.getClient()) .sessionModel(model.get()) .sessionInactivityTimeoutMilli( - settings.getSettingValue(SESSION_INACTIVITY_TIMEOUT_MILLIS)) + sessionConfigSupplier.getSessionInactivityTimeoutMillis()) .timeProvider(new RealTimeProvider()) .build(); return Optional.ofNullable(session); diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index ca252f48c6..5323c00288 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -29,6 +29,8 @@ import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory; import org.opensearch.sql.spark.dispatcher.QueryIdProvider; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; +import org.opensearch.sql.spark.execution.session.OpenSearchSessionConfigSupplier; +import org.opensearch.sql.spark.execution.session.SessionConfigSupplier; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statestore.OpenSearchSessionStorageService; import org.opensearch.sql.spark.execution.statestore.OpenSearchStatementStorageService; @@ -141,9 +143,12 @@ public SessionManager sessionManager( SessionStorageService sessionStorageService, StatementStorageService statementStorageService, EMRServerlessClientFactory emrServerlessClientFactory, - Settings settings) { + SessionConfigSupplier sessionConfigSupplier) { return new SessionManager( - sessionStorageService, statementStorageService, emrServerlessClientFactory, settings); + sessionStorageService, + statementStorageService, + emrServerlessClientFactory, + sessionConfigSupplier); } @Provides @@ -185,6 +190,11 @@ public JobExecutionResponseReader jobExecutionResponseReader(NodeClient client) return new OpenSearchJobExecutionResponseReader(client); } + @Provides + public SessionConfigSupplier sessionConfigSupplier(Settings settings) { + return new OpenSearchSessionConfigSupplier(settings); + } + private void registerStateStoreMetrics(StateStore stateStore) { GaugeMetric activeSessionMetric = new GaugeMetric<>( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java index 43dd4880e7..96ed18e897 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplTest.java @@ -102,6 +102,8 @@ void testCreateAsyncQuery() { .storeJobMetadata(getAsyncQueryJobMetadata()); verify(sparkExecutionEngineConfigSupplier, times(1)) .getSparkExecutionEngineConfig(requestContext); + verify(sparkExecutionEngineConfigSupplier, times(1)) + .getSparkExecutionEngineConfig(requestContext); verify(sparkQueryDispatcher, times(1)).dispatch(expectedDispatchQueryRequest); Assertions.assertEquals(QUERY_ID, createAsyncQueryResponse.getQueryId()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 90a06edb19..9c378b9274 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -61,6 +61,8 @@ import org.opensearch.sql.spark.dispatcher.DatasourceEmbeddedQueryIdProvider; import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; +import org.opensearch.sql.spark.execution.session.OpenSearchSessionConfigSupplier; +import org.opensearch.sql.spark.execution.session.SessionConfigSupplier; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; @@ -93,6 +95,7 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { protected ClusterService clusterService; protected org.opensearch.sql.common.setting.Settings pluginSettings; + protected SessionConfigSupplier sessionConfigSupplier; protected NodeClient client; protected DataSourceServiceImpl dataSourceService; protected ClusterSettings clusterSettings; @@ -123,6 +126,7 @@ public void setup() { pluginSettings = new OpenSearchSettings(clusterSettings); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); + sessionConfigSupplier = new OpenSearchSessionConfigSupplier(pluginSettings); Metrics.getInstance().registerDefaultMetrics(); client = (NodeClient) cluster().client(); client @@ -246,7 +250,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( sessionStorageService, statementStorageService, emrServerlessClientFactory, - pluginSettings), + sessionConfigSupplier), new DefaultLeaseManager(pluginSettings, stateStore), new OpenSearchIndexDMLResultStorageService(dataSourceService, stateStore), new FlintIndexOpFactory( @@ -262,7 +266,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( sessionStorageService, statementStorageService, emrServerlessClientFactory, - pluginSettings), + sessionConfigSupplier), queryHandlerFactory, new DatasourceEmbeddedQueryIdProvider()); return new AsyncQueryExecutorServiceImpl( diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java index a2cf202c1f..0c606cc5df 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/InteractiveSessionTest.java @@ -7,7 +7,6 @@ import static org.opensearch.sql.spark.constants.TestConstants.TEST_CLUSTER_NAME; import static org.opensearch.sql.spark.constants.TestConstants.TEST_DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.session.SessionManagerTest.sessionSetting; import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED; import static org.opensearch.sql.spark.execution.session.SessionTestUtil.createSessionRequest; @@ -42,6 +41,7 @@ public class InteractiveSessionTest extends OpenSearchIntegTestCase { private StartJobRequest startJobRequest; private SessionStorageService sessionStorageService; private StatementStorageService statementStorageService; + private SessionConfigSupplier sessionConfigSupplier = () -> 600000L; private SessionManager sessionManager; @Before @@ -54,12 +54,13 @@ public void setup() { statementStorageService = new OpenSearchStatementStorageService(stateStore, new StatementModelXContentSerializer()); EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + sessionManager = new SessionManager( sessionStorageService, statementStorageService, emrServerlessClientFactory, - sessionSetting()); + sessionConfigSupplier); } @After diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index 360018c5b0..7b341d2a75 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -23,6 +23,7 @@ public class SessionManagerTest { @Mock private SessionStorageService sessionStorageService; @Mock private StatementStorageService statementStorageService; @Mock private EMRServerlessClientFactory emrServerlessClientFactory; + @Mock private SessionConfigSupplier sessionConfigSupplier; @Test public void sessionEnable() { @@ -31,7 +32,7 @@ public void sessionEnable() { sessionStorageService, statementStorageService, emrServerlessClientFactory, - sessionSetting()); + sessionConfigSupplier); Assertions.assertTrue(sessionManager.isEnabled()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java index 357a09c3ee..9650e5a73c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statement/StatementTest.java @@ -6,7 +6,6 @@ package org.opensearch.sql.spark.execution.statement; import static org.opensearch.sql.spark.constants.TestConstants.TEST_DATASOURCE_NAME; -import static org.opensearch.sql.spark.execution.session.SessionManagerTest.sessionSetting; import static org.opensearch.sql.spark.execution.session.SessionTestUtil.createSessionRequest; import static org.opensearch.sql.spark.execution.statement.StatementState.CANCELLED; import static org.opensearch.sql.spark.execution.statement.StatementState.RUNNING; @@ -23,6 +22,7 @@ import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.session.Session; +import org.opensearch.sql.spark.execution.session.SessionConfigSupplier; import org.opensearch.sql.spark.execution.session.SessionId; import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.session.SessionState; @@ -45,6 +45,7 @@ public class StatementTest extends OpenSearchIntegTestCase { private StatementStorageService statementStorageService; private SessionStorageService sessionStorageService; private TestEMRServerlessClient emrsClient = new TestEMRServerlessClient(); + private SessionConfigSupplier sessionConfigSupplier = () -> 600000L; private SessionManager sessionManager; @@ -62,7 +63,7 @@ public void setup() { sessionStorageService, statementStorageService, emrServerlessClientFactory, - sessionSetting()); + sessionConfigSupplier); } @After