From 83d3687e20f41d9a45f7ea99dacc9155f24650c1 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 3 Sep 2024 16:45:23 -0700 Subject: [PATCH] Add more UTs Signed-off-by: Louis Chu --- .../asyncquery/AsyncQueryCoreIntegTest.java | 110 +++++++++++++++++- .../operation/FlintIndexOpVacuumTest.java | 61 ++++++++-- 2 files changed, 160 insertions(+), 11 deletions(-) diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index f2ff4bafec..ff5cc8df04 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -208,6 +208,31 @@ public void createDropIndexQuery() { verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); } + @Test + public void createDropIndexQueryWithScheduler() { + givenSparkExecutionEngineConfigIsSupplied(); + givenValidDataSourceMetadataExist(); + when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); + + String indexName = "flint_datasource_name_table_name_index_name_index"; + givenFlintIndexMetadataExistsWithExternalScheduler(indexName); + + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "DROP INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL), + asyncQueryRequestContext); + + assertEquals(QUERY_ID, response.getQueryId()); + assertNull(response.getSessionId()); + verifyGetQueryIdCalled(); + verifyCreateIndexDMLResultCalled(); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + + // Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler + verify(asyncQueryScheduler).unscheduleJob(indexName); + } + @Test public void createVacuumIndexQuery() { givenSparkExecutionEngineConfigIsSupplied(); @@ -230,6 +255,34 @@ public void createVacuumIndexQuery() { verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); } + @Test + public void createVacuumIndexQueryWithScheduler() { + givenSparkExecutionEngineConfigIsSupplied(); + givenValidDataSourceMetadataExist(); + when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); + + String indexName = "flint_datasource_name_table_name_index_name_index"; + givenFlintIndexMetadataExistsWithExternalScheduler(indexName); + + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "VACUUM INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL), + asyncQueryRequestContext); + + assertEquals(QUERY_ID, response.getQueryId()); + assertNull(response.getSessionId()); + verifyGetQueryIdCalled(); + + // Verifying that the index is deleted + verify(flintIndexClient).deleteIndex(indexName); + verifyCreateIndexDMLResultCalled(); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + + // Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler + verify(asyncQueryScheduler).removeJob(indexName); + } + @Test public void createAlterIndexQuery() { givenSparkExecutionEngineConfigIsSupplied(); @@ -261,6 +314,41 @@ public void createAlterIndexQuery() { verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); } + @Test + public void createAlterIndexQueryWithScheduler() { + givenSparkExecutionEngineConfigIsSupplied(); + givenValidDataSourceMetadataExist(); + when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); + + String indexName = "flint_datasource_name_table_name_index_name_index"; + givenFlintIndexMetadataExistsWithExternalScheduler(indexName); + + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "ALTER INDEX index_name ON table_name WITH (auto_refresh = false)", + DATASOURCE_NAME, + LangType.SQL), + asyncQueryRequestContext); + + assertEquals(QUERY_ID, response.getQueryId()); + assertNull(response.getSessionId()); + verifyGetQueryIdCalled(); + + verify(flintIndexMetadataService) + .updateIndexToManualRefresh( + eq(indexName), flintIndexOptionsArgumentCaptor.capture(), eq(asyncQueryRequestContext)); + + FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue(); + assertFalse(flintIndexOptions.autoRefresh()); + + // Verifying that unscheduleJob is called on asyncQueryScheduler for external scheduler + verify(asyncQueryScheduler).unscheduleJob(indexName); + + verifyCreateIndexDMLResultCalled(); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + } + @Test public void createStreamingQuery() { givenSparkExecutionEngineConfigIsSupplied(); @@ -510,7 +598,8 @@ private void givenSparkExecutionEngineConfigIsSupplied() { .build()); } - private void givenFlintIndexMetadataExists(String indexName) { + private void givenFlintIndexMetadataExists( + String indexName, FlintIndexOptions flintIndexOptions) { when(flintIndexMetadataService.getFlintIndexMetadata(indexName, asyncQueryRequestContext)) .thenReturn( ImmutableMap.of( @@ -519,10 +608,27 @@ private void givenFlintIndexMetadataExists(String indexName) { .appId(APPLICATION_ID) .jobId(JOB_ID) .opensearchIndexName(indexName) - .flintIndexOptions(new FlintIndexOptions()) + .flintIndexOptions(flintIndexOptions) .build())); } + // Overload method for default FlintIndexOptions usage + private void givenFlintIndexMetadataExists(String indexName) { + givenFlintIndexMetadataExists(indexName, new FlintIndexOptions()); + } + + // Method to set up FlintIndexMetadata with external scheduler + private void givenFlintIndexMetadataExistsWithExternalScheduler(String indexName) { + givenFlintIndexMetadataExists(indexName, createExternalSchedulerFlintIndexOptions()); + } + + // Helper method for creating FlintIndexOptions with external scheduler + private FlintIndexOptions createExternalSchedulerFlintIndexOptions() { + FlintIndexOptions options = new FlintIndexOptions(); + options.setOption(FlintIndexOptions.SCHEDULER_MODE, "external"); + return options; + } + private void givenValidDataSourceMetadataExist() { when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( DATASOURCE_NAME, asyncQueryRequestContext)) diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java index e73913aa7c..e736b0e93a 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,17 +33,13 @@ class FlintIndexOpVacuumTest { public static final String DATASOURCE_NAME = "DATASOURCE_NAME"; public static final String LATEST_ID = "LATEST_ID"; public static final String INDEX_NAME = "INDEX_NAME"; + public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID = - FlintIndexMetadata.builder() - .latestId(LATEST_ID) - .opensearchIndexName(INDEX_NAME) - .flintIndexOptions(new FlintIndexOptions()) - .build(); + createFlintIndexMetadataWithLatestId(); + public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID = - FlintIndexMetadata.builder() - .opensearchIndexName(INDEX_NAME) - .flintIndexOptions(new FlintIndexOptions()) - .build(); + createFlintIndexMetadataWithoutLatestId(); + @Mock FlintIndexClient flintIndexClient; @Mock FlintIndexStateModelService flintIndexStateModelService; @Mock EMRServerlessClientFactory emrServerlessClientFactory; @@ -66,6 +63,34 @@ public void setUp() { asyncQueryScheduler); } + // Helper method to create FlintIndexMetadata with latest ID + private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() { + return FlintIndexMetadata.builder() + .latestId(LATEST_ID) + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(new FlintIndexOptions()) + .build(); + } + + // Helper method to create FlintIndexMetadata without latest ID + private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() { + return FlintIndexMetadata.builder() + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(new FlintIndexOptions()) + .build(); + } + + // Helper method to create FlintIndexMetadata with external scheduler + private FlintIndexMetadata createFlintIndexMetadataWithExternalScheduler() { + FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); + flintIndexOptions.setOption(FlintIndexOptions.SCHEDULER_MODE, "external"); + + return FlintIndexMetadata.builder() + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(flintIndexOptions) + .build(); + } + @Test public void testApplyWithEmptyLatestId() { flintIndexOpVacuum.apply(FLINT_INDEX_METADATA_WITHOUT_LATEST_ID, asyncQueryRequestContext); @@ -218,4 +243,22 @@ public void testApplyHappyPath() { .deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext); verify(flintIndexClient).deleteIndex(INDEX_NAME); } + + @Test + public void testRunOpWithExternalScheduler() { + FlintIndexMetadata flintIndexMetadata = createFlintIndexMetadataWithExternalScheduler(); + flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext); + + verify(asyncQueryScheduler).removeJob(INDEX_NAME); + verify(flintIndexClient).deleteIndex(INDEX_NAME); + } + + @Test + public void testRunOpWithoutExternalScheduler() { + FlintIndexMetadata flintIndexMetadata = FLINT_INDEX_METADATA_WITHOUT_LATEST_ID; + flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext); + + verify(asyncQueryScheduler, never()).removeJob(INDEX_NAME); + verify(flintIndexClient).deleteIndex(INDEX_NAME); + } }