diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 122fea601..82096e322 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -226,7 +226,6 @@ class FlintSpark(val spark: SparkSession) extends Logging { // TODO: share same transaction for now flintIndexMonitor.stopMonitor(indexName) stopRefreshingJob(indexName) - flintClient.deleteIndex(indexName) true }) } catch { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index c79069b9b..5eb208f14 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -31,7 +31,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { super.afterEach() // Delete all test indices - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create covering index with metadata successfully") { @@ -126,6 +126,6 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { .onTable(testTable) .addIndexColumns("address") .create() - flint.deleteIndex(getFlintIndexName(newIndex, testTable)) + deleteTestIndex(getFlintIndexName(newIndex, testTable)) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 27419b616..3ad5f0d0f 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -15,7 +15,7 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.{defined, have} +import org.scalatest.matchers.must.Matchers.defined import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row @@ -38,7 +38,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { super.afterEach() // Delete all test indices - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create covering index with auto refresh") { @@ -252,8 +252,7 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { val result = sql(s"SHOW INDEX ON $testTable") checkAnswer(result, Seq(Row(testIndex), Row("idx_address"))) - flint.deleteIndex(getFlintIndexName("idx_address", testTable)) - flint.deleteIndex(getSkippingIndexName(testTable)) + deleteTestIndex(getFlintIndexName("idx_address", testTable), getSkippingIndexName(testTable)) } test("describe covering index") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala index 8df2bc472..98ce7b9b6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobITSuite.scala @@ -29,17 +29,7 @@ class FlintSparkIndexJobITSuite extends OpenSearchTransactionSuite with Matchers } override def afterEach(): Unit = { - - /** - * Todo, if state is not valid, will throw IllegalStateException. Should check flint - * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if - * failed, delete index itself. - */ - try { - flint.deleteIndex(testIndex) - } catch { - case _: IllegalStateException => deleteIndex(testIndex) - } + deleteTestIndex(testIndex) super.afterEach() } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala index ddbfeeb16..d9588d281 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexJobSqlITSuite.scala @@ -117,7 +117,7 @@ class FlintSparkIndexJobSqlITSuite extends FlintSparkSuite with Matchers { try { test(new AssertionHelper(flintIndexName, checkpointDir)) } finally { - flint.deleteIndex(flintIndexName) + deleteTestIndex(flintIndexName) } } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 4af147939..78bf35850 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -65,13 +65,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc FlintSparkIndexMonitor.indexMonitorTracker.clear() try { - flint.deleteIndex(testFlintIndex) - } catch { - // Index maybe end up with failed state in some test - case _: IllegalStateException => - openSearchClient - .indices() - .delete(new DeleteIndexRequest(testFlintIndex), RequestOptions.DEFAULT) + deleteTestIndex(testFlintIndex) } finally { super.afterEach() } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 1b16a9e16..ad1a2dbeb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -38,7 +38,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create materialized view with metadata successfully") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala index f9bd3967a..f8cd6eb0e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewSqlITSuite.scala @@ -44,7 +44,7 @@ class FlintSparkMaterializedViewSqlITSuite extends FlintSparkSuite { override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testFlintIndex) + deleteTestIndex(testFlintIndex) } test("create materialized view with auto refresh") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index e3fb467e6..c4d4282e8 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -38,7 +38,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { super.afterEach() // Delete all test indices - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } test("create skipping index with metadata successfully") { @@ -581,7 +581,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | } |""".stripMargin) - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } test("can build skipping index for varchar and char and rewrite applicable query") { @@ -627,7 +627,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { useFlintSparkSkippingFileIndex(hasIndexFilter( col("varchar_col") === "sample varchar" && col("char_col") === paddedChar)) - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } // Custom matcher to check if a SparkPlan uses FlintSparkSkippingFileIndex diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index 21de15de7..a862fd162 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -36,7 +36,7 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { protected override def afterEach(): Unit = { super.afterEach() - flint.deleteIndex(testIndex) + deleteTestIndex(testIndex) } test("create skipping index with auto refresh") { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 29b8b95a6..2e47117a7 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -12,6 +12,9 @@ import scala.concurrent.duration.TimeUnit import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.when import org.mockito.invocation.InvocationOnMock +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.client.RequestOptions +import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchSuite import org.scalatestplus.mockito.MockitoSugar.mock @@ -46,6 +49,29 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit FlintSparkIndexMonitor.executor = mockExecutor } + protected def deleteTestIndex(testIndexNames: String*): Unit = { + testIndexNames.foreach(testIndex => { + /** + * Todo, if state is not valid, will throw IllegalStateException. Should check flint + * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) + * if failed, delete index itself. + */ + try { + flint.deleteIndex(testIndex) + flint.vacuumIndex(testIndex) + } catch { + case _: IllegalStateException => + if (openSearchClient + .indices() + .exists(new GetIndexRequest(testIndex), RequestOptions.DEFAULT)) { + openSearchClient + .indices() + .delete(new DeleteIndexRequest(testIndex), RequestOptions.DEFAULT) + } + } + }) + } + protected def awaitStreamingComplete(jobId: String): Unit = { val job = spark.streams.get(jobId) failAfter(streamingTimeout) { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala index 643a35516..48740cad4 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkTransactionITSuite.scala @@ -14,8 +14,6 @@ import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.client.indices.GetIndexRequest import org.opensearch.flint.OpenSearchTransactionSuite -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry -import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.DELETED import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL} import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName import org.scalatest.matchers.should.Matchers @@ -39,11 +37,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match * .isRefresh before cleanup resource. Current solution, (1) try to delete flint index, (2) if * failed, delete index itself. */ - try { - flint.deleteIndex(testFlintIndex) - } catch { - case _: IllegalStateException => deleteIndex(testFlintIndex) - } + deleteTestIndex(testFlintIndex) super.afterEach() } @@ -108,27 +102,19 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match latest("jobStartTime").asInstanceOf[Number].longValue() should be > prevStartTime } - test("delete index") { + test("delete and vacuum index") { flint .skippingIndex() .onTable(testTable) .addPartitions("year", "month") .create() - flint.deleteIndex(testFlintIndex) + // Logical delete index + flint.deleteIndex(testFlintIndex) latestLogEntry(testLatestId) should contain("state" -> "deleted") - } - - test("vacuum index") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() - deleteLogically(testLatestId) - flint.vacuumIndex(testFlintIndex) // Both index data and metadata log should be vacuumed + flint.vacuumIndex(testFlintIndex) openSearchClient .indices() .exists(new GetIndexRequest(testFlintIndex), RequestOptions.DEFAULT) shouldBe false @@ -137,25 +123,6 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match RequestOptions.DEFAULT) shouldBe false } - test("should recreate index if logical deleted") { - flint - .skippingIndex() - .onTable(testTable) - .addPartitions("year", "month") - .create() - - // Simulate that user deletes index data manually - flint.deleteIndex(testFlintIndex) - latestLogEntry(testLatestId) should contain("state" -> "deleted") - - // Simulate that user recreate the index - flint - .skippingIndex() - .onTable(testTable) - .addValueSet("name") - .create() - } - test("should not recreate index if index data still exists") { flint .skippingIndex() @@ -164,7 +131,7 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() // Simulate that PPL plugin leaves index data as logical deleted - deleteLogically(testLatestId) + flint.deleteIndex(testFlintIndex) latestLogEntry(testLatestId) should contain("state" -> "deleted") // Simulate that user recreate the index but forgot to cleanup index data @@ -176,16 +143,4 @@ class FlintSparkTransactionITSuite extends OpenSearchTransactionSuite with Match .create() } should have message s"Flint index $testFlintIndex already exists" } - - private def deleteLogically(latestId: String): Unit = { - val response = openSearchClient - .get(new GetRequest(testMetaLogIndex, latestId), RequestOptions.DEFAULT) - - val latest = new FlintMetadataLogEntry( - latestId, - response.getSeqNo, - response.getPrimaryTerm, - response.getSourceAsMap) - updateLatestLogEntry(latest, DELETED) - } }