Skip to content

Commit

Permalink
Introduce SessionConfigSupplier to abstract settings (#2707)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 authored Jun 4, 2024
1 parent 03a5e4d commit 65e88c2
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;

@RequiredArgsConstructor
public class OpenSearchSessionConfigSupplier implements SessionConfigSupplier {
private final Settings settings;

@Override
public Long getSessionInactivityTimeoutMillis() {
return settings.getSettingValue(Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.session;

/** Interface to abstract session config */
public interface SessionConfigSupplier {
Long getSessionInactivityTimeoutMillis();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@

package org.opensearch.sql.spark.execution.session;

import static org.opensearch.sql.common.setting.Settings.Key.SESSION_INACTIVITY_TIMEOUT_MILLIS;
import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.SessionStorageService;
import org.opensearch.sql.spark.execution.statestore.StatementStorageService;
Expand All @@ -26,7 +24,7 @@ public class SessionManager {
private final SessionStorageService sessionStorageService;
private final StatementStorageService statementStorageService;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final Settings settings;
private final SessionConfigSupplier sessionConfigSupplier;

public Session createSession(CreateSessionRequest request) {
InteractiveSession session =
Expand Down Expand Up @@ -70,7 +68,7 @@ public Optional<Session> getSession(SessionId sid, String dataSourceName) {
.serverlessClient(emrServerlessClientFactory.getClient())
.sessionModel(model.get())
.sessionInactivityTimeoutMilli(
settings.getSettingValue(SESSION_INACTIVITY_TIMEOUT_MILLIS))
sessionConfigSupplier.getSessionInactivityTimeoutMillis())
.timeProvider(new RealTimeProvider())
.build();
return Optional.ofNullable(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory;
import org.opensearch.sql.spark.dispatcher.QueryIdProvider;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.execution.session.OpenSearchSessionConfigSupplier;
import org.opensearch.sql.spark.execution.session.SessionConfigSupplier;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.OpenSearchSessionStorageService;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStatementStorageService;
Expand Down Expand Up @@ -141,9 +143,12 @@ public SessionManager sessionManager(
SessionStorageService sessionStorageService,
StatementStorageService statementStorageService,
EMRServerlessClientFactory emrServerlessClientFactory,
Settings settings) {
SessionConfigSupplier sessionConfigSupplier) {
return new SessionManager(
sessionStorageService, statementStorageService, emrServerlessClientFactory, settings);
sessionStorageService,
statementStorageService,
emrServerlessClientFactory,
sessionConfigSupplier);
}

@Provides
Expand Down Expand Up @@ -185,6 +190,11 @@ public JobExecutionResponseReader jobExecutionResponseReader(NodeClient client)
return new OpenSearchJobExecutionResponseReader(client);
}

@Provides
public SessionConfigSupplier sessionConfigSupplier(Settings settings) {
return new OpenSearchSessionConfigSupplier(settings);
}

private void registerStateStoreMetrics(StateStore stateStore) {
GaugeMetric<Long> activeSessionMetric =
new GaugeMetric<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ void testCreateAsyncQuery() {
.storeJobMetadata(getAsyncQueryJobMetadata());
verify(sparkExecutionEngineConfigSupplier, times(1))
.getSparkExecutionEngineConfig(requestContext);
verify(sparkExecutionEngineConfigSupplier, times(1))
.getSparkExecutionEngineConfig(requestContext);
verify(sparkQueryDispatcher, times(1)).dispatch(expectedDispatchQueryRequest);
Assertions.assertEquals(QUERY_ID, createAsyncQueryResponse.getQueryId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.opensearch.sql.spark.dispatcher.DatasourceEmbeddedQueryIdProvider;
import org.opensearch.sql.spark.dispatcher.QueryHandlerFactory;
import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher;
import org.opensearch.sql.spark.execution.session.OpenSearchSessionConfigSupplier;
import org.opensearch.sql.spark.execution.session.SessionConfigSupplier;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.session.SessionModel;
import org.opensearch.sql.spark.execution.session.SessionState;
Expand Down Expand Up @@ -93,6 +95,7 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase {

protected ClusterService clusterService;
protected org.opensearch.sql.common.setting.Settings pluginSettings;
protected SessionConfigSupplier sessionConfigSupplier;
protected NodeClient client;
protected DataSourceServiceImpl dataSourceService;
protected ClusterSettings clusterSettings;
Expand Down Expand Up @@ -123,6 +126,7 @@ public void setup() {
pluginSettings = new OpenSearchSettings(clusterSettings);
LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
sessionConfigSupplier = new OpenSearchSessionConfigSupplier(pluginSettings);
Metrics.getInstance().registerDefaultMetrics();
client = (NodeClient) cluster().client();
client
Expand Down Expand Up @@ -246,7 +250,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
sessionStorageService,
statementStorageService,
emrServerlessClientFactory,
pluginSettings),
sessionConfigSupplier),
new DefaultLeaseManager(pluginSettings, stateStore),
new OpenSearchIndexDMLResultStorageService(dataSourceService, stateStore),
new FlintIndexOpFactory(
Expand All @@ -262,7 +266,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
sessionStorageService,
statementStorageService,
emrServerlessClientFactory,
pluginSettings),
sessionConfigSupplier),
queryHandlerFactory,
new DatasourceEmbeddedQueryIdProvider());
return new AsyncQueryExecutorServiceImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.opensearch.sql.spark.constants.TestConstants.TEST_CLUSTER_NAME;
import static org.opensearch.sql.spark.constants.TestConstants.TEST_DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.session.SessionManagerTest.sessionSetting;
import static org.opensearch.sql.spark.execution.session.SessionState.NOT_STARTED;
import static org.opensearch.sql.spark.execution.session.SessionTestUtil.createSessionRequest;

Expand Down Expand Up @@ -42,6 +41,7 @@ public class InteractiveSessionTest extends OpenSearchIntegTestCase {
private StartJobRequest startJobRequest;
private SessionStorageService sessionStorageService;
private StatementStorageService statementStorageService;
private SessionConfigSupplier sessionConfigSupplier = () -> 600000L;
private SessionManager sessionManager;

@Before
Expand All @@ -54,12 +54,13 @@ public void setup() {
statementStorageService =
new OpenSearchStatementStorageService(stateStore, new StatementModelXContentSerializer());
EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient;

sessionManager =
new SessionManager(
sessionStorageService,
statementStorageService,
emrServerlessClientFactory,
sessionSetting());
sessionConfigSupplier);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class SessionManagerTest {
@Mock private SessionStorageService sessionStorageService;
@Mock private StatementStorageService statementStorageService;
@Mock private EMRServerlessClientFactory emrServerlessClientFactory;
@Mock private SessionConfigSupplier sessionConfigSupplier;

@Test
public void sessionEnable() {
Expand All @@ -31,7 +32,7 @@ public void sessionEnable() {
sessionStorageService,
statementStorageService,
emrServerlessClientFactory,
sessionSetting());
sessionConfigSupplier);

Assertions.assertTrue(sessionManager.isEnabled());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.sql.spark.execution.statement;

import static org.opensearch.sql.spark.constants.TestConstants.TEST_DATASOURCE_NAME;
import static org.opensearch.sql.spark.execution.session.SessionManagerTest.sessionSetting;
import static org.opensearch.sql.spark.execution.session.SessionTestUtil.createSessionRequest;
import static org.opensearch.sql.spark.execution.statement.StatementState.CANCELLED;
import static org.opensearch.sql.spark.execution.statement.StatementState.RUNNING;
Expand All @@ -23,6 +22,7 @@
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.session.Session;
import org.opensearch.sql.spark.execution.session.SessionConfigSupplier;
import org.opensearch.sql.spark.execution.session.SessionId;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.session.SessionState;
Expand All @@ -45,6 +45,7 @@ public class StatementTest extends OpenSearchIntegTestCase {
private StatementStorageService statementStorageService;
private SessionStorageService sessionStorageService;
private TestEMRServerlessClient emrsClient = new TestEMRServerlessClient();
private SessionConfigSupplier sessionConfigSupplier = () -> 600000L;

private SessionManager sessionManager;

Expand All @@ -62,7 +63,7 @@ public void setup() {
sessionStorageService,
statementStorageService,
emrServerlessClientFactory,
sessionSetting());
sessionConfigSupplier);
}

@After
Expand Down

0 comments on commit 65e88c2

Please sign in to comment.