Skip to content

Commit

Permalink
Add IT for MV
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 31, 2023
1 parent 9be54b3 commit d36c7e8
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
}
.assertIndexData(indexData => indexData should have size 5)
.stopStreamingJob()
.run(s""" INSERT INTO $testTable VALUES
.run(s"""
| INSERT INTO $testTable VALUES
| (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver')
|""".stripMargin)
.run(s"RECOVER INDEX JOB $testSkippingIndex")
Expand All @@ -65,14 +66,39 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
}
.assertIndexData(indexData => indexData should have size 5)
.stopStreamingJob()
.run(s""" INSERT INTO $testTable VALUES
.run(s"""
| INSERT INTO $testTable VALUES
| (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver')
|""".stripMargin)
.run(s"RECOVER INDEX JOB $testCoveringIndex")
.assertIndexData(indexData => indexData should have size 6)
}
}

test("recover materialized view refresh job") {
withFlintIndex(testMvIndex) { assertion =>
assertion
.run { checkpointDir =>
s""" CREATE MATERIALIZED VIEW $testMv
| AS
| $testMvQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin
}
.assertIndexData(indexData => indexData should have size 5)
.stopStreamingJob()
.run(s"""
| INSERT INTO $testTable VALUES
| (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver')
|""".stripMargin)
.run(s"RECOVER INDEX JOB $testMvIndex")
.assertIndexData(indexData => indexData should have size 6)
}
}

private def withFlintIndex(flintIndexName: String)(test: AssertionHelper => Unit): Unit = {
withTable(testTable) {
createTimeSeriesTable(testTable)
Expand All @@ -87,6 +113,9 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
}
}

/**
* Recover test assertion helper that de-duplicates test code.
*/
private class AssertionHelper(flintIndexName: String, checkpointDir: File) {

def run(createIndex: File => String): AssertionHelper = {
Expand All @@ -109,17 +138,12 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers {
spark.streams.get(findJobId(flintIndexName)).stop()
this
}
}

private def startMaterializedViewIndexJob(checkpointDir: File): Unit = {
sql(s"""
| CREATE MATERIALIZED VIEW $testMv
| AS
| $testMvQuery
| WITH (
| auto_refresh = true,
| checkpoint_location = '${checkpointDir.getAbsolutePath}'
| )
|""".stripMargin)
private def findJobId(indexName: String): String = {
val job = spark.streams.active.find(_.name == indexName)
job
.map(_.id.toString)
.getOrElse(throw new RuntimeException(s"Streaming job not found for index $indexName"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,13 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
setFlintSparkConf(CHECKPOINT_MANDATORY, "false")
}

protected def findJobId(indexName: String): String = {
val job = spark.streams.active.find(_.name == indexName)
job
.map(_.id.toString)
.getOrElse(throw new RuntimeException(s"Streaming job not found for index $indexName"))
}

protected def awaitStreamingComplete(jobId: String): Unit = {
val job = spark.streams.get(jobId)
failAfter(streamingTimeout) {
job.processAllAvailable()
}
}

protected def stopStreamingJob(jobId: String): Unit = {
spark.streams.get(jobId).stop()
}

protected def createPartitionedTable(testTable: String): Unit = {
sql(s"""
| CREATE TABLE $testTable
Expand Down

0 comments on commit d36c7e8

Please sign in to comment.