From 289400ebe579388a9216f0f153a5771cb3d1d152 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Wed, 25 Oct 2023 18:40:43 -0700 Subject: [PATCH] Add Flint Index Purging Logic - Introduce dynamic settings for enabling/disabling purging and controlling index TTL. - Reuse default result index name as a common prefix for all result indices. - Change result index to a non-hidden index for better user experience. - Allow custom result index specification in the data source. - Move default result index name from spark to core package to avoid cross-package references. - Add validation for provided result index name in the data source. - Use pattern prefix + data source name for default result index naming. Testing: - Verified old documents are purged in a cluster setup. - Checked result index naming with and without custom names, ensuring validation is applied. Note: Tests will be added in a subsequent PR. Signed-off-by: Kaituo Li --- .../sql/common/setting/Settings.java | 6 +- .../datasource/model/DataSourceMetadata.java | 52 +++++- .../sql/analysis/AnalyzerTestBase.java | 2 + .../datasource/DataSourceTableScanTest.java | 2 + .../service/DataSourceServiceImplTest.java | 5 + .../utils/XContentParserUtilsTest.java | 1 + .../sql/datasource/DataSourceAPIsIT.java | 5 + .../sql/ppl/InformationSchemaCommandIT.java | 2 + .../ppl/PrometheusDataSourceCommandsIT.java | 1 + .../sql/ppl/ShowDataSourcesCommandIT.java | 2 + .../setting/OpenSearchSettings.java | 44 ++++++ .../org/opensearch/sql/plugin/SQLPlugin.java | 15 +- spark/build.gradle | 6 +- .../sql/spark/client/EmrClientImpl.java | 4 +- .../spark/client/EmrServerlessClientImpl.java | 4 +- .../cluster/ClusterManagerEventListener.java | 148 ++++++++++++++++++ .../spark/cluster/FlintIndexRetention.java | 148 ++++++++++++++++++ .../sql/spark/cluster/IndexCleanup.java | 125 +++++++++++++++ .../spark/data/constants/SparkConstants.java | 1 - .../response/JobExecutionResponseReader.java | 4 +- .../sql/spark/response/SparkResponse.java | 8 +- ...AsyncQueryExecutorServiceImplSpecTest.java | 13 +- ...AsyncQueryExecutionResponseReaderTest.java | 4 +- .../sql/spark/response/SparkResponseTest.java | 4 +- 24 files changed, 581 insertions(+), 25 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index ae1950d81c..6ef3921b39 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -40,7 +40,11 @@ public enum Key { SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"), CLUSTER_NAME("cluster.name"), SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"), - SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"); + SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"), + SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"), + RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"), + AUTO_INDEX_MANAGEMENT_ENABLED( + "plugins.query.executionengine.spark.auto_index_management.enabled"); @Getter private final String keyValue; diff --git a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java index 866e9cadef..85f4bd137a 100644 --- a/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java +++ b/core/src/main/java/org/opensearch/sql/datasource/model/DataSourceMetadata.java @@ -16,7 +16,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import lombok.AllArgsConstructor; +import java.util.function.Function; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; @@ -25,11 +25,24 @@ @Getter @Setter -@AllArgsConstructor @EqualsAndHashCode @JsonIgnoreProperties(ignoreUnknown = true) public class DataSourceMetadata { + public static final String DEFAULT_RESULT_INDEX = "query_execution_result"; + public static final int MAX_RESULT_INDEX_NAME_SIZE = 255; + // OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx + public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+"; + public static String INVALID_RESULT_INDEX_NAME_SIZE = + "Result index name size must contains less than " + + MAX_RESULT_INDEX_NAME_SIZE + + " characters"; + public static String INVALID_CHAR_IN_RESULT_INDEX_NAME = + "Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and" + + " _(underscore)"; + public static String INVALID_RESULT_INDEX_PREFIX = + "Result index must start with " + DEFAULT_RESULT_INDEX; + @JsonProperty private String name; @JsonProperty private String description; @@ -44,18 +57,32 @@ public class DataSourceMetadata { @JsonProperty private String resultIndex; + public static Function DATASOURCE_TO_RESULT_INDEX = + datasourceName -> String.format("%s_%s", DEFAULT_RESULT_INDEX, datasourceName); + public DataSourceMetadata( String name, + String description, DataSourceType connector, List allowedRoles, Map properties, String resultIndex) { + String errorMessage = validateCustomResultIndex(resultIndex); + this.name = name; this.connector = connector; - this.description = StringUtils.EMPTY; + this.description = description; this.properties = properties; this.allowedRoles = allowedRoles; - this.resultIndex = resultIndex; + + if (errorMessage != null) { + throw new IllegalArgumentException(errorMessage); + } + if (resultIndex == null) { + this.resultIndex = DATASOURCE_TO_RESULT_INDEX.apply(name); + } else { + this.resultIndex = resultIndex; + } } public DataSourceMetadata() { @@ -71,9 +98,26 @@ public DataSourceMetadata() { public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() { return new DataSourceMetadata( DEFAULT_DATASOURCE_NAME, + StringUtils.EMPTY, DataSourceType.OPENSEARCH, Collections.emptyList(), ImmutableMap.of(), null); } + + public String validateCustomResultIndex(String resultIndex) { + if (resultIndex == null) { + return null; + } + if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) { + return INVALID_RESULT_INDEX_NAME_SIZE; + } + if (!resultIndex.matches(RESULT_INDEX_NAME_PATTERN)) { + return INVALID_CHAR_IN_RESULT_INDEX_NAME; + } + if (resultIndex != null && !resultIndex.startsWith(DEFAULT_RESULT_INDEX)) { + return INVALID_RESULT_INDEX_PREFIX; + } + return null; + } } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java index 569cdd96f8..bfd68ee53a 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTestBase.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.DataSourceSchemaName; import org.opensearch.sql.analysis.symbol.Namespace; @@ -197,6 +198,7 @@ public Set getDataSourceMetadata(boolean isDefaultDataSource ds -> new DataSourceMetadata( ds.getName(), + StringUtils.EMPTY, ds.getConnectorType(), Collections.emptyList(), ImmutableMap.of(), diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java index 4aefc5521d..0c9449e824 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/datasource/DataSourceTableScanTest.java @@ -18,6 +18,7 @@ import java.util.LinkedHashMap; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -62,6 +63,7 @@ void testIterator() { dataSource -> new DataSourceMetadata( dataSource.getName(), + StringUtils.EMPTY, dataSource.getConnectorType(), Collections.emptyList(), ImmutableMap.of(), diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java index c62e586dae..bf88302833 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/service/DataSourceServiceImplTest.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -382,6 +383,7 @@ void testRemovalOfAuthorizationInfo() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testDS", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, Collections.singletonList("prometheus_access"), properties, @@ -407,6 +409,7 @@ void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testDS", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, Collections.singletonList("prometheus_access"), properties, @@ -434,6 +437,7 @@ void testRemovalOfAuthorizationInfoForGlueWithRoleARN() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testGlue", + StringUtils.EMPTY, DataSourceType.S3GLUE, Collections.singletonList("glue_access"), properties, @@ -498,6 +502,7 @@ void testGetRawDataSourceMetadata() { DataSourceMetadata dataSourceMetadata = new DataSourceMetadata( "testDS", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, Collections.singletonList("prometheus_access"), properties, diff --git a/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java b/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java index e1e442d12b..5a1f5e155f 100644 --- a/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java +++ b/datasources/src/test/java/org/opensearch/sql/datasources/utils/XContentParserUtilsTest.java @@ -44,6 +44,7 @@ public void testToDataSourceMetadataFromJson() { dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS); dataSourceMetadata.setAllowedRoles(List.of("prometheus_access")); dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090")); + dataSourceMetadata.setResultIndex("query_execution_result2"); Gson gson = new Gson(); String json = gson.toJson(dataSourceMetadata); diff --git a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java index ff36d2a887..92c1a4df16 100644 --- a/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/datasource/DataSourceAPIsIT.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Test; @@ -103,6 +104,7 @@ public void updateDataSourceAPITest() { DataSourceMetadata createDSM = new DataSourceMetadata( "update_prometheus", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090"), @@ -116,6 +118,7 @@ public void updateDataSourceAPITest() { DataSourceMetadata updateDSM = new DataSourceMetadata( "update_prometheus", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://randomtest.com:9090"), @@ -175,6 +178,7 @@ public void deleteDataSourceTest() { DataSourceMetadata createDSM = new DataSourceMetadata( "delete_prometheus", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090"), @@ -214,6 +218,7 @@ public void getAllDataSourceTest() { DataSourceMetadata createDSM = new DataSourceMetadata( "get_all_prometheus", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "https://localhost:9090"), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java index 7b694ce222..d916bfc4db 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/InformationSchemaCommandIT.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.junit.After; import org.junit.Assert; @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = new DataSourceMetadata( "my_prometheus", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "http://localhost:9090"), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java index b81b7f9517..10fe13a8db 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusDataSourceCommandsIT.java @@ -56,6 +56,7 @@ protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = new DataSourceMetadata( "my_prometheus", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "http://localhost:9090"), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java index c3d2bf5912..b6a34d5c41 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ShowDataSourcesCommandIT.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import org.apache.commons.lang3.StringUtils; import org.json.JSONObject; import org.junit.After; import org.junit.Assert; @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException { DataSourceMetadata createDSM = new DataSourceMetadata( "my_prometheus", + StringUtils.EMPTY, DataSourceType.PROMETHEUS, ImmutableList.of(), ImmutableMap.of("prometheus.uri", "http://localhost:9090"), diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index f80b576fe0..19660af1b6 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -6,6 +6,7 @@ package org.opensearch.sql.opensearch.setting; import static org.opensearch.common.settings.Settings.EMPTY; +import static org.opensearch.common.unit.TimeValue.timeValueDays; import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY; import com.google.common.annotations.VisibleForTesting; @@ -25,6 +26,7 @@ import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.MemorySizeValue; +import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.common.setting.LegacySettings; import org.opensearch.sql.common.setting.Settings; @@ -149,6 +151,27 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting SESSION_INDEX_TTL_SETTING = + Setting.positiveTimeSetting( + Key.SESSION_INDEX_TTL.getKeyValue(), + timeValueDays(14), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + public static final Setting RESULT_INDEX_TTL_SETTING = + Setting.positiveTimeSetting( + Key.RESULT_INDEX_TTL.getKeyValue(), + timeValueDays(60), + Setting.Property.NodeScope, + Setting.Property.Dynamic); + + public static final Setting AUTO_INDEX_MANAGEMENT_ENABLED_SETTING = + Setting.boolSetting( + Key.AUTO_INDEX_MANAGEMENT_ENABLED.getKeyValue(), + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic); + /** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */ @SuppressWarnings("unchecked") public OpenSearchSettings(ClusterSettings clusterSettings) { @@ -231,6 +254,24 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SPARK_EXECUTION_SESSION_LIMIT, SPARK_EXECUTION_SESSION_LIMIT_SETTING, new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT)); + register( + settingBuilder, + clusterSettings, + Key.SESSION_INDEX_TTL, + SESSION_INDEX_TTL_SETTING, + new Updater(Key.SESSION_INDEX_TTL)); + register( + settingBuilder, + clusterSettings, + Key.RESULT_INDEX_TTL, + RESULT_INDEX_TTL_SETTING, + new Updater(Key.RESULT_INDEX_TTL)); + register( + settingBuilder, + clusterSettings, + Key.AUTO_INDEX_MANAGEMENT_ENABLED, + AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, + new Updater(Key.AUTO_INDEX_MANAGEMENT_ENABLED)); registerNonDynamicSettings( settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING); defaultSettings = settingBuilder.build(); @@ -298,6 +339,9 @@ public static List> pluginSettings() { .add(SPARK_EXECUTION_ENGINE_CONFIG) .add(SPARK_EXECUTION_SESSION_ENABLED_SETTING) .add(SPARK_EXECUTION_SESSION_LIMIT_SETTING) + .add(SESSION_INDEX_TTL_SETTING) + .add(RESULT_INDEX_TTL_SETTING) + .add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING) .build(); } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 3d9740d84c..905c697e5b 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableSet; import java.security.AccessController; import java.security.PrivilegedAction; +import java.time.Clock; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -89,6 +90,7 @@ import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.EmrServerlessClientImpl; +import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; import org.opensearch.sql.spark.config.SparkExecutionEngineConfig; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier; import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl; @@ -245,7 +247,18 @@ public Collection createComponents( }); injector = modules.createInjector(); - return ImmutableList.of(dataSourceService, asyncQueryExecutorService); + ClusterManagerEventListener clusterManagerEventListener = + new ClusterManagerEventListener( + clusterService, + threadPool, + client, + Clock.systemUTC(), + OpenSearchSettings.SESSION_INDEX_TTL_SETTING, + OpenSearchSettings.RESULT_INDEX_TTL_SETTING, + OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING, + environment.settings()); + return ImmutableList.of( + dataSourceService, asyncQueryExecutorService, clusterManagerEventListener); } @Override diff --git a/spark/build.gradle b/spark/build.gradle index 8f4388495e..6d3bb17c62 100644 --- a/spark/build.gradle +++ b/spark/build.gradle @@ -122,7 +122,11 @@ jacocoTestCoverageVerification { // ignore because XContext IOException 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.execution.session.SessionModel', - 'org.opensearch.sql.spark.execution.statement.StatementModel' + 'org.opensearch.sql.spark.execution.statement.StatementModel', + // TODO: add tests for purging flint indices + 'org.opensearch.sql.spark.cluster.ClusterManagerEventListener', + 'org.opensearch.sql.spark.cluster.FlintIndexRetention', + 'org.opensearch.sql.spark.cluster.IndexCleanup' ] limit { counter = 'LINE' diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java index 4e66cd9a00..87f35bbc1e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrClientImpl.java @@ -5,7 +5,7 @@ package org.opensearch.sql.spark.client; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; @@ -74,7 +74,7 @@ void runEmrApplication(String query) { flint.getFlintIntegrationJar(), sparkApplicationJar, query, - SPARK_RESPONSE_BUFFER_INDEX_NAME, + DEFAULT_RESULT_INDEX, flint.getFlintHost(), flint.getFlintPort(), flint.getFlintScheme(), diff --git a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java index 335f3b6fc8..0da5ae7211 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java +++ b/spark/src/main/java/org/opensearch/sql/spark/client/EmrServerlessClientImpl.java @@ -5,7 +5,7 @@ package org.opensearch.sql.spark.client; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_SQL_APPLICATION_JAR; import com.amazonaws.services.emrserverless.AWSEMRServerless; @@ -36,7 +36,7 @@ public EmrServerlessClientImpl(AWSEMRServerless emrServerless) { public String startJobRun(StartJobRequest startJobRequest) { String resultIndex = startJobRequest.getResultIndex() == null - ? SPARK_RESPONSE_BUFFER_INDEX_NAME + ? DEFAULT_RESULT_INDEX : startJobRequest.getResultIndex(); StartJobRunRequest request = new StartJobRunRequest() diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java new file mode 100644 index 0000000000..3d004b548f --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.cluster; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Clock; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import org.opensearch.client.Client; +import org.opensearch.cluster.LocalNodeClusterManagerListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lifecycle.LifecycleListener; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.threadpool.Scheduler.Cancellable; +import org.opensearch.threadpool.ThreadPool; + +public class ClusterManagerEventListener implements LocalNodeClusterManagerListener { + + private Cancellable flintIndexRetentionCron; + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + private Clock clock; + private Duration sessionTtlDuration; + private Duration resultTtlDuration; + private boolean isAutoIndexManagementEnabled; + + public ClusterManagerEventListener( + ClusterService clusterService, + ThreadPool threadPool, + Client client, + Clock clock, + Setting sessionTtl, + Setting resultTtl, + Setting isAutoIndexManagementEnabledSetting, + Settings settings) { + this.clusterService = clusterService; + this.threadPool = threadPool; + this.client = client; + this.clusterService.addLocalNodeClusterManagerListener(this); + this.clock = clock; + + this.sessionTtlDuration = toDuration(sessionTtl.get(settings)); + this.resultTtlDuration = toDuration(resultTtl.get(settings)); + + clusterService + .getClusterSettings() + .addSettingsUpdateConsumer( + sessionTtl, + it -> { + this.sessionTtlDuration = toDuration(it); + cancel(flintIndexRetentionCron); + reInitializeFlintIndexRetention(); + }); + + clusterService + .getClusterSettings() + .addSettingsUpdateConsumer( + resultTtl, + it -> { + this.resultTtlDuration = toDuration(it); + cancel(flintIndexRetentionCron); + reInitializeFlintIndexRetention(); + }); + + isAutoIndexManagementEnabled = isAutoIndexManagementEnabledSetting.get(settings); + clusterService + .getClusterSettings() + .addSettingsUpdateConsumer( + isAutoIndexManagementEnabledSetting, + it -> { + if (isAutoIndexManagementEnabled != it) { + this.isAutoIndexManagementEnabled = it; + if (it) { + onClusterManager(); + } else { + offClusterManager(); + } + } + }); + } + + @Override + public void onClusterManager() { + + if (isAutoIndexManagementEnabled && flintIndexRetentionCron == null) { + reInitializeFlintIndexRetention(); + + clusterService.addLifecycleListener( + new LifecycleListener() { + @Override + public void beforeStop() { + cancel(flintIndexRetentionCron); + flintIndexRetentionCron = null; + } + }); + } + } + + private void reInitializeFlintIndexRetention() { + IndexCleanup indexCleanup = new IndexCleanup(client, clusterService); + flintIndexRetentionCron = + threadPool.scheduleWithFixedDelay( + new FlintIndexRetention( + sessionTtlDuration, + resultTtlDuration, + clock, + indexCleanup, + SPARK_REQUEST_BUFFER_INDEX_NAME + "*", + DataSourceMetadata.DEFAULT_RESULT_INDEX + "*"), + TimeValue.timeValueHours(24), + executorName()); + } + + @Override + public void offClusterManager() { + cancel(flintIndexRetentionCron); + flintIndexRetentionCron = null; + } + + private void cancel(Cancellable cron) { + if (cron != null) { + cron.cancel(); + } + } + + @VisibleForTesting + public List getFlintIndexRetentionCron() { + return Arrays.asList(flintIndexRetentionCron); + } + + private String executorName() { + return ThreadPool.Names.GENERIC; + } + + public static Duration toDuration(TimeValue timeValue) { + return Duration.ofMillis(timeValue.millis()); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java new file mode 100644 index 0000000000..3ca56ca173 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintIndexRetention.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.cluster; + +import static org.opensearch.sql.spark.execution.session.SessionModel.LAST_UPDATE_TIME; +import static org.opensearch.sql.spark.execution.statement.StatementModel.SUBMIT_TIME; + +import java.time.Clock; +import java.time.Duration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.CheckedConsumer; +import org.opensearch.common.time.FormatNames; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.QueryBuilders; + +public class FlintIndexRetention implements Runnable { + private static final Logger LOG = LogManager.getLogger(FlintIndexRetention.class); + + static final String SESSION_INDEX_NOT_EXIST_MSG = "Checkpoint index does not exist."; + + static final String RESULT_INDEX_NOT_EXIST_MSG = "Result index does not exist."; + + // timestamp field in result index + static final String UPDATE_TIME_FIELD = "updateTime"; + + private final Duration defaultSessionTtl; + private final Duration defaultResultTtl; + private final Clock clock; + private final IndexCleanup indexCleanup; + private final String sessionIndexNameRegex; + private final String resultIndexNameRegex; + + public FlintIndexRetention( + Duration defaultSessionTtl, + Duration defaultResultTtl, + Clock clock, + IndexCleanup indexCleanup, + String sessionIndexNameRegex, + String resultIndexNameRegex) { + this.defaultSessionTtl = defaultSessionTtl; + this.defaultResultTtl = defaultResultTtl; + this.clock = clock; + this.indexCleanup = indexCleanup; + this.sessionIndexNameRegex = sessionIndexNameRegex; + this.resultIndexNameRegex = resultIndexNameRegex; + } + + @Override + public void run() { + purgeSessionIndex(); + } + + private void purgeSessionIndex() { + purgeIndex( + sessionIndexNameRegex, + defaultSessionTtl, + LAST_UPDATE_TIME, + this::handleSessionPurgeResponse, + this::handleSessionPurgeError); + } + + private void handleSessionPurgeResponse(Long response) { + purgeStatementIndex(); + } + + private void handleSessionPurgeError(Exception exception) { + handlePurgeError(SESSION_INDEX_NOT_EXIST_MSG, "session index", exception); + purgeStatementIndex(); + } + + private void purgeStatementIndex() { + purgeIndex( + sessionIndexNameRegex, + defaultSessionTtl, + SUBMIT_TIME, + this::handleStatementPurgeResponse, + this::handleStatementPurgeError); + } + + private void handleStatementPurgeResponse(Long response) { + purgeResultIndex(); + } + + private void handleStatementPurgeError(Exception exception) { + handlePurgeError(SESSION_INDEX_NOT_EXIST_MSG, "session index", exception); + purgeResultIndex(); + } + + private void purgeResultIndex() { + purgeIndex( + resultIndexNameRegex, + defaultResultTtl, + UPDATE_TIME_FIELD, + this::handleResultPurgeResponse, + this::handleResultPurgeError); + } + + private void handleResultPurgeResponse(Long response) { + LOG.debug("purge result index done"); + } + + private void handleResultPurgeError(Exception exception) { + handlePurgeError(RESULT_INDEX_NOT_EXIST_MSG, "result index", exception); + } + + private void handlePurgeError(String notExistMsg, String indexType, Exception exception) { + if (exception instanceof IndexNotFoundException) { + LOG.debug(notExistMsg); + } else { + LOG.error("delete docs by query fails for " + indexType, exception); + } + } + + private void purgeIndex( + String indexName, + Duration ttl, + String timeStampField, + CheckedConsumer successHandler, + CheckedConsumer errorHandler) { + indexCleanup.deleteDocsByQuery( + indexName, + QueryBuilders.boolQuery() + .filter( + QueryBuilders.rangeQuery(timeStampField) + .lte(clock.millis() - ttl.toMillis()) + .format(FormatNames.EPOCH_MILLIS.getSnakeCaseName())), + ActionListener.wrap( + response -> { + try { + successHandler.accept(response); + } catch (Exception e) { + LOG.error("Error handling response for index " + indexName, e); + } + }, + ex -> { + try { + errorHandler.accept(ex); + } catch (Exception e) { + LOG.error("Error handling error for index " + indexName, e); + } + })); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java new file mode 100644 index 0000000000..75cad4fed9 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java @@ -0,0 +1,125 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.cluster; + +import java.util.Arrays; +import java.util.Objects; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.reindex.DeleteByQueryAction; +import org.opensearch.index.reindex.DeleteByQueryRequest; +import org.opensearch.index.store.StoreStats; + +/** Clean up the old docs for indices. */ +public class IndexCleanup { + private static final Logger LOG = LogManager.getLogger(IndexCleanup.class); + + private final Client client; + private final ClusterService clusterService; + + public IndexCleanup(Client client, ClusterService clusterService) { + this.client = client; + this.clusterService = clusterService; + } + + /** + * delete docs when shard size is bigger than max limitation. + * + * @param indexName index name + * @param maxShardSize max shard size + * @param queryForDeleteByQueryRequest query request + * @param listener action listener + */ + public void deleteDocsBasedOnShardSize( + String indexName, + long maxShardSize, + QueryBuilder queryForDeleteByQueryRequest, + ActionListener listener) { + + if (!clusterService.state().getRoutingTable().hasIndex(indexName)) { + LOG.debug("skip as the index:{} doesn't exist", indexName); + return; + } + + ActionListener indicesStatsResponseListener = + ActionListener.wrap( + indicesStatsResponse -> { + // Check if any shard size is bigger than maxShardSize + boolean cleanupNeeded = + Arrays.stream(indicesStatsResponse.getShards()) + .map(ShardStats::getStats) + .filter(Objects::nonNull) + .map(CommonStats::getStore) + .filter(Objects::nonNull) + .map(StoreStats::getSizeInBytes) + .anyMatch(size -> size > maxShardSize); + + if (cleanupNeeded) { + deleteDocsByQuery( + indexName, + queryForDeleteByQueryRequest, + ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); + } else { + listener.onResponse(false); + } + }, + listener::onFailure); + + getCheckpointShardStoreStats(indexName, indicesStatsResponseListener); + } + + private void getCheckpointShardStoreStats( + String indexName, ActionListener listener) { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.store(); + indicesStatsRequest.indices(indexName); + client.admin().indices().stats(indicesStatsRequest, listener); + } + + /** + * Delete docs based on query request + * + * @param indexName index name + * @param queryForDeleteByQueryRequest query request + * @param listener action listener + */ + public void deleteDocsByQuery( + String indexName, QueryBuilder queryForDeleteByQueryRequest, ActionListener listener) { + DeleteByQueryRequest deleteRequest = + new DeleteByQueryRequest(indexName) + .setQuery(queryForDeleteByQueryRequest) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setRefresh(true); + + try (ThreadContext.StoredContext context = + client.threadPool().getThreadContext().stashContext()) { + client.execute( + DeleteByQueryAction.INSTANCE, + deleteRequest, + ActionListener.wrap( + response -> { + long deleted = response.getDeleted(); + if (deleted > 0) { + // if 0 docs get deleted, it means our query cannot find any matching doc + // or the index does not exist at all + LOG.info("{} docs are deleted for index:{}", deleted, indexName); + } + listener.onResponse(response.getDeleted()); + }, + listener::onFailure)); + } + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java index e8659c680c..3a243cb5b3 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java +++ b/spark/src/main/java/org/opensearch/sql/spark/data/constants/SparkConstants.java @@ -20,7 +20,6 @@ public class SparkConstants { // EMR-S will download JAR to local maven public static final String SPARK_SQL_APPLICATION_JAR = "file:///home/hadoop/.ivy2/jars/org.opensearch_opensearch-spark-sql-application_2.12-0.1.0-SNAPSHOT.jar"; - public static final String SPARK_RESPONSE_BUFFER_INDEX_NAME = ".query_execution_result"; public static final String SPARK_REQUEST_BUFFER_INDEX_NAME = ".query_execution_request"; // TODO should be replaced with mvn jar. public static final String FLINT_INTEGRATION_JAR = diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java index 2614992463..e4773310f0 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/JobExecutionResponseReader.java @@ -5,9 +5,9 @@ package org.opensearch.sql.spark.response; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.data.constants.SparkConstants.DATA_FIELD; import static org.opensearch.sql.spark.data.constants.SparkConstants.JOB_ID_FIELD; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -45,7 +45,7 @@ public JSONObject getResultWithQueryId(String queryId, String resultIndex) { private JSONObject searchInSparkIndex(QueryBuilder query, String resultIndex) { SearchRequest searchRequest = new SearchRequest(); - String searchResultIndex = resultIndex == null ? SPARK_RESPONSE_BUFFER_INDEX_NAME : resultIndex; + String searchResultIndex = resultIndex == null ? DEFAULT_RESULT_INDEX : resultIndex; searchRequest.indices(searchResultIndex); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); diff --git a/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java b/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java index 496caba2c9..e225804043 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java +++ b/spark/src/main/java/org/opensearch/sql/spark/response/SparkResponse.java @@ -5,7 +5,7 @@ package org.opensearch.sql.spark.response; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import com.google.common.annotations.VisibleForTesting; import lombok.Data; @@ -51,7 +51,7 @@ public JSONObject getResultFromOpensearchIndex() { private JSONObject searchInSparkIndex(QueryBuilder query) { SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(SPARK_RESPONSE_BUFFER_INDEX_NAME); + searchRequest.indices(DEFAULT_RESULT_INDEX); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(query); searchRequest.source(searchSourceBuilder); @@ -65,7 +65,7 @@ private JSONObject searchInSparkIndex(QueryBuilder query) { if (searchResponse.status().getStatus() != 200) { throw new RuntimeException( "Fetching result from " - + SPARK_RESPONSE_BUFFER_INDEX_NAME + + DEFAULT_RESULT_INDEX + " index failed with status : " + searchResponse.status()); } else { @@ -80,7 +80,7 @@ private JSONObject searchInSparkIndex(QueryBuilder query) { @VisibleForTesting void deleteInSparkIndex(String id) { - DeleteRequest deleteRequest = new DeleteRequest(SPARK_RESPONSE_BUFFER_INDEX_NAME); + DeleteRequest deleteRequest = new DeleteRequest(DEFAULT_RESULT_INDEX); deleteRequest.id(id); ActionFuture deleteResponseActionFuture; try { diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java index cf638effc6..5d106e0582 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceImplSpecTest.java @@ -5,6 +5,7 @@ package org.opensearch.sql.spark.asyncquery; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_ENABLED_SETTING; import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.SPARK_EXECUTION_SESSION_LIMIT_SETTING; import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME; @@ -12,7 +13,6 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID; import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_SESSION_CLASS_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import static org.opensearch.sql.spark.execution.session.SessionModel.SESSION_DOC_TYPE; import static org.opensearch.sql.spark.execution.statement.StatementModel.SESSION_ID; import static org.opensearch.sql.spark.execution.statement.StatementModel.STATEMENT_DOC_TYPE; @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import joptsimple.internal.Strings; import lombok.Getter; import org.junit.After; import org.junit.Before; @@ -111,6 +112,7 @@ public void setup() { dataSourceService.createDataSource( new DataSourceMetadata( DATASOURCE, + Strings.EMPTY, DataSourceType.S3GLUE, ImmutableList.of(), ImmutableMap.of( @@ -124,7 +126,7 @@ public void setup() { "noauth"), null)); stateStore = new StateStore(client, clusterService); - createIndex(SPARK_RESPONSE_BUFFER_INDEX_NAME); + createIndex(DEFAULT_RESULT_INDEX); } @After @@ -328,7 +330,12 @@ public void datasourceWithBasicAuth() { dataSourceService.createDataSource( new DataSourceMetadata( - "mybasicauth", DataSourceType.S3GLUE, ImmutableList.of(), properties, null)); + "mybasicauth", + Strings.EMPTY, + DataSourceType.S3GLUE, + ImmutableList.of(), + properties, + null)); LocalEMRSClient emrsClient = new LocalEMRSClient(); AsyncQueryExecutorService asyncQueryExecutorService = createAsyncQueryExecutorService(emrsClient); diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java index fefc951dd7..bbaf6f0f59 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/AsyncQueryExecutionResponseReaderTest.java @@ -10,8 +10,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.constants.TestConstants.EMR_JOB_ID; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import java.util.Map; import org.apache.lucene.search.TotalHits; @@ -79,7 +79,7 @@ public void testInvalidSearchResponse() { () -> jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)); Assertions.assertEquals( "Fetching result from " - + SPARK_RESPONSE_BUFFER_INDEX_NAME + + DEFAULT_RESULT_INDEX + " index failed with status : " + RestStatus.NO_CONTENT, exception.getMessage()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java b/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java index e234454021..bad26a2792 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/response/SparkResponseTest.java @@ -9,8 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; +import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX; import static org.opensearch.sql.spark.constants.TestConstants.EMR_CLUSTER_ID; -import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_RESPONSE_BUFFER_INDEX_NAME; import java.util.Map; import org.apache.lucene.search.TotalHits; @@ -69,7 +69,7 @@ public void testInvalidSearchResponse() { assertThrows(RuntimeException.class, () -> sparkResponse.getResultFromOpensearchIndex()); Assertions.assertEquals( "Fetching result from " - + SPARK_RESPONSE_BUFFER_INDEX_NAME + + DEFAULT_RESULT_INDEX + " index failed with status : " + RestStatus.NO_CONTENT, exception.getMessage());