Skip to content

Commit

Permalink
Add where clause support in create statement (#2366)
Browse files Browse the repository at this point in the history
* Upload latest grammar with where clause support in create statement

Signed-off-by: Chen Dai <[email protected]>

* Fix MV index name bug

Signed-off-by: Chen Dai <[email protected]>

* Add UT for create index statement with where clause

Signed-off-by: Chen Dai <[email protected]>

* Fix spotless format

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
(cherry picked from commit bb82d85)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Oct 25, 2023
1 parent a43d28d commit a63af39
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 31 deletions.
10 changes: 10 additions & 0 deletions spark/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ createSkippingIndexStatement
: CREATE SKIPPING INDEX (IF NOT EXISTS)?
ON tableName
LEFT_PAREN indexColTypeList RIGHT_PAREN
whereClause?
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

Expand Down Expand Up @@ -58,6 +59,7 @@ createCoveringIndexStatement
: CREATE INDEX (IF NOT EXISTS)? indexName
ON tableName
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
whereClause?
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

Expand Down Expand Up @@ -115,6 +117,14 @@ materializedViewQuery
: .+?
;

whereClause
: WHERE filterCondition
;

filterCondition
: .+?
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ SHOW: 'SHOW';
TRUE: 'TRUE';
VIEW: 'VIEW';
VIEWS: 'VIEWS';
WHERE: 'WHERE';
WITH: 'WITH';


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.sql.spark.dispatcher.model;

import static org.apache.commons.lang3.StringUtils.strip;
import static org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails.STRIP_CHARS;

import java.util.Arrays;
import lombok.Data;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -40,4 +43,23 @@ public FullyQualifiedTableName(String fullyQualifiedName) {
tableName = parts[0];
}
}

