From b2403ca4fa1bbba2a1ac8827b6f7aeefe48f9f32 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Mon, 24 Jun 2024 08:50:57 -0700 Subject: [PATCH] Abstract FlintIndex client (#2755) * Abstract FlintIndex client Signed-off-by: Tomoyuki Morita * Fix log Signed-off-by: Tomoyuki Morita * Fix test function name Signed-off-by: Tomoyuki Morita --------- Signed-off-by: Tomoyuki Morita --- .../sql/spark/flint/FlintIndexClient.java | 11 ++ .../flint/operation/FlintIndexOpFactory.java | 6 +- .../flint/operation/FlintIndexOpVacuum.java | 15 +- .../operation/FlintIndexOpFactoryTest.java | 51 ++++++ .../operation/FlintIndexOpVacuumTest.java | 164 ++++++++++++++++++ .../flint/OpenSearchFlintIndexClient.java | 27 +++ .../config/AsyncExecutorServiceModule.java | 14 +- .../AsyncQueryExecutorServiceSpec.java | 11 +- 8 files changed, 282 insertions(+), 17 deletions(-) create mode 100644 async-query-core/src/main/java/org/opensearch/sql/spark/flint/FlintIndexClient.java create mode 100644 async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java create mode 100644 async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexClient.java diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/FlintIndexClient.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/FlintIndexClient.java new file mode 100644 index 0000000000..af1a23d8d1 --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/FlintIndexClient.java @@ -0,0 +1,11 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +/** Interface to abstract access to the FlintIndex */ +public interface FlintIndexClient { + void deleteIndex(String indexName); +} diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java index b102e43d59..14cf9fa7c9 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java @@ -6,16 +6,16 @@ package org.opensearch.sql.spark.flint.operation; import lombok.RequiredArgsConstructor; -import org.opensearch.client.Client; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; @RequiredArgsConstructor public class FlintIndexOpFactory { private final FlintIndexStateModelService flintIndexStateModelService; - private final Client client; + private final FlintIndexClient flintIndexClient; private final FlintIndexMetadataService flintIndexMetadataService; private final EMRServerlessClientFactory emrServerlessClientFactory; @@ -35,7 +35,7 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da public FlintIndexOpVacuum getVacuum(String datasource) { return new FlintIndexOpVacuum( - flintIndexStateModelService, datasource, client, emrServerlessClientFactory); + flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory); } public FlintIndexOpCancel getCancel(String datasource) { diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java index ffd09e16a4..a0ef955adf 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -7,10 +7,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; -import org.opensearch.action.support.master.AcknowledgedResponse; -import org.opensearch.client.Client; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; @@ -22,15 +20,15 @@ public class FlintIndexOpVacuum extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); /** OpenSearch client. */ - private final Client client; + private final FlintIndexClient flintIndexClient; public FlintIndexOpVacuum( FlintIndexStateModelService flintIndexStateModelService, String datasourceName, - Client client, + FlintIndexClient flintIndexClient, EMRServerlessClientFactory emrServerlessClientFactory) { super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); - this.client = client; + this.flintIndexClient = flintIndexClient; } @Override @@ -46,10 +44,7 @@ FlintIndexState transitioningState() { @Override public void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintIndex) { LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName()); - DeleteIndexRequest request = - new DeleteIndexRequest().indices(flintIndexMetadata.getOpensearchIndexName()); - AcknowledgedResponse response = client.admin().indices().delete(request).actionGet(); - LOG.info("OpenSearch index delete result: {}", response.isAcknowledged()); + flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName()); } @Override diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java new file mode 100644 index 0000000000..3bf438aeb9 --- /dev/null +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.flint.FlintIndexClient; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; + +@ExtendWith(MockitoExtension.class) +class FlintIndexOpFactoryTest { + public static final String DATASOURCE_NAME = "DATASOURCE_NAME"; + + @Mock private FlintIndexStateModelService flintIndexStateModelService; + @Mock private FlintIndexClient flintIndexClient; + @Mock private FlintIndexMetadataService flintIndexMetadataService; + @Mock private EMRServerlessClientFactory emrServerlessClientFactory; + + @InjectMocks FlintIndexOpFactory flintIndexOpFactory; + + @Test + void getDrop() { + assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME)); + } + + @Test + void getAlter() { + assertNotNull(flintIndexOpFactory.getAlter(new FlintIndexOptions(), DATASOURCE_NAME)); + } + + @Test + void getVacuum() { + assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME)); + } + + @Test + void getCancel() { + assertNotNull(flintIndexOpFactory.getDrop(DATASOURCE_NAME)); + } +} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java new file mode 100644 index 0000000000..60fa13dc93 --- /dev/null +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java @@ -0,0 +1,164 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.flint.FlintIndexClient; +import org.opensearch.sql.spark.flint.FlintIndexMetadata; +import org.opensearch.sql.spark.flint.FlintIndexState; +import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; + +@ExtendWith(MockitoExtension.class) +class FlintIndexOpVacuumTest { + + public static final String DATASOURCE_NAME = "DATASOURCE_NAME"; + public static final String LATEST_ID = "LATEST_ID"; + public static final String INDEX_NAME = "INDEX_NAME"; + public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID = + FlintIndexMetadata.builder().latestId(LATEST_ID).opensearchIndexName(INDEX_NAME).build(); + public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID = + FlintIndexMetadata.builder().opensearchIndexName(INDEX_NAME).build(); + @Mock FlintIndexClient flintIndexClient; + @Mock FlintIndexStateModelService flintIndexStateModelService; + @Mock EMRServerlessClientFactory emrServerlessClientFactory; + @Mock FlintIndexStateModel flintIndexStateModel; + @Mock FlintIndexStateModel transitionedFlintIndexStateModel; + + RuntimeException testException = new RuntimeException("Test Exception"); + + FlintIndexOpVacuum flintIndexOpVacuum; + + @BeforeEach + public void setUp() { + flintIndexOpVacuum = + new FlintIndexOpVacuum( + flintIndexStateModelService, + DATASOURCE_NAME, + flintIndexClient, + emrServerlessClientFactory); + } + + @Test + public void testApplyWithEmptyLatestId() { + flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITHOUT_LATEST_ID); + + verify(flintIndexClient).deleteIndex(INDEX_NAME); + } + + @Test + public void testApplyWithFlintIndexStateNotFound() { + when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenReturn(Optional.empty()); + + assertThrows( + IllegalStateException.class, + () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID)); + } + + @Test + public void testApplyWithNotDeletedState() { + when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenReturn(Optional.of(flintIndexStateModel)); + when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.ACTIVE); + + assertThrows( + IllegalStateException.class, + () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID)); + } + + @Test + public void testApplyWithUpdateFlintIndexStateThrow() { + when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenReturn(Optional.of(flintIndexStateModel)); + when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); + when(flintIndexStateModelService.updateFlintIndexState( + flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME)) + .thenThrow(testException); + + assertThrows( + IllegalStateException.class, + () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID)); + } + + @Test + public void testApplyWithRunOpThrow() { + when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenReturn(Optional.of(flintIndexStateModel)); + when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); + when(flintIndexStateModelService.updateFlintIndexState( + flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME)) + .thenReturn(transitionedFlintIndexStateModel); + doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME); + + assertThrows( + Exception.class, () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID)); + + verify(flintIndexStateModelService) + .updateFlintIndexState( + transitionedFlintIndexStateModel, FlintIndexState.DELETED, DATASOURCE_NAME); + } + + @Test + public void testApplyWithRunOpThrowAndRollbackThrow() { + when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenReturn(Optional.of(flintIndexStateModel)); + when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); + when(flintIndexStateModelService.updateFlintIndexState( + flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME)) + .thenReturn(transitionedFlintIndexStateModel); + doThrow(testException).when(flintIndexClient).deleteIndex(INDEX_NAME); + when(flintIndexStateModelService.updateFlintIndexState( + transitionedFlintIndexStateModel, FlintIndexState.DELETED, DATASOURCE_NAME)) + .thenThrow(testException); + + assertThrows( + Exception.class, () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID)); + } + + @Test + public void testApplyWithDeleteFlintIndexStateModelThrow() { + when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenReturn(Optional.of(flintIndexStateModel)); + when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); + when(flintIndexStateModelService.updateFlintIndexState( + flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME)) + .thenReturn(transitionedFlintIndexStateModel); + when(flintIndexStateModelService.deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenThrow(testException); + + assertThrows( + IllegalStateException.class, + () -> flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID)); + } + + @Test + public void testApplyHappyPath() { + when(flintIndexStateModelService.getFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME)) + .thenReturn(Optional.of(flintIndexStateModel)); + when(flintIndexStateModel.getIndexState()).thenReturn(FlintIndexState.DELETED); + when(flintIndexStateModelService.updateFlintIndexState( + flintIndexStateModel, FlintIndexState.VACUUMING, DATASOURCE_NAME)) + .thenReturn(transitionedFlintIndexStateModel); + when(transitionedFlintIndexStateModel.getLatestId()).thenReturn(LATEST_ID); + + flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITH_LATEST_ID); + + verify(flintIndexStateModelService).deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME); + verify(flintIndexClient).deleteIndex(INDEX_NAME); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexClient.java b/async-query/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexClient.java new file mode 100644 index 0000000000..7a655f0678 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexClient.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import lombok.RequiredArgsConstructor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; + +@RequiredArgsConstructor +public class OpenSearchFlintIndexClient implements FlintIndexClient { + private static final Logger LOG = LogManager.getLogger(); + + private final Client client; + + @Override + public void deleteIndex(String indexName) { + DeleteIndexRequest request = new DeleteIndexRequest().indices(indexName); + AcknowledgedResponse response = client.admin().indices().delete(request).actionGet(); + LOG.info("OpenSearch index delete result: {}", response.isAcknowledged()); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 7287dc0201..d75b6616f7 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -42,9 +42,11 @@ import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; +import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.OpenSearchFlintIndexClient; import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService; import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; @@ -124,11 +126,19 @@ public QueryHandlerFactory queryhandlerFactory( @Provides public FlintIndexOpFactory flintIndexOpFactory( FlintIndexStateModelService flintIndexStateModelService, - NodeClient client, + FlintIndexClient flintIndexClient, FlintIndexMetadataServiceImpl flintIndexMetadataService, EMRServerlessClientFactory emrServerlessClientFactory) { return new FlintIndexOpFactory( - flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory); + flintIndexStateModelService, + flintIndexClient, + flintIndexMetadataService, + emrServerlessClientFactory); + } + + @Provides + public FlintIndexClient flintIndexClient(NodeClient nodeClient) { + return new OpenSearchFlintIndexClient(nodeClient); } @Provides diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index f69a3ff44e..a5935db2c9 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -77,10 +77,12 @@ import org.opensearch.sql.spark.execution.xcontent.FlintIndexStateModelXContentSerializer; import org.opensearch.sql.spark.execution.xcontent.SessionModelXContentSerializer; import org.opensearch.sql.spark.execution.xcontent.StatementModelXContentSerializer; +import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.flint.OpenSearchFlintIndexClient; import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService; import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; @@ -100,6 +102,7 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { protected org.opensearch.sql.common.setting.Settings pluginSettings; protected SessionConfigSupplier sessionConfigSupplier; protected NodeClient client; + protected FlintIndexClient flintIndexClient; protected DataSourceServiceImpl dataSourceService; protected ClusterSettings clusterSettings; protected FlintIndexMetadataService flintIndexMetadataService; @@ -142,6 +145,7 @@ public void setup() { .putList(DATASOURCE_URI_HOSTS_DENY_LIST.getKey(), Collections.emptyList()) .build()) .get(); + flintIndexClient = new OpenSearchFlintIndexClient(client); dataSourceService = createDataSourceService(); DataSourceMetadata dm = new DataSourceMetadata.Builder() @@ -191,7 +195,10 @@ public void setup() { protected FlintIndexOpFactory getFlintIndexOpFactory( EMRServerlessClientFactory emrServerlessClientFactory) { return new FlintIndexOpFactory( - flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory); + flintIndexStateModelService, + flintIndexClient, + flintIndexMetadataService, + emrServerlessClientFactory); } @After @@ -260,7 +267,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( new OpenSearchIndexDMLResultStorageService(dataSourceService, stateStore), new FlintIndexOpFactory( flintIndexStateModelService, - client, + flintIndexClient, new FlintIndexMetadataServiceImpl(client), emrServerlessClientFactory), emrServerlessClientFactory,