Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support IF NOT EXISTS in create statement #42

Merged
merged 4 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ High level API is dependent on query engine implementation. Please see Query Eng
#### Skipping Index

```sql
CREATE SKIPPING INDEX
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
( column <index_type> [, ...] )
WHERE <filter_predicate>
Expand Down Expand Up @@ -161,7 +161,7 @@ DROP SKIPPING INDEX ON alb_logs
#### Covering Index

```sql
CREATE INDEX name ON <object>
CREATE INDEX [IF NOT EXISTS] name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ skippingIndexStatement
;

createSkippingIndexStatement
: CREATE SKIPPING INDEX ON tableName=multipartIdentifier
: CREATE SKIPPING INDEX (IF NOT EXISTS)?
ON tableName=multipartIdentifier
LEFT_PAREN indexColTypeList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand All @@ -53,7 +54,8 @@ coveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName=identifier ON tableName=multipartIdentifier
: CREATE INDEX (IF NOT EXISTS)? indexName=identifier
ON tableName=multipartIdentifier
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand Down
3 changes: 3 additions & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ CREATE: 'CREATE';
DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
FALSE: 'FALSE';
IF: 'IF';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
NOT: 'NOT';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,18 @@ class FlintSpark(val spark: SparkSession) {
*
* @param index
* Flint index to create
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex): Unit = {
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
if (flintClient.exists(indexName)) {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved
throw new IllegalStateException(
s"A table can only have one Flint skipping index: Flint index $indexName is found")
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
flintClient.createIndex(indexName, index.metadata())
}
flintClient.createIndex(indexName, index.metadata())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {

/**
* Create Flint index.
*
* @param ignoreIfExists
* ignore existing index
*/
def create(): Unit = flint.createIndex(buildIndex())
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)

/**
* Build method for concrete builder class to implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C
val colName = indexColCtx.multipartIdentifier().getText
indexBuilder.addIndexColumns(colName)
}
indexBuilder.create()

val ignoreIfExists = ctx.EXISTS() != null
indexBuilder.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C
case MIN_MAX => indexBuilder.addMinMax(colName)
}
}
indexBuilder.create()

val ignoreIfExists = ctx.EXISTS() != null
indexBuilder.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (isAutoRefreshEnabled(ctx.propertyList())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,26 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create covering index if not exists") {
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
flint.describeIndex(testFlintIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE INDEX $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}
sql(s"""
| CREATE INDEX IF NOT EXISTS $testIndex
| ON $testTable (name, age)
|""".stripMargin)
}

test("show all covering index on the source table") {
flint
.coveringIndex()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIn
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -78,6 +75,28 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index if not exists") {
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
flint.describeIndex(testIndex) shouldBe defined

// Expect error without IF NOT EXISTS, otherwise success
assertThrows[IllegalStateException] {
sql(s"""
| CREATE SKIPPING INDEX
| ON $testTable ( year PARTITION )
| """.stripMargin)
}
sql(s"""
| CREATE SKIPPING INDEX
| IF NOT EXISTS
| ON $testTable ( year PARTITION )
| """.stripMargin)
}

test("describe skipping index") {
flint
.skippingIndex()
Expand Down
Loading