Skip to content

Commit

Permalink
Introduce FlintIndexStateModelService
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed May 6, 2024
1 parent 45122ec commit adc544a
Show file tree
Hide file tree
Showing 23 changed files with 358 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

@RequiredArgsConstructor
public class BatchQueryHandler extends AsyncQueryHandler {
private final EMRServerlessClient emrServerlessClient;
private final JobExecutionResponseReader jobExecutionResponseReader;
protected final EMRServerlessClient emrServerlessClient;
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@

/** Handle Streaming Query. */
public class StreamingQueryHandler extends BatchQueryHandler {
private final EMRServerlessClient emrServerlessClient;

public StreamingQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.emrServerlessClient = emrServerlessClient;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME;

import java.util.Locale;
import lombok.experimental.UtilityClass;

@UtilityClass
public class OpenSearchStateStoreUtil {

public static String getIndexName(String datasourceName) {
return String.format(
"%s_%s", SPARK_REQUEST_BUFFER_INDEX_NAME, datasourceName.toLowerCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Optional;

/**
* Abstraction over flint index state storage.
* Flint index state will maintain the status of each flint index.
*/
public interface FlintIndexStateModelService {
FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName);

Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String datasourceName);

FlintIndexStateModel updateFlintIndexState(
FlintIndexStateModel flintIndexStateModel,
FlintIndexState flintIndexState,
String datasourceName);

boolean deleteFlintIndexStateModel(String id, String datasourceName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateStore;

@RequiredArgsConstructor
public class OpenSearchFlintIndexStateModelService implements FlintIndexStateModelService {
private final StateStore stateStore;

@Override
public FlintIndexStateModel updateFlintIndexState(
FlintIndexStateModel flintIndexStateModel,
FlintIndexState flintIndexState,
String datasourceName) {
return stateStore.updateState(
flintIndexStateModel,
flintIndexState,
FlintIndexStateModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

@Override
public Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String datasourceName) {
return stateStore.get(
id,
FlintIndexStateModel::fromXContent,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

@Override
public FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName) {
return stateStore.create(
flintIndexStateModel,
FlintIndexStateModel::copy,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

@Override
public boolean deleteFlintIndexStateModel(String id, String datasourceName) {
return stateStore.delete(id, OpenSearchStateStoreUtil.getIndexName(datasourceName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
Expand All @@ -22,17 +19,17 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/** Flint Index Operation. */
@RequiredArgsConstructor
public abstract class FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final FlintIndexStateModelService flintIndexStateModelService;
private final String datasourceName;
private final EMRServerlessClientFactory emrServerlessClientFactory;

Expand All @@ -57,8 +54,10 @@ public void apply(FlintIndexMetadata metadata) {
} catch (Throwable e) {
LOG.error("Rolling back transient log due to transaction operation failure", e);
try {
updateFlintIndexState(stateStore, datasourceName)
.apply(transitionedFlintIndexStateModel, initialFlintIndexStateModel.getIndexState());
flintIndexStateModelService.updateFlintIndexState(
transitionedFlintIndexStateModel,
initialFlintIndexStateModel.getIndexState(),
datasourceName);
} catch (Exception ex) {
LOG.error("Failed to rollback transient log", ex);
}
Expand All @@ -70,7 +69,7 @@ public void apply(FlintIndexMetadata metadata) {
@NotNull
private FlintIndexStateModel getFlintIndexStateModel(String latestId) {
Optional<FlintIndexStateModel> flintIndexOptional =
getFlintIndexState(stateStore, datasourceName).apply(latestId);
flintIndexStateModelService.getFlintIndexStateModel(latestId, datasourceName);
if (flintIndexOptional.isEmpty()) {
String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId);
LOG.error(errorMsg);
Expand Down Expand Up @@ -111,7 +110,8 @@ private FlintIndexStateModel moveToTransitioningState(FlintIndexStateModel flint
FlintIndexState transitioningState = transitioningState();
try {
flintIndex =
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, transitioningState());
flintIndexStateModelService.updateFlintIndexState(
flintIndex, transitioningState(), datasourceName);
} catch (Exception e) {
String errorMsg =
String.format(Locale.ROOT, "Moving to transition state:%s failed.", transitioningState);
Expand All @@ -127,9 +127,10 @@ private void commit(FlintIndexStateModel flintIndex) {
try {
if (stableState == FlintIndexState.NONE) {
LOG.info("Deleting index state with docId: " + flintIndex.getLatestId());
deleteFlintIndexState(stateStore, datasourceName).apply(flintIndex.getLatestId());
flintIndexStateModelService.deleteFlintIndexStateModel(
flintIndex.getLatestId(), datasourceName);
} else {
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState);
flintIndexStateModelService.updateFlintIndexState(flintIndex, stableState, datasourceName);
}
} catch (Exception e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand All @@ -27,11 +27,11 @@ public class FlintIndexOpAlter extends FlintIndexOp {

public FlintIndexOpAlter(
FlintIndexOptions flintIndexOptions,
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory,
FlintIndexMetadataService flintIndexMetadataService) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOptions = flintIndexOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/** Cancel refreshing job for refresh query when user clicks cancel button on UI. */
public class FlintIndexOpCancel extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

public FlintIndexOpCancel(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
}

// Only in refreshing state, the job is cancellable in case of REFRESH query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/**
* Operation to drop Flint index
*/
public class FlintIndexOpDrop extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

public FlintIndexOpDrop(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
}

public boolean validate(FlintIndexState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,37 @@
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@RequiredArgsConstructor
public class FlintIndexOpFactory {
private final StateStore stateStore;
private final FlintIndexStateModelService flintIndexStateModelService;
private final Client client;
private final FlintIndexMetadataService flintIndexMetadataService;
private final EMRServerlessClientFactory emrServerlessClientFactory;

public FlintIndexOpDrop getDrop(String datasource) {
return new FlintIndexOpDrop(stateStore, datasource, emrServerlessClientFactory);
return new FlintIndexOpDrop(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
}

public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) {
return new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
flintIndexStateModelService,
datasource,
emrServerlessClientFactory,
flintIndexMetadataService);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(stateStore, datasource, client, emrServerlessClientFactory);
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, client, emrServerlessClientFactory);
}

public FlintIndexOpCancel getCancel(String datasource) {
return new FlintIndexOpCancel(stateStore, datasource, emrServerlessClientFactory);
return new FlintIndexOpCancel(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {
Expand All @@ -25,11 +25,11 @@ public class FlintIndexOpVacuum extends FlintIndexOp {
private final Client client;

public FlintIndexOpVacuum(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
Client client,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.client = client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService;
import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
Expand Down Expand Up @@ -96,12 +98,17 @@ public QueryHandlerFactory queryhandlerFactory(

@Provides
public FlintIndexOpFactory flintIndexOpFactory(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
NodeClient client,
FlintIndexMetadataServiceImpl flintIndexMetadataService,
EMRServerlessClientFactory emrServerlessClientFactory) {
return new FlintIndexOpFactory(
stateStore, client, flintIndexMetadataService, emrServerlessClientFactory);
flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory);
}

@Provides
public FlintIndexStateModelService flintIndexStateModelService(StateStore stateStore) {
return new OpenSearchFlintIndexStateModelService(stateStore);
}

@Provides
Expand Down
Loading

0 comments on commit adc544a

Please sign in to comment.