Skip to content

Commit

Permalink
Add IF NOT EXISTS for skipping and covering index with IT (opensearch…
Browse files Browse the repository at this point in the history
…-project#42)

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Sep 28, 2023
1 parent 132795c commit 33a5f0a
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 11 deletions.
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ High level API is dependent on query engine implementation. Please see Query Eng
#### Skipping Index

```sql
CREATE SKIPPING INDEX
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
( column <index_type> [, ...] )
WHERE <filter_predicate>
Expand Down Expand Up @@ -161,7 +161,7 @@ DROP SKIPPING INDEX ON alb_logs
#### Covering Index

```sql
CREATE INDEX name ON <object>
CREATE INDEX [IF NOT EXISTS] name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH ( options )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ skippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName=multipartIdentifier
: CREATE SKIPPING INDEX (IF NOT EXISTS)?
ON tableName=multipartIdentifier
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand All @@ -53,7 +54,8 @@ coveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName=identifier ON tableName=multipartIdentifier
: CREATE INDEX (IF NOT EXISTS)? indexName=identifier
ON tableName=multipartIdentifier
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand Down
3 changes: 3 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,18 @@ class FlintSpark(val spark: SparkSession) {
*
* @param index
* Flint index to create
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex): Unit = {
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
if (flintClient.exists(indexName)) {
throw new IllegalStateException(
s"A table can only have one Flint skipping index: Flint index $indexName is found")
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
flintClient.createIndex(indexName, index.metadata())
}
flintClient.createIndex(indexName, index.metadata())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {

/**
* Create Flint index.
*
* @param ignoreIfExists
* ignore existing index
*/
def create(): Unit = flint.createIndex(buildIndex())
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)

/**
* Build method for concrete builder class to implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
indexBuilder.addIndexColumns(colName)
}

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
indexBuilder
.options(indexOptions)
.create()
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}
}

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
indexBuilder
.options(indexOptions)
.create()
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create covering index if not exists") {
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
flint.describeIndex(testFlintIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE INDEX $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}

test("show all covering index on the source table") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,28 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index if not exists") {
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
flint.describeIndex(testIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE SKIPPING INDEX
| ON $testTable ( year PARTITION )
| """.stripMargin)
}
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
}

test("describe skipping index") {
flint
.skippingIndex()
Expand Down

0 comments on commit 33a5f0a

Please sign in to comment.