diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index d1ebf21e24..412db50e85 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -132,18 +132,6 @@ private void executeIndexOp( flintIndexOpAlter.apply(indexMetadata); break; case VACUUM: - // Try to perform drop operation first - FlintIndexOp tryDropOp = - new FlintIndexOpDrop( - stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); - try { - tryDropOp.apply(indexMetadata); - } catch (IllegalStateException e) { - // Drop failed possibly due to invalid initial state - } - - // Continue to delete index data physically if state is DELETED - // which means previous transaction succeeds FlintIndexOp indexVacuumOp = new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client); indexVacuumOp.apply(indexMetadata); diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java index ee201b5151..76adddf89d 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -19,7 +19,6 @@ import com.amazonaws.services.emrserverless.model.CancelJobRunResult; import com.amazonaws.services.emrserverless.model.GetJobRunResult; import com.amazonaws.services.emrserverless.model.JobRun; -import com.amazonaws.services.emrserverless.model.ValidationException; import com.google.common.collect.Lists; import java.util.Base64; import java.util.List; @@ -27,6 +26,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.junit.Test; import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.get.GetRequest; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse; import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; @@ -63,22 +63,15 @@ public class IndexQuerySpecVacuumTest extends AsyncQueryExecutorServiceSpec { .isSpecialCharacter(true)); @Test - public void shouldVacuumIndexInRefreshingState() { + public void shouldVacuumIndexInDeletedState() { List> testCases = Lists.cartesianProduct( FLINT_TEST_DATASETS, - List.of(REFRESHING), + List.of(DELETED), List.of( - // Happy case that there is job running Pair.of( DEFAULT_OP, - () -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))), - // Cancel EMR-S job, but not job running - Pair.of( - () -> { - throw new ValidationException("Job run is not in a cancellable state"); - }, - DEFAULT_OP))); + () -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))))); runVacuumTestSuite( testCases, @@ -90,32 +83,11 @@ public void shouldVacuumIndexInRefreshingState() { } @Test - public void shouldNotVacuumIndexInRefreshingStateIfCancelTimeout() { - List> testCases = - Lists.cartesianProduct( - FLINT_TEST_DATASETS, - List.of(REFRESHING), - List.of( - Pair.of( - DEFAULT_OP, - () -> new GetJobRunResult().withJobRun(new JobRun().withState("Running"))))); - - runVacuumTestSuite( - testCases, - (mockDS, response) -> { - assertEquals("FAILED", response.getStatus()); - assertEquals("Cancel job operation timed out.", response.getError()); - assertTrue(indexExists(mockDS.indexName)); - assertTrue(indexDocExists(mockDS.latestId)); - }); - } - - @Test - public void shouldNotVacuumIndexInVacuumingState() { + public void shouldNotVacuumIndexInOtherStates() { List> testCases = Lists.cartesianProduct( FLINT_TEST_DATASETS, - List.of(VACUUMING), + List.of(EMPTY, CREATING, ACTIVE, REFRESHING, VACUUMING), List.of( Pair.of( () -> { @@ -134,39 +106,29 @@ public void shouldNotVacuumIndexInVacuumingState() { }); } - @Test - public void shouldVacuumIndexWithoutJobRunning() { - List> testCases = - Lists.cartesianProduct( - FLINT_TEST_DATASETS, - List.of(EMPTY, CREATING, ACTIVE, DELETED), - List.of( - Pair.of( - DEFAULT_OP, - () -> new GetJobRunResult().withJobRun(new JobRun().withState("Cancelled"))))); - - runVacuumTestSuite( - testCases, - (mockDS, response) -> { - assertEquals("SUCCESS", response.getStatus()); - assertFalse(flintIndexExists(mockDS.indexName)); - assertFalse(indexDocExists(mockDS.latestId)); - }); - } - private void runVacuumTestSuite( List> testCases, BiConsumer assertion) { testCases.forEach( params -> { FlintDatasetMock mockDS = (FlintDatasetMock) params.get(0); - FlintIndexState state = (FlintIndexState) params.get(1); - EMRApiCall cancelJobRun = ((Pair) params.get(2)).getLeft(); - EMRApiCall getJobRunResult = ((Pair) params.get(2)).getRight(); - - AsyncQueryExecutionResponse response = - runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult); - assertion.accept(mockDS, response); + try { + FlintIndexState state = (FlintIndexState) params.get(1); + EMRApiCall cancelJobRun = ((Pair) params.get(2)).getLeft(); + EMRApiCall getJobRunResult = ((Pair) params.get(2)).getRight(); + + AsyncQueryExecutionResponse response = + runVacuumTest(mockDS, state, cancelJobRun, getJobRunResult); + assertion.accept(mockDS, response); + } finally { + // Clean up because we simulate parameterized test in single unit test method + if (flintIndexExists(mockDS.indexName)) { + mockDS.deleteIndex(); + } + if (indexDocExists(mockDS.latestId)) { + deleteIndexDoc(mockDS.latestId); + } + } }); } @@ -229,6 +191,10 @@ private boolean indexDocExists(String docId) { .isExists(); } + private void deleteIndexDoc(String docId) { + client.delete(new DeleteRequest(DATASOURCE_TO_REQUEST_INDEX.apply("mys3"), docId)).actionGet(); + } + private FlintDatasetMock mockDataset(String query, FlintIndexType indexType, String indexName) { FlintDatasetMock dataset = new FlintDatasetMock(query, "", indexType, indexName); dataset.latestId(Base64.getEncoder().encodeToString(indexName.getBytes()));