diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index ae1950d81c..89d046b3d9 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -39,8 +39,7 @@ public enum Key { METRICS_ROLLING_INTERVAL("plugins.query.metrics.rolling_interval"), SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"), CLUSTER_NAME("cluster.name"), - SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"), - SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"); + SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"); @Getter private final String keyValue; diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 686116636a..cd56e76491 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -347,39 +347,3 @@ SQL query:: } } -plugins.query.executionengine.spark.session.limit -=================================================== - -Description ------------ - -Each datasource can have maximum 100 sessions running in parallel by default. You can increase limit by this setting. - -1. The default value is 100. -2. This setting is node scope. -3. This setting can be updated dynamically. - -You can update the setting with a new value like this. - -SQL query:: - - sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings \ - ... -d '{"transient":{"plugins.query.executionengine.spark.session.limit":200}}' - { - "acknowledged": true, - "persistent": {}, - "transient": { - "plugins": { - "query": { - "executionengine": { - "spark": { - "session": { - "limit": "200" - } - } - } - } - } - } - } - diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index f80b576fe0..ecb35afafa 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -142,13 +142,6 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); - public static final Setting SPARK_EXECUTION_SESSION_LIMIT_SETTING = - Setting.intSetting( - Key.SPARK_EXECUTION_SESSION_LIMIT.getKeyValue(), - 100, - Setting.Property.NodeScope, - Setting.Property.Dynamic); - /** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */ @SuppressWarnings("unchecked") public OpenSearchSettings(ClusterSettings clusterSettings) { @@ -225,12 +218,6 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SPARK_EXECUTION_SESSION_ENABLED, SPARK_EXECUTION_SESSION_ENABLED_SETTING, new Updater(Key.SPARK_EXECUTION_SESSION_ENABLED)); - register( - settingBuilder, - clusterSettings, - Key.SPARK_EXECUTION_SESSION_LIMIT, - SPARK_EXECUTION_SESSION_LIMIT_SETTING, - new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT)); registerNonDynamicSettings( settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING); defaultSettings = settingBuilder.build(); @@ -297,7 +284,6 @@ public static List> pluginSettings() { .add(DATASOURCE_URI_HOSTS_DENY_LIST) .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_ENABLED_SETTING) - .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) .build(); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java index 81b9fdaee0..c0f7bbcde8 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/session/SessionManager.java @@ -6,11 +6,8 @@ package org.opensearch.sql.spark.execution.session; import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_ENABLED; -import static org.opensearch.sql.common.setting.Settings.Key.SPARK_EXECUTION_SESSION_LIMIT; import static org.opensearch.sql.spark.execution.session.SessionId.newSessionId; -import static org.opensearch.sql.spark.execution.statestore.StateStore.activeSessionsCount; -import java.util.Locale; import java.util.Optional; import lombok.RequiredArgsConstructor; import org.opensearch.sql.common.setting.Settings; @@ -29,15 +26,6 @@ public class SessionManager { private final Settings settings; public Session createSession(CreateSessionRequest request) { - int sessionMaxLimit = sessionMaxLimit(); - if (activeSessionsCount(stateStore, request.getDatasourceName()).get() >= sessionMaxLimit) { - String errorMsg = - String.format( - Locale.ROOT, - "The maximum number of active sessions can be " + "supported is %d", - sessionMaxLimit); - throw new IllegalArgumentException(errorMsg); - } InteractiveSession session = InteractiveSession.builder() .sessionId(newSessionId(request.getDatasourceName())) @@ -67,8 +55,4 @@ public Optional getSession(SessionId sid) { public boolean isEnabled() { return settings.getSettingValue(SPARK_EXECUTION_SESSION_ENABLED); } - - public int sessionMaxLimit() { - return settings.getSettingValue(SPARK_EXECUTION_SESSION_LIMIT); - } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java index e6bad9fc26..6546d303fb 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/StateStore.java @@ -14,7 +14,6 @@ import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.Supplier; import lombok.RequiredArgsConstructor; import org.apache.commons.io.IOUtils; import org.apache.logging.log4j.LogManager; @@ -26,8 +25,6 @@ import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.SearchRequest; -import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; @@ -41,13 +38,9 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.index.query.QueryBuilder; -import org.opensearch.index.query.QueryBuilders; -import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; -import org.opensearch.sql.spark.execution.session.SessionType; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; @@ -189,35 +182,6 @@ private void createIndex(String indexName) { } } - private long count(String indexName, QueryBuilder query) { - if (!this.clusterService.state().routingTable().hasIndex(indexName)) { - return 0; - } - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(query); - searchSourceBuilder.size(0); - - // https://github.com/opensearch-project/sql/issues/1801. - SearchRequest searchRequest = - new SearchRequest() - .indices(indexName) - .preference("_primary_first") - .source(searchSourceBuilder); - - ActionFuture searchResponseActionFuture; - try (ThreadContext.StoredContext ignored = - client.threadPool().getThreadContext().stashContext()) { - searchResponseActionFuture = client.search(searchRequest); - } - SearchResponse searchResponse = searchResponseActionFuture.actionGet(); - if (searchResponse.status().getStatus() != 200) { - throw new RuntimeException( - "Fetching job metadata information failed with status : " + searchResponse.status()); - } else { - return searchResponse.getHits().getTotalHits().value; - } - } - private String loadConfigFromResource(String fileName) throws IOException { InputStream fileStream = StateStore.class.getClassLoader().getResourceAsStream(fileName); return IOUtils.toString(fileStream, StandardCharsets.UTF_8); @@ -289,19 +253,4 @@ public static Function> getJobMetaData( AsyncQueryJobMetadata::fromXContent, DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName)); } - - public static Supplier activeSessionsCount(StateStore stateStore, String datasourceName) { - return () -> - stateStore.count( - DATASOURCE_TO_REQUEST_INDEX.apply(datasourceName), - QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(SessionModel.TYPE, SessionModel.SESSION_DOC_TYPE)) - .must( - QueryBuilders.termQuery( - SessionModel.SESSION_TYPE, SessionType.INTERACTIVE.getSessionType())) - .must(QueryBuilders.termQuery(SessionModel.DATASOURCE_NAME, datasourceName)) - .must( - QueryBuilders.termQuery( - SessionModel.SESSION_STATE, SessionState.RUNNING.getSessionState()))); - } } diff --git a/spark/src/main/resources/query_execution_request_mapping.yml b/spark/src/main/resources/query_execution_request_mapping.yml index 682534d338..fbe90a1cba 100644 --- a/spark/src/main/resources/query_execution_request_mapping.yml +++ b/spark/src/main/resources/query_execution_request_mapping.yml @@ -40,5 +40,3 @@ properties: format: strict_date_time||epoch_millis queryId: type: keyword - excludeJobIds: - type: keyword diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index f65049a7d9..19edd53eae 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -6,7 +6,6 @@ package org.opensearch.sql.spark.asyncquery; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING; -import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; @@ -17,9 +16,7 @@ import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; -import static org.opensearch.sql.spark.execution.statestore.StateStore.getSession; import static org.opensearch.sql.spark.execution.statestore.StateStore.getStatement; -import static org.opensearch.sql.spark.execution.statestore.StateStore.updateSessionState; import static org.opensearch.sql.spark.execution.statestore.StateStore.updateStatementState; import com.amazonaws.services.emrserverless.model.CancelJobRunResult; @@ -64,8 +61,6 @@ import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; import org.opensearch.sql.spark.dispatcher.SparkQueryDispatcher; import org.opensearch.sql.spark.execution.session.SessionManager; -import org.opensearch.sql.spark.execution.session.SessionModel; -import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statement.StatementModel; import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; @@ -134,13 +129,6 @@ public void clean() { .setTransientSettings( Settings.builder().putNull(SPARK_EXECUTION_SESSION_ENABLED_SETTING.getKey()).build()) .get(); - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().putNull(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey()).build()) - .get(); } @Test @@ -390,35 +378,6 @@ public void withSessionCreateAsyncQueryFailed() { assertEquals("mock error", asyncQueryResults.getError()); } - @Test - public void createSessionMoreThanLimitFailed() { - LocalEMRSClient emrsClient = new LocalEMRSClient(); - AsyncQueryExecutorService asyncQueryExecutorService = - createAsyncQueryExecutorService(emrsClient); - - // enable session - enableSession(true); - // only allow one session in domain. - setSessionLimit(1); - - // 1. create async query. - CreateAsyncQueryResponse first = - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null)); - assertNotNull(first.getSessionId()); - setSessionState(first.getSessionId(), SessionState.RUNNING); - - // 2. create async query without session. - IllegalArgumentException exception = - assertThrows( - IllegalArgumentException.class, - () -> - asyncQueryExecutorService.createAsyncQuery( - new CreateAsyncQueryRequest("select 1", DATASOURCE, LangType.SQL, null))); - assertEquals( - "The maximum number of active sessions can be supported is 1", exception.getMessage()); - } - private DataSourceServiceImpl createDataSourceService() { String masterKey = "a57d991d9b573f75b9bba1df"; DataSourceMetadataStorage dataSourceMetadataStorage = @@ -511,16 +470,6 @@ public void enableSession(boolean enabled) { .get(); } - public void setSessionLimit(long limit) { - client - .admin() - .cluster() - .prepareUpdateSettings() - .setTransientSettings( - Settings.builder().put(SPARK_EXECUTION_SESSION_LIMIT_SETTING.getKey(), limit).build()) - .get(); - } - int search(QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices(DATASOURCE_TO_REQUEST_INDEX.apply(DATASOURCE)); @@ -531,11 +480,4 @@ int search(QueryBuilder query) { return searchResponse.getHits().getHits().length; } - - void setSessionState(String sessionId, SessionState sessionState) { - Optional model = getSession(stateStore, DATASOURCE).apply(sessionId); - SessionModel updated = - updateSessionState(stateStore, DATASOURCE).apply(model.get(), sessionState); - assertEquals(SessionState.RUNNING, updated.getSessionState()); - } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java index 3546a874d9..4374bd4f11 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/session/SessionManagerTest.java @@ -33,7 +33,6 @@ public void sessionEnable() { public static Settings sessionSetting(boolean enabled) { Map settings = new HashMap<>(); settings.put(Settings.Key.SPARK_EXECUTION_SESSION_ENABLED, enabled); - settings.put(Settings.Key.SPARK_EXECUTION_SESSION_LIMIT, 100); return settings(settings); }