Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce FlintIndexStateModelService #2658

Merged
merged 2 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

@RequiredArgsConstructor
public class BatchQueryHandler extends AsyncQueryHandler {
private final EMRServerlessClient emrServerlessClient;
private final JobExecutionResponseReader jobExecutionResponseReader;
protected final EMRServerlessClient emrServerlessClient;
protected final JobExecutionResponseReader jobExecutionResponseReader;
protected final LeaseManager leaseManager;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@

/** Handle Streaming Query. */
public class StreamingQueryHandler extends BatchQueryHandler {
private final EMRServerlessClient emrServerlessClient;

public StreamingQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
LeaseManager leaseManager) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.emrServerlessClient = emrServerlessClient;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.execution.statestore;

import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME;

import java.util.Locale;
import lombok.experimental.UtilityClass;

@UtilityClass
public class OpenSearchStateStoreUtil {

public static String getIndexName(String datasourceName) {
return String.format(
"%s_%s", SPARK_REQUEST_BUFFER_INDEX_NAME, datasourceName.toLowerCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Optional;

/**
* Abstraction over flint index state storage. Flint index state will maintain the status of each
* flint index.
*/
public interface FlintIndexStateModelService {
FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName);

Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String datasourceName);

FlintIndexStateModel updateFlintIndexState(
FlintIndexStateModel flintIndexStateModel,
FlintIndexState flintIndexState,
String datasourceName);

boolean deleteFlintIndexStateModel(String id, String datasourceName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.spark.execution.statestore.OpenSearchStateStoreUtil;
import org.opensearch.sql.spark.execution.statestore.StateStore;

@RequiredArgsConstructor
public class OpenSearchFlintIndexStateModelService implements FlintIndexStateModelService {
private final StateStore stateStore;

@Override
public FlintIndexStateModel updateFlintIndexState(
FlintIndexStateModel flintIndexStateModel,
FlintIndexState flintIndexState,
String datasourceName) {
return stateStore.updateState(
flintIndexStateModel,
flintIndexState,
FlintIndexStateModel::copyWithState,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

@Override
public Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String datasourceName) {
return stateStore.get(
id,
FlintIndexStateModel::fromXContent,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

@Override
public FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, String datasourceName) {
return stateStore.create(
flintIndexStateModel,
FlintIndexStateModel::copy,
OpenSearchStateStoreUtil.getIndexName(datasourceName));
}

@Override
public boolean deleteFlintIndexStateModel(String id, String datasourceName) {
return stateStore.delete(id, OpenSearchStateStoreUtil.getIndexName(datasourceName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
Expand All @@ -22,17 +19,17 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/** Flint Index Operation. */
@RequiredArgsConstructor
public abstract class FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

private final StateStore stateStore;
private final FlintIndexStateModelService flintIndexStateModelService;
private final String datasourceName;
private final EMRServerlessClientFactory emrServerlessClientFactory;

Expand All @@ -57,8 +54,10 @@ public void apply(FlintIndexMetadata metadata) {
} catch (Throwable e) {
LOG.error("Rolling back transient log due to transaction operation failure", e);
try {
updateFlintIndexState(stateStore, datasourceName)
.apply(transitionedFlintIndexStateModel, initialFlintIndexStateModel.getIndexState());
flintIndexStateModelService.updateFlintIndexState(
transitionedFlintIndexStateModel,
initialFlintIndexStateModel.getIndexState(),
datasourceName);
} catch (Exception ex) {
LOG.error("Failed to rollback transient log", ex);
}
Expand All @@ -70,7 +69,7 @@ public void apply(FlintIndexMetadata metadata) {
@NotNull
private FlintIndexStateModel getFlintIndexStateModel(String latestId) {
Optional<FlintIndexStateModel> flintIndexOptional =
getFlintIndexState(stateStore, datasourceName).apply(latestId);
flintIndexStateModelService.getFlintIndexStateModel(latestId, datasourceName);
if (flintIndexOptional.isEmpty()) {
String errorMsg = String.format(Locale.ROOT, "no state found. docId: %s", latestId);
LOG.error(errorMsg);
Expand Down Expand Up @@ -111,7 +110,8 @@ private FlintIndexStateModel moveToTransitioningState(FlintIndexStateModel flint
FlintIndexState transitioningState = transitioningState();
try {
flintIndex =
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, transitioningState());
flintIndexStateModelService.updateFlintIndexState(
flintIndex, transitioningState(), datasourceName);
} catch (Exception e) {
String errorMsg =
String.format(Locale.ROOT, "Moving to transition state:%s failed.", transitioningState);
Expand All @@ -127,9 +127,10 @@ private void commit(FlintIndexStateModel flintIndex) {
try {
if (stableState == FlintIndexState.NONE) {
LOG.info("Deleting index state with docId: " + flintIndex.getLatestId());
deleteFlintIndexState(stateStore, datasourceName).apply(flintIndex.getLatestId());
flintIndexStateModelService.deleteFlintIndexStateModel(
flintIndex.getLatestId(), datasourceName);
} else {
updateFlintIndexState(stateStore, datasourceName).apply(flintIndex, stableState);
flintIndexStateModelService.updateFlintIndexState(flintIndex, stableState, datasourceName);
}
} catch (Exception e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand All @@ -27,11 +27,11 @@ public class FlintIndexOpAlter extends FlintIndexOp {

public FlintIndexOpAlter(
FlintIndexOptions flintIndexOptions,
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory,
FlintIndexMetadataService flintIndexMetadataService) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.flintIndexMetadataService = flintIndexMetadataService;
this.flintIndexOptions = flintIndexOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/** Cancel refreshing job for refresh query when user clicks cancel button on UI. */
public class FlintIndexOpCancel extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

public FlintIndexOpCancel(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
}

// Only in refreshing state, the job is cancellable in case of REFRESH query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/** Operation to drop Flint index */
public class FlintIndexOpDrop extends FlintIndexOp {
private static final Logger LOG = LogManager.getLogger();

public FlintIndexOpDrop(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
}

public boolean validate(FlintIndexState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,37 @@
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

@RequiredArgsConstructor
public class FlintIndexOpFactory {
private final StateStore stateStore;
private final FlintIndexStateModelService flintIndexStateModelService;
private final Client client;
private final FlintIndexMetadataService flintIndexMetadataService;
private final EMRServerlessClientFactory emrServerlessClientFactory;

public FlintIndexOpDrop getDrop(String datasource) {
return new FlintIndexOpDrop(stateStore, datasource, emrServerlessClientFactory);
return new FlintIndexOpDrop(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
}

public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) {
return new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
flintIndexStateModelService,
datasource,
emrServerlessClientFactory,
flintIndexMetadataService);
}

public FlintIndexOpVacuum getVacuum(String datasource) {
return new FlintIndexOpVacuum(stateStore, datasource, client, emrServerlessClientFactory);
return new FlintIndexOpVacuum(
flintIndexStateModelService, datasource, client, emrServerlessClientFactory);
}

public FlintIndexOpCancel getCancel(String datasource) {
return new FlintIndexOpCancel(stateStore, datasource, emrServerlessClientFactory);
return new FlintIndexOpCancel(
flintIndexStateModelService, datasource, emrServerlessClientFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexState;
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;

/** Flint index vacuum operation. */
public class FlintIndexOpVacuum extends FlintIndexOp {
Expand All @@ -25,11 +25,11 @@ public class FlintIndexOpVacuum extends FlintIndexOp {
private final Client client;

public FlintIndexOpVacuum(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
String datasourceName,
Client client,
EMRServerlessClientFactory emrServerlessClientFactory) {
super(stateStore, datasourceName, emrServerlessClientFactory);
super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory);
this.client = client;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.OpenSearchFlintIndexStateModelService;
import org.opensearch.sql.spark.flint.OpenSearchIndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.DefaultLeaseManager;
Expand Down Expand Up @@ -96,12 +98,17 @@ public QueryHandlerFactory queryhandlerFactory(

@Provides
public FlintIndexOpFactory flintIndexOpFactory(
StateStore stateStore,
FlintIndexStateModelService flintIndexStateModelService,
NodeClient client,
FlintIndexMetadataServiceImpl flintIndexMetadataService,
EMRServerlessClientFactory emrServerlessClientFactory) {
return new FlintIndexOpFactory(
stateStore, client, flintIndexMetadataService, emrServerlessClientFactory);
flintIndexStateModelService, client, flintIndexMetadataService, emrServerlessClientFactory);
}

@Provides
public FlintIndexStateModelService flintIndexStateModelService(StateStore stateStore) {
return new OpenSearchFlintIndexStateModelService(stateStore);
}

@Provides
Expand Down
Loading
Loading