From b620a561e178a366d17932d2ad84951f9d2d3a4e Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 25 Oct 2023 17:30:46 -0700 Subject: [PATCH] Add where clause support in create statement (#2366) (#2370) * Upload latest grammar with where clause support in create statement * Fix MV index name bug * Add UT for create index statement with where clause * Fix spotless format --------- (cherry picked from commit bb82d8545b71d586d262a00f7f52b46c4e533f1c) Signed-off-by: Chen Dai Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../src/main/antlr/FlintSparkSqlExtensions.g4 | 10 +++ spark/src/main/antlr/SparkSqlBase.g4 | 1 + .../model/FullyQualifiedTableName.java | 22 ++++++ .../dispatcher/model/IndexQueryDetails.java | 25 ++---- .../spark/flint/IndexQueryDetailsTest.java | 77 +++++++++++++++++++ .../sql/spark/utils/SQLQueryUtilsTest.java | 64 +++++++++++---- 6 files changed, 168 insertions(+), 31 deletions(-) diff --git a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 index f48c276e44..e44944fcff 100644 --- a/spark/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/spark/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -31,6 +31,7 @@ createSkippingIndexStatement : CREATE SKIPPING INDEX (IF NOT EXISTS)? ON tableName LEFT_PAREN indexColTypeList RIGHT_PAREN + whereClause? (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; @@ -58,6 +59,7 @@ createCoveringIndexStatement : CREATE INDEX (IF NOT EXISTS)? indexName ON tableName LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN + whereClause? (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; @@ -115,6 +117,14 @@ materializedViewQuery : .+? ; +whereClause + : WHERE filterCondition + ; + +filterCondition + : .+? + ; + indexColTypeList : indexColType (COMMA indexColType)* ; diff --git a/spark/src/main/antlr/SparkSqlBase.g4 b/spark/src/main/antlr/SparkSqlBase.g4 index 533d851ba6..597a1e5856 100644 --- a/spark/src/main/antlr/SparkSqlBase.g4 +++ b/spark/src/main/antlr/SparkSqlBase.g4 @@ -174,6 +174,7 @@ SHOW: 'SHOW'; TRUE: 'TRUE'; VIEW: 'VIEW'; VIEWS: 'VIEWS'; +WHERE: 'WHERE'; WITH: 'WITH'; diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java index 5a9fe4d31f..fc1513241f 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/FullyQualifiedTableName.java @@ -5,6 +5,9 @@ package org.opensearch.sql.spark.dispatcher.model; +import static org.apache.commons.lang3.StringUtils.strip; +import static org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails.STRIP_CHARS; + import java.util.Arrays; import lombok.Data; import lombok.NoArgsConstructor; @@ -40,4 +43,23 @@ public FullyQualifiedTableName(String fullyQualifiedName) { tableName = parts[0]; } } + + /** + * Convert qualified name to Flint name concat by underscore. + * + * @return Flint name + */ + public String toFlintName() { + StringBuilder builder = new StringBuilder(); + if (datasourceName != null) { + builder.append(strip(datasourceName, STRIP_CHARS)).append("_"); + } + if (schemaName != null) { + builder.append(strip(schemaName, STRIP_CHARS)).append("_"); + } + if (tableName != null) { + builder.append(strip(tableName, STRIP_CHARS)); + } + return builder.toString(); + } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java index 5b4326a10e..576b0772d2 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/model/IndexQueryDetails.java @@ -5,6 +5,8 @@ package org.opensearch.sql.spark.dispatcher.model; +import static org.apache.commons.lang3.StringUtils.strip; + import lombok.EqualsAndHashCode; import lombok.Getter; import org.apache.commons.lang3.StringUtils; @@ -83,32 +85,19 @@ public String openSearchIndexName() { switch (getIndexType()) { case COVERING: indexName = - "flint" - + "_" - + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS) - + "_" - + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS) + "flint_" + + fullyQualifiedTableName.toFlintName() + "_" - + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS) - + "_" - + StringUtils.strip(getIndexName(), STRIP_CHARS) + + strip(getIndexName(), STRIP_CHARS) + "_" + getIndexType().getSuffix(); break; case SKIPPING: indexName = - "flint" - + "_" - + StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS) - + "_" - + StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS) - + "_" - + StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS) - + "_" - + getIndexType().getSuffix(); + "flint_" + fullyQualifiedTableName.toFlintName() + "_" + getIndexType().getSuffix(); break; case MATERIALIZED_VIEW: - indexName = "flint" + "_" + StringUtils.strip(getMvName(), STRIP_CHARS).toLowerCase(); + indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName(); break; } return indexName.toLowerCase(); diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java index e725ddc21e..6299dee0ca 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/IndexQueryDetailsTest.java @@ -26,4 +26,81 @@ public void skippingIndexName() { .build() .openSearchIndexName()); } + + @Test + public void coveringIndexName() { + assertEquals( + "flint_mys3_default_http_logs_idx_status_index", + IndexQueryDetails.builder() + .indexName("idx_status") + .fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs")) + .indexType(FlintIndexType.COVERING) + .build() + .openSearchIndexName()); + } + + @Test + public void materializedViewIndexName() { + assertEquals( + "flint_mys3_default_http_logs_metrics", + IndexQueryDetails.builder() + .mvName("mys3.default.http_logs_metrics") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build() + .openSearchIndexName()); + } + + @Test + public void materializedViewIndexNameWithBackticks() { + assertEquals( + "flint_mys3_default_http_logs_metrics", + IndexQueryDetails.builder() + .mvName("`mys3`.`default`.`http_logs_metrics`") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build() + .openSearchIndexName()); + } + + @Test + public void materializedViewIndexNameWithDots() { + assertEquals( + "flint_mys3_default_http_logs_metrics.1026", + IndexQueryDetails.builder() + .mvName("`mys3`.`default`.`http_logs_metrics.1026`") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build() + .openSearchIndexName()); + } + + @Test + public void materializedViewIndexNameWithDotsInCatalogName() { + // FIXME: should not use ctx.getText which is hard to split + assertEquals( + "flint_mys3_1026_default`.`http_logs_metrics", + IndexQueryDetails.builder() + .mvName("`mys3.1026`.`default`.`http_logs_metrics`") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build() + .openSearchIndexName()); + } + + @Test + public void materializedViewIndexNameNotFullyQualified() { + // Normally this should not happen and can add precondition check once confirmed. + assertEquals( + "flint_default_http_logs_metrics", + IndexQueryDetails.builder() + .mvName("default.http_logs_metrics") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build() + .openSearchIndexName()); + + assertEquals( + "flint_http_logs_metrics", + IndexQueryDetails.builder() + .mvName("http_logs_metrics") + .indexType(FlintIndexType.MATERIALIZED_VIEW) + .build() + .openSearchIndexName()); + } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java index c86d7656d6..f5226206ab 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/utils/SQLQueryUtilsTest.java @@ -102,19 +102,57 @@ void testErrorScenarios() { } @Test - void testExtractionFromFlintIndexQueries() { - String createCoveredIndexQuery = - "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity) WITH" - + " (auto_refresh = true)"; - Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(createCoveredIndexQuery)); - IndexQueryDetails indexQueryDetails = - SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery); - FullyQualifiedTableName fullyQualifiedTableName = - indexQueryDetails.getFullyQualifiedTableName(); - Assertions.assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName()); - Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName()); - Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); - Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); + void testExtractionFromFlintSkippingIndexQueries() { + String[] createSkippingIndexQueries = { + "CREATE SKIPPING INDEX ON myS3.default.alb_logs (l_orderkey VALUE_SET)", + "CREATE SKIPPING INDEX IF NOT EXISTS" + + " ON myS3.default.alb_logs (l_orderkey VALUE_SET) " + + " WITH (auto_refresh = true)", + "CREATE SKIPPING INDEX ON myS3.default.alb_logs(l_orderkey VALUE_SET)" + + " WITH (auto_refresh = true)", + "CREATE SKIPPING INDEX ON myS3.default.alb_logs(l_orderkey VALUE_SET) " + + " WHERE elb_status_code = 500 " + + " WITH (auto_refresh = true)" + }; + + for (String query : createSkippingIndexQueries) { + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(query), "Failed query: " + query); + IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(query); + FullyQualifiedTableName fullyQualifiedTableName = + indexQueryDetails.getFullyQualifiedTableName(); + + Assertions.assertNull(indexQueryDetails.getIndexName()); + Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName()); + Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); + Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); + } + } + + @Test + void testExtractionFromFlintCoveringIndexQueries() { + String[] createCoveredIndexQueries = { + "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity)", + "CREATE INDEX IF NOT EXISTS elb_and_requestUri " + + " ON myS3.default.alb_logs(l_orderkey, l_quantity) " + + " WITH (auto_refresh = true)", + "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity)" + + " WITH (auto_refresh = true)", + "CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity) " + + " WHERE elb_status_code = 500 " + + " WITH (auto_refresh = true)" + }; + + for (String query : createCoveredIndexQueries) { + Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(query), "Failed query: " + query); + IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(query); + FullyQualifiedTableName fullyQualifiedTableName = + indexQueryDetails.getFullyQualifiedTableName(); + + Assertions.assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName()); + Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName()); + Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName()); + Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName()); + } } @Test