From 10d2805b08a06d349b9c1ba625390b04931b0996 Mon Sep 17 00:00:00 2001 From: Tomoyuki MORITA Date: Mon, 6 May 2024 13:26:48 -0700 Subject: [PATCH] Refactor IndexDMLHandler and related classes (#2644) Signed-off-by: Tomoyuki Morita (cherry picked from commit 45122ec67bfb517ea6ab445624fa82b8a1663a4f) --- .../org/opensearch/sql/plugin/SQLPlugin.java | 6 +- .../cluster/ClusterManagerEventListener.java | 17 +- .../FlintStreamingJobHouseKeeperTask.java | 21 +- .../sql/spark/dispatcher/IndexDMLHandler.java | 64 +- .../spark/dispatcher/QueryHandlerFactory.java | 17 +- .../spark/dispatcher/RefreshQueryHandler.java | 16 +- .../flint/IndexDMLResultStorageService.java | 12 + ...penSearchIndexDMLResultStorageService.java | 25 + .../spark/flint/operation/FlintIndexOp.java | 6 +- .../flint/operation/FlintIndexOpAlter.java | 10 +- .../flint/operation/FlintIndexOpCancel.java | 13 +- .../flint/operation/FlintIndexOpDrop.java | 13 +- .../flint/operation/FlintIndexOpFactory.java | 42 ++ .../flint/operation/FlintIndexOpVacuum.java | 9 +- .../config/AsyncExecutorServiceModule.java | 27 +- .../AsyncQueryExecutorServiceSpec.java | 33 +- .../FlintStreamingJobHouseKeeperTaskTest.java | 648 +++++++----------- .../spark/dispatcher/IndexDMLHandlerTest.java | 26 +- .../dispatcher/SparkQueryDispatcherTest.java | 27 +- .../flint/operation/FlintIndexOpTest.java | 18 +- 20 files changed, 486 insertions(+), 564 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index bc0a084f8c..16fd46c253 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -79,10 +79,9 @@ import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; -import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.cluster.ClusterManagerEventListener; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; @@ -227,8 +226,7 @@ public Collection createComponents( environment.settings(), dataSourceService, injector.getInstance(FlintIndexMetadataServiceImpl.class), - injector.getInstance(StateStore.class), - injector.getInstance(EMRServerlessClientFactory.class)); + injector.getInstance(FlintIndexOpFactory.class)); return ImmutableList.of( dataSourceService, injector.getInstance(AsyncQueryExecutorService.class), diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java index f04c6cb830..6c660f073c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java @@ -21,9 +21,8 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; -import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.threadpool.Scheduler.Cancellable; import org.opensearch.threadpool.ThreadPool; @@ -37,8 +36,7 @@ public class ClusterManagerEventListener implements LocalNodeClusterManagerListe private Clock clock; private DataSourceService dataSourceService; private FlintIndexMetadataService flintIndexMetadataService; - private StateStore stateStore; - private EMRServerlessClientFactory emrServerlessClientFactory; + private FlintIndexOpFactory flintIndexOpFactory; private Duration sessionTtlDuration; private Duration resultTtlDuration; private TimeValue streamingJobHouseKeepingInterval; @@ -56,8 +54,7 @@ public ClusterManagerEventListener( Settings settings, DataSourceService dataSourceService, FlintIndexMetadataService flintIndexMetadataService, - StateStore stateStore, - EMRServerlessClientFactory emrServerlessClientFactory) { + FlintIndexOpFactory flintIndexOpFactory) { this.clusterService = clusterService; this.threadPool = threadPool; this.client = client; @@ -65,8 +62,7 @@ public ClusterManagerEventListener( this.clock = clock; this.dataSourceService = dataSourceService; this.flintIndexMetadataService = flintIndexMetadataService; - this.stateStore = stateStore; - this.emrServerlessClientFactory = emrServerlessClientFactory; + this.flintIndexOpFactory = flintIndexOpFactory; this.sessionTtlDuration = toDuration(sessionTtl.get(settings)); this.resultTtlDuration = toDuration(resultTtl.get(settings)); this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings); @@ -151,10 +147,7 @@ private void initializeStreamingJobHouseKeeperCron() { flintStreamingJobHouseKeeperCron = threadPool.scheduleWithFixedDelay( new FlintStreamingJobHouseKeeperTask( - dataSourceService, - flintIndexMetadataService, - stateStore, - emrServerlessClientFactory), + dataSourceService, flintIndexMetadataService, flintIndexOpFactory), streamingJobHouseKeepingInterval, executorName()); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java index 27221f1b72..31b1ecb49c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java +++ b/spark/src/main/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTask.java @@ -17,13 +17,10 @@ import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; -import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; /** Cleaner task which alters the active streaming jobs of a disabled datasource. */ @RequiredArgsConstructor @@ -31,8 +28,7 @@ public class FlintStreamingJobHouseKeeperTask implements Runnable { private final DataSourceService dataSourceService; private final FlintIndexMetadataService flintIndexMetadataService; - private final StateStore stateStore; - private final EMRServerlessClientFactory emrServerlessClientFactory; + private final FlintIndexOpFactory flintIndexOpFactory; private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class); protected static final AtomicBoolean isRunning = new AtomicBoolean(false); @@ -95,9 +91,7 @@ private void dropAutoRefreshIndex( String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) { // When the datasource is deleted. Possibly Replace with VACUUM Operation. LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex); - FlintIndexOpDrop flintIndexOpDrop = - new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient()); - flintIndexOpDrop.apply(flintIndexMetadata); + flintIndexOpFactory.getDrop(datasourceName).apply(flintIndexMetadata); LOGGER.info("Successfully dropped index: {}", autoRefreshIndex); } @@ -106,14 +100,7 @@ private void alterAutoRefreshIndex( LOGGER.info("Attempting to alter index: {}", autoRefreshIndex); FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false"); - FlintIndexOpAlter flintIndexOpAlter = - new FlintIndexOpAlter( - flintIndexOptions, - stateStore, - datasourceName, - emrServerlessClientFactory.getClient(), - flintIndexMetadataService); - flintIndexOpAlter.apply(flintIndexMetadata); + flintIndexOpFactory.getAlter(flintIndexOptions, datasourceName).apply(flintIndexMetadata); LOGGER.info("Successfully altered index: {}", autoRefreshIndex); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java index 412db50e85..dfd5316f6c 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandler.java @@ -7,7 +7,6 @@ import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD; import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD; -import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult; import com.amazonaws.services.emrserverless.model.JobRunState; import java.util.Map; @@ -16,24 +15,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.json.JSONObject; -import org.opensearch.client.Client; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata; -import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOp; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.response.JobExecutionResponseReader; /** Handle Index DML query. includes * DROP * ALT? */ @@ -45,15 +40,10 @@ public class IndexDMLHandler extends AsyncQueryHandler { public static final String DROP_INDEX_JOB_ID = "dropIndexJobId"; public static final String DML_QUERY_JOB_ID = "DMLQueryJobId"; - private final EMRServerlessClient emrServerlessClient; - private final JobExecutionResponseReader jobExecutionResponseReader; - private final FlintIndexMetadataService flintIndexMetadataService; - - private final StateStore stateStore; - - private final Client client; + private final IndexDMLResultStorageService indexDMLResultStorageService; + private final FlintIndexOpFactory flintIndexOpFactory; public static boolean isIndexDMLQuery(String jobId) { return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId); @@ -67,14 +57,16 @@ public DispatchQueryResponse submit( try { IndexQueryDetails indexDetails = context.getIndexQueryDetails(); FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails); - executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata); + + getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata); + AsyncQueryId asyncQueryId = storeIndexDMLResult( dispatchQueryRequest, dataSourceMetadata, JobRunState.SUCCESS.toString(), StringUtils.EMPTY, - startTime); + getElapsedTimeSince(startTime)); return new DispatchQueryResponse( asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null); } catch (Exception e) { @@ -85,7 +77,7 @@ public DispatchQueryResponse submit( dataSourceMetadata, JobRunState.FAILED.toString(), e.getMessage(), - startTime); + getElapsedTimeSince(startTime)); return new DispatchQueryResponse( asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null); } @@ -96,7 +88,7 @@ private AsyncQueryId storeIndexDMLResult( DataSourceMetadata dataSourceMetadata, String status, String error, - long startTime) { + long queryRunTime) { AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName()); IndexDMLResult indexDMLResult = new IndexDMLResult( @@ -104,38 +96,26 @@ private AsyncQueryId storeIndexDMLResult( status, error, dispatchQueryRequest.getDatasource(), - System.currentTimeMillis() - startTime, + queryRunTime, System.currentTimeMillis()); - createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult); + indexDMLResultStorageService.createIndexDMLResult(indexDMLResult, dataSourceMetadata.getName()); return asyncQueryId; } - private void executeIndexOp( - DispatchQueryRequest dispatchQueryRequest, - IndexQueryDetails indexQueryDetails, - FlintIndexMetadata indexMetadata) { + private long getElapsedTimeSince(long startTime) { + return System.currentTimeMillis() - startTime; + } + + private FlintIndexOp getIndexOp( + DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) { switch (indexQueryDetails.getIndexQueryActionType()) { case DROP: - FlintIndexOp dropOp = - new FlintIndexOpDrop( - stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient); - dropOp.apply(indexMetadata); - break; + return flintIndexOpFactory.getDrop(dispatchQueryRequest.getDatasource()); case ALTER: - FlintIndexOpAlter flintIndexOpAlter = - new FlintIndexOpAlter( - indexQueryDetails.getFlintIndexOptions(), - stateStore, - dispatchQueryRequest.getDatasource(), - emrServerlessClient, - flintIndexMetadataService); - flintIndexOpAlter.apply(indexMetadata); - break; + return flintIndexOpFactory.getAlter( + indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource()); case VACUUM: - FlintIndexOp indexVacuumOp = - new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client); - indexVacuumOp.apply(indexMetadata); - break; + return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource()); default: throw new IllegalStateException( String.format( diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java index 1713bed4e2..f994d9c728 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/QueryHandlerFactory.java @@ -6,11 +6,11 @@ package org.opensearch.sql.spark.dispatcher; import lombok.RequiredArgsConstructor; -import org.opensearch.client.Client; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.session.SessionManager; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -19,10 +19,10 @@ public class QueryHandlerFactory { private final JobExecutionResponseReader jobExecutionResponseReader; private final FlintIndexMetadataService flintIndexMetadataService; - private final Client client; private final SessionManager sessionManager; private final LeaseManager leaseManager; - private final StateStore stateStore; + private final IndexDMLResultStorageService indexDMLResultStorageService; + private final FlintIndexOpFactory flintIndexOpFactory; private final EMRServerlessClientFactory emrServerlessClientFactory; public RefreshQueryHandler getRefreshQueryHandler() { @@ -30,8 +30,8 @@ public RefreshQueryHandler getRefreshQueryHandler() { emrServerlessClientFactory.getClient(), jobExecutionResponseReader, flintIndexMetadataService, - stateStore, - leaseManager); + leaseManager, + flintIndexOpFactory); } public StreamingQueryHandler getStreamingQueryHandler() { @@ -50,10 +50,9 @@ public InteractiveQueryHandler getInteractiveQueryHandler() { public IndexDMLHandler getIndexDMLHandler() { return new IndexDMLHandler( - emrServerlessClientFactory.getClient(), jobExecutionResponseReader, flintIndexMetadataService, - stateStore, - client); + indexDMLResultStorageService, + flintIndexOpFactory); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java index d55408f62e..aeb5c1b35f 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/RefreshQueryHandler.java @@ -13,11 +13,10 @@ import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.JobType; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.operation.FlintIndexOp; -import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -25,19 +24,17 @@ public class RefreshQueryHandler extends BatchQueryHandler { private final FlintIndexMetadataService flintIndexMetadataService; - private final StateStore stateStore; - private final EMRServerlessClient emrServerlessClient; + private final FlintIndexOpFactory flintIndexOpFactory; public RefreshQueryHandler( EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, FlintIndexMetadataService flintIndexMetadataService, - StateStore stateStore, - LeaseManager leaseManager) { + LeaseManager leaseManager, + FlintIndexOpFactory flintIndexOpFactory) { super(emrServerlessClient, jobExecutionResponseReader, leaseManager); this.flintIndexMetadataService = flintIndexMetadataService; - this.stateStore = stateStore; - this.emrServerlessClient = emrServerlessClient; + this.flintIndexOpFactory = flintIndexOpFactory; } @Override @@ -51,8 +48,7 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { "Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName())); } FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName()); - FlintIndexOp jobCancelOp = - new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient); + FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName); jobCancelOp.apply(indexMetadata); return asyncQueryJobMetadata.getQueryId().getId(); } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java new file mode 100644 index 0000000000..4a046564f5 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/IndexDMLResultStorageService.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; + +public interface IndexDMLResultStorageService { + IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java new file mode 100644 index 0000000000..eeb2921449 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchIndexDMLResultStorageService.java @@ -0,0 +1,25 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.datasource.model.DataSourceMetadata; +import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult; +import org.opensearch.sql.spark.execution.statestore.StateStore; + +@RequiredArgsConstructor +public class OpenSearchIndexDMLResultStorageService implements IndexDMLResultStorageService { + + private final DataSourceService dataSourceService; + private final StateStore stateStore; + + @Override + public IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName) { + DataSourceMetadata dataSourceMetadata = dataSourceService.getDataSourceMetadata(datasourceName); + return stateStore.create(result, IndexDMLResult::copy, dataSourceMetadata.getResultIndex()); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java index 8d5e301631..edfd0aace2 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java @@ -21,6 +21,7 @@ import org.jetbrains.annotations.NotNull; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; @@ -33,6 +34,7 @@ public abstract class FlintIndexOp { private final StateStore stateStore; private final String datasourceName; + private final EMRServerlessClientFactory emrServerlessClientFactory; /** Apply operation on {@link FlintIndexMetadata} */ public void apply(FlintIndexMetadata metadata) { @@ -140,11 +142,11 @@ private void commit(FlintIndexStateModel flintIndex) { /*** * Common operation between AlterOff and Drop. So moved to FlintIndexOp. */ - public void cancelStreamingJob( - EMRServerlessClient emrServerlessClient, FlintIndexStateModel flintIndexStateModel) + public void cancelStreamingJob(FlintIndexStateModel flintIndexStateModel) throws InterruptedException, TimeoutException { String applicationId = flintIndexStateModel.getApplicationId(); String jobId = flintIndexStateModel.getJobId(); + EMRServerlessClient emrServerlessClient = emrServerlessClientFactory.getClient(); try { emrServerlessClient.cancelJobRun( flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId(), true); diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java index 7db4f6a4c6..31e33539a1 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java @@ -8,7 +8,7 @@ import lombok.SneakyThrows; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; @@ -22,7 +22,6 @@ */ public class FlintIndexOpAlter extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(FlintIndexOpAlter.class); - private final EMRServerlessClient emrServerlessClient; private final FlintIndexMetadataService flintIndexMetadataService; private final FlintIndexOptions flintIndexOptions; @@ -30,10 +29,9 @@ public FlintIndexOpAlter( FlintIndexOptions flintIndexOptions, StateStore stateStore, String datasourceName, - EMRServerlessClient emrServerlessClient, + EMRServerlessClientFactory emrServerlessClientFactory, FlintIndexMetadataService flintIndexMetadataService) { - super(stateStore, datasourceName); - this.emrServerlessClient = emrServerlessClient; + super(stateStore, datasourceName, emrServerlessClientFactory); this.flintIndexMetadataService = flintIndexMetadataService; this.flintIndexOptions = flintIndexOptions; } @@ -55,7 +53,7 @@ void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintInde "Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); this.flintIndexMetadataService.updateIndexToManualRefresh( flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions); - cancelStreamingJob(emrServerlessClient, flintIndexStateModel); + cancelStreamingJob(flintIndexStateModel); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java index 2317c5b6dc..0962e2a16b 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java @@ -8,7 +8,7 @@ import lombok.SneakyThrows; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; @@ -18,12 +18,11 @@ public class FlintIndexOpCancel extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); - private final EMRServerlessClient emrServerlessClient; - public FlintIndexOpCancel( - StateStore stateStore, String datasourceName, EMRServerlessClient emrServerlessClient) { - super(stateStore, datasourceName); - this.emrServerlessClient = emrServerlessClient; + StateStore stateStore, + String datasourceName, + EMRServerlessClientFactory emrServerlessClientFactory) { + super(stateStore, datasourceName, emrServerlessClientFactory); } // Only in refreshing state, the job is cancellable in case of REFRESH query. @@ -43,7 +42,7 @@ void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintInde LOG.debug( "Performing drop index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); - cancelStreamingJob(emrServerlessClient, flintIndexStateModel); + cancelStreamingJob(flintIndexStateModel); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java index 586c346863..0f71b3bc70 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java @@ -8,7 +8,7 @@ import lombok.SneakyThrows; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.sql.spark.client.EMRServerlessClient; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; @@ -17,12 +17,11 @@ public class FlintIndexOpDrop extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); - private final EMRServerlessClient emrServerlessClient; - public FlintIndexOpDrop( - StateStore stateStore, String datasourceName, EMRServerlessClient emrServerlessClient) { - super(stateStore, datasourceName); - this.emrServerlessClient = emrServerlessClient; + StateStore stateStore, + String datasourceName, + EMRServerlessClientFactory emrServerlessClientFactory) { + super(stateStore, datasourceName, emrServerlessClientFactory); } public boolean validate(FlintIndexState state) { @@ -44,7 +43,7 @@ void runOp(FlintIndexMetadata flintIndexMetadata, FlintIndexStateModel flintInde LOG.debug( "Performing drop index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); - cancelStreamingJob(emrServerlessClient, flintIndexStateModel); + cancelStreamingJob(flintIndexStateModel); } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java new file mode 100644 index 0000000000..6fc2261ade --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint.operation; + +import lombok.RequiredArgsConstructor; +import org.opensearch.client.Client; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; +import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; + +@RequiredArgsConstructor +public class FlintIndexOpFactory { + private final StateStore stateStore; + private final Client client; + private final FlintIndexMetadataService flintIndexMetadataService; + private final EMRServerlessClientFactory emrServerlessClientFactory; + + public FlintIndexOpDrop getDrop(String datasource) { + return new FlintIndexOpDrop(stateStore, datasource, emrServerlessClientFactory); + } + + public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) { + return new FlintIndexOpAlter( + flintIndexOptions, + stateStore, + datasource, + emrServerlessClientFactory, + flintIndexMetadataService); + } + + public FlintIndexOpVacuum getVacuum(String datasource) { + return new FlintIndexOpVacuum(stateStore, datasource, client, emrServerlessClientFactory); + } + + public FlintIndexOpCancel getCancel(String datasource) { + return new FlintIndexOpCancel(stateStore, datasource, emrServerlessClientFactory); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java index cf204450e7..4287d9c7c9 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -10,6 +10,7 @@ import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; @@ -23,8 +24,12 @@ public class FlintIndexOpVacuum extends FlintIndexOp { /** OpenSearch client. */ private final Client client; - public FlintIndexOpVacuum(StateStore stateStore, String datasourceName, Client client) { - super(stateStore, datasourceName); + public FlintIndexOpVacuum( + StateStore stateStore, + String datasourceName, + Client client, + EMRServerlessClientFactory emrServerlessClientFactory) { + super(stateStore, datasourceName, emrServerlessClientFactory); this.client = client; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index f93d065855..1d890ce346 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -30,6 +30,9 @@ import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; +import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; @@ -76,21 +79,37 @@ public SparkQueryDispatcher sparkQueryDispatcher( public QueryHandlerFactory queryhandlerFactory( JobExecutionResponseReader jobExecutionResponseReader, FlintIndexMetadataServiceImpl flintIndexMetadataReader, - NodeClient client, SessionManager sessionManager, DefaultLeaseManager defaultLeaseManager, - StateStore stateStore, + IndexDMLResultStorageService indexDMLResultStorageService, + FlintIndexOpFactory flintIndexOpFactory, EMRServerlessClientFactory emrServerlessClientFactory) { return new QueryHandlerFactory( jobExecutionResponseReader, flintIndexMetadataReader, - client, sessionManager, defaultLeaseManager, - stateStore, + indexDMLResultStorageService, + flintIndexOpFactory, emrServerlessClientFactory); } + @Provides + public FlintIndexOpFactory flintIndexOpFactory( + StateStore stateStore, + NodeClient client, + FlintIndexMetadataServiceImpl flintIndexMetadataService, + EMRServerlessClientFactory emrServerlessClientFactory) { + return new FlintIndexOpFactory( + stateStore, client, flintIndexMetadataService, emrServerlessClientFactory); + } + + @Provides + public IndexDMLResultStorageService indexDMLResultStorageService( + DataSourceService dataSourceService, StateStore stateStore) { + return new OpenSearchIndexDMLResultStorageService(dataSourceService, stateStore); + } + @Provides public SessionManager sessionManager( StateStore stateStore, diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index fdd094259f..b1c7f68388 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -64,14 +64,18 @@ import org.opensearch.sql.spark.execution.session.SessionModel; import org.opensearch.sql.spark.execution.session.SessionState; import org.opensearch.sql.spark.execution.statestore.StateStore; +import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.test.OpenSearchIntegTestCase; public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { + public static final String MYS3_DATASOURCE = "mys3"; public static final String MYGLUE_DATASOURCE = "my_glue"; @@ -81,6 +85,7 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { protected DataSourceServiceImpl dataSourceService; protected StateStore stateStore; protected ClusterSettings clusterSettings; + protected FlintIndexMetadataService flintIndexMetadataService; @Override protected Collection> nodePlugins() { @@ -88,6 +93,7 @@ protected Collection> nodePlugins() { } public static class TestSettingPlugin extends Plugin { + @Override public List> getSettings() { return OpenSearchSettings.pluginSettings(); @@ -148,6 +154,13 @@ public void setup() { stateStore = new StateStore(client, clusterService); createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings()); createIndexWithMappings(otherDm.getResultIndex(), loadResultIndexMappings()); + flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + } + + protected FlintIndexOpFactory getFlintIndexOpFactory( + EMRServerlessClientFactory emrServerlessClientFactory) { + return new FlintIndexOpFactory( + stateStore, client, flintIndexMetadataService, emrServerlessClientFactory); } @After @@ -205,10 +218,14 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( new QueryHandlerFactory( jobExecutionResponseReader, new FlintIndexMetadataServiceImpl(client), - client, new SessionManager(stateStore, emrServerlessClientFactory, pluginSettings), new DefaultLeaseManager(pluginSettings, stateStore), - stateStore, + new OpenSearchIndexDMLResultStorageService(dataSourceService, stateStore), + new FlintIndexOpFactory( + stateStore, + client, + new FlintIndexMetadataServiceImpl(client), + emrServerlessClientFactory), emrServerlessClientFactory); SparkQueryDispatcher sparkQueryDispatcher = new SparkQueryDispatcher( @@ -269,6 +286,17 @@ public void setJobState(JobRunState jobState) { } } + protected LocalEMRSClient getCancelledLocalEmrsClient() { + return new LocalEMRSClient() { + public GetJobRunResult getJobRunResult(String applicationId, String jobId) { + super.getJobRunResult(applicationId, jobId); + JobRun jobRun = new JobRun(); + jobRun.setState("cancelled"); + return new GetJobRunResult().withJobRun(jobRun); + } + }; + } + public static class LocalEMRServerlessClientFactory implements EMRServerlessClientFactory { @Override @@ -333,6 +361,7 @@ public String loadResultIndexMappings() { @RequiredArgsConstructor public class FlintDatasetMock { + final String query; final String refreshQuery; final FlintIndexType indexType; diff --git a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java index 80542ba2e0..6bcf9c6308 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java @@ -21,7 +21,6 @@ import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceSpec; import org.opensearch.sql.spark.asyncquery.model.MockFlintIndex; import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob; -import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; @@ -34,70 +33,40 @@ public class FlintStreamingJobHouseKeeperTaskTest extends AsyncQueryExecutorServ @Test @SneakyThrows public void testStreamingJobHouseKeeperWhenDataSourceDisabled() { - MockFlintIndex SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList mockFlintIndices = getMockFlintIndices(); Map indexJobMapping = new HashMap<>(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - INDEX.createIndex(); - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); - indexJobMapping.put(INDEX, flintIndexJob); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - // Making Index Auto Refresh - INDEX.updateIndexOptions(existingOptions, false); - flintIndexJob.refreshing(); - }); + mockFlintIndices.forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + LocalEMRSClient emrsClient = getCancelledLocalEmrsClient(); FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); + + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); emrsClient.cancelJobRunCalled(3); emrsClient.getJobRunResultCalled(3); emrsClient.startJobRunCalled(0); @@ -108,64 +77,74 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { .getValue()); } + private ImmutableList getMockFlintIndices() { + return ImmutableList.of(getSkipping(), getCovering(), getMv()); + } + + private MockFlintIndex getMv() { + return new MockFlintIndex( + client, + "flint_my_glue_mydb_mv", + FlintIndexType.MATERIALIZED_VIEW, + "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\") "); + } + + private MockFlintIndex getCovering() { + return new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_covering_index", + FlintIndexType.COVERING, + "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + } + + private MockFlintIndex getSkipping() { + return new MockFlintIndex( + client, + "flint_my_glue_mydb_http_logs_skipping_index", + FlintIndexType.SKIPPING, + "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," + + " incremental_refresh=true, output_mode=\"complete\")"); + } + @Test @SneakyThrows public void testStreamingJobHouseKeeperWhenCancelJobGivesTimeout() { - MockFlintIndex SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList mockFlintIndices = getMockFlintIndices(); Map indexJobMapping = new HashMap<>(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - INDEX.createIndex(); - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); - indexJobMapping.put(INDEX, flintIndexJob); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - // Making Index Auto Refresh - INDEX.updateIndexOptions(existingOptions, false); - flintIndexJob.refreshing(); - }); + mockFlintIndices.forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); LocalEMRSClient emrsClient = new LocalEMRSClient(); - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.REFRESHING); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); + + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.REFRESHING); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); emrsClient.cancelJobRunCalled(3); emrsClient.getJobRunResultCalled(9); emrsClient.startJobRunCalled(0); @@ -179,62 +158,41 @@ public void testStreamingJobHouseKeeperWhenCancelJobGivesTimeout() { @Test @SneakyThrows public void testSimulateConcurrentJobHouseKeeperExecution() { - MockFlintIndex SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList mockFlintIndices = getMockFlintIndices(); Map indexJobMapping = new HashMap<>(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - INDEX.createIndex(); - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); - indexJobMapping.put(INDEX, flintIndexJob); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - // Making Index Auto Refresh - INDEX.updateIndexOptions(existingOptions, false); - flintIndexJob.refreshing(); - }); + mockFlintIndices.forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); LocalEMRSClient emrsClient = new LocalEMRSClient(); - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); FlintStreamingJobHouseKeeperTask.isRunning.compareAndSet(false, true); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.REFRESHING); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); + + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.REFRESHING); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); emrsClient.cancelJobRunCalled(0); emrsClient.getJobRunResultCalled(0); emrsClient.startJobRunCalled(0); @@ -249,70 +207,40 @@ public void testSimulateConcurrentJobHouseKeeperExecution() { @SneakyThrows @Test public void testStreamingJobClearnerWhenDataSourceIsDeleted() { - MockFlintIndex SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList mockFlintIndices = getMockFlintIndices(); Map indexJobMapping = new HashMap<>(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - INDEX.createIndex(); - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); - indexJobMapping.put(INDEX, flintIndexJob); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - // Making Index Auto Refresh - INDEX.updateIndexOptions(existingOptions, false); - flintIndexJob.refreshing(); - }); + mockFlintIndices.forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); this.dataSourceService.deleteDataSource(MYGLUE_DATASOURCE); - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + LocalEMRSClient emrsClient = getCancelledLocalEmrsClient(); FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.DELETED); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); + + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.DELETED); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); emrsClient.cancelJobRunCalled(3); emrsClient.getJobRunResultCalled(3); emrsClient.startJobRunCalled(0); @@ -326,69 +254,39 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { @Test @SneakyThrows public void testStreamingJobHouseKeeperWhenDataSourceIsNeitherDisabledNorDeleted() { - MockFlintIndex SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList mockFlintIndices = getMockFlintIndices(); Map indexJobMapping = new HashMap<>(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - INDEX.createIndex(); - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); - indexJobMapping.put(INDEX, flintIndexJob); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - // Making Index Auto Refresh - INDEX.updateIndexOptions(existingOptions, false); - flintIndexJob.refreshing(); - }); - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + mockFlintIndices.forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); + LocalEMRSClient emrsClient = getCancelledLocalEmrsClient(); FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.REFRESHING); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); + + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.REFRESHING); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); emrsClient.cancelJobRunCalled(0); emrsClient.getJobRunResultCalled(0); emrsClient.startJobRunCalled(0); @@ -413,14 +311,15 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { return new GetJobRunResult().withJobRun(jobRun); } }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); + emrsClient.getJobRunResultCalled(0); emrsClient.startJobRunCalled(0); emrsClient.cancelJobRunCalled(0); @@ -438,24 +337,16 @@ public void testStreamingJobHouseKeeperWhenFlintIndexIsCorrupted() throws Interr new MockFlintIndex(client(), indexName, FlintIndexType.COVERING, null); mockFlintIndex.createIndex(); changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + LocalEMRSClient emrsClient = getCancelledLocalEmrsClient(); FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); + emrsClient.getJobRunResultCalled(0); emrsClient.startJobRunCalled(0); emrsClient.cancelJobRunCalled(0); @@ -479,7 +370,6 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { return new GetJobRunResult().withJobRun(jobRun); } }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataService() { @Override @@ -493,10 +383,12 @@ public void updateIndexToManualRefresh( }; FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); + Assertions.assertFalse(FlintStreamingJobHouseKeeperTask.isRunning.get()); emrsClient.getJobRunResultCalled(0); emrsClient.startJobRunCalled(0); @@ -511,70 +403,40 @@ public void updateIndexToManualRefresh( @Test @SneakyThrows public void testStreamingJobHouseKeeperMultipleTimesWhenDataSourceDisabled() { - MockFlintIndex SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList mockFlintIndices = getMockFlintIndices(); Map indexJobMapping = new HashMap<>(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - INDEX.createIndex(); - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); - indexJobMapping.put(INDEX, flintIndexJob); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - // Making Index Auto Refresh - INDEX.updateIndexOptions(existingOptions, false); - flintIndexJob.refreshing(); - }); + mockFlintIndices.forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); changeDataSourceStatus(MYGLUE_DATASOURCE, DISABLED); - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + LocalEMRSClient emrsClient = getCancelledLocalEmrsClient(); FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); + + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); emrsClient.cancelJobRunCalled(3); emrsClient.getJobRunResultCalled(3); emrsClient.startJobRunCalled(0); @@ -588,16 +450,15 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { Thread thread2 = new Thread(flintStreamingJobHouseKeeperTask); thread2.start(); thread2.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.ACTIVE); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("false", options.get("auto_refresh")); - }); + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.ACTIVE); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("false", options.get("auto_refresh")); + }); // No New Calls and Errors emrsClient.cancelJobRunCalled(3); @@ -613,70 +474,40 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { @SneakyThrows @Test public void testRunStreamingJobHouseKeeperWhenDataSourceIsDeleted() { - MockFlintIndex SKIPPING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_skipping_index", - FlintIndexType.SKIPPING, - "ALTER SKIPPING INDEX ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex COVERING = - new MockFlintIndex( - client, - "flint_my_glue_mydb_http_logs_covering_index", - FlintIndexType.COVERING, - "ALTER INDEX covering ON my_glue.mydb.http_logs WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\")"); - MockFlintIndex MV = - new MockFlintIndex( - client, - "flint_my_glue_mydb_mv", - FlintIndexType.MATERIALIZED_VIEW, - "ALTER MATERIALIZED VIEW my_glue.mydb.mv WITH (auto_refresh=false," - + " incremental_refresh=true, output_mode=\"complete\") "); + ImmutableList mockFlintIndices = getMockFlintIndices(); Map indexJobMapping = new HashMap<>(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - INDEX.createIndex(); - MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); - indexJobMapping.put(INDEX, flintIndexJob); - HashMap existingOptions = new HashMap<>(); - existingOptions.put("auto_refresh", "true"); - // Making Index Auto Refresh - INDEX.updateIndexOptions(existingOptions, false); - flintIndexJob.refreshing(); - }); + mockFlintIndices.forEach( + INDEX -> { + INDEX.createIndex(); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + indexJobMapping.put(INDEX, flintIndexJob); + HashMap existingOptions = new HashMap<>(); + existingOptions.put("auto_refresh", "true"); + // Making Index Auto Refresh + INDEX.updateIndexOptions(existingOptions, false); + flintIndexJob.refreshing(); + }); this.dataSourceService.deleteDataSource(MYGLUE_DATASOURCE); - LocalEMRSClient emrsClient = - new LocalEMRSClient() { - @Override - public GetJobRunResult getJobRunResult(String applicationId, String jobId) { - super.getJobRunResult(applicationId, jobId); - JobRun jobRun = new JobRun(); - jobRun.setState("cancelled"); - return new GetJobRunResult().withJobRun(jobRun); - } - }; - EMRServerlessClientFactory emrServerlessClientFactory = () -> emrsClient; + LocalEMRSClient emrsClient = getCancelledLocalEmrsClient(); FlintIndexMetadataService flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); FlintStreamingJobHouseKeeperTask flintStreamingJobHouseKeeperTask = new FlintStreamingJobHouseKeeperTask( - dataSourceService, flintIndexMetadataService, stateStore, emrServerlessClientFactory); + dataSourceService, flintIndexMetadataService, getFlintIndexOpFactory(() -> emrsClient)); + Thread thread = new Thread(flintStreamingJobHouseKeeperTask); thread.start(); thread.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.DELETED); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); + + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.DELETED); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); emrsClient.cancelJobRunCalled(3); emrsClient.getJobRunResultCalled(3); emrsClient.startJobRunCalled(0); @@ -690,16 +521,15 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { Thread thread2 = new Thread(flintStreamingJobHouseKeeperTask); thread2.start(); thread2.join(); - ImmutableList.of(SKIPPING, COVERING, MV) - .forEach( - INDEX -> { - MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); - flintIndexJob.assertState(FlintIndexState.DELETED); - Map mappings = INDEX.getIndexMappings(); - Map meta = (HashMap) mappings.get("_meta"); - Map options = (Map) meta.get("options"); - Assertions.assertEquals("true", options.get("auto_refresh")); - }); + mockFlintIndices.forEach( + INDEX -> { + MockFlintSparkJob flintIndexJob = indexJobMapping.get(INDEX); + flintIndexJob.assertState(FlintIndexState.DELETED); + Map mappings = INDEX.getIndexMappings(); + Map meta = (HashMap) mappings.get("_meta"); + Map options = (Map) meta.get("options"); + Assertions.assertEquals("true", options.get("auto_refresh")); + }); // No New Calls and Errors emrsClient.cancelJobRunCalled(3); emrsClient.getJobRunResultCalled(3); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java index 045de66d0a..aade6ff63b 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/IndexDMLHandlerTest.java @@ -24,35 +24,32 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.client.Client; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; -import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest; import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse; import org.opensearch.sql.spark.dispatcher.model.IndexQueryActionType; import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @ExtendWith(MockitoExtension.class) class IndexDMLHandlerTest { - @Mock private EMRServerlessClient emrServerlessClient; @Mock private JobExecutionResponseReader jobExecutionResponseReader; @Mock private FlintIndexMetadataService flintIndexMetadataService; - @Mock private StateStore stateStore; - @Mock private Client client; + @Mock private IndexDMLResultStorageService indexDMLResultStorageService; + @Mock private FlintIndexOpFactory flintIndexOpFactory; @Test public void getResponseFromExecutor() { - JSONObject result = - new IndexDMLHandler(null, null, null, null, null).getResponseFromExecutor(null); + JSONObject result = new IndexDMLHandler(null, null, null, null).getResponseFromExecutor(null); assertEquals("running", result.getString(STATUS_FIELD)); assertEquals("", result.getString(ERROR_FIELD)); @@ -62,11 +59,10 @@ public void getResponseFromExecutor() { public void testWhenIndexDetailsAreNotFound() { IndexDMLHandler indexDMLHandler = new IndexDMLHandler( - emrServerlessClient, jobExecutionResponseReader, flintIndexMetadataService, - stateStore, - client); + indexDMLResultStorageService, + flintIndexOpFactory); DispatchQueryRequest dispatchQueryRequest = new DispatchQueryRequest( EMRS_APPLICATION_ID, @@ -94,8 +90,10 @@ public void testWhenIndexDetailsAreNotFound() { .build(); Mockito.when(flintIndexMetadataService.getFlintIndexMetadata(any())) .thenReturn(new HashMap<>()); + DispatchQueryResponse dispatchQueryResponse = indexDMLHandler.submit(dispatchQueryRequest, dispatchQueryContext); + Assertions.assertNotNull(dispatchQueryResponse.getQueryId()); } @@ -104,11 +102,10 @@ public void testWhenIndexDetailsWithInvalidQueryActionType() { FlintIndexMetadata flintIndexMetadata = mock(FlintIndexMetadata.class); IndexDMLHandler indexDMLHandler = new IndexDMLHandler( - emrServerlessClient, jobExecutionResponseReader, flintIndexMetadataService, - stateStore, - client); + indexDMLResultStorageService, + flintIndexOpFactory); DispatchQueryRequest dispatchQueryRequest = new DispatchQueryRequest( EMRS_APPLICATION_ID, @@ -139,6 +136,7 @@ public void testWhenIndexDetailsWithInvalidQueryActionType() { flintMetadataMap.put(indexQueryDetails.openSearchIndexName(), flintIndexMetadata); when(flintIndexMetadataService.getFlintIndexMetadata(indexQueryDetails.openSearchIndexName())) .thenReturn(flintMetadataMap); + indexDMLHandler.submit(dispatchQueryRequest, dispatchQueryContext); } diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 8de5fe3fb4..36264e49c6 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -54,7 +54,6 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.opensearch.client.Client; import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasource.model.DataSourceMetadata; import org.opensearch.sql.datasource.model.DataSourceType; @@ -72,8 +71,9 @@ import org.opensearch.sql.spark.execution.statement.Statement; import org.opensearch.sql.spark.execution.statement.StatementId; import org.opensearch.sql.spark.execution.statement.StatementState; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.LeaseManager; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; @@ -86,13 +86,10 @@ public class SparkQueryDispatcherTest { @Mock private DataSourceService dataSourceService; @Mock private JobExecutionResponseReader jobExecutionResponseReader; @Mock private FlintIndexMetadataService flintIndexMetadataService; - - @Mock(answer = RETURNS_DEEP_STUBS) - private Client openSearchClient; - @Mock private SessionManager sessionManager; - @Mock private LeaseManager leaseManager; + @Mock private IndexDMLResultStorageService indexDMLResultStorageService; + @Mock private FlintIndexOpFactory flintIndexOpFactory; @Mock(answer = RETURNS_DEEP_STUBS) private Session session; @@ -100,8 +97,6 @@ public class SparkQueryDispatcherTest { @Mock(answer = RETURNS_DEEP_STUBS) private Statement statement; - @Mock private StateStore stateStore; - private SparkQueryDispatcher sparkQueryDispatcher; private final AsyncQueryId QUERY_ID = AsyncQueryId.newAsyncQueryId(DS_NAME); @@ -114,13 +109,14 @@ void setUp() { new QueryHandlerFactory( jobExecutionResponseReader, flintIndexMetadataService, - openSearchClient, sessionManager, leaseManager, - stateStore, + indexDMLResultStorageService, + flintIndexOpFactory, emrServerlessClientFactory); sparkQueryDispatcher = new SparkQueryDispatcher(dataSourceService, sessionManager, queryHandlerFactory); + new SparkQueryDispatcher(dataSourceService, sessionManager, queryHandlerFactory); } @Test @@ -405,7 +401,6 @@ void testDispatchIndexQuery() { tags, true, "query_execution_result_my_glue"); - when(emrServerlessClient.startJobRun(expected)).thenReturn(EMR_JOB_ID); DataSourceMetadata dataSourceMetadata = constructMyGlueDataSourceMetadata(); when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata("my_glue")) @@ -420,6 +415,7 @@ void testDispatchIndexQuery() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); @@ -513,6 +509,7 @@ void testDispatchQueryWithoutATableAndDataSourceName() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); @@ -563,6 +560,7 @@ void testDispatchIndexQueryWithoutADatasourceName() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); @@ -613,6 +611,7 @@ void testDispatchMaterializedViewQuery() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); @@ -659,6 +658,7 @@ void testDispatchShowMVQuery() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); @@ -751,6 +751,7 @@ void testDispatchDescribeIndexQuery() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); @@ -962,7 +963,9 @@ void testGetQueryResponseWithSuccess() { queryResult.put(DATA_FIELD, resultMap); when(jobExecutionResponseReader.getResultFromOpensearchIndex(EMR_JOB_ID, null)) .thenReturn(queryResult); + JSONObject result = sparkQueryDispatcher.getQueryResponse(asyncQueryJobMetadata()); + verify(jobExecutionResponseReader, times(1)).getResultFromOpensearchIndex(EMR_JOB_ID, null); Assertions.assertEquals( new HashSet<>(Arrays.asList(DATA_FIELD, STATUS_FIELD, ERROR_FIELD)), result.keySet()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java index 5755d03baa..b3dc65a5fe 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java @@ -13,6 +13,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; @@ -22,6 +23,7 @@ public class FlintIndexOpTest { @Mock private StateStore mockStateStore; + @Mock private EMRServerlessClientFactory mockEmrServerlessClientFactory; @Test public void testApplyWithTransitioningStateFailure() { @@ -42,7 +44,8 @@ public void testApplyWithTransitioningStateFailure() { .thenReturn(Optional.of(fakeModel)); when(mockStateStore.updateState(any(), any(), any(), any())) .thenThrow(new RuntimeException("Transitioning state failed")); - FlintIndexOp flintIndexOp = new TestFlintIndexOp(mockStateStore, "myS3"); + FlintIndexOp flintIndexOp = + new TestFlintIndexOp(mockStateStore, "myS3", mockEmrServerlessClientFactory); IllegalStateException illegalStateException = Assertions.assertThrows(IllegalStateException.class, () -> flintIndexOp.apply(metadata)); Assertions.assertEquals( @@ -70,7 +73,8 @@ public void testApplyWithCommitFailure() { .thenReturn(FlintIndexStateModel.copy(fakeModel, 1, 2)) .thenThrow(new RuntimeException("Commit state failed")) .thenReturn(FlintIndexStateModel.copy(fakeModel, 1, 3)); - FlintIndexOp flintIndexOp = new TestFlintIndexOp(mockStateStore, "myS3"); + FlintIndexOp flintIndexOp = + new TestFlintIndexOp(mockStateStore, "myS3", mockEmrServerlessClientFactory); IllegalStateException illegalStateException = Assertions.assertThrows(IllegalStateException.class, () -> flintIndexOp.apply(metadata)); Assertions.assertEquals( @@ -98,7 +102,8 @@ public void testApplyWithRollBackFailure() { .thenReturn(FlintIndexStateModel.copy(fakeModel, 1, 2)) .thenThrow(new RuntimeException("Commit state failed")) .thenThrow(new RuntimeException("Rollback failure")); - FlintIndexOp flintIndexOp = new TestFlintIndexOp(mockStateStore, "myS3"); + FlintIndexOp flintIndexOp = + new TestFlintIndexOp(mockStateStore, "myS3", mockEmrServerlessClientFactory); IllegalStateException illegalStateException = Assertions.assertThrows(IllegalStateException.class, () -> flintIndexOp.apply(metadata)); Assertions.assertEquals( @@ -107,8 +112,11 @@ public void testApplyWithRollBackFailure() { static class TestFlintIndexOp extends FlintIndexOp { - public TestFlintIndexOp(StateStore stateStore, String datasourceName) { - super(stateStore, datasourceName); + public TestFlintIndexOp( + StateStore stateStore, + String datasourceName, + EMRServerlessClientFactory emrServerlessClientFactory) { + super(stateStore, datasourceName, emrServerlessClientFactory); } @Override