From fda5dadb107c26f86064ce1429047818fb5d41f4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 6 Nov 2023 12:30:25 -0800 Subject: [PATCH] Add recover index job statement (#119) * Add recover command syntax and API Signed-off-by: Chen Dai * Refactor IT with assertion helper Signed-off-by: Chen Dai * Add IT for MV Signed-off-by: Chen Dai * Update user manual and add test for backticks Signed-off-by: Chen Dai * Add more logging and IT on FlintSpark API layer Signed-off-by: Chen Dai * Reformat sql text in IT Signed-off-by: Chen Dai * Add recovering transient state Signed-off-by: Chen Dai * Detect streaming job state in update task Signed-off-by: Chen Dai * Address PR comment Signed-off-by: Chen Dai --------- Signed-off-by: Chen Dai --- docs/index.md | 14 ++ .../metadata/log/FlintMetadataLogEntry.scala | 1 + .../main/antlr4/FlintSparkSqlExtensions.g4 | 9 + .../src/main/antlr4/SparkSqlBase.g4 | 2 + .../opensearch/flint/spark/FlintSpark.scala | 79 +++++++-- .../spark/sql/FlintSparkSqlAstBuilder.scala | 2 + .../job/FlintSparkIndexJobAstBuilder.scala | 26 +++ .../flint/OpenSearchTransactionSuite.scala | 5 + .../spark/FlintSparkIndexJobITSuite.scala | 107 ++++++++++++ .../spark/FlintSparkIndexJobSqlITSuite.scala | 159 ++++++++++++++++++ 10 files changed, 394 insertions(+), 10 deletions(-) create mode 100644 flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala create mode 100644 integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala diff --git a/docs/index.md b/docs/index.md index 05a38dbeb..65dfa9203 100644 --- a/docs/index.md +++ b/docs/index.md @@ -262,6 +262,20 @@ WITH ( ) ``` +### Index Job Management + +Currently Flint index job ID is same as internal Flint index name in [OpenSearch](./index.md#OpenSearch) section below. + +```sql +RECOVER INDEX JOB +``` + +Example: + +```sql +RECOVER INDEX JOB `flint_spark_catalog_default_test_skipping_index` +``` + ## Index Store ### OpenSearch diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala index 8b67ac2ae..1761a142a 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/log/FlintMetadataLogEntry.scala @@ -74,6 +74,7 @@ object FlintMetadataLogEntry { val DELETING: IndexState.Value = Value("deleting") val DELETED: IndexState.Value = Value("deleted") val FAILED: IndexState.Value = Value("failed") + val RECOVERING: IndexState.Value = Value("recovering") val UNKNOWN: IndexState.Value = Value("unknown") def from(s: String): IndexState.Value = { diff --git a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 index e44944fcf..cb2e14144 100644 --- a/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 +++ b/flint-spark-integration/src/main/antlr4/FlintSparkSqlExtensions.g4 @@ -18,6 +18,7 @@ statement : skippingIndexStatement | coveringIndexStatement | materializedViewStatement + | indexJobManagementStatement ; skippingIndexStatement @@ -109,6 +110,14 @@ dropMaterializedViewStatement : DROP MATERIALIZED VIEW mvName=multipartIdentifier ; +indexJobManagementStatement + : recoverIndexJobStatement + ; + +recoverIndexJobStatement + : RECOVER INDEX JOB identifier + ; + /* * Match all remaining tokens in non-greedy way * so WITH clause won't be captured by this rule. diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 597a1e585..fe6fd3c66 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -165,10 +165,12 @@ IF: 'IF'; IN: 'IN'; INDEX: 'INDEX'; INDEXES: 'INDEXES'; +JOB: 'JOB'; MATERIALIZED: 'MATERIALIZED'; NOT: 'NOT'; ON: 'ON'; PARTITION: 'PARTITION'; +RECOVER: 'RECOVER'; REFRESH: 'REFRESH'; SHOW: 'SHOW'; TRUE: 'TRUE'; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 2713f464a..461011ce5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -5,7 +5,7 @@ package org.opensearch.flint.spark -import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} +import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit} import scala.collection.JavaConverters._ @@ -47,7 +47,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer /** Scheduler for updating index state regularly as needed, such as incremental refreshing */ - private val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) + var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1) /** * Data source name. Assign empty string in case of backward compatibility. TODO: remove this in @@ -113,8 +113,10 @@ class FlintSpark(val spark: SparkSession) extends Logging { if (latest == null) { // in case transaction capability is disabled flintClient.createIndex(indexName, metadata) } else { + logInfo(s"Creating index with metadata log entry ID ${latest.id}") flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id))) }) + logInfo("Create index complete") } catch { case e: Exception => logError("Failed to create Flint index", e) @@ -146,9 +148,11 @@ class FlintSpark(val spark: SparkSession) extends Logging { .finalLog(latest => { // Change state to active if full, otherwise update index state regularly if (mode == FULL) { + logInfo("Updating index state to active") latest.copy(state = ACTIVE) } else { // Schedule regular update and return log entry as refreshing state + logInfo("Scheduling index state updater") scheduleIndexStateUpdate(indexName) latest } @@ -229,6 +233,41 @@ class FlintSpark(val spark: SparkSession) extends Logging { throw new IllegalStateException("Failed to delete Flint index") } } else { + logInfo("Flint index to be deleted doesn't exist") + false + } + } + + /** + * Recover index job. + * + * @param indexName + * index name + */ + def recoverIndex(indexName: String): Boolean = { + logInfo(s"Recovering Flint index $indexName") + val index = describeIndex(indexName) + if (index.exists(_.options.autoRefresh())) { + try { + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state)) + .transientLog(latest => latest.copy(state = RECOVERING)) + .finalLog(latest => { + scheduleIndexStateUpdate(indexName) + latest.copy(state = REFRESHING) + }) + .commit(_ => doRefreshIndex(index.get, indexName, INCREMENTAL)) + + logInfo("Recovery complete") + true + } catch { + case e: Exception => + logError("Failed to recover Flint index", e) + throw new IllegalStateException("Failed to recover Flint index") + } + } else { + logInfo("Index to be recovered either doesn't exist or not auto refreshed") false } } @@ -249,15 +288,28 @@ class FlintSpark(val spark: SparkSession) extends Logging { spark.streams.active.exists(_.name == indexName) private def scheduleIndexStateUpdate(indexName: String): Unit = { - executor.scheduleAtFixedRate( + var task: ScheduledFuture[_] = null // avoid forward reference compile error at task.cancel() + task = executor.scheduleAtFixedRate( () => { - logInfo("Scheduler triggers index log entry update") + logInfo(s"Scheduler triggers index log entry update for $indexName") try { - flintClient - .startTransaction(indexName, dataSourceName) - .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest) // timestamp will update automatically - .commit(latest => logInfo("Updating log entry to " + latest)) + if (isIncrementalRefreshing(indexName)) { + logInfo("Streaming job is still active") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(latest => latest.state == REFRESHING) + .finalLog(latest => latest) // timestamp will update automatically + .commit(_ => {}) + } else { + logError("Streaming job is not active. Cancelling update task") + flintClient + .startTransaction(indexName, dataSourceName) + .initialLog(_ => true) + .finalLog(latest => latest.copy(state = FAILED)) + .commit(_ => {}) + task.cancel(true) + logInfo("Update task is cancelled") + } } catch { case e: Exception => logError("Failed to update index log entry", e) @@ -274,6 +326,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { index: FlintSparkIndex, indexName: String, mode: RefreshMode): Option[String] = { + logInfo(s"Refreshing index $indexName in $mode mode") val options = index.options val tableName = index.metadata().source @@ -288,17 +341,19 @@ class FlintSpark(val spark: SparkSession) extends Logging { .save(indexName) } - mode match { + val jobId = mode match { case FULL if isIncrementalRefreshing(indexName) => throw new IllegalStateException( s"Index $indexName is incremental refreshing and cannot be manual refreshed") case FULL => + logInfo("Start refreshing index in batch style") batchRefresh() None // Flint index has specialized logic and capability for incremental refresh case INCREMENTAL if index.isInstanceOf[StreamingRefresh] => + logInfo("Start refreshing index in streaming style") val job = index .asInstanceOf[StreamingRefresh] @@ -313,6 +368,7 @@ class FlintSpark(val spark: SparkSession) extends Logging { // Otherwise, fall back to foreachBatch + batch refresh case INCREMENTAL => + logInfo("Start refreshing index in foreach streaming style") val job = spark.readStream .options(options.extraSourceOptions(tableName)) .table(tableName) @@ -325,6 +381,9 @@ class FlintSpark(val spark: SparkSession) extends Logging { .start() Some(job.id.toString) } + + logInfo("Refresh index complete") + jobId } private def stopRefreshingJob(indexName: String): Unit = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 606cb88eb..011bb37fe 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -9,6 +9,7 @@ import org.antlr.v4.runtime.ParserRuleContext import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder +import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder @@ -25,6 +26,7 @@ class FlintSparkSqlAstBuilder with FlintSparkSkippingIndexAstBuilder with FlintSparkCoveringIndexAstBuilder with FlintSparkMaterializedViewAstBuilder + with FlintSparkIndexJobAstBuilder with SparkSqlAstBuilder { override def visit(tree: ParseTree): LogicalPlan = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala new file mode 100644 index 000000000..87dd429e5 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/job/FlintSparkIndexJobAstBuilder.scala @@ -0,0 +1,26 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.job + +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.RecoverIndexJobStatementContext + +import org.apache.spark.sql.catalyst.plans.logical.Command + +/** + * Flint Spark AST builder that builds Spark command for Flint index job management statement. + */ +trait FlintSparkIndexJobAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => + + override def visitRecoverIndexJobStatement(ctx: RecoverIndexJobStatementContext): Command = { + FlintSparkSqlCommand() { flint => + val jobId = ctx.identifier().getText + flint.recoverIndex(jobId) + Seq.empty + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index 1e7077799..35b6688e5 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint import java.util.Collections +import java.util.concurrent.ScheduledExecutorService import scala.collection.JavaConverters.mapAsScalaMapConverter @@ -20,6 +21,7 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.IndexState import org.opensearch.flint.core.storage.FlintOpenSearchClient._ import org.opensearch.flint.spark.FlintSparkSuite +import org.scalatestplus.mockito.MockitoSugar.mock /** * Transaction test base suite that creates the metadata log index which enables transaction @@ -33,6 +35,9 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() spark.conf.set("spark.flint.datasource.name", testDataSourceName) + + // Replace executor to avoid impact on IT + flint.executor = mock[ScheduledExecutorService] } override def beforeEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala new file mode 100644 index 000000000..27693c63b --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -0,0 +1,107 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.util.Base64 + +import scala.collection.JavaConverters.mapAsJavaMapConverter + +import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName +import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} +import org.scalatest.matchers.should.Matchers + +class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers { + + /** Test table and index name */ + private val testTable = "spark_catalog.default.test" + private val testIndex = getSkippingIndexName(testTable) + private val latestId = Base64.getEncoder.encodeToString(testIndex.getBytes) + + override def beforeAll(): Unit = { + super.beforeAll() + createPartitionedTable(testTable) + } + + override def afterEach(): Unit = { + super.afterEach() // must clean up metadata log first and then delete + flint.deleteIndex(testIndex) + } + + test("recover should exit if index doesn't exist") { + flint.recoverIndex("non_exist_index") shouldBe false + } + + test("recover should exit if index is not auto refreshed") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .create() + flint.recoverIndex(testIndex) shouldBe false + } + + test("recover should succeed if index exists and is auto refreshed") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + flint.recoverIndex(testIndex) shouldBe true + spark.streams.active.exists(_.name == testIndex) + latestLogEntry(latestId) should contain("state" -> "refreshing") + } + + Seq(EMPTY, CREATING, DELETING, DELETED, RECOVERING, UNKNOWN).foreach { state => + test(s"recover should fail if index is in $state state") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + updateLatestLogEntry( + new FlintMetadataLogEntry( + latestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(latestId).asJava), + state) + + assertThrows[IllegalStateException] { + flint.recoverIndex(testIndex) shouldBe true + } + } + } + + Seq(ACTIVE, REFRESHING, FAILED).foreach { state => + test(s"recover should succeed if index is in $state state") { + flint + .skippingIndex() + .onTable(testTable) + .addPartitions("year") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) + .create() + + updateLatestLogEntry( + new FlintMetadataLogEntry( + latestId, + UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + latestLogEntry(latestId).asJava), + state) + + flint.recoverIndex(testIndex) shouldBe true + spark.streams.active.exists(_.name == testIndex) + latestLogEntry(latestId) should contain("state" -> "refreshing") + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala new file mode 100644 index 000000000..ddbfeeb16 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import java.io.File + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.scalatest.matchers.should.Matchers + +import org.apache.spark.sql.Row + +/** + * This suite doesn't enable transaction to avoid side effect of scheduled update task. + */ +class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { + + private val testTable = "spark_catalog.default.index_job_test" + private val testSkippingIndex = FlintSparkSkippingIndex.getSkippingIndexName(testTable) + + /** Covering index names */ + private val testIndex = "test_ci" + private val testCoveringIndex = FlintSparkCoveringIndex.getFlintIndexName(testIndex, testTable) + + /** Materialized view names and query */ + private val testMv = "spark_catalog.default.mv_test" + private val testMvQuery = s"SELECT name, age FROM $testTable" + private val testMvIndex = FlintSparkMaterializedView.getFlintIndexName(testMv) + + test("recover skipping index refresh job") { + withFlintIndex(testSkippingIndex) { assertion => + assertion + .run { checkpointDir => + s""" CREATE SKIPPING INDEX ON $testTable + | (name VALUE_SET) + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin + } + .assertIndexData(indexData => indexData should have size 5) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB $testSkippingIndex") + .assertIndexData(indexData => indexData should have size 6) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 06:00:00', 'G', 40, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB `$testSkippingIndex`") // test backtick name + .assertIndexData(indexData => indexData should have size 7) + + } + } + + test("recover covering index refresh job") { + withFlintIndex(testCoveringIndex) { assertion => + assertion + .run { checkpointDir => + s""" CREATE INDEX $testIndex ON $testTable + | (time, name) + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin + } + .assertIndexData(indexData => indexData should have size 5) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB $testCoveringIndex") + .assertIndexData(indexData => indexData should have size 6) + } + } + + test("recover materialized view refresh job") { + withFlintIndex(testMvIndex) { assertion => + assertion + .run { checkpointDir => + s""" CREATE MATERIALIZED VIEW $testMv + | AS + | $testMvQuery + | WITH ( + | auto_refresh = true, + | checkpoint_location = '${checkpointDir.getAbsolutePath}' + | ) + |""".stripMargin + } + .assertIndexData(indexData => indexData should have size 5) + .stopStreamingJob() + .run(s""" + | INSERT INTO $testTable VALUES + | (TIMESTAMP '2023-10-01 05:00:00', 'F', 35, 'Vancouver') + |""".stripMargin) + .run(s"RECOVER INDEX JOB $testMvIndex") + .assertIndexData(indexData => indexData should have size 6) + } + } + + private def withFlintIndex(flintIndexName: String)(test: AssertionHelper => Unit): Unit = { + withTable(testTable) { + createTimeSeriesTable(testTable) + + withTempDir { checkpointDir => + try { + test(new AssertionHelper(flintIndexName, checkpointDir)) + } finally { + flint.deleteIndex(flintIndexName) + } + } + } + } + + /** + * Recover test assertion helper that de-duplicates test code. + */ + private class AssertionHelper(flintIndexName: String, checkpointDir: File) { + + def run(createIndex: File => String): AssertionHelper = { + sql(createIndex(checkpointDir)) + this + } + + def run(sqlText: String): AssertionHelper = { + sql(sqlText) + this + } + + def assertIndexData(assertion: Array[Row] => Unit): AssertionHelper = { + awaitStreamingComplete(findJobId(flintIndexName)) + assertion(flint.queryIndex(flintIndexName).collect()) + this + } + + def stopStreamingJob(): AssertionHelper = { + spark.streams.get(findJobId(flintIndexName)).stop() + this + } + + private def findJobId(indexName: String): String = { + val job = spark.streams.active.find(_.name == indexName) + job + .map(_.id.toString) + .getOrElse(throw new RuntimeException(s"Streaming job not found for index $indexName")) + } + } +}