/**
* Convert qualified name to Flint name concat by underscore.
*
* @return Flint name
*/
public String toFlintName() {
StringBuilder builder = new StringBuilder();
if (datasourceName != null) {
builder.append(strip(datasourceName, STRIP_CHARS)).append("_");
}
if (schemaName != null) {
builder.append(strip(schemaName, STRIP_CHARS)).append("_");
}
if (tableName != null) {
builder.append(strip(tableName, STRIP_CHARS));
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.spark.dispatcher.model;

import static org.apache.commons.lang3.StringUtils.strip;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -83,32 +85,19 @@ public String openSearchIndexName() {
switch (getIndexType()) {
case COVERING:
indexName =
"flint"
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
"flint_"
+ fullyQualifiedTableName.toFlintName()
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(getIndexName(), STRIP_CHARS)
+ strip(getIndexName(), STRIP_CHARS)
+ "_"
+ getIndexType().getSuffix();
break;
case SKIPPING:
indexName =
"flint"
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getDatasourceName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getSchemaName(), STRIP_CHARS)
+ "_"
+ StringUtils.strip(fullyQualifiedTableName.getTableName(), STRIP_CHARS)
+ "_"
+ getIndexType().getSuffix();
"flint_" + fullyQualifiedTableName.toFlintName() + "_" + getIndexType().getSuffix();
break;
case MATERIALIZED_VIEW:
indexName = "flint" + "_" + StringUtils.strip(getMvName(), STRIP_CHARS).toLowerCase();
indexName = "flint_" + new FullyQualifiedTableName(mvName).toFlintName();
break;
}
return indexName.toLowerCase();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,81 @@ public void skippingIndexName() {
.build()
.openSearchIndexName());
}

@Test
public void coveringIndexName() {
assertEquals(
"flint_mys3_default_http_logs_idx_status_index",
IndexQueryDetails.builder()
.indexName("idx_status")
.fullyQualifiedTableName(new FullyQualifiedTableName("mys3.default.http_logs"))
.indexType(FlintIndexType.COVERING)
.build()
.openSearchIndexName());
}

@Test
public void materializedViewIndexName() {
assertEquals(
"flint_mys3_default_http_logs_metrics",
IndexQueryDetails.builder()
.mvName("mys3.default.http_logs_metrics")
.indexType(FlintIndexType.MATERIALIZED_VIEW)
.build()
.openSearchIndexName());
}

@Test
public void materializedViewIndexNameWithBackticks() {
assertEquals(
"flint_mys3_default_http_logs_metrics",
IndexQueryDetails.builder()
.mvName("`mys3`.`default`.`http_logs_metrics`")
.indexType(FlintIndexType.MATERIALIZED_VIEW)
.build()
.openSearchIndexName());
}

@Test
public void materializedViewIndexNameWithDots() {
assertEquals(
"flint_mys3_default_http_logs_metrics.1026",
IndexQueryDetails.builder()
.mvName("`mys3`.`default`.`http_logs_metrics.1026`")
.indexType(FlintIndexType.MATERIALIZED_VIEW)
.build()
.openSearchIndexName());
}

@Test
public void materializedViewIndexNameWithDotsInCatalogName() {
// FIXME: should not use ctx.getText which is hard to split
assertEquals(
"flint_mys3_1026_default`.`http_logs_metrics",
IndexQueryDetails.builder()
.mvName("`mys3.1026`.`default`.`http_logs_metrics`")
.indexType(FlintIndexType.MATERIALIZED_VIEW)
.build()
.openSearchIndexName());
}

@Test
public void materializedViewIndexNameNotFullyQualified() {
// Normally this should not happen and can add precondition check once confirmed.
assertEquals(
"flint_default_http_logs_metrics",
IndexQueryDetails.builder()
.mvName("default.http_logs_metrics")
.indexType(FlintIndexType.MATERIALIZED_VIEW)
.build()
.openSearchIndexName());

assertEquals(
"flint_http_logs_metrics",
IndexQueryDetails.builder()
.mvName("http_logs_metrics")
.indexType(FlintIndexType.MATERIALIZED_VIEW)
.build()
.openSearchIndexName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,57 @@ void testErrorScenarios() {
}

@Test
void testExtractionFromFlintIndexQueries() {
String createCoveredIndexQuery =
"CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity) WITH"
+ " (auto_refresh = true)";
Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(createCoveredIndexQuery));
IndexQueryDetails indexQueryDetails =
SQLQueryUtils.extractIndexDetails(createCoveredIndexQuery);
FullyQualifiedTableName fullyQualifiedTableName =
indexQueryDetails.getFullyQualifiedTableName();
Assertions.assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName());
Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName());
Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName());
Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName());
void testExtractionFromFlintSkippingIndexQueries() {
String[] createSkippingIndexQueries = {
"CREATE SKIPPING INDEX ON myS3.default.alb_logs (l_orderkey VALUE_SET)",
"CREATE SKIPPING INDEX IF NOT EXISTS"
+ " ON myS3.default.alb_logs (l_orderkey VALUE_SET) "
+ " WITH (auto_refresh = true)",
"CREATE SKIPPING INDEX ON myS3.default.alb_logs(l_orderkey VALUE_SET)"
+ " WITH (auto_refresh = true)",
"CREATE SKIPPING INDEX ON myS3.default.alb_logs(l_orderkey VALUE_SET) "
+ " WHERE elb_status_code = 500 "
+ " WITH (auto_refresh = true)"
};

for (String query : createSkippingIndexQueries) {
Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(query), "Failed query: " + query);
IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(query);
FullyQualifiedTableName fullyQualifiedTableName =
indexQueryDetails.getFullyQualifiedTableName();

Assertions.assertNull(indexQueryDetails.getIndexName());
Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName());
Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName());
Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName());
}
}

@Test
void testExtractionFromFlintCoveringIndexQueries() {
String[] createCoveredIndexQueries = {
"CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity)",
"CREATE INDEX IF NOT EXISTS elb_and_requestUri "
+ " ON myS3.default.alb_logs(l_orderkey, l_quantity) "
+ " WITH (auto_refresh = true)",
"CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity)"
+ " WITH (auto_refresh = true)",
"CREATE INDEX elb_and_requestUri ON myS3.default.alb_logs(l_orderkey, l_quantity) "
+ " WHERE elb_status_code = 500 "
+ " WITH (auto_refresh = true)"
};

for (String query : createCoveredIndexQueries) {
Assertions.assertTrue(SQLQueryUtils.isFlintExtensionQuery(query), "Failed query: " + query);
IndexQueryDetails indexQueryDetails = SQLQueryUtils.extractIndexDetails(query);
FullyQualifiedTableName fullyQualifiedTableName =
indexQueryDetails.getFullyQualifiedTableName();

Assertions.assertEquals("elb_and_requestUri", indexQueryDetails.getIndexName());
Assertions.assertEquals("myS3", fullyQualifiedTableName.getDatasourceName());
Assertions.assertEquals("default", fullyQualifiedTableName.getSchemaName());
Assertions.assertEquals("alb_logs", fullyQualifiedTableName.getTableName());
}
}

@Test
Expand Down

0 comments on commit a63af39

Please sign in to comment.