From 9cabe09095bcef6da20ef1063e09b7ace699fca4 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 18 Jan 2024 11:09:15 -0800 Subject: [PATCH] Fix broken IT and update user manual Signed-off-by: Chen Dai --- docs/index.md | 4 ---- .../opensearch/flint/spark/FlintSpark.scala | 7 ------- .../FlintSparkCoveringIndexITSuite.scala | 1 + .../FlintSparkSkippingIndexITSuite.scala | 19 +------------------ 4 files changed, 2 insertions(+), 29 deletions(-) diff --git a/docs/index.md b/docs/index.md index 88c2bc5e6..1e89a6aa1 100644 --- a/docs/index.md +++ b/docs/index.md @@ -515,10 +515,6 @@ CREATE INDEX Idx_elb ON alb_logs ... For now, only single or conjunct conditions (conditions connected by AND) in WHERE clause can be optimized by skipping index. -### Index Refresh Job Management - -Manual refreshing a table which already has skipping index being auto-refreshed, will be prevented. However, this assumption relies on the condition that the incremental refresh job is actively running in the same Spark cluster, which can be identified when performing the check. - ## Integration ### AWS EMR Spark Integration - Using execution role diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index cbcd517de..aa9dae660 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -320,9 +320,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { spark.read.format(FLINT_DATASOURCE).load(indexName) } - private def isIncrementalRefreshing(indexName: String): Boolean = - spark.streams.active.exists(_.name == indexName) - // TODO: move to separate class private def doRefreshIndex( index: FlintSparkIndex, @@ -344,10 +341,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { } val jobId = mode match { - case MANUAL if isIncrementalRefreshing(indexName) => - throw new IllegalStateException( - s"Index $indexName is incremental refreshing and cannot be manual refreshed") - case MANUAL => logInfo("Start refreshing index in batch style") batchRefresh() diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 0d745bbb2..a177a9d1d 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -96,6 +96,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .name(testIndex) .onTable(testTable) .addIndexColumns("name", "age") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() val jobId = flint.refreshIndex(testFlintIndex) diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 9e9bf6d86..a3bdb11f2 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -173,6 +173,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { .skippingIndex() .onTable(testTable) .addPartitions("year", "month") + .options(FlintSparkIndexOptions(Map("auto_refresh" -> "true"))) .create() val jobId = flint.refreshIndex(testIndex) @@ -187,24 +188,6 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { indexData should have size 2 } - test("should fail to manual refresh an incremental refreshing index") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() - - val jobId = flint.refreshIndex(testIndex) - val job = spark.streams.get(jobId.get) - failAfter(streamingTimeout) { - job.processAllAvailable() - } - - assertThrows[IllegalStateException] { - flint.refreshIndex(testIndex) - } - } - test("can have only 1 skipping index on a table") { flint .skippingIndex()