Skip to content

Commit

Permalink
Add Flint Index Purging Logic
Browse files Browse the repository at this point in the history
- Introduce dynamic settings for enabling/disabling purging and controlling index TTL.
- Reuse default result index name as a common prefix for all result indices.
- Change result index to a non-hidden index for better user experience.
- Allow custom result index specification in the data source.
- Move default result index name from spark to core package to avoid cross-package references.
- Add validation for provided result index name in the data source.
- Use pattern prefix + data source name for default result index naming.

Testing:
- Verified old documents are purged in a cluster setup.
- Checked result index naming with and without custom names, ensuring validation is applied.

Note: Tests will be added in a subsequent PR.

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Oct 26, 2023
1 parent e16da37 commit 289400e
Show file tree
Hide file tree
Showing 24 changed files with 581 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ public enum Key {
SPARK_EXECUTION_ENGINE_CONFIG("plugins.query.executionengine.spark.config"),
CLUSTER_NAME("cluster.name"),
SPARK_EXECUTION_SESSION_ENABLED("plugins.query.executionengine.spark.session.enabled"),
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit");
SPARK_EXECUTION_SESSION_LIMIT("plugins.query.executionengine.spark.session.limit"),
SESSION_INDEX_TTL("plugins.query.executionengine.spark.session.index.ttl"),
RESULT_INDEX_TTL("plugins.query.executionengine.spark.result.index.ttl"),
AUTO_INDEX_MANAGEMENT_ENABLED(
"plugins.query.executionengine.spark.auto_index_management.enabled");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.AllArgsConstructor;
import java.util.function.Function;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -25,11 +25,24 @@

@Getter
@Setter
@AllArgsConstructor
@EqualsAndHashCode
@JsonIgnoreProperties(ignoreUnknown = true)
public class DataSourceMetadata {

public static final String DEFAULT_RESULT_INDEX = "query_execution_result";
public static final int MAX_RESULT_INDEX_NAME_SIZE = 255;
// OS doesn’t allow uppercase: https://tinyurl.com/yse2xdbx
public static final String RESULT_INDEX_NAME_PATTERN = "[a-z0-9_-]+";
public static String INVALID_RESULT_INDEX_NAME_SIZE =
"Result index name size must contains less than "
+ MAX_RESULT_INDEX_NAME_SIZE
+ " characters";
public static String INVALID_CHAR_IN_RESULT_INDEX_NAME =
"Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and"
+ " _(underscore)";
public static String INVALID_RESULT_INDEX_PREFIX =
"Result index must start with " + DEFAULT_RESULT_INDEX;

@JsonProperty private String name;

@JsonProperty private String description;
Expand All @@ -44,18 +57,32 @@ public class DataSourceMetadata {

@JsonProperty private String resultIndex;

public static Function<String, String> DATASOURCE_TO_RESULT_INDEX =
datasourceName -> String.format("%s_%s", DEFAULT_RESULT_INDEX, datasourceName);

public DataSourceMetadata(
String name,
String description,
DataSourceType connector,
List<String> allowedRoles,
Map<String, String> properties,
String resultIndex) {
String errorMessage = validateCustomResultIndex(resultIndex);

this.name = name;
this.connector = connector;
this.description = StringUtils.EMPTY;
this.description = description;
this.properties = properties;
this.allowedRoles = allowedRoles;
this.resultIndex = resultIndex;

if (errorMessage != null) {
throw new IllegalArgumentException(errorMessage);
}
if (resultIndex == null) {
this.resultIndex = DATASOURCE_TO_RESULT_INDEX.apply(name);
} else {
this.resultIndex = resultIndex;
}
}

public DataSourceMetadata() {
Expand All @@ -71,9 +98,26 @@ public DataSourceMetadata() {
public static DataSourceMetadata defaultOpenSearchDataSourceMetadata() {
return new DataSourceMetadata(
DEFAULT_DATASOURCE_NAME,
StringUtils.EMPTY,
DataSourceType.OPENSEARCH,
Collections.emptyList(),
ImmutableMap.of(),
null);
}

public String validateCustomResultIndex(String resultIndex) {
if (resultIndex == null) {
return null;
}
if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) {
return INVALID_RESULT_INDEX_NAME_SIZE;
}
if (!resultIndex.matches(RESULT_INDEX_NAME_PATTERN)) {
return INVALID_CHAR_IN_RESULT_INDEX_NAME;
}
if (resultIndex != null && !resultIndex.startsWith(DEFAULT_RESULT_INDEX)) {
return INVALID_RESULT_INDEX_PREFIX;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.DataSourceSchemaName;
import org.opensearch.sql.analysis.symbol.Namespace;
Expand Down Expand Up @@ -197,6 +198,7 @@ public Set<DataSourceMetadata> getDataSourceMetadata(boolean isDefaultDataSource
ds ->
new DataSourceMetadata(
ds.getName(),
StringUtils.EMPTY,
ds.getConnectorType(),
Collections.emptyList(),
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -62,6 +63,7 @@ void testIterator() {
dataSource ->
new DataSourceMetadata(
dataSource.getName(),
StringUtils.EMPTY,
dataSource.getConnectorType(),
Collections.emptyList(),
ImmutableMap.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -382,6 +383,7 @@ void testRemovalOfAuthorizationInfo() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand All @@ -407,6 +409,7 @@ void testRemovalOfAuthorizationInfoForAccessKeyAndSecretKye() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand Down Expand Up @@ -434,6 +437,7 @@ void testRemovalOfAuthorizationInfoForGlueWithRoleARN() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testGlue",
StringUtils.EMPTY,
DataSourceType.S3GLUE,
Collections.singletonList("glue_access"),
properties,
Expand Down Expand Up @@ -498,6 +502,7 @@ void testGetRawDataSourceMetadata() {
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
"testDS",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
Collections.singletonList("prometheus_access"),
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testToDataSourceMetadataFromJson() {
dataSourceMetadata.setConnector(DataSourceType.PROMETHEUS);
dataSourceMetadata.setAllowedRoles(List.of("prometheus_access"));
dataSourceMetadata.setProperties(Map.of("prometheus.uri", "https://localhost:9090"));
dataSourceMetadata.setResultIndex("query_execution_result2");
Gson gson = new Gson();
String json = gson.toJson(dataSourceMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -103,6 +104,7 @@ public void updateDataSourceAPITest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"update_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand All @@ -116,6 +118,7 @@ public void updateDataSourceAPITest() {
DataSourceMetadata updateDSM =
new DataSourceMetadata(
"update_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://randomtest.com:9090"),
Expand Down Expand Up @@ -175,6 +178,7 @@ public void deleteDataSourceTest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"delete_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand Down Expand Up @@ -214,6 +218,7 @@ public void getAllDataSourceTest() {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"get_all_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "https://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -44,6 +45,7 @@ protected void init() throws InterruptedException, IOException {
DataSourceMetadata createDSM =
new DataSourceMetadata(
"my_prometheus",
StringUtils.EMPTY,
DataSourceType.PROMETHEUS,
ImmutableList.of(),
ImmutableMap.of("prometheus.uri", "http://localhost:9090"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.opensearch.setting;

import static org.opensearch.common.settings.Settings.EMPTY;
import static org.opensearch.common.unit.TimeValue.timeValueDays;
import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.MemorySizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.common.setting.LegacySettings;
import org.opensearch.sql.common.setting.Settings;

Expand Down Expand Up @@ -149,6 +151,27 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> SESSION_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.SESSION_INDEX_TTL.getKeyValue(),
timeValueDays(14),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> RESULT_INDEX_TTL_SETTING =
Setting.positiveTimeSetting(
Key.RESULT_INDEX_TTL.getKeyValue(),
timeValueDays(60),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<Boolean> AUTO_INDEX_MANAGEMENT_ENABLED_SETTING =
Setting.boolSetting(
Key.AUTO_INDEX_MANAGEMENT_ENABLED.getKeyValue(),
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -231,6 +254,24 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SPARK_EXECUTION_SESSION_LIMIT,
SPARK_EXECUTION_SESSION_LIMIT_SETTING,
new Updater(Key.SPARK_EXECUTION_SESSION_LIMIT));
register(
settingBuilder,
clusterSettings,
Key.SESSION_INDEX_TTL,
SESSION_INDEX_TTL_SETTING,
new Updater(Key.SESSION_INDEX_TTL));
register(
settingBuilder,
clusterSettings,
Key.RESULT_INDEX_TTL,
RESULT_INDEX_TTL_SETTING,
new Updater(Key.RESULT_INDEX_TTL));
register(
settingBuilder,
clusterSettings,
Key.AUTO_INDEX_MANAGEMENT_ENABLED,
AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
new Updater(Key.AUTO_INDEX_MANAGEMENT_ENABLED));
registerNonDynamicSettings(
settingBuilder, clusterSettings, Key.CLUSTER_NAME, ClusterName.CLUSTER_NAME_SETTING);
defaultSettings = settingBuilder.build();
Expand Down Expand Up @@ -298,6 +339,9 @@ public static List<Setting<?>> pluginSettings() {
.add(SPARK_EXECUTION_ENGINE_CONFIG)
.add(SPARK_EXECUTION_SESSION_ENABLED_SETTING)
.add(SPARK_EXECUTION_SESSION_LIMIT_SETTING)
.add(SESSION_INDEX_TTL_SETTING)
.add(RESULT_INDEX_TTL_SETTING)
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.build();
}

Expand Down
15 changes: 14 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.google.common.collect.ImmutableSet;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -89,6 +90,7 @@
import org.opensearch.sql.spark.asyncquery.OpensearchAsyncQueryJobMetadataStorageService;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EmrServerlessClientImpl;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfig;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplier;
import org.opensearch.sql.spark.config.SparkExecutionEngineConfigSupplierImpl;
Expand Down Expand Up @@ -245,7 +247,18 @@ public Collection<Object> createComponents(
});

injector = modules.createInjector();
return ImmutableList.of(dataSourceService, asyncQueryExecutorService);
ClusterManagerEventListener clusterManagerEventListener =
new ClusterManagerEventListener(
clusterService,
threadPool,
client,
Clock.systemUTC(),
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
return ImmutableList.of(
dataSourceService, asyncQueryExecutorService, clusterManagerEventListener);
}

@Override
Expand Down
6 changes: 5 additions & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,11 @@ jacocoTestCoverageVerification {
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel'
'org.opensearch.sql.spark.execution.statement.StatementModel',
// TODO: add tests for purging flint indices
'org.opensearch.sql.spark.cluster.ClusterManagerEventListener',
'org.opensearch.sql.spark.cluster.FlintIndexRetention',
'org.opensearch.sql.spark.cluster.IndexCleanup'
]
limit {
counter = 'LINE'
Expand Down
Loading

0 comments on commit 289400e

Please sign in to comment.