Skip to content

Commit

Permalink
Add IT
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jul 19, 2024
1 parent dbb2d87 commit aa0a0e9
Showing 1 changed file with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@

package org.opensearch.flint.spark

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.client.RequestOptions
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.scalatest.matchers.should.Matchers

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col

class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
class FlintSparkIndexSqlITSuite extends FlintSparkSuite with Matchers {

private val testTableName = "index_test"
private val testTableQualifiedName = s"spark_catalog.default.$testTableName"
Expand Down Expand Up @@ -99,6 +106,42 @@ class FlintSparkIndexSqlITSuite extends FlintSparkSuite {
FlintSparkMaterializedView.getFlintIndexName("spark_catalog.other.mv2"))
}

test("show flint indexes with extended information") {
// Create and refresh with all existing data
flint
.skippingIndex()
.onTable(testTableQualifiedName)
.addValueSet("name")
.options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true")))
.create()
flint.refreshIndex(testSkippingFlintIndex)
val activeJob = spark.streams.active.find(_.name == testSkippingFlintIndex)
awaitStreamingComplete(activeJob.get.id.toString)

// Trigger next micro batch after 5 seconds with index readonly
Future {
Thread.sleep(5000)
openSearchClient
.indices()
.putSettings(
new UpdateSettingsRequest(testSkippingFlintIndex).settings(
Map("index.blocks.write" -> true).asJava),
RequestOptions.DEFAULT)
sql(
s"INSERT INTO $testTableQualifiedName VALUES (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')")
}

// Await and store exception as expected
flint.flintIndexMonitor.awaitMonitor(Some(testSkippingFlintIndex))

// Assert output contains error message
val df = sql("SHOW FLINT INDEX EXTENDED IN spark_catalog")
df.columns should contain("error")
df.select(col("error")).collect().head.getString(0) should include("OpenSearchException")

deleteTestIndex(testSkippingFlintIndex)
}

test("should return empty when show flint index in empty database") {
checkAnswer(sql(s"SHOW FLINT INDEX IN spark_catalog.default"), Seq.empty)
}
Expand Down

0 comments on commit aa0a0e9

Please sign in to comment.