Skip to content

Commit

Permalink
Add more IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 21, 2023
1 parent fdccca7 commit ed6ba7f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ class FlintSpark(val spark: SparkSession) {
*/
def deleteIndex(indexName: String): Boolean = {
if (flintClient.exists(indexName)) {
flintClient.deleteIndex(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
} else {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils.string
trait SparkSqlAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] {

override def visitPropertyList(ctx: PropertyListContext): FlintSparkIndexOptions = {
val properties = ctx.property.asScala.map { property =>
val key = visitPropertyKey(property.key)
val value = visitPropertyValue(property.value)
key -> value
if (ctx == null) {
FlintSparkIndexOptions.empty
} else {
val properties = ctx.property.asScala.map { property =>
val key = visitPropertyKey(property.key)
val value = visitPropertyValue(property.value)
key -> value
}
FlintSparkIndexOptions(properties.toMap)
}
FlintSparkIndexOptions(properties.toMap)
}

override def visitPropertyKey(key: PropertyKeyContext): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ package org.opensearch.flint.spark

import scala.Option.empty

import org.opensearch.flint.OpenSearchSuite
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -59,6 +55,26 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create skipping index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (
| auto_refresh = true,
| refresh_interval = '5 Seconds',
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
| """.stripMargin)

val index = flint.describeIndex(testIndex)
index shouldBe defined
index.get.options.autoRefresh() shouldBe true
index.get.options.refreshInterval() shouldBe Some("5 Seconds")
index.get.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)
}
}

test("create skipping index with manual refresh") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand Down

0 comments on commit ed6ba7f

Please sign in to comment.