From 3f69da454a386fc09578a7e81c957d935b28ce1a Mon Sep 17 00:00:00 2001 From: Sean Kao Date: Tue, 19 Mar 2024 08:30:44 -0700 Subject: [PATCH] percent encode opensearch index name Signed-off-by: Sean Kao --- .../dispatcher/model/IndexQueryDetails.java | 22 +++- .../AsyncQueryExecutorServiceSpec.java | 11 ++ .../spark/asyncquery/IndexQuerySpecTest.java | 106 +++++++++++++++++- .../spark/flint/IndexQueryDetailsTest.java | 14 +++ .../0.1.1/flint_special_character_index.json | 23 ++++ .../flint_special_character_index.json | 22 ++++ 6 files changed, 193 insertions(+), 5 deletions(-) create mode 100644 spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json create mode 100644 spark/src/test/resources/flint-index-mappings/flint_special_character_index.json diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java index 7ecd784792..38342fb15a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java @@ -7,6 +7,8 @@ import static org.apache.commons.lang3.StringUtils.strip; +import java.util.Set; + import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.commons.lang3.StringUtils; @@ -19,6 +21,9 @@ public class IndexQueryDetails { public static final String STRIP_CHARS = "`"; + private static final Set INVALID_INDEX_NAME_CHARS = + Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<'); + private String indexName; private FullyQualifiedTableName fullyQualifiedTableName; // by default, auto_refresh = false; @@ -103,6 +108,21 @@ public String openSearchIndexName() { indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName(); break; } - return indexName.toLowerCase(); + return percentEncode(indexName).toLowerCase(); + } + + /* + * Percent-encode invalid OpenSearch index name characters. + */ + private String percentEncode(String indexName) { + StringBuilder builder = new StringBuilder(indexName.length()); + for (char ch : indexName.toCharArray()) { + if (INVALID_INDEX_NAME_CHARS.contains(ch)) { + builder.append(String.format("%%%02X", (int) ch)); + } else { + builder.append(ch); + } + } + return builder.toString(); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index c1532d5c10..cb2c34dca0 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -334,6 +334,7 @@ public class FlintDatasetMock { final FlintIndexType indexType; final String indexName; boolean isLegacy = false; + boolean isSpecialCharacter = false; String latestId; FlintDatasetMock isLegacy(boolean isLegacy) { @@ -341,6 +342,11 @@ FlintDatasetMock isLegacy(boolean isLegacy) { return this; } + FlintDatasetMock isSpecialCharacter(boolean isSpecialCharacter) { + this.isSpecialCharacter = isSpecialCharacter; + return this; + } + FlintDatasetMock latestId(String latestId) { this.latestId = latestId; return this; @@ -348,6 +354,11 @@ FlintDatasetMock latestId(String latestId) { public void createIndex() { String pathPrefix = isLegacy ? "flint-index-mappings" : "flint-index-mappings/0.1.1"; + if (isSpecialCharacter) { + createIndexWithMappings( + indexName, loadMappings(pathPrefix + "/" + "flint_special_character_index.json")); + return; + } switch (indexType) { case SKIPPING: createIndexWithMappings( diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 25b94f2d11..3329655c9d 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -27,9 +27,13 @@ import org.opensearch.sql.spark.rest.model.LangType; public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { + private final String specialName = "`test ,:\"+/\\|?#><`"; + private final String encodedName = "test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c"; + public final String REFRESH_SI = "REFRESH SKIPPING INDEX on mys3.default.http_logs"; public final String REFRESH_CI = "REFRESH INDEX covering ON mys3.default.http_logs"; public final String REFRESH_MV = "REFRESH MATERIALIZED VIEW mv"; + public final String REFRESH_SCI = "REFRESH SKIPPING INDEX on mys3.default." + specialName; public final FlintDatasetMock LEGACY_SKIPPING = new FlintDatasetMock( @@ -50,6 +54,15 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { "DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv") .isLegacy(true); + public final FlintDatasetMock LEGACY_SPECIAL_CHARACTERS = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default." + specialName, + REFRESH_SCI, + FlintIndexType.SKIPPING, + "flint_mys3_default_" + encodedName + "_skipping_index") + .isLegacy(true) + .isSpecialCharacter(true); + public final FlintDatasetMock SKIPPING = new FlintDatasetMock( "DROP SKIPPING INDEX ON mys3.default.http_logs", @@ -68,6 +81,16 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { new FlintDatasetMock( "DROP MATERIALIZED VIEW mv", REFRESH_MV, FlintIndexType.MATERIALIZED_VIEW, "flint_mv") .latestId("mvid"); + + public final FlintDatasetMock SPECIAL_CHARACTERS = + new FlintDatasetMock( + "DROP SKIPPING INDEX ON mys3.default." + specialName, + REFRESH_SCI, + FlintIndexType.SKIPPING, + "flint_mys3_default_" + encodedName + "_skipping_index") + .isSpecialCharacter(true) + .latestId("specialcharacterindexid"); + public final String CREATE_SI_AUTO = "CREATE SKIPPING INDEX ON mys3.default.http_logs" + "(l_orderkey VALUE_SET) WITH (auto_refresh = true)"; @@ -87,7 +110,7 @@ public class IndexQuerySpecTest extends AsyncQueryExecutorServiceSpec { */ @Test public void legacyBasicDropAndFetchAndCancel() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING) + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_SPECIAL_CHARACTERS) .forEach( mockDS -> { LocalEMRSClient emrsClient = @@ -135,7 +158,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { */ @Test public void legacyDropIndexNoJobRunning() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS) .forEach( mockDS -> { LocalEMRSClient emrsClient = @@ -172,7 +195,7 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { */ @Test public void legacyDropIndexCancelJobTimeout() { - ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV) + ImmutableList.of(LEGACY_SKIPPING, LEGACY_COVERING, LEGACY_MV, LEGACY_SPECIAL_CHARACTERS) .forEach( mockDS -> { // Mock EMR-S always return running. @@ -203,6 +226,40 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { }); } + /** + * Legacy Test, without state index support. Not EMR-S job running. expectation is + * + *

(1) Drop Index response is SUCCESS + */ + @Test + public void legacyDropIndexSpecialCharacter() { + FlintDatasetMock mockDS = LEGACY_SPECIAL_CHARACTERS; + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + throw new IllegalArgumentException("Job run is not in a cancellable state"); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // Mock flint index + mockDS.createIndex(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2.fetch result. + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + } + /** * Happy case. expectation is * @@ -210,7 +267,7 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { */ @Test public void dropAndFetchAndCancel() { - ImmutableList.of(SKIPPING, COVERING, MV) + ImmutableList.of(SKIPPING, COVERING, MV, SPECIAL_CHARACTERS) .forEach( mockDS -> { LocalEMRSClient emrsClient = @@ -578,6 +635,47 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { }); } + /** + * Cancel EMR-S job, but not job running. expectation is + * + *

(1) Drop Index response is SUCCESS (2) change index state to: DELETED + */ + @Test + public void dropIndexSpecialCharacter() { + FlintDatasetMock mockDS = SPECIAL_CHARACTERS; + // Mock EMR-S job is not running + LocalEMRSClient emrsClient = + new LocalEMRSClient() { + @Override + public CancelJobRunResult cancelJobRun(String applicationId, String jobId) { + throw new IllegalArgumentException("Job run is not in a cancellable state"); + } + }; + EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + AsyncQueryExecutorService asyncQueryExecutorService = + createAsyncQueryExecutorService(emrServerlessClientFactory); + + // Mock flint index + mockDS.createIndex(); + // Mock index state in refresh state. + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, mockDS.latestId, DATASOURCE); + flintIndexJob.refreshing(); + + // 1.drop index + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest(mockDS.query, DATASOURCE, LangType.SQL, null)); + + // 2.fetch result. + AsyncQueryExecutionResponse asyncQueryResults = + asyncQueryExecutorService.getAsyncQueryResults(response.getQueryId()); + assertEquals("SUCCESS", asyncQueryResults.getStatus()); + assertNull(asyncQueryResults.getError()); + + flintIndexJob.assertState(FlintIndexState.DELETED); + } + /** * No Job running, expectation is * diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java index cddc790d5e..0fe92af54f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java @@ -104,4 +104,18 @@ public void materializedViewIndexNameNotFullyQualified() { .build() .openSearchIndexName()); } + + @Test + public void sanitizedIndexName() { + assertEquals( + "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index", + IndexQueryDetails.builder() + .indexName("invalid") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.`test ,:\"+/\\|?#><`")) + .indexOptions(new FlintIndexOptions()) + .indexQueryActionType(IndexQueryActionType.DROP) + .indexType(FlintIndexType.SKIPPING) + .build() + .openSearchIndexName()); + } } diff --git a/spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json new file mode 100644 index 0000000000..63943d17f3 --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/0.1.1/flint_special_character_index.json @@ -0,0 +1,23 @@ +{ + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index", + "options": {}, + "source": "mys3.default.`test ,:\"+/\\|?#><`", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + }, + "latestId": "specialcharacterindexid" + } +} diff --git a/spark/src/test/resources/flint-index-mappings/flint_special_character_index.json b/spark/src/test/resources/flint-index-mappings/flint_special_character_index.json new file mode 100644 index 0000000000..95ae75545f --- /dev/null +++ b/spark/src/test/resources/flint-index-mappings/flint_special_character_index.json @@ -0,0 +1,22 @@ +{ + "_meta": { + "kind": "skipping", + "indexedColumns": [ + { + "columnType": "int", + "kind": "VALUE_SET", + "columnName": "status" + } + ], + "name": "flint_mys3_default_test%20%2c%3a%22%2b%2f%5c%7c%3f%23%3e%3c_skipping_index", + "options": {}, + "source": "mys3.default.`test ,:\"+/\\|?#><`", + "version": "0.1.0", + "properties": { + "env": { + "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID": "00fd777k3k3ls20p", + "SERVERLESS_EMR_JOB_ID": "00fdmvv9hp8u0o0q" + } + } + } +}