From e17962f675c0600339cb0c8920efaee813396e2b Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 19 Mar 2024 14:03:15 -0700 Subject: [PATCH] Percent encode opensearch index name (#2564) * percent encode opensearch index name Signed-off-by: Sean Kao * spec test vacuum Signed-off-by: Sean Kao * spotlessApply Signed-off-by: Sean Kao --------- Signed-off-by: Sean Kao --- .../dispatcher/model/IndexQueryDetails.java | 21 +++- .../AsyncQueryExecutorServiceSpec.java | 11 ++ .../spark/asyncquery/IndexQuerySpecTest.java | 106 +++++++++++++++++- .../asyncquery/IndexQuerySpecVacuumTest.java | 7 +- .../spark/flint/IndexQueryDetailsTest.java | 15 +++ .../0.1.1/flint_special_character_index.json | 23 ++++ .../flint_special_character_index.json | 22 ++++ 7 files changed, 199 insertions(+), 6 deletions(-) create mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_special_character_index.json diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java index 7ecd784792..5596d1b425 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java @@ -7,6 +7,7 @@ import static org.apache.commons.lang3.StringUtils.strip; +import java.util.Set; import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.commons.lang3.StringUtils; @@ -19,6 +20,9 @@ public class IndexQueryDetails { public static final String STRIP_CHARS = "`"; + private static final Set INVALID_INDEX_NAME_CHARS = + Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); + private String indexName; private FullyQualifiedTableName fullyQualifiedTableName; // by default, auto_refresh = false; @@ -103,6 +107,21 @@ public String openSearchIndexName() { indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName(); break; } - return indexName.toLowerCase(); + return percentEncode(indexName).toLowerCase(); + } + + /* + * Percent-encode invalid OpenSearch index name characters. + */ + private String percentEncode(String indexName) { + StringBuilder builder = new StringBuilder(indexName.length()); + for (char ch : indexName.toCharArray()) { + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + builder.append(String.format("%%%02X", (int) ch)); + } else { + builder.append(ch); + } + } + return builder.toString(); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index c1532d5c10..cb2c34dca0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -334,6 +334,7 @@ public class FlintDatasetMock { final FlintIndexType indexType; final String indexName; boolean isLegacy = false; + boolean isSpecialCharacter = false; String latestId; FlintDatasetMock isLegacy(boolean isLegacy) { @@ -341,6 +342,11 @@ FlintDatasetMock isLegacy(boolean isLegacy) { return this; } + FlintDatasetMock isSpecialCharacter(boolean isSpecialCharacter) { + this.isSpecialCharacter = isSpecialCharacter; + return this; + } + FlintDatasetMock latestId(String latestId) { this.latestId = latestId; return this; @@ -348,6 +354,11 @@ FlintDatasetMock latestId(String latestId) { public void createIndex() { String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; + if (isSpecialCharacter) { + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_special_character_index.json")); + return; + } switch (indexType) { case SKIPPING: createIndexWithMappings( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 132074de63..19f68d5969 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -27,9 +27,13 @@ import org.opensearch.sql.spark.rest.model.LangType; public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { + private final String specialName = "`test ,:\"+/\\|?#><`"; + private final String encodedName = "test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c"; + public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs"; public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs"; public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mys3.default.http_logs_metrics"; + public final String REFRESH_SCI = "REFRESH SKIPPING INDEX on mys3.default." + specialName; public final FlintDatasetMock LEGACY_SKIPPING = new FlintDatasetMock( @@ -53,6 +57,15 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { "flint_mys3_default_http_logs_metrics") .isLegacy(true); + public final FlintDatasetMock LEGACY_SPECIAL_CHARACTERS = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default." + specialName, + REFRESH_SCI, + FlintIndexType.SKIPPING, + "flint_mys3_default_" + encodedName + "_skipping_index") + .isLegacy(true) + .isSpecialCharacter(true); + public final FlintDatasetMock SKIPPING = new FlintDatasetMock( "DROP SKIPPING INDEX ON mys3.default.http_logs", @@ -74,6 +87,16 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { FlintIndexType.MATERIALIZED_VIEW, "flint_mys3_default_http_logs_metrics") .latestId("ZmxpbnRfbXlzM19kZWZhdWx0X2h0dHBfbG9nc19tZXRyaWNz"); + public final FlintDatasetMock SPECIAL_CHARACTERS = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default." + specialName, + REFRESH_SCI, + FlintIndexType.SKIPPING, + "flint_mys3_default_" + encodedName + "_skipping_index") + .isSpecialCharacter(true) + .latestId( + "ZmxpbnRfbXlzM19kZWZhdWx0X3Rlc3QlMjAlMmMlM2ElMjIlMmIlMmYlNWMlN2MlM2YlMjMlM2UlM2Nfc2tpcHBpbmdfaW5kZXg="); + public final String CREATE_SI_AUTO = "CREATE SKIPPING INDEX ON mys3.default.http_logs" + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)"; @@ -93,7 +116,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { */ @Test public void legacyBasicDropAndFetchAndCancel() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING) + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_SPECIAL_CHARACTERS) .forEach( mockDS -> { LocalEMRSClient emrsClient = @@ -141,7 +164,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { */ @Test public void legacyDropIndexNoJobRunning() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS) .forEach( mockDS -> { LocalEMRSClient emrsClient = @@ -178,7 +201,7 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { */ @Test public void legacyDropIndexCancelJobTimeout() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS) .forEach( mockDS -> { // Mock EMR-S always return running. @@ -209,6 +232,40 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { }); } + /** + * Legacy Test, without state index support. Not EMR-S job running. expectation is + * + *

(1) Drop Index response is SUCCESS + */ + @Test + public void legacyDropIndexSpecialCharacter() { + FlintDatasetMock mockDS = LEGACY_SPECIAL_CHARACTERS; + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + throw new IllegalArgumentException("Job run is not in a cancellable state"); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // Mock flint index + mockDS.createIndex(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2.fetch result. + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + } + /** * Happy case. expectation is * @@ -216,7 +273,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { */ @Test public void dropAndFetchAndCancel() { - ImmutableList.of(SKIPPING, COVERING, MV) + ImmutableList.of(SKIPPING, COVERING, MV, SPECIAL_CHARACTERS) .forEach( mockDS -> { LocalEMRSClient emrsClient = @@ -584,6 +641,47 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { }); } + /** + * Cancel EMR-S job, but not job running. expectation is + * + *

(1) Drop Index response is SUCCESS (2) change index state to: DELETED + */ + @Test + public void dropIndexSpecialCharacter() { + FlintDatasetMock mockDS = SPECIAL_CHARACTERS; + // Mock EMR-S job is not running + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + throw new IllegalArgumentException("Job run is not in a cancellable state"); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // Mock flint index + mockDS.createIndex(); + // Mock index state in refresh state. + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + flintIndexJob.refreshing(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2.fetch result. + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + } + /** * No Job running, expectation is * diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java index 67c89c791c..1a07ae8634 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -54,7 +54,12 @@ public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec { mockDataset( "VACUUM MATERIALIZED VIEW mys3.default.http_logs_metrics", MATERIALIZED_VIEW, - "flint_mys3_default_http_logs_metrics")); + "flint_mys3_default_http_logs_metrics"), + mockDataset( + "VACUUM SKIPPING INDEX ON mys3.default.`test ,:\"+/\\|?#><`", + SKIPPING, + "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index") + .isSpecialCharacter(true)); @Test public void shouldVacuumIndexInRefreshingState() { diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java index cddc790d5e..4d52baee92 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java @@ -104,4 +104,19 @@ public void materializedViewIndexNameNotFullyQualified() { .build() .openSearchIndexName()); } + + @Test + public void sanitizedIndexName() { + assertEquals( + "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index", + IndexQueryDetails.builder() + .indexName("invalid") + .fullyQualifiedTableName( + new FullyQualifiedTableName("mys3.default.`test ,:\"+/\\|?#><`")) + .indexOptions(new FlintIndexOptions()) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build() + .openSearchIndexName()); + } } diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json new file mode 100644 index 0000000000..72c83c59fa --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json @@ -0,0 +1,23 @@ +{ + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index", + "options": {}, + "source": "mys3.default.`test ,:\"+/\\|?#><`", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + }, + "latestId": "ZmxpbnRfbXlzM19kZWZhdWx0X3Rlc3QlMjAlMmMlM2ElMjIlMmIlMmYlNWMlN2MlM2YlMjMlM2UlM2Nfc2tpcHBpbmdfaW5kZXg=" + } +} diff --git a/spark/src/test/resources/flint-index-mappings/flint_special_character_index.json b/spark/src/test/resources/flint-index-mappings/flint_special_character_index.json new file mode 100644 index 0000000000..95ae75545f --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_special_character_index.json @@ -0,0 +1,22 @@ +{ + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index", + "options": {}, + "source": "mys3.default.`test ,:\"+/\\|?#><`", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + } + } +}