From d36c7e85658a543af047905347a469b7047d0dcc Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 31 Oct 2023 14:05:42 -0700 Subject: [PATCH] Add IT for MV Signed-off-by: Chen Dai --- .../spark/FlintSparkIndexJobSqlITSuite.scala | 50 ++++++++++++++----- .../flint/spark/FlintSparkSuite.scala | 11 ---- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala index e4d05e61c..b32bb9f5f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -42,7 +42,8 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { } .assertIndexData(indexData => indexData should have size 5) .stopStreamingJob() - .run(s""" INSERT INTO $testTable VALUES + .run(s""" + | INSERT INTO $testTable VALUES | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') |""".stripMargin) .run(s"RECOVER INDEX JOB $testSkippingIndex") @@ -65,7 +66,8 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { } .assertIndexData(indexData => indexData should have size 5) .stopStreamingJob() - .run(s""" INSERT INTO $testTable VALUES + .run(s""" + | INSERT INTO $testTable VALUES | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') |""".stripMargin) .run(s"RECOVER INDEX JOB $testCoveringIndex") @@ -73,6 +75,30 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { } } + test("recover materialized view refresh job") { + withFlintIndex(testMvIndex) { assertion => + assertion + .run { checkpointDir => + s""" CREATE MATERIALIZED VIEW $testMv + | AS + | $testMvQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin + } + .assertIndexData(indexData => indexData should have size 5) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB $testMvIndex") + .assertIndexData(indexData => indexData should have size 6) + } + } + private def withFlintIndex(flintIndexName: String)(test: AssertionHelper => Unit): Unit = { withTable(testTable) { createTimeSeriesTable(testTable) @@ -87,6 +113,9 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { } } + /** + * Recover test assertion helper that de-duplicates test code. + */ private class AssertionHelper(flintIndexName: String, checkpointDir: File) { def run(createIndex: File => String): AssertionHelper = { @@ -109,17 +138,12 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { spark.streams.get(findJobId(flintIndexName)).stop() this } - } - private def startMaterializedViewIndexJob(checkpointDir: File): Unit = { - sql(s""" - | CREATE MATERIALIZED VIEW $testMv - | AS - | $testMvQuery - | WITH ( - | auto_refresh = true, - | checkpoint_location = '${checkpointDir.getAbsolutePath}' - | ) - |""".stripMargin) + private def findJobId(indexName: String): String = { + val job = spark.streams.active.find(_.name == indexName) + job + .map(_.id.toString) + .getOrElse(throw new RuntimeException(s"Streaming job not found for index $indexName")) + } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index d15371ed6..168279eb3 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -31,13 +31,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit setFlintSparkConf(CHECKPOINT_MANDATORY, "false") } - protected def findJobId(indexName: String): String = { - val job = spark.streams.active.find(_.name == indexName) - job - .map(_.id.toString) - .getOrElse(throw new RuntimeException(s"Streaming job not found for index $indexName")) - } - protected def awaitStreamingComplete(jobId: String): Unit = { val job = spark.streams.get(jobId) failAfter(streamingTimeout) { @@ -45,10 +38,6 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit } } - protected def stopStreamingJob(jobId: String): Unit = { - spark.streams.get(jobId).stop() - } - protected def createPartitionedTable(testTable: String): Unit = { sql(s""" | CREATE TABLE $testTable