Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo committed Oct 27, 2023
1 parent 4d0ec33 commit 003fe92
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,21 @@ public DataSourceMetadata(
List<String> allowedRoles,
Map<String, String> properties,
String resultIndex) {
String errorMessage = validateCustomResultIndex(resultIndex);

this.name = name;
this.connector = connector;
this.description = description;
this.properties = properties;
this.allowedRoles = allowedRoles;

String errorMessage = validateCustomResultIndex(resultIndex);
if (errorMessage != null) {
throw new IllegalArgumentException(errorMessage);
}
if (resultIndex == null) {
this.resultIndex = DATASOURCE_TO_RESULT_INDEX.apply(name);
this.resultIndex = fromNameToCustomResultIndex();
} else {
this.resultIndex = resultIndex;
}

this.connector = connector;
this.description = description;
this.properties = properties;
this.allowedRoles = allowedRoles;
}

public DataSourceMetadata() {
Expand Down Expand Up @@ -120,4 +119,34 @@ public String validateCustomResultIndex(String resultIndex) {
}
return null;
}

/**
* Since we are using datasource name to create result index, we need to make sure that the final
* name is valid
*
* @param resultIndex result index name
* @return valid result index name
*/
private String convertToValidResultIndex(String resultIndex) {
// Limit Length
if (resultIndex.length() > MAX_RESULT_INDEX_NAME_SIZE) {
resultIndex = resultIndex.substring(0, MAX_RESULT_INDEX_NAME_SIZE);
}

// Pattern Matching: Remove characters that don't match the pattern
StringBuilder validChars = new StringBuilder();
for (char c : resultIndex.toCharArray()) {
if (String.valueOf(c).matches(RESULT_INDEX_NAME_PATTERN)) {
validChars.append(c);
}
}
return validChars.toString();
}

public String fromNameToCustomResultIndex() {
if (name == null) {
throw new IllegalArgumentException("Datasource name cannot be null");
}
return convertToValidResultIndex(DATASOURCE_TO_RESULT_INDEX.apply(name.toLowerCase()));
}
}
2 changes: 1 addition & 1 deletion spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ jacocoTestCoverageVerification {
'org.opensearch.sql.spark.execution.session.SessionModel',
'org.opensearch.sql.spark.execution.statement.StatementModel',
// TODO: add tests for purging flint indices
'org.opensearch.sql.spark.cluster.ClusterManagerEventListener',
'org.opensearch.sql.spark.cluster.ClusterManagerEventListener*',
'org.opensearch.sql.spark.cluster.FlintIndexRetention',
'org.opensearch.sql.spark.cluster.IndexCleanup'
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,8 @@

package org.opensearch.sql.spark.cluster;

import java.util.Arrays;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -21,7 +15,6 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.reindex.DeleteByQueryAction;
import org.opensearch.index.reindex.DeleteByQueryRequest;
import org.opensearch.index.store.StoreStats;

/** Clean up the old docs for indices. */
public class IndexCleanup {
Expand All @@ -35,60 +28,6 @@ public IndexCleanup(Client client, ClusterService clusterService) {
this.clusterService = clusterService;
}

Check warning on line 29 in spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/IndexCleanup.java#L26-L29

Added lines #L26 - L29 were not covered by tests

/**
* delete docs when shard size is bigger than max limitation.
*
* @param indexName index name
* @param maxShardSize max shard size
* @param queryForDeleteByQueryRequest query request
* @param listener action listener
*/
public void deleteDocsBasedOnShardSize(
String indexName,
long maxShardSize,
QueryBuilder queryForDeleteByQueryRequest,
ActionListener<Boolean> listener) {

if (!clusterService.state().getRoutingTable().hasIndex(indexName)) {
LOG.debug("skip as the index:{} doesn't exist", indexName);
return;
}

ActionListener<IndicesStatsResponse> indicesStatsResponseListener =
ActionListener.wrap(
indicesStatsResponse -> {
// Check if any shard size is bigger than maxShardSize
boolean cleanupNeeded =
Arrays.stream(indicesStatsResponse.getShards())
.map(ShardStats::getStats)
.filter(Objects::nonNull)
.map(CommonStats::getStore)
.filter(Objects::nonNull)
.map(StoreStats::getSizeInBytes)
.anyMatch(size -> size > maxShardSize);

if (cleanupNeeded) {
deleteDocsByQuery(
indexName,
queryForDeleteByQueryRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure));
} else {
listener.onResponse(false);
}
},
listener::onFailure);

getCheckpointShardStoreStats(indexName, indicesStatsResponseListener);
}

private void getCheckpointShardStoreStats(
String indexName, ActionListener<IndicesStatsResponse> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.store();
indicesStatsRequest.indices(indexName);
client.admin().indices().stats(indicesStatsRequest, listener);
}

/**
* Delete docs based on query request
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.sql.spark.asyncquery;

import static org.opensearch.sql.opensearch.setting.OpenSearchSettings.*;
import static org.opensearch.sql.datasource.model.DataSourceMetadata.DEFAULT_RESULT_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.DEFAULT_CLASS_NAME;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_REQUEST_INDEX;
import static org.opensearch.sql.spark.data.constants.SparkConstants.FLINT_JOB_SESSION_ID;
Expand All @@ -29,6 +28,7 @@
import com.google.common.collect.ImmutableSet;
import java.util.*;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
Expand Down Expand Up @@ -112,10 +112,10 @@ public void setup() {
clusterSettings = clusterService.getClusterSettings();
pluginSettings = new OpenSearchSettings(clusterSettings);
dataSourceService = createDataSourceService();
dataSourceService.createDataSource(
DataSourceMetadata dataSourceMetadata =
new DataSourceMetadata(
DATASOURCE,
Strings.EMPTY,
StringUtils.EMPTY,
DataSourceType.S3GLUE,
ImmutableList.of(),
ImmutableMap.of(
Expand All @@ -127,9 +127,10 @@ public void setup() {
"http://localhost:9200",
"glue.indexstore.opensearch.auth",
"noauth"),
null));
null);
dataSourceService.createDataSource(dataSourceMetadata);
stateStore = new StateStore(client, clusterService);
createIndex(DEFAULT_RESULT_INDEX);
createIndex(dataSourceMetadata.fromNameToCustomResultIndex());
}

@After
Expand Down Expand Up @@ -341,7 +342,7 @@ public void datasourceWithBasicAuth() {
dataSourceService.createDataSource(
new DataSourceMetadata(
"mybasicauth",
Strings.EMPTY,
StringUtils.EMPTY,
DataSourceType.S3GLUE,
ImmutableList.of(),
properties,
Expand Down Expand Up @@ -382,7 +383,7 @@ public void withSessionCreateAsyncQueryFailed() {
assertTrue(statementModel.isPresent());
assertEquals(StatementState.WAITING, statementModel.get().getStatementState());

// 2. fetch async query result. not result write to SPARK_RESPONSE_BUFFER_INDEX_NAME yet.
// 2. fetch async query result. not result write to DEFAULT_RESULT_INDEX yet.
// mock failed statement.
StatementModel submitted = statementModel.get();
StatementModel mocked =
Expand Down Expand Up @@ -502,7 +503,7 @@ public void datasourceNameIncludeUppercase() {
dataSourceService.createDataSource(
new DataSourceMetadata(
"TESTS3",
Strings.EMPTY,
StringUtils.EMPTY,
DataSourceType.S3GLUE,
ImmutableList.of(),
ImmutableMap.of(
Expand Down

0 comments on commit 003fe92

Please sign in to comment.