From 33a5f0a9ae71819f3dc79471a977b6fff86cb32e Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 28 Sep 2023 11:20:01 -0700 Subject: [PATCH] Add IF NOT EXISTS for skipping and covering index with IT (#42) Signed-off-by: Chen Dai --- docs/index.md | 4 ++-- .../main/antlr4/FlintSparkSqlExtensions.g4 | 6 +++-- .../src/main/antlr4/SparkSqlBase.g4 | 3 +++ .../opensearch/flint/spark/FlintSpark.scala | 12 ++++++---- .../flint/spark/FlintSparkIndexBuilder.scala | 6 ++++- .../FlintSparkCoveringIndexAstBuilder.scala | 3 ++- .../FlintSparkSkippingIndexAstBuilder.scala | 3 ++- .../FlintSparkCoveringIndexSqlITSuite.scala | 20 +++++++++++++++++ .../FlintSparkSkippingIndexSqlITSuite.scala | 22 +++++++++++++++++++ 9 files changed, 68 insertions(+), 11 deletions(-) diff --git a/docs/index.md b/docs/index.md index 7b8d7aef1..8f7a79a2e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -121,7 +121,7 @@ High level API is dependent on query engine implementation. Please see Query Eng #### Skipping Index ```sql -CREATE SKIPPING INDEX +CREATE SKIPPING INDEX [IF NOT EXISTS] ON ( column [, ...] ) WHERE @@ -161,7 +161,7 @@ DROP SKIPPING INDEX ON alb_logs #### Covering Index ```sql -CREATE INDEX name ON +CREATE INDEX [IF NOT EXISTS] name ON ( column [, ...] ) WHERE WITH ( options ) diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index 12f69680e..eb2cca410 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -27,7 +27,8 @@ skippingIndexStatement ; createSkippingIndexStatement - : CREATE SKIPPING INDEX ON tableName=multipartIdentifier + : CREATE SKIPPING INDEX (IF NOT EXISTS)? + ON tableName=multipartIdentifier LEFT_PAREN indexColTypeList RIGHT_PAREN (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; @@ -53,7 +54,8 @@ coveringIndexStatement ; createCoveringIndexStatement - : CREATE INDEX indexName=identifier ON tableName=multipartIdentifier + : CREATE INDEX (IF NOT EXISTS)? indexName=identifier + ON tableName=multipartIdentifier LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 4d8b12370..4ac1ced5c 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -158,9 +158,12 @@ CREATE: 'CREATE'; DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; DROP: 'DROP'; +EXISTS: 'EXISTS'; FALSE: 'FALSE'; +IF: 'IF'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +NOT: 'NOT'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index c910f28dc..b3de3c4b6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -76,14 +76,18 @@ class FlintSpark(val spark: SparkSession) { * * @param index * Flint index to create + * @param ignoreIfExists + * Ignore existing index */ - def createIndex(index: FlintSparkIndex): Unit = { + def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { val indexName = index.name() if (flintClient.exists(indexName)) { - throw new IllegalStateException( - s"A table can only have one Flint skipping index: Flint index $indexName is found") + if (!ignoreIfExists) { + throw new IllegalStateException(s"Flint index $indexName already exists") + } + } else { + flintClient.createIndex(indexName, index.metadata()) } - flintClient.createIndex(indexName, index.metadata()) } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala index 95e351f7d..2212826dc 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexBuilder.scala @@ -49,8 +49,12 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) { /** * Create Flint index. + * + * @param ignoreIfExists + * ignore existing index */ - def create(): Unit = flint.createIndex(buildIndex()) + def create(ignoreIfExists: Boolean = false): Unit = + flint.createIndex(buildIndex(), ignoreIfExists) /** * Build method for concrete builder class to implement diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 689dfaefc..65a87c568 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -40,10 +40,11 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A indexBuilder.addIndexColumns(colName) } + val ignoreIfExists = ctx.EXISTS() != null val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder .options(indexOptions) - .create() + .create(ignoreIfExists) // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala index c6cd0ae48..dc8132a25 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/skipping/FlintSparkSkippingIndexAstBuilder.scala @@ -44,10 +44,11 @@ trait FlintSparkSkippingIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A } } + val ignoreIfExists = ctx.EXISTS() != null val indexOptions = visitPropertyList(ctx.propertyList()) indexBuilder .options(indexOptions) - .create() + .create(ignoreIfExists) // Trigger auto refresh if enabled if (indexOptions.autoRefresh()) { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 66d19a261..78ec1619b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -67,6 +67,26 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create covering index if not exists") { + sql(s""" + | CREATE INDEX IF NOT EXISTS $testIndex + | ON $testTable (name, age) + |""".stripMargin) + flint.describeIndex(testFlintIndex) shouldBe defined + + // Expect error without IF NOT EXISTS, otherwise success + assertThrows[IllegalStateException] { + sql(s""" + | CREATE INDEX $testIndex + | ON $testTable (name, age) + |""".stripMargin) + } + sql(s""" + | CREATE INDEX IF NOT EXISTS $testIndex + | ON $testTable (name, age) + |""".stripMargin) + } + test("show all covering index on the source table") { flint .coveringIndex() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 5846cab23..eec9dfca9 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -94,6 +94,28 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { indexData.count() shouldBe 2 } + test("create skipping index if not exists") { + sql(s""" + | CREATE SKIPPING INDEX + | IF NOT EXISTS + | ON $testTable ( year PARTITION ) + | """.stripMargin) + flint.describeIndex(testIndex) shouldBe defined + + // Expect error without IF NOT EXISTS, otherwise success + assertThrows[IllegalStateException] { + sql(s""" + | CREATE SKIPPING INDEX + | ON $testTable ( year PARTITION ) + | """.stripMargin) + } + sql(s""" + | CREATE SKIPPING INDEX + | IF NOT EXISTS + | ON $testTable ( year PARTITION ) + | """.stripMargin) + } + test("describe skipping index") { flint .skippingIndex()