diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index af6e463231..d1ebf21e24 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -25,7 +25,6 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; -import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.execution.statement.StatementState; import org.opensearch.sql.spark.execution.statestore.StateStore; @@ -117,18 +116,10 @@ private void executeIndexOp( FlintIndexMetadata indexMetadata) { switch (indexQueryDetails.getIndexQueryActionType()) { case DROP: - case VACUUM: FlintIndexOp dropOp = new FlintIndexOpDrop( stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); dropOp.apply(indexMetadata); - - // For vacuum, continue to delete index data physically - if (indexQueryDetails.getIndexQueryActionType() == IndexQueryActionType.VACUUM) { - FlintIndexOp indexVacuumOp = - new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client); - indexVacuumOp.apply(indexMetadata); - } break; case ALTER: FlintIndexOpAlter flintIndexOpAlter = @@ -140,6 +131,23 @@ private void executeIndexOp( flintIndexMetadataService); flintIndexOpAlter.apply(indexMetadata); break; + case VACUUM: + // Try to perform drop operation first + FlintIndexOp tryDropOp = + new FlintIndexOpDrop( + stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); + try { + tryDropOp.apply(indexMetadata); + } catch (IllegalStateException e) { + // Drop failed possibly due to invalid initial state + } + + // Continue to delete index data physically if state is DELETED + // which means previous transaction succeeds + FlintIndexOp indexVacuumOp = + new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client); + indexVacuumOp.apply(indexMetadata); + break; default: throw new IllegalStateException( String.format( diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java index 06221dc6de..cf204450e7 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -5,7 +5,6 @@ package org.opensearch.sql.spark.flint.operation; -import java.util.Base64; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; @@ -41,12 +40,9 @@ FlintIndexState transitioningState() { @Override public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) { - // Decode Flint index name from latest ID - String flintIndexName = new String(Base64.getDecoder().decode(flintIndex.getId())); - LOG.info("Vacuuming Flint index {}", flintIndexName); - - // Delete OpenSearch index - DeleteIndexRequest request = new DeleteIndexRequest().indices(flintIndexName); + LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName()); + DeleteIndexRequest request = + new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName()); AcknowledgedResponse response = client.admin().indices().delete(request).actionGet(); LOG.info("OpenSearch index delete result: {}", response.isAcknowledged()); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java index 4691b35332..67c89c791c 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -7,12 +7,11 @@ import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; import static org.opensearch.sql.spark.flint.FlintIndexState.ACTIVE; -import static org.opensearch.sql.spark.flint.FlintIndexState.CANCELLING; +import static org.opensearch.sql.spark.flint.FlintIndexState.CREATING; import static org.opensearch.sql.spark.flint.FlintIndexState.DELETED; -import static org.opensearch.sql.spark.flint.FlintIndexState.DELETING; import static org.opensearch.sql.spark.flint.FlintIndexState.EMPTY; -import static org.opensearch.sql.spark.flint.FlintIndexState.FAILED; import static org.opensearch.sql.spark.flint.FlintIndexState.REFRESHING; +import static org.opensearch.sql.spark.flint.FlintIndexState.VACUUMING; import static org.opensearch.sql.spark.flint.FlintIndexType.COVERING; import static org.opensearch.sql.spark.flint.FlintIndexType.MATERIALIZED_VIEW; import static org.opensearch.sql.spark.flint.FlintIndexType.SKIPPING; @@ -25,7 +24,6 @@ import java.util.List; import java.util.function.BiConsumer; import org.apache.commons.lang3.tuple.Pair; -import org.junit.Ignore; import org.junit.Test; import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.opensearch.action.get.GetRequest; @@ -106,12 +104,12 @@ public void shouldNotVacuumIndexInRefreshingStateIfCancelTimeout() { }); } - @Ignore - public void shouldNotVacuumIndexInFailedState() { + @Test + public void shouldNotVacuumIndexInVacuumingState() { List> testCases = Lists.cartesianProduct( FLINT_TEST_DATASETS, - List.of(FAILED), + List.of(VACUUMING), List.of( Pair.of( () -> { @@ -124,18 +122,18 @@ public void shouldNotVacuumIndexInFailedState() { runVacuumTestSuite( testCases, (mockDS, response) -> { - assertEquals("SUCCESS", response.getStatus()); + assertEquals("FAILED", response.getStatus()); assertTrue(flintIndexExists(mockDS.indexName)); assertTrue(indexDocExists(mockDS.latestId)); }); } - @Ignore - public void shouldVacuumIndexInCancellingState() { + @Test + public void shouldVacuumIndexWithoutJobRunning() { List> testCases = Lists.cartesianProduct( FLINT_TEST_DATASETS, - List.of(CANCELLING), + List.of(EMPTY, CREATING, ACTIVE, DELETED), List.of( Pair.of( DEFAULT_OP, @@ -150,30 +148,6 @@ public void shouldVacuumIndexInCancellingState() { }); } - @Ignore - public void shouldVacuumIndexWithoutJobRunning() { - List> testCases = - Lists.cartesianProduct( - FLINT_TEST_DATASETS, - List.of(EMPTY, ACTIVE, DELETING, DELETED), - List.of( - Pair.of( - () -> { - throw new AssertionError("should not call cancelJobRun"); - }, - () -> { - throw new AssertionError("should not call getJobRunResult"); - }))); - - runVacuumTestSuite( - testCases, - (mockDS, response) -> { - assertEquals("SUCCESS", response.getStatus()); - assertFalse(flintIndexExists(mockDS.indexName)); - assertFalse(indexDocExists(mockDS.latestId)); - }); - } - private void runVacuumTestSuite( List> testCases, BiConsumer assertion) {