diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index aac0fb44e..303084970 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -46,6 +46,8 @@ dropSkippingIndexStatement coveringIndexStatement : createCoveringIndexStatement + | refreshCoveringIndexStatement + | dropCoveringIndexStatement ; createCoveringIndexStatement @@ -54,6 +56,14 @@ createCoveringIndexStatement (WITH LEFT_PAREN propertyList RIGHT_PAREN)? ; +refreshCoveringIndexStatement + : REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier + ; + +dropCoveringIndexStatement + : DROP INDEX indexName=identifier ON tableName=multipartIdentifier + ; + indexColTypeList : indexColType (COMMA indexColType)* ; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala index 35ccf7dc9..4e15c26a2 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/covering/FlintSparkCoveringIndexAstBuilder.scala @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor} import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled} -import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.CreateCoveringIndexStatementContext +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext} import org.apache.spark.sql.catalyst.plans.logical.Command @@ -43,4 +43,28 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C Seq.empty } } + + override def visitRefreshCoveringIndexStatement( + ctx: RefreshCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = ctx.indexName.getText + val tableName = ctx.tableName.getText + val flintIndexName = getFlintIndexName(indexName, tableName) + + flint.refreshIndex(flintIndexName, RefreshMode.FULL) + Seq.empty + } + } + + override def visitDropCoveringIndexStatement( + ctx: DropCoveringIndexStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val indexName = ctx.indexName.getText + val tableName = getFullTableName(flint, ctx.tableName) + val flintIndexName = getFlintIndexName(indexName, tableName) + + flint.deleteIndex(flintIndexName) + Seq.empty + } + } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index f21dcffa7..04b3ed0c8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import scala.Option.empty + import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper @@ -46,4 +48,32 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { val indexData = flint.queryIndex(testFlintIndex) indexData.count() shouldBe 2 } + + test("create covering index with manual refresh") { + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + |""".stripMargin) + + val indexData = flint.queryIndex(testFlintIndex) + + flint.describeIndex(testFlintIndex) shouldBe defined + indexData.count() shouldBe 0 + + sql(s"REFRESH INDEX $testIndex ON $testTable") + indexData.count() shouldBe 2 + } + + test("drop covering index") { + flint + .coveringIndex() + .name(testIndex) + .onTable(testTable) + .addIndexColumns("name", "age") + .create() + + sql(s"DROP INDEX $testIndex ON $testTable") + + flint.describeIndex(testFlintIndex) shouldBe empty + } }