From daf4f8f0f9c72255511d0e2a6eb5c86824f67b9e Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Fri, 26 Apr 2024 06:14:16 +0000 Subject: [PATCH] Introduce FlintIndexStateModelService Signed-off-by: Tomoyuki Morita --- .../spark/dispatcher/BatchQueryHandler.java | 4 +- .../dispatcher/StreamingQueryHandler.java | 2 - .../statestore/OpenSearchStateStoreUtil.java | 20 +++++ .../flint/FlintIndexStateModelService.java | 22 ++++++ ...OpenSearchFlintIndexStateModelService.java | 50 ++++++++++++ .../spark/flint/operation/FlintIndexOp.java | 23 +++--- .../flint/operation/FlintIndexOpAlter.java | 6 +- .../flint/operation/FlintIndexOpCancel.java | 6 +- .../flint/operation/FlintIndexOpDrop.java | 6 +- .../flint/operation/FlintIndexOpFactory.java | 15 ++-- .../flint/operation/FlintIndexOpVacuum.java | 6 +- .../config/AsyncExecutorServiceModule.java | 11 ++- .../AsyncQueryExecutorServiceSpec.java | 8 +- .../AsyncQueryGetResultSpecTest.java | 3 +- .../asyncquery/IndexQuerySpecAlterTest.java | 48 ++++++++---- .../spark/asyncquery/IndexQuerySpecTest.java | 43 +++++++---- .../asyncquery/IndexQuerySpecVacuumTest.java | 3 +- .../asyncquery/model/MockFlintSparkJob.java | 41 ++++------ .../FlintStreamingJobHouseKeeperTaskTest.java | 21 +++-- .../dispatcher/SparkQueryDispatcherTest.java | 2 + .../OpenSearchStateStoreUtilTest.java | 20 +++++ ...SearchFlintIndexStateModelServiceTest.java | 77 +++++++++++++++++++ .../flint/operation/FlintIndexOpTest.java | 32 ++++---- 23 files changed, 351 insertions(+), 118 deletions(-) create mode 100644 spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtil.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java create mode 100644 spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtilTest.java create mode 100644 spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java index e9356e5bed..c5cbc1e539 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java @@ -30,8 +30,8 @@ @RequiredArgsConstructor public class BatchQueryHandler extends AsyncQueryHandler { - private final EMRServerlessClient emrServerlessClient; - private final JobExecutionResponseReader jobExecutionResponseReader; + protected final EMRServerlessClient emrServerlessClient; + protected final JobExecutionResponseReader jobExecutionResponseReader; protected final LeaseManager leaseManager; @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java index 8170b41c66..08c10e04cc 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java +++ b/spark/src/main/java/org/opensearch/sql/spark/dispatcher/StreamingQueryHandler.java @@ -28,14 +28,12 @@ /** Handle Streaming Query. */ public class StreamingQueryHandler extends BatchQueryHandler { - private final EMRServerlessClient emrServerlessClient; public StreamingQueryHandler( EMRServerlessClient emrServerlessClient, JobExecutionResponseReader jobExecutionResponseReader, LeaseManager leaseManager) { super(emrServerlessClient, jobExecutionResponseReader, leaseManager); - this.emrServerlessClient = emrServerlessClient; } @Override diff --git a/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtil.java b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtil.java new file mode 100644 index 0000000000..da9d166fcf --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtil.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.statestore; + +import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; + +import java.util.Locale; +import lombok.experimental.UtilityClass; + +@UtilityClass +public class OpenSearchStateStoreUtil { + + public static String getIndexName(String datasourceName) { + return String.format( + "%s_%s", SPARK_REQUEST_BUFFER_INDEX_NAME, datasourceName.toLowerCase(Locale.ROOT)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java new file mode 100644 index 0000000000..ce89a7f2af --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/FlintIndexStateModelService.java @@ -0,0 +1,22 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import java.util.Optional; + +public interface FlintIndexStateModelService { + FlintIndexStateModel createFlintIndexStateModel( + FlintIndexStateModel flintIndexStateModel, String datasourceName); + + Optional getFlintIndexStateModel(String id, String datasourceName); + + FlintIndexStateModel updateFlintIndexState( + FlintIndexStateModel flintIndexStateModel, + FlintIndexState flintIndexState, + String datasourceName); + + boolean deleteFlintIndexStateModel(String id, String datasourceName); +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java new file mode 100644 index 0000000000..2db3930821 --- /dev/null +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelService.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil; +import org.opensearch.sql.spark.execution.statestore.StateStore; + +@RequiredArgsConstructor +public class OpenSearchFlintIndexStateModelService implements FlintIndexStateModelService { + private final StateStore stateStore; + + @Override + public FlintIndexStateModel updateFlintIndexState( + FlintIndexStateModel flintIndexStateModel, + FlintIndexState flintIndexState, + String datasourceName) { + return stateStore.updateState( + flintIndexStateModel, + flintIndexState, + FlintIndexStateModel::copyWithState, + OpenSearchStateStoreUtil.getIndexName(datasourceName)); + } + + @Override + public Optional getFlintIndexStateModel(String id, String datasourceName) { + return stateStore.get( + id, + FlintIndexStateModel::fromXContent, + OpenSearchStateStoreUtil.getIndexName(datasourceName)); + } + + @Override + public FlintIndexStateModel createFlintIndexStateModel( + FlintIndexStateModel flintIndexStateModel, String datasourceName) { + return stateStore.create( + flintIndexStateModel, + FlintIndexStateModel::copy, + OpenSearchStateStoreUtil.getIndexName(datasourceName)); + } + + @Override + public boolean deleteFlintIndexStateModel(String id, String datasourceName) { + return stateStore.delete(id, OpenSearchStateStoreUtil.getIndexName(datasourceName)); + } +} diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java index edfd0aace2..0b1ccc988e 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOp.java @@ -6,9 +6,6 @@ package org.opensearch.sql.spark.flint.operation; import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE; -import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState; -import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState; -import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState; import com.amazonaws.services.emrserverless.model.ValidationException; import java.util.Locale; @@ -22,17 +19,17 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.client.EMRServerlessClient; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; /** Flint Index Operation. */ @RequiredArgsConstructor public abstract class FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); - private final StateStore stateStore; + private final FlintIndexStateModelService flintIndexStateModelService; private final String datasourceName; private final EMRServerlessClientFactory emrServerlessClientFactory; @@ -57,8 +54,10 @@ public void apply(FlintIndexMetadata metadata) { } catch (Throwable e) { LOG.error("Rolling back transient log due to transaction operation failure", e); try { - updateFlintIndexState(stateStore, datasourceName) - .apply(transitionedFlintIndexStateModel, initialFlintIndexStateModel.getIndexState()); + flintIndexStateModelService.updateFlintIndexState( + transitionedFlintIndexStateModel, + initialFlintIndexStateModel.getIndexState(), + datasourceName); } catch (Exception ex) { LOG.error("Failed to rollback transient log", ex); } @@ -70,7 +69,7 @@ public void apply(FlintIndexMetadata metadata) { @NotNull private FlintIndexStateModel getFlintIndexStateModel(String latestId) { Optional flintIndexOptional = - getFlintIndexState(stateStore, datasourceName).apply(latestId); + flintIndexStateModelService.getFlintIndexStateModel(latestId, datasourceName); if (flintIndexOptional.isEmpty()) { String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId); LOG.error(errorMsg); @@ -111,7 +110,8 @@ private FlintIndexStateModel moveToTransitioningState(FlintIndexStateModel flint FlintIndexState transitioningState = transitioningState(); try { flintIndex = - updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, transitioningState()); + flintIndexStateModelService.updateFlintIndexState( + flintIndex, transitioningState(), datasourceName); } catch (Exception e) { String errorMsg = String.format(Locale.ROOT, "Moving to transition state:%s failed.", transitioningState); @@ -127,9 +127,10 @@ private void commit(FlintIndexStateModel flintIndex) { try { if (stableState == FlintIndexState.NONE) { LOG.info("Deleting index state with docId: " + flintIndex.getLatestId()); - deleteFlintIndexState(stateStore, datasourceName).apply(flintIndex.getLatestId()); + flintIndexStateModelService.deleteFlintIndexStateModel( + flintIndex.getLatestId(), datasourceName); } else { - updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState); + flintIndexStateModelService.updateFlintIndexState(flintIndex, stableState, datasourceName); } } catch (Exception e) { String errorMsg = diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java index 31e33539a1..9955320253 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java @@ -10,11 +10,11 @@ import org.apache.logging.log4j.Logger; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; /** * Index Operation for Altering the flint index. Only handles alter operation when @@ -27,11 +27,11 @@ public class FlintIndexOpAlter extends FlintIndexOp { public FlintIndexOpAlter( FlintIndexOptions flintIndexOptions, - StateStore stateStore, + FlintIndexStateModelService flintIndexStateModelService, String datasourceName, EMRServerlessClientFactory emrServerlessClientFactory, FlintIndexMetadataService flintIndexMetadataService) { - super(stateStore, datasourceName, emrServerlessClientFactory); + super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); this.flintIndexMetadataService = flintIndexMetadataService; this.flintIndexOptions = flintIndexOptions; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java index 0962e2a16b..02c8e39c66 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpCancel.java @@ -9,20 +9,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; /** Cancel refreshing job for refresh query when user clicks cancel button on UI. */ public class FlintIndexOpCancel extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); public FlintIndexOpCancel( - StateStore stateStore, + FlintIndexStateModelService flintIndexStateModelService, String datasourceName, EMRServerlessClientFactory emrServerlessClientFactory) { - super(stateStore, datasourceName, emrServerlessClientFactory); + super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); } // Only in refreshing state, the job is cancellable in case of REFRESH query. diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java index 0f71b3bc70..d22cde952d 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java @@ -9,19 +9,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; public class FlintIndexOpDrop extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); public FlintIndexOpDrop( - StateStore stateStore, + FlintIndexStateModelService flintIndexStateModelService, String datasourceName, EMRServerlessClientFactory emrServerlessClientFactory) { - super(stateStore, datasourceName, emrServerlessClientFactory); + super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); } public boolean validate(FlintIndexState state) { diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java index 6fc2261ade..b102e43d59 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java @@ -9,34 +9,37 @@ import org.opensearch.client.Client; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; @RequiredArgsConstructor public class FlintIndexOpFactory { - private final StateStore stateStore; + private final FlintIndexStateModelService flintIndexStateModelService; private final Client client; private final FlintIndexMetadataService flintIndexMetadataService; private final EMRServerlessClientFactory emrServerlessClientFactory; public FlintIndexOpDrop getDrop(String datasource) { - return new FlintIndexOpDrop(stateStore, datasource, emrServerlessClientFactory); + return new FlintIndexOpDrop( + flintIndexStateModelService, datasource, emrServerlessClientFactory); } public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) { return new FlintIndexOpAlter( flintIndexOptions, - stateStore, + flintIndexStateModelService, datasource, emrServerlessClientFactory, flintIndexMetadataService); } public FlintIndexOpVacuum getVacuum(String datasource) { - return new FlintIndexOpVacuum(stateStore, datasource, client, emrServerlessClientFactory); + return new FlintIndexOpVacuum( + flintIndexStateModelService, datasource, client, emrServerlessClientFactory); } public FlintIndexOpCancel getCancel(String datasource) { - return new FlintIndexOpCancel(stateStore, datasource, emrServerlessClientFactory); + return new FlintIndexOpCancel( + flintIndexStateModelService, datasource, emrServerlessClientFactory); } } diff --git a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java index 4287d9c7c9..ffd09e16a4 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ b/spark/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -11,10 +11,10 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.client.Client; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; /** Flint index vacuum operation. */ public class FlintIndexOpVacuum extends FlintIndexOp { @@ -25,11 +25,11 @@ public class FlintIndexOpVacuum extends FlintIndexOp { private final Client client; public FlintIndexOpVacuum( - StateStore stateStore, + FlintIndexStateModelService flintIndexStateModelService, String datasourceName, Client client, EMRServerlessClientFactory emrServerlessClientFactory) { - super(stateStore, datasourceName, emrServerlessClientFactory); + super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); this.client = client; } diff --git a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 1d890ce346..dfc8e4042a 100644 --- a/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/spark/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -30,7 +30,9 @@ import org.opensearch.sql.spark.execution.session.SessionManager; import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.flint.IndexDMLResultStorageService; +import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService; import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; @@ -96,12 +98,17 @@ public QueryHandlerFactory queryhandlerFactory( @Provides public FlintIndexOpFactory flintIndexOpFactory( - StateStore stateStore, + FlintIndexStateModelService flintIndexStateModelService, NodeClient client, FlintIndexMetadataServiceImpl flintIndexMetadataService, EMRServerlessClientFactory emrServerlessClientFactory) { return new FlintIndexOpFactory( - stateStore, client, flintIndexMetadataService, emrServerlessClientFactory); + flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory); + } + + @Provides + public FlintIndexStateModelService flintIndexStateModelService(StateStore stateStore) { + return new OpenSearchFlintIndexStateModelService(stateStore); } @Provides diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index b1c7f68388..84a2128821 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -66,7 +66,9 @@ import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; import org.opensearch.sql.spark.flint.FlintIndexType; +import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService; import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService; import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager; @@ -86,6 +88,7 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { protected StateStore stateStore; protected ClusterSettings clusterSettings; protected FlintIndexMetadataService flintIndexMetadataService; + protected FlintIndexStateModelService flintIndexStateModelService; @Override protected Collection> nodePlugins() { @@ -155,12 +158,13 @@ public void setup() { createIndexWithMappings(dm.getResultIndex(), loadResultIndexMappings()); createIndexWithMappings(otherDm.getResultIndex(), loadResultIndexMappings()); flintIndexMetadataService = new FlintIndexMetadataServiceImpl(client); + flintIndexStateModelService = new OpenSearchFlintIndexStateModelService(stateStore); } protected FlintIndexOpFactory getFlintIndexOpFactory( EMRServerlessClientFactory emrServerlessClientFactory) { return new FlintIndexOpFactory( - stateStore, client, flintIndexMetadataService, emrServerlessClientFactory); + flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory); } @After @@ -222,7 +226,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( new DefaultLeaseManager(pluginSettings, stateStore), new OpenSearchIndexDMLResultStorageService(dataSourceService, stateStore), new FlintIndexOpFactory( - stateStore, + flintIndexStateModelService, client, new FlintIndexMetadataServiceImpl(client), emrServerlessClientFactory), diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java index 10598d110c..6dcc2c17af 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryGetResultSpecTest.java @@ -53,7 +53,8 @@ public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec { @Before public void doSetUp() { - mockIndexState = new MockFlintSparkJob(stateStore, mockIndex.latestId, MYS3_DATASOURCE); + mockIndexState = + new MockFlintSparkJob(flintIndexStateModelService, mockIndex.latestId, MYS3_DATASOURCE); } @Test diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java index ddefebcf77..d49e3883da 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecAlterTest.java @@ -68,7 +68,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -135,7 +136,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -215,7 +217,8 @@ public CancelJobRunResult cancelJobRun( mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -277,7 +280,8 @@ public void testAlterIndexQueryConvertingToAutoRefresh() { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -341,7 +345,8 @@ public void testAlterIndexQueryWithOutAnyAutoRefresh() { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -414,7 +419,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -487,7 +493,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -554,7 +561,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -614,7 +622,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -676,7 +685,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -738,7 +748,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. alter index @@ -797,7 +808,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, true); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. alter index @@ -854,7 +866,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.updating(); // 1. alter index @@ -919,7 +932,8 @@ public CancelJobRunResult cancelJobRun( mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -982,7 +996,8 @@ public CancelJobRunResult cancelJobRun( mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index @@ -1046,7 +1061,8 @@ public CancelJobRunResult cancelJobRun( mockDS.updateIndexOptions(existingOptions, false); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.getLatestId(), MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.getLatestId(), MYS3_DATASOURCE); flintIndexJob.active(); // 1. alter index diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java index 864a87586f..09addccdbb 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java @@ -294,7 +294,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1.drop index @@ -352,7 +353,8 @@ public CancelJobRunResult cancelJobRun( mockDS.createIndex(); // Mock index state in refresh state. MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1.drop index @@ -397,7 +399,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. drop index @@ -441,7 +444,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // 1. drop index @@ -490,7 +494,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.active(); // 1. drop index @@ -536,7 +541,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.creating(); // 1. drop index @@ -582,7 +588,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); // 1. drop index CreateAsyncQueryResponse response = @@ -634,7 +641,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); flintIndexJob.deleting(); // 1. drop index @@ -679,7 +687,7 @@ public CancelJobRunResult cancelJobRun( mockDS.createIndex(); // Mock index state in refresh state. MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYGLUE_DATASOURCE); + new MockFlintSparkJob(flintIndexStateModelService, mockDS.latestId, MYGLUE_DATASOURCE); flintIndexJob.refreshing(); // 1.drop index @@ -752,7 +760,7 @@ public void concurrentRefreshJobLimitNotApplied() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob(flintIndexStateModelService, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto refresh @@ -777,7 +785,7 @@ public void concurrentRefreshJobLimitAppliedToDDLWithAuthRefresh() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob(flintIndexStateModelService, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -805,7 +813,7 @@ public void concurrentRefreshJobLimitAppliedToRefresh() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob(flintIndexStateModelService, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); // query with auto_refresh = true. @@ -832,7 +840,7 @@ public void concurrentRefreshJobLimitNotAppliedToDDL() { COVERING.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, COVERING.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob(flintIndexStateModelService, COVERING.latestId, MYS3_DATASOURCE); flintIndexJob.refreshing(); CreateAsyncQueryResponse asyncQueryResponse = @@ -905,7 +913,8 @@ public GetJobRunResult getJobRunResult( mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = @@ -948,7 +957,8 @@ public GetJobRunResult getJobRunResult( mockDS.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, mockDS.latestId, MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, mockDS.latestId, MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = @@ -990,7 +1000,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockFlintIndex.createIndex(); // Mock index state MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, indexName + "_latest_id", MYS3_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, indexName + "_latest_id", MYS3_DATASOURCE); // 1. Submit REFRESH statement CreateAsyncQueryResponse response = diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java index 76adddf89d..c9660c8d87 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecVacuumTest.java @@ -164,7 +164,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) { mockDS.createIndex(); // Mock index state doc - MockFlintSparkJob flintIndexJob = new MockFlintSparkJob(stateStore, mockDS.latestId, "mys3"); + MockFlintSparkJob flintIndexJob = + new MockFlintSparkJob(flintIndexStateModelService, mockDS.latestId, "mys3"); flintIndexJob.transition(state); // Vacuum index diff --git a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java index 4cfdb6a9a9..4c58ea472f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java +++ b/spark/src/test/java/org/opensearch/sql/spark/asyncquery/model/MockFlintSparkJob.java @@ -11,18 +11,19 @@ import java.util.Optional; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; public class MockFlintSparkJob { private FlintIndexStateModel stateModel; - private StateStore stateStore; + private FlintIndexStateModelService flintIndexStateModelService; private String datasource; - public MockFlintSparkJob(StateStore stateStore, String latestId, String datasource) { + public MockFlintSparkJob( + FlintIndexStateModelService flintIndexStateModelService, String latestId, String datasource) { assertNotNull(latestId); - this.stateStore = stateStore; + this.flintIndexStateModelService = flintIndexStateModelService; this.datasource = datasource; stateModel = new FlintIndexStateModel( @@ -35,54 +36,42 @@ public MockFlintSparkJob(StateStore stateStore, String latestId, String datasour "", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - stateModel = StateStore.createFlintIndexState(stateStore, datasource).apply(stateModel); + stateModel = flintIndexStateModelService.createFlintIndexStateModel(stateModel, datasource); } public void transition(FlintIndexState newState) { stateModel = - StateStore.updateFlintIndexState(stateStore, datasource).apply(stateModel, newState); + flintIndexStateModelService.updateFlintIndexState(stateModel, newState, datasource); } public void refreshing() { - stateModel = - StateStore.updateFlintIndexState(stateStore, datasource) - .apply(stateModel, FlintIndexState.REFRESHING); + transition(FlintIndexState.REFRESHING); } public void active() { - stateModel = - StateStore.updateFlintIndexState(stateStore, datasource) - .apply(stateModel, FlintIndexState.ACTIVE); + transition(FlintIndexState.ACTIVE); } public void creating() { - stateModel = - StateStore.updateFlintIndexState(stateStore, datasource) - .apply(stateModel, FlintIndexState.CREATING); + transition(FlintIndexState.CREATING); } public void updating() { - stateModel = - StateStore.updateFlintIndexState(stateStore, datasource) - .apply(stateModel, FlintIndexState.UPDATING); + transition(FlintIndexState.UPDATING); } public void deleting() { - stateModel = - StateStore.updateFlintIndexState(stateStore, datasource) - .apply(stateModel, FlintIndexState.DELETING); + transition(FlintIndexState.DELETING); } public void deleted() { - stateModel = - StateStore.updateFlintIndexState(stateStore, datasource) - .apply(stateModel, FlintIndexState.DELETED); + transition(FlintIndexState.DELETED); } public void assertState(FlintIndexState expected) { Optional stateModelOpt = - StateStore.getFlintIndexState(stateStore, datasource).apply(stateModel.getId()); - assertTrue((stateModelOpt.isPresent())); + flintIndexStateModelService.getFlintIndexStateModel(stateModel.getId(), datasource); + assertTrue(stateModelOpt.isPresent()); assertEquals(expected, stateModelOpt.get().getIndexState()); } } diff --git a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java index 6bcf9c6308..aa4684811f 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/cluster/FlintStreamingJobHouseKeeperTaskTest.java @@ -39,7 +39,8 @@ public void testStreamingJobHouseKeeperWhenDataSourceDisabled() { INDEX -> { INDEX.createIndex(); MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, INDEX.getLatestId(), MYGLUE_DATASOURCE); indexJobMapping.put(INDEX, flintIndexJob); HashMap existingOptions = new HashMap<>(); existingOptions.put("auto_refresh", "true"); @@ -117,7 +118,8 @@ public void testStreamingJobHouseKeeperWhenCancelJobGivesTimeout() { INDEX -> { INDEX.createIndex(); MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, INDEX.getLatestId(), MYGLUE_DATASOURCE); indexJobMapping.put(INDEX, flintIndexJob); HashMap existingOptions = new HashMap<>(); existingOptions.put("auto_refresh", "true"); @@ -164,7 +166,8 @@ public void testSimulateConcurrentJobHouseKeeperExecution() { INDEX -> { INDEX.createIndex(); MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, INDEX.getLatestId(), MYGLUE_DATASOURCE); indexJobMapping.put(INDEX, flintIndexJob); HashMap existingOptions = new HashMap<>(); existingOptions.put("auto_refresh", "true"); @@ -213,7 +216,8 @@ public void testStreamingJobClearnerWhenDataSourceIsDeleted() { INDEX -> { INDEX.createIndex(); MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, INDEX.getLatestId(), MYGLUE_DATASOURCE); indexJobMapping.put(INDEX, flintIndexJob); HashMap existingOptions = new HashMap<>(); existingOptions.put("auto_refresh", "true"); @@ -260,7 +264,8 @@ public void testStreamingJobHouseKeeperWhenDataSourceIsNeitherDisabledNorDeleted INDEX -> { INDEX.createIndex(); MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, INDEX.getLatestId(), MYGLUE_DATASOURCE); indexJobMapping.put(INDEX, flintIndexJob); HashMap existingOptions = new HashMap<>(); existingOptions.put("auto_refresh", "true"); @@ -409,7 +414,8 @@ public void testStreamingJobHouseKeeperMultipleTimesWhenDataSourceDisabled() { INDEX -> { INDEX.createIndex(); MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, INDEX.getLatestId(), MYGLUE_DATASOURCE); indexJobMapping.put(INDEX, flintIndexJob); HashMap existingOptions = new HashMap<>(); existingOptions.put("auto_refresh", "true"); @@ -480,7 +486,8 @@ public void testRunStreamingJobHouseKeeperWhenDataSourceIsDeleted() { INDEX -> { INDEX.createIndex(); MockFlintSparkJob flintIndexJob = - new MockFlintSparkJob(stateStore, INDEX.getLatestId(), MYGLUE_DATASOURCE); + new MockFlintSparkJob( + flintIndexStateModelService, INDEX.getLatestId(), MYGLUE_DATASOURCE); indexJobMapping.put(INDEX, flintIndexJob); HashMap existingOptions = new HashMap<>(); existingOptions.put("auto_refresh", "true"); diff --git a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index b71548cdac..19be7fd9fb 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -300,6 +300,7 @@ void testDispatchSelectQueryWithNoAuthIndexStoreDatasource() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); @@ -704,6 +705,7 @@ void testRefreshIndexQuery() { LangType.SQL, EMRS_EXECUTION_ROLE, TEST_CLUSTER_NAME)); + verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture()); Assertions.assertEquals(expected, startJobRequestArgumentCaptor.getValue()); Assertions.assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId()); diff --git a/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtilTest.java b/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtilTest.java new file mode 100644 index 0000000000..318080ff2d --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/execution/statestore/OpenSearchStateStoreUtilTest.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.execution.statestore; + +import static org.junit.Assert.assertEquals; + +import org.junit.jupiter.api.Test; + +public class OpenSearchStateStoreUtilTest { + + @Test + void getIndexName() { + String result = OpenSearchStateStoreUtil.getIndexName("DATASOURCE"); + + assertEquals(".query_execution_request_datasource", result); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java new file mode 100644 index 0000000000..aebc136b93 --- /dev/null +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/OpenSearchFlintIndexStateModelServiceTest.java @@ -0,0 +1,77 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.flint; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.util.Optional; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.spark.execution.statestore.StateStore; + +@ExtendWith(MockitoExtension.class) +public class OpenSearchFlintIndexStateModelServiceTest { + + public static final String DATASOURCE = "DATASOURCE"; + public static final String ID = "ID"; + + @Mock StateStore mockStateStore; + @Mock FlintIndexStateModel flintIndexStateModel; + @Mock FlintIndexState flintIndexState; + @Mock FlintIndexStateModel responseFlintIndexStateModel; + + @InjectMocks OpenSearchFlintIndexStateModelService openSearchFlintIndexStateModelService; + + @Test + void updateFlintIndexState() { + when(mockStateStore.updateState(any(), any(), any(), any())) + .thenReturn(responseFlintIndexStateModel); + + FlintIndexStateModel result = + openSearchFlintIndexStateModelService.updateFlintIndexState( + flintIndexStateModel, flintIndexState, DATASOURCE); + + assertEquals(responseFlintIndexStateModel, result); + } + + @Test + void getFlintIndexStateModel() { + when(mockStateStore.get(any(), any(), any())) + .thenReturn(Optional.of(responseFlintIndexStateModel)); + + Optional result = + openSearchFlintIndexStateModelService.getFlintIndexStateModel("ID", DATASOURCE); + + assertEquals(responseFlintIndexStateModel, result.get()); + } + + @Test + void createFlintIndexStateModel() { + when(mockStateStore.create(any(), any(), any())).thenReturn(responseFlintIndexStateModel); + + FlintIndexStateModel result = + openSearchFlintIndexStateModelService.createFlintIndexStateModel( + flintIndexStateModel, DATASOURCE); + + assertEquals(responseFlintIndexStateModel, result); + } + + @Test + void deleteFlintIndexStateModel() { + when(mockStateStore.delete(any(), any())).thenReturn(true); + + boolean result = + openSearchFlintIndexStateModelService.deleteFlintIndexStateModel(ID, DATASOURCE); + + assertTrue(result); + } +} diff --git a/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java b/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java index b3dc65a5fe..6c2a3a81a4 100644 --- a/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java +++ b/spark/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpTest.java @@ -1,10 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.sql.spark.flint.operation; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.sql.spark.execution.statestore.StateStore.DATASOURCE_TO_REQUEST_INDEX; import java.util.Optional; import org.junit.jupiter.api.Assertions; @@ -14,15 +18,15 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; -import org.opensearch.sql.spark.execution.statestore.StateStore; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; +import org.opensearch.sql.spark.flint.FlintIndexStateModelService; @ExtendWith(MockitoExtension.class) public class FlintIndexOpTest { - @Mock private StateStore mockStateStore; + @Mock private FlintIndexStateModelService flintIndexStateModelService; @Mock private EMRServerlessClientFactory mockEmrServerlessClientFactory; @Test @@ -40,12 +44,12 @@ public void testApplyWithTransitioningStateFailure() { "", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - when(mockStateStore.get(eq("latestId"), any(), eq(DATASOURCE_TO_REQUEST_INDEX.apply("myS3")))) + when(flintIndexStateModelService.getFlintIndexStateModel(eq("latestId"), any())) .thenReturn(Optional.of(fakeModel)); - when(mockStateStore.updateState(any(), any(), any(), any())) + when(flintIndexStateModelService.updateFlintIndexState(any(), any(), any())) .thenThrow(new RuntimeException("Transitioning state failed")); FlintIndexOp flintIndexOp = - new TestFlintIndexOp(mockStateStore, "myS3", mockEmrServerlessClientFactory); + new TestFlintIndexOp(flintIndexStateModelService, "myS3", mockEmrServerlessClientFactory); IllegalStateException illegalStateException = Assertions.assertThrows(IllegalStateException.class, () -> flintIndexOp.apply(metadata)); Assertions.assertEquals( @@ -67,14 +71,14 @@ public void testApplyWithCommitFailure() { "", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - when(mockStateStore.get(eq("latestId"), any(), eq(DATASOURCE_TO_REQUEST_INDEX.apply("myS3")))) + when(flintIndexStateModelService.getFlintIndexStateModel(eq("latestId"), any())) .thenReturn(Optional.of(fakeModel)); - when(mockStateStore.updateState(any(), any(), any(), any())) + when(flintIndexStateModelService.updateFlintIndexState(any(), any(), any())) .thenReturn(FlintIndexStateModel.copy(fakeModel, 1, 2)) .thenThrow(new RuntimeException("Commit state failed")) .thenReturn(FlintIndexStateModel.copy(fakeModel, 1, 3)); FlintIndexOp flintIndexOp = - new TestFlintIndexOp(mockStateStore, "myS3", mockEmrServerlessClientFactory); + new TestFlintIndexOp(flintIndexStateModelService, "myS3", mockEmrServerlessClientFactory); IllegalStateException illegalStateException = Assertions.assertThrows(IllegalStateException.class, () -> flintIndexOp.apply(metadata)); Assertions.assertEquals( @@ -96,14 +100,14 @@ public void testApplyWithRollBackFailure() { "", SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM); - when(mockStateStore.get(eq("latestId"), any(), eq(DATASOURCE_TO_REQUEST_INDEX.apply("myS3")))) + when(flintIndexStateModelService.getFlintIndexStateModel(eq("latestId"), any())) .thenReturn(Optional.of(fakeModel)); - when(mockStateStore.updateState(any(), any(), any(), any())) + when(flintIndexStateModelService.updateFlintIndexState(any(), any(), any())) .thenReturn(FlintIndexStateModel.copy(fakeModel, 1, 2)) .thenThrow(new RuntimeException("Commit state failed")) .thenThrow(new RuntimeException("Rollback failure")); FlintIndexOp flintIndexOp = - new TestFlintIndexOp(mockStateStore, "myS3", mockEmrServerlessClientFactory); + new TestFlintIndexOp(flintIndexStateModelService, "myS3", mockEmrServerlessClientFactory); IllegalStateException illegalStateException = Assertions.assertThrows(IllegalStateException.class, () -> flintIndexOp.apply(metadata)); Assertions.assertEquals( @@ -113,10 +117,10 @@ public void testApplyWithRollBackFailure() { static class TestFlintIndexOp extends FlintIndexOp { public TestFlintIndexOp( - StateStore stateStore, + FlintIndexStateModelService flintIndexStateModelService, String datasourceName, EMRServerlessClientFactory emrServerlessClientFactory) { - super(stateStore, datasourceName, emrServerlessClientFactory); + super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); } @Override