Skip to content

Commit

Permalink
Add integ test
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Jul 11, 2024
1 parent 9f6a0de commit 4358e83
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode._
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.SchedulerMode
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.recommendations.DataTypeSkippingStrategy
Expand Down Expand Up @@ -140,7 +141,8 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
// Change state to active if full, otherwise update index state regularly
if (indexRefresh.refreshMode == AUTO) {
if (indexRefresh.refreshMode == AUTO && SchedulerMode.INTERNAL == index.options
.schedulerMode()) {
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
// Checkpoint location is required if mandatory option set or external scheduler is used
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
val checkpointLocation = options.checkpointLocation()
if (flintSparkConf.isCheckpointMandatory || SchedulerMode.EXTERNAL ==
options.schedulerMode()) {
if (SchedulerMode.EXTERNAL == options.schedulerMode()) {
require(
checkpointLocation.isDefined,
"Checkpoint location is required for external scheduler")
}
if (flintSparkConf.isCheckpointMandatory) {
require(
checkpointLocation.isDefined,
s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
refresh
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,27 @@ class FlintSparkIndexValidationITSuite extends FlintSparkSuite with SparkHiveSup
}
}

Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement)
.foreach { statement =>
test(
s"should fail to create auto refresh Flint index if scheduler_mode is external and no checkpoint location: $statement") {
withTable(testTable) {
sql(s"CREATE TABLE $testTable (name STRING) USING JSON")

the[IllegalArgumentException] thrownBy {
sql(s"""
| $statement
| WITH (
| auto_refresh = true,
| scheduler_mode = 'external'
| )
|""".stripMargin)
} should have message
"requirement failed: Checkpoint location is required for external scheduler"
}
}
}

Seq(createSkippingIndexStatement, createCoveringIndexStatement, createMaterializedViewStatement)
.foreach { statement =>
test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| "auto_refresh": "true",
| "incremental_refresh": "false",
| "checkpoint_location": "${checkpointDir.getAbsolutePath}",
| "watermark_delay": "30 Seconds"
| "watermark_delay": "30 Seconds",
| "scheduler_mode":"internal"
| },
| "latestId": "$testLatestId",
| "properties": {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,36 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite {
}
}

test("create materialized view with auto refresh and external scheduler") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE MATERIALIZED VIEW $testMvName
| AS $testQuery
| WITH (
| auto_refresh = true,
| scheduler_mode = 'external',
| checkpoint_location = '${checkpointDir.getAbsolutePath}',
| watermark_delay = '1 Second'
| )
| """.stripMargin)

// Refresh all present source data as of now
sql(s"REFRESH MATERIALIZED VIEW $testMvName")
flint.queryIndex(testFlintIndex).count() shouldBe 3

// New data won't be refreshed until refresh statement triggered
sql(s"""
| INSERT INTO $testTable VALUES
| (TIMESTAMP '2023-10-01 04:00:00', 'F', 25, 'Vancouver')
| """.stripMargin)
flint.queryIndex(testFlintIndex).count() shouldBe 3

// New data is refreshed incrementally
sql(s"REFRESH MATERIALIZED VIEW $testMvName")
flint.queryIndex(testFlintIndex).count() shouldBe 4
}
}

test("create materialized view with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,39 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
indexData should have size 2
}

test("auto refresh skipping index successfully with external scheduler") {
withTempDir { checkpointDir =>
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"scheduler_mode" -> "external",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))
.create()

flint.refreshIndex(testIndex) shouldBe empty
flint.queryIndex(testIndex).collect().toSet should have size 2

// Delete all index data intentionally and generate a new source file
openSearchClient.deleteByQuery(
new DeleteByQueryRequest(testIndex).setQuery(QueryBuilders.matchAllQuery()),
RequestOptions.DEFAULT)
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=4)
| VALUES ('Hello', 35, 'Vancouver')
| """.stripMargin)

// Expect to only refresh the new file
flint.refreshIndex(testIndex) shouldBe empty
flint.queryIndex(testIndex).collect().toSet should have size 1
}
}

test("update skipping index successfully") {
// Create full refresh Flint index
flint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,35 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite with ExplainSuit
indexData.count() shouldBe 2
}

test("create skipping index with auto refresh and external scheduler") {
withTempDir { checkpointDir =>
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (
| auto_refresh = true,
| scheduler_mode = 'external',
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
| """.stripMargin)

// Refresh all present source data as of now
sql(s"REFRESH SKIPPING INDEX ON $testTable")
flint.queryIndex(testIndex).count() shouldBe 2

// New data won't be refreshed until refresh statement triggered
sql(s"""
| INSERT INTO $testTable
| PARTITION (year=2023, month=5)
| VALUES ('Hello', 50, 'Vancouver')
|""".stripMargin)
flint.queryIndex(testIndex).count() shouldBe 2

sql(s"REFRESH SKIPPING INDEX ON $testTable")
flint.queryIndex(testIndex).count() shouldBe 3
}
}

test("create skipping index with max size value set") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand Down

0 comments on commit 4358e83

Please sign in to comment.