Skip to content

Commit

Permalink
Add refresh and drop statement support
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 18, 2023
1 parent afe9d3e commit ad7f353
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 1 deletion.
10 changes: 10 additions & 0 deletions flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ dropSkippingIndexStatement

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
Expand All @@ -54,6 +56,14 @@ createCoveringIndexStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier
;

dropCoveringIndexStatement
: DROP INDEX indexName=identifier ON tableName=multipartIdentifier
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSpark.RefreshMode
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor}
import org.opensearch.flint.spark.sql.FlintSparkSqlAstBuilder.{getFullTableName, isAutoRefreshEnabled}
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.CreateCoveringIndexStatementContext
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.{CreateCoveringIndexStatementContext, DropCoveringIndexStatementContext, RefreshCoveringIndexStatementContext}

import org.apache.spark.sql.catalyst.plans.logical.Command

Expand Down Expand Up @@ -43,4 +43,28 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[C
Seq.empty
}
}

override def visitRefreshCoveringIndexStatement(
ctx: RefreshCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = ctx.indexName.getText
val tableName = ctx.tableName.getText
val flintIndexName = getFlintIndexName(indexName, tableName)

flint.refreshIndex(flintIndexName, RefreshMode.FULL)
Seq.empty
}
}

override def visitDropCoveringIndexStatement(
ctx: DropCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = ctx.indexName.getText
val tableName = getFullTableName(flint, ctx.tableName)
val flintIndexName = getFlintIndexName(indexName, tableName)

flint.deleteIndex(flintIndexName)
Seq.empty
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import scala.Option.empty

import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
Expand Down Expand Up @@ -46,4 +48,32 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
val indexData = flint.queryIndex(testFlintIndex)
indexData.count() shouldBe 2
}

test("create covering index with manual refresh") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
|""".stripMargin)

val indexData = flint.queryIndex(testFlintIndex)

flint.describeIndex(testFlintIndex) shouldBe defined
indexData.count() shouldBe 0

sql(s"REFRESH INDEX $testIndex ON $testTable")
indexData.count() shouldBe 2
}

test("drop covering index") {
flint
.coveringIndex()
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.create()

sql(s"DROP INDEX $testIndex ON $testTable")

flint.describeIndex(testFlintIndex) shouldBe empty
}
}

0 comments on commit ad7f353

Please sign in to comment.