Skip to content

Commit

Permalink
Fix jacoco test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Mar 19, 2024
1 parent 24c37c1 commit 2fd27b8
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
Expand Down Expand Up @@ -117,18 +116,10 @@ private void executeIndexOp(
FlintIndexMetadata indexMetadata) {
switch (indexQueryDetails.getIndexQueryActionType()) {
case DROP:
case VACUUM:
FlintIndexOp dropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
dropOp.apply(indexMetadata);

// For vacuum, continue to delete index data physically
if (indexQueryDetails.getIndexQueryActionType() == IndexQueryActionType.VACUUM) {
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
}
break;
case ALTER:
FlintIndexOpAlter flintIndexOpAlter =
Expand All @@ -140,6 +131,23 @@ private void executeIndexOp(
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
case VACUUM:
// Try to perform drop operation first
FlintIndexOp tryDropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
try {
tryDropOp.apply(indexMetadata);
} catch (IllegalStateException e) {
// Drop failed possibly due to invalid initial state
}

// Continue to delete index data physically if state is DELETED
// which means previous transaction succeeds
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
break;
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.sql.spark.flint.operation;

import java.util.Base64;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
Expand Down Expand Up @@ -41,12 +40,9 @@ FlintIndexState transitioningState() {

@Override
public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) {
// Decode Flint index name from latest ID
String flintIndexName = new String(Base64.getDecoder().decode(flintIndex.getId()));
LOG.info("Vacuuming Flint index {}", flintIndexName);

// Delete OpenSearch index
DeleteIndexRequest request = new DeleteIndexRequest().indices(flintIndexName);
LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName());
DeleteIndexRequest request =
new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName());
AcknowledgedResponse response = client.admin().indices().delete(request).actionGet();
LOG.info("OpenSearch index delete result: {}", response.isAcknowledged());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@

import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX;
import static org.opensearch.sql.spark.flint.FlintIndexState.ACTIVE;
import static org.opensearch.sql.spark.flint.FlintIndexState.CANCELLING;
import static org.opensearch.sql.spark.flint.FlintIndexState.CREATING;
import static org.opensearch.sql.spark.flint.FlintIndexState.DELETED;
import static org.opensearch.sql.spark.flint.FlintIndexState.DELETING;
import static org.opensearch.sql.spark.flint.FlintIndexState.EMPTY;
import static org.opensearch.sql.spark.flint.FlintIndexState.FAILED;
import static org.opensearch.sql.spark.flint.FlintIndexState.REFRESHING;
import static org.opensearch.sql.spark.flint.FlintIndexState.VACUUMING;
import static org.opensearch.sql.spark.flint.FlintIndexType.COVERING;
import static org.opensearch.sql.spark.flint.FlintIndexType.MATERIALIZED_VIEW;
import static org.opensearch.sql.spark.flint.FlintIndexType.SKIPPING;
Expand All @@ -25,7 +24,6 @@
import java.util.List;
import java.util.function.BiConsumer;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.Ignore;
import org.junit.Test;
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.opensearch.action.get.GetRequest;
Expand Down Expand Up @@ -106,12 +104,12 @@ public void shouldNotVacuumIndexInRefreshingStateIfCancelTimeout() {
});
}

@Ignore
public void shouldNotVacuumIndexInFailedState() {
@Test
public void shouldNotVacuumIndexInVacuumingState() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(FAILED),
List.of(VACUUMING),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
() -> {
Expand All @@ -124,18 +122,18 @@ public void shouldNotVacuumIndexInFailedState() {
runVacuumTestSuite(
testCases,
(mockDS, response) -> {
assertEquals("SUCCESS", response.getStatus());
assertEquals("FAILED", response.getStatus());
assertTrue(flintIndexExists(mockDS.indexName));
assertTrue(indexDocExists(mockDS.latestId));
});
}

@Ignore
public void shouldVacuumIndexInCancellingState() {
@Test
public void shouldVacuumIndexWithoutJobRunning() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(CANCELLING),
List.of(EMPTY, CREATING, ACTIVE, DELETED),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
DEFAULT_OP,
Expand All @@ -150,30 +148,6 @@ public void shouldVacuumIndexInCancellingState() {
});
}

@Ignore
public void shouldVacuumIndexWithoutJobRunning() {
List<List<Object>> testCases =
Lists.cartesianProduct(
FLINT_TEST_DATASETS,
List.of(EMPTY, ACTIVE, DELETING, DELETED),
List.of(
Pair.<EMRApiCall, EMRApiCall>of(
() -> {
throw new AssertionError("should not call cancelJobRun");
},
() -> {
throw new AssertionError("should not call getJobRunResult");
})));

runVacuumTestSuite(
testCases,
(mockDS, response) -> {
assertEquals("SUCCESS", response.getStatus());
assertFalse(flintIndexExists(mockDS.indexName));
assertFalse(indexDocExists(mockDS.latestId));
});
}

private void runVacuumTestSuite(
List<List<Object>> testCases,
BiConsumer<FlintDatasetMock, AsyncQueryExecutionResponse> assertion) {
Expand Down

0 comments on commit 2fd27b8

Please sign in to comment.