Skip to content

Commit

Permalink
Percent encode opensearch index name (#2564)
Browse files Browse the repository at this point in the history
* percent encode opensearch index name

Signed-off-by: Sean Kao <[email protected]>

* spec test vacuum

Signed-off-by: Sean Kao <[email protected]>

* spotlessApply

Signed-off-by: Sean Kao <[email protected]>

---------

Signed-off-by: Sean Kao <[email protected]>
  • Loading branch information
seankao-az authored Mar 19, 2024
1 parent b375a28 commit e17962f
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.apache.commons.lang3.StringUtils.strip;

import java.util.Set;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -19,6 +20,9 @@ public class IndexQueryDetails {

public static final String STRIP_CHARS = "`";

private static final Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

private String indexName;
private FullyQualifiedTableName fullyQualifiedTableName;
// by default, auto_refresh = false;
Expand Down Expand Up @@ -103,6 +107,21 @@ public String openSearchIndexName() {
indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName();
break;
}
return indexName.toLowerCase();
return percentEncode(indexName).toLowerCase();
}

/*
* Percent-encode invalid OpenSearch index name characters.
*/
private String percentEncode(String indexName) {
StringBuilder builder = new StringBuilder(indexName.length());
for (char ch : indexName.toCharArray()) {
if (INVALID_INDEX_NAME_CHARS.contains(ch)) {
builder.append(String.format("%%%02X", (int) ch));
} else {
builder.append(ch);
}
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,20 +334,31 @@ public class FlintDatasetMock {
final FlintIndexType indexType;
final String indexName;
boolean isLegacy = false;
boolean isSpecialCharacter = false;
String latestId;

FlintDatasetMock isLegacy(boolean isLegacy) {
this.isLegacy = isLegacy;
return this;
}

FlintDatasetMock isSpecialCharacter(boolean isSpecialCharacter) {
this.isSpecialCharacter = isSpecialCharacter;
return this;
}

FlintDatasetMock latestId(String latestId) {
this.latestId = latestId;
return this;
}

public void createIndex() {
String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1";
if (isSpecialCharacter) {
createIndexWithMappings(
indexName, loadMappings(pathPrefix + "/" + "flint_special_character_index.json"));
return;
}
switch (indexType) {
case SKIPPING:
createIndexWithMappings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import org.opensearch.sql.spark.rest.model.LangType;

public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
private final String specialName = "`test ,:\"+/\\|?#><`";
private final String encodedName = "test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c";

public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs";
public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs";
public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mys3.default.http_logs_metrics";
public final String REFRESH_SCI = "REFRESH SKIPPING INDEX on mys3.default." + specialName;

public final FlintDatasetMock LEGACY_SKIPPING =
new FlintDatasetMock(
Expand All @@ -53,6 +57,15 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
"flint_mys3_default_http_logs_metrics")
.isLegacy(true);

public final FlintDatasetMock LEGACY_SPECIAL_CHARACTERS =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON mys3.default." + specialName,
REFRESH_SCI,
FlintIndexType.SKIPPING,
"flint_mys3_default_" + encodedName + "_skipping_index")
.isLegacy(true)
.isSpecialCharacter(true);

public final FlintDatasetMock SKIPPING =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON mys3.default.http_logs",
Expand All @@ -74,6 +87,16 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
FlintIndexType.MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics")
.latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19tZXRyaWNz");
public final FlintDatasetMock SPECIAL_CHARACTERS =
new FlintDatasetMock(
"DROP SKIPPING INDEX ON mys3.default." + specialName,
REFRESH_SCI,
FlintIndexType.SKIPPING,
"flint_mys3_default_" + encodedName + "_skipping_index")
.isSpecialCharacter(true)
.latestId(
"ZmxpbnRfbXlzM19kZWZhdWx0X3Rlc3QlMjAlMmMlM2ElMjIlMmIlMmYlNWMlN2MlM2YlMjMlM2UlM2Nfc2tpcHBpbmdfaW5kZXg=");

public final String CREATE_SI_AUTO =
"CREATE SKIPPING INDEX ON mys3.default.http_logs"
+ "(l_orderkey VALUE_SET) WITH (auto_refresh = true)";
Expand All @@ -93,7 +116,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec {
*/
@Test
public void legacyBasicDropAndFetchAndCancel() {
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING)
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
LocalEMRSClient emrsClient =
Expand Down Expand Up @@ -141,7 +164,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
*/
@Test
public void legacyDropIndexNoJobRunning() {
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV)
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
LocalEMRSClient emrsClient =
Expand Down Expand Up @@ -178,7 +201,7 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
*/
@Test
public void legacyDropIndexCancelJobTimeout() {
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV)
ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
// Mock EMR-S always return running.
Expand Down Expand Up @@ -209,14 +232,48 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
});
}

/**
* Legacy Test, without state index support. Not EMR-S job running. expectation is
*
* <p>(1) Drop Index response is SUCCESS
*/
@Test
public void legacyDropIndexSpecialCharacter() {
FlintDatasetMock mockDS = LEGACY_SPECIAL_CHARACTERS;
LocalEMRSClient emrsClient =
new LocalEMRSClient() {
@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
throw new IllegalArgumentException("Job run is not in a cancellable state");
}
};
EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient;
AsyncQueryExecutorService asyncQueryExecutorService =
createAsyncQueryExecutorService(emrServerlessClientFactory);

// Mock flint index
mockDS.createIndex();

// 1.drop index
CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null));

// 2.fetch result.
AsyncQueryExecutionResponse asyncQueryResults =
asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId());
assertEquals("SUCCESS", asyncQueryResults.getStatus());
assertNull(asyncQueryResults.getError());
}

/**
* Happy case. expectation is
*
* <p>(1) Drop Index response is SUCCESS (2) change index state to: DELETED
*/
@Test
public void dropAndFetchAndCancel() {
ImmutableList.of(SKIPPING, COVERING, MV)
ImmutableList.of(SKIPPING, COVERING, MV, SPECIAL_CHARACTERS)
.forEach(
mockDS -> {
LocalEMRSClient emrsClient =
Expand Down Expand Up @@ -584,6 +641,47 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
});
}

/**
* Cancel EMR-S job, but not job running. expectation is
*
* <p>(1) Drop Index response is SUCCESS (2) change index state to: DELETED
*/
@Test
public void dropIndexSpecialCharacter() {
FlintDatasetMock mockDS = SPECIAL_CHARACTERS;
// Mock EMR-S job is not running
LocalEMRSClient emrsClient =
new LocalEMRSClient() {
@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
throw new IllegalArgumentException("Job run is not in a cancellable state");
}
};
EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient;
AsyncQueryExecutorService asyncQueryExecutorService =
createAsyncQueryExecutorService(emrServerlessClientFactory);

// Mock flint index
mockDS.createIndex();
// Mock index state in refresh state.
MockFlintSparkJob flintIndexJob =
new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE);
flintIndexJob.refreshing();

// 1.drop index
CreateAsyncQueryResponse response =
asyncQueryExecutorService.createAsyncQuery(
new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null));

// 2.fetch result.
AsyncQueryExecutionResponse asyncQueryResults =
asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId());
assertEquals("SUCCESS", asyncQueryResults.getStatus());
assertNull(asyncQueryResults.getError());

flintIndexJob.assertState(FlintIndexState.DELETED);
}

/**
* No Job running, expectation is
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec {
mockDataset(
"VACUUM MATERIALIZED VIEW mys3.default.http_logs_metrics",
MATERIALIZED_VIEW,
"flint_mys3_default_http_logs_metrics"));
"flint_mys3_default_http_logs_metrics"),
mockDataset(
"VACUUM SKIPPING INDEX ON mys3.default.`test ,:\"+/\\|?#><`",
SKIPPING,
"flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index")
.isSpecialCharacter(true));

@Test
public void shouldVacuumIndexInRefreshingState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,19 @@ public void materializedViewIndexNameNotFullyQualified() {
.build()
.openSearchIndexName());
}

@Test
public void sanitizedIndexName() {
assertEquals(
"flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index",
IndexQueryDetails.builder()
.indexName("invalid")
.fullyQualifiedTableName(
new FullyQualifiedTableName("mys3.default.`test ,:\"+/\\|?#><`"))
.indexOptions(new FlintIndexOptions())
.indexQueryActionType(IndexQueryActionType.DROP)
.indexType(FlintIndexType.SKIPPING)
.build()
.openSearchIndexName());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"_meta": {
"kind": "skipping",
"indexedColumns": [
{
"columnType": "int",
"kind": "VALUE_SET",
"columnName": "status"
}
],
"name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index",
"options": {},
"source": "mys3.default.`test ,:\"+/\\|?#><`",
"version": "0.1.0",
"properties": {
"env": {
"SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p",
"SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q"
}
},
"latestId": "ZmxpbnRfbXlzM19kZWZhdWx0X3Rlc3QlMjAlMmMlM2ElMjIlMmIlMmYlNWMlN2MlM2YlMjMlM2UlM2Nfc2tpcHBpbmdfaW5kZXg="
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"_meta": {
"kind": "skipping",
"indexedColumns": [
{
"columnType": "int",
"kind": "VALUE_SET",
"columnName": "status"
}
],
"name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index",
"options": {},
"source": "mys3.default.`test ,:\"+/\\|?#><`",
"version": "0.1.0",
"properties": {
"env": {
"SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p",
"SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q"
}
}
}
}

0 comments on commit e17962f

Please sign in to comment.