Skip to content

Commit

Permalink
Add IF NOT EXISTS for skipping and covering index with IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 25, 2023
1 parent e72f054 commit 0884216
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 15 deletions.
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ High level API is dependent on query engine implementation. Please see Query Eng
#### Skipping Index

```sql
CREATE SKIPPING INDEX
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
( column <index_type> [, ...] )
WHERE <filter_predicate>
Expand Down Expand Up @@ -161,7 +161,7 @@ DROP SKIPPING INDEX ON alb_logs
#### Covering Index

```sql
CREATE INDEX name ON <object>
CREATE INDEX [IF NOT EXISTS] name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ skippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName=multipartIdentifier
: CREATE SKIPPING INDEX (IF NOT EXISTS)?
ON tableName=multipartIdentifier
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand All @@ -51,7 +52,8 @@ coveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName=identifier ON tableName=multipartIdentifier
: CREATE INDEX (IF NOT EXISTS)? indexName=identifier
ON tableName=multipartIdentifier
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand Down
3 changes: 3 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,11 @@ CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,18 @@ class FlintSpark(val spark: SparkSession) {
*
* @param index
* Flint index to create
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex): Unit = {
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
if (flintClient.exists(indexName)) {
throw new IllegalStateException(
s"A table can only have one Flint skipping index: Flint index $indexName is found")
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
flintClient.createIndex(indexName, index.metadata())
}
flintClient.createIndex(indexName, index.metadata())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {

/**
* Create Flint index.
*
* @param ignoreIfExists
* ignore existing index
*/
def create(): Unit = flint.createIndex(buildIndex())
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)

/**
* Build method for concrete builder class to implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C
val colName = indexColCtx.multipartIdentifier().getText
indexBuilder.addIndexColumns(colName)
}
indexBuilder.create()

val ignoreIfExists = ctx.EXISTS() != null
indexBuilder.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
indexBuilder.create()

val ignoreIfExists = ctx.EXISTS() != null
indexBuilder.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create covering index if not exists") {
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
flint.describeIndex(testFlintIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE INDEX $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}

test("drop covering index") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -78,6 +75,28 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index if not exists") {
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
flint.describeIndex(testIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE SKIPPING INDEX
| ON $testTable ( year PARTITION )
| """.stripMargin)
}
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
}

test("describe skipping index") {
flint
.skippingIndex()
Expand Down

0 comments on commit 0884216

Please sign in to comment.