diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index 89d046b3d9..ae1950d81c 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -39,7 +39,8 @@ public enum Key { METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"), SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"), CLUSTER_NAME("cluster.name"), - SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"); + SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"), + SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"); @Getter private final String keyValue; diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index cd56e76491..686116636a 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -347,3 +347,39 @@ SQL query:: } } +plugins.query.executionengine.spark.session.limit +=================================================== + +Description +----------- + +Each datasource can have maximum 100 sessions running in parallel by default. You can increase limit by this setting. + +1. The default value is 100. +2. This setting is node scope. +3. This setting can be updated dynamically. + +You can update the setting with a new value like this. + +SQL query:: + + sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \ + ... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}' + { + "acknowledged": true, + "persistent": {}, + "transient": { + "plugins": { + "query": { + "executionengine": { + "spark": { + "session": { + "limit": "200" + } + } + } + } + } + } + } + diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index ecb35afafa..f80b576fe0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -142,6 +142,13 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting<?> SPARK_EXECUTION_SESSION_LIMIT_SETTING = + Setting.intSetting( + Key.SPARK_EXECUTION_SESSION_LIMIT.getKeyValue(), + 100, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + /** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */ @SuppressWarnings("unchecked") public OpenSearchSettings(ClusterSettings clusterSettings) { @@ -218,6 +225,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SPARK_EXECUTION_SESSION_ENABLED, SPARK_EXECUTION_SESSION_ENABLED_SETTING, new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED)); + register( + settingBuilder, + clusterSettings, + Key.SPARK_EXECUTION_SESSION_LIMIT, + SPARK_EXECUTION_SESSION_LIMIT_SETTING, + new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT)); registerNonDynamicSettings( settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING); defaultSettings = settingBuilder.build(); @@ -284,6 +297,7 @@ public static List<Setting<?>> pluginSettings() { .add(DATASOURCE_URI_HOSTS_DENY_LIST) .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_ENABLED_SETTING) + .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) .build(); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index c0f7bbcde8..81b9fdaee0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -6,8 +6,11 @@ package org.opensearch.sql.spark.execution.session; import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED; +import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_LIMIT; import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; +import static org.opensearch.sql.spark.execution.statestore.StateStore.activeSessionsCount; +import java.util.Locale; import java.util.Optional; import lombok.RequiredArgsConstructor; import org.opensearch.sql.common.setting.Settings; @@ -26,6 +29,15 @@ public class SessionManager { private final Settings settings; public Session createSession(CreateSessionRequest request) { + int sessionMaxLimit = sessionMaxLimit(); + if (activeSessionsCount(stateStore, request.getDatasourceName()).get() >= sessionMaxLimit) { + String errorMsg = + String.format( + Locale.ROOT, + "The maximum number of active sessions can be " + "supported is %d", + sessionMaxLimit); + throw new IllegalArgumentException(errorMsg); + } InteractiveSession session = InteractiveSession.builder() .sessionId(newSessionId(request.getDatasourceName())) @@ -55,4 +67,8 @@ public Optional<Session> getSession(SessionId sid) { public boolean isEnabled() { return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED); } + + public int sessionMaxLimit() { + return settings.getSettingValue(SPARK_EXECUTION_SESSION_LIMIT); + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index 6546d303fb..e6bad9fc26 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -14,6 +14,7 @@ import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import org.apache.commons.io.IOUtils; import org.apache.logging.log4j.LogManager; @@ -25,6 +26,8 @@ import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; @@ -38,9 +41,13 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; +import org.opensearch.sql.spark.execution.session.SessionType; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; @@ -182,6 +189,35 @@ private void createIndex(String indexName) { } } + private long count(String indexName, QueryBuilder query) { + if (!this.clusterService.state().routingTable().hasIndex(indexName)) { + return 0; + } + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(query); + searchSourceBuilder.size(0); + + // https://github.com/opensearch-project/sql/issues/1801. + SearchRequest searchRequest = + new SearchRequest() + .indices(indexName) + .preference("_primary_first") + .source(searchSourceBuilder); + + ActionFuture<SearchResponse> searchResponseActionFuture; + try (ThreadContext.StoredContext ignored = + client.threadPool().getThreadContext().stashContext()) { + searchResponseActionFuture = client.search(searchRequest); + } + SearchResponse searchResponse = searchResponseActionFuture.actionGet(); + if (searchResponse.status().getStatus() != 200) { + throw new RuntimeException( + "Fetching job metadata information failed with status : " + searchResponse.status()); + } else { + return searchResponse.getHits().getTotalHits().value; + } + } + private String loadConfigFromResource(String fileName) throws IOException { InputStream fileStream = StateStore.class.getClassLoader().getResourceAsStream(fileName); return IOUtils.toString(fileStream, StandardCharsets.UTF_8); @@ -253,4 +289,19 @@ public static Function<String, Optional<AsyncQueryJobMetadata>> getJobMetaData( AsyncQueryJobMetadata::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } + + public static Supplier<Long> activeSessionsCount(StateStore stateStore, String datasourceName) { + return () -> + stateStore.count( + DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), + QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(SessionModel.TYPE, SessionModel.SESSION_DOC_TYPE)) + .must( + QueryBuilders.termQuery( + SessionModel.SESSION_TYPE, SessionType.INTERACTIVE.getSessionType())) + .must(QueryBuilders.termQuery(SessionModel.DATASOURCE_NAME, datasourceName)) + .must( + QueryBuilders.termQuery( + SessionModel.SESSION_STATE, SessionState.RUNNING.getSessionState()))); + } } diff --git a/spark/src/main/resources/query_execution_request_mapping.yml b/spark/src/main/resources/query_execution_request_mapping.yml index fbe90a1cba..682534d338 100644 --- a/spark/src/main/resources/query_execution_request_mapping.yml +++ b/spark/src/main/resources/query_execution_request_mapping.yml @@ -40,3 +40,5 @@ properties: format: strict_date_time||epoch_millis queryId: type: keyword + excludeJobIds: + type: keyword diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index 19edd53eae..f65049a7d9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -6,6 +6,7 @@ package org.opensearch.sql.spark.asyncquery; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING; +import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; @@ -16,7 +17,9 @@ import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; +import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession; import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; +import static org.opensearch.sql.spark.execution.statestore.StateStore.updateSessionState; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; @@ -61,6 +64,8 @@ import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.execution.session.SessionManager; +import org.opensearch.sql.spark.execution.session.SessionModel; +import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; @@ -129,6 +134,13 @@ public void clean() { .setTransientSettings( Settings.builder().putNull(SPARK_EXECUTION_SESSION_ENABLED_SETTING.getKey()).build()) .get(); + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) + .get(); } @Test @@ -378,6 +390,35 @@ public void withSessionCreateAsyncQueryFailed() { assertEquals("mock error", asyncQueryResults.getError()); } + @Test + public void createSessionMoreThanLimitFailed() { + LocalEMRSClient emrsClient = new LocalEMRSClient(); + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrsClient); + + // enable session + enableSession(true); + // only allow one session in domain. + setSessionLimit(1); + + // 1. create async query. + CreateAsyncQueryResponse first = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); + assertNotNull(first.getSessionId()); + setSessionState(first.getSessionId(), SessionState.RUNNING); + + // 2. create async query without session. + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null))); + assertEquals( + "The maximum number of active sessions can be supported is 1", exception.getMessage()); + } + private DataSourceServiceImpl createDataSourceService() { String masterKey = "a57d991d9b573f75b9bba1df"; DataSourceMetadataStorage dataSourceMetadataStorage = @@ -470,6 +511,16 @@ public void enableSession(boolean enabled) { .get(); } + public void setSessionLimit(long limit) { + client + .admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey(), limit).build()) + .get(); + } + int search(QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); @@ -480,4 +531,11 @@ int search(QueryBuilder query) { return searchResponse.getHits().getHits().length; } + + void setSessionState(String sessionId, SessionState sessionState) { + Optional<SessionModel> model = getSession(stateStore, DATASOURCE).apply(sessionId); + SessionModel updated = + updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); + assertEquals(SessionState.RUNNING, updated.getSessionState()); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index 4374bd4f11..3546a874d9 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -33,6 +33,7 @@ public void sessionEnable() { public static Settings sessionSetting(boolean enabled) { Map<Settings.Key, Object> settings = new HashMap<>(); settings.put(Settings.Key.SPARK_EXECUTION_SESSION_ENABLED, enabled); + settings.put(Settings.Key.SPARK_EXECUTION_SESSION_LIMIT, 100); return settings(settings); }