Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Refactor IndexDMLHandler and related classes (#2644) #2675

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,9 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
Expand Down Expand Up @@ -227,8 +226,7 @@ public Collection<Object> createComponents(
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataServiceImpl.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
injector.getInstance(FlintIndexOpFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -37,8 +36,7 @@ public class ClusterManagerEventListener implements LocalNodeClusterManagerListe
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private FlintIndexOpFactory flintIndexOpFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private TimeValue streamingJobHouseKeepingInterval;
Expand All @@ -56,17 +54,15 @@ public ClusterManagerEventListener(
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
FlintIndexOpFactory flintIndexOpFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;
this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.flintIndexOpFactory = flintIndexOpFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));
this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings);
Expand Down Expand Up @@ -151,10 +147,7 @@ private void initializeStreamingJobHouseKeeperCron() {
flintStreamingJobHouseKeeperCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobHouseKeeperTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
dataSourceService, flintIndexMetadataService, flintIndexOpFactory),
streamingJobHouseKeepingInterval,
executorName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobHouseKeeperTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;
private final FlintIndexOpFactory flintIndexOpFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class);
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);
Expand Down Expand Up @@ -95,9 +91,7 @@ private void dropAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
// When the datasource is deleted. Possibly Replace with VACUUM Operation.
LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex);
FlintIndexOpDrop flintIndexOpDrop =
new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient());
flintIndexOpDrop.apply(flintIndexMetadata);
flintIndexOpFactory.getDrop(datasourceName).apply(flintIndexMetadata);
LOGGER.info("Successfully dropped index: {}", autoRefreshIndex);
}

Expand All @@ -106,14 +100,7 @@ private void alterAutoRefreshIndex(
LOGGER.info("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
datasourceName,
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
flintIndexOpFactory.getAlter(flintIndexOptions, datasourceName).apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.opensearch.sql.spark.data.constants.SparkConstants.ERROR_FIELD;
import static org.opensearch.sql.spark.data.constants.SparkConstants.STATUS_FIELD;
import static org.opensearch.sql.spark.execution.statestore.StateStore.createIndexDMLResult;

import com.amazonaws.services.emrserverless.model.JobRunState;
import java.util.Map;
Expand All @@ -16,24 +15,20 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONObject;
import org.opensearch.client.Client;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryId;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;
import org.opensearch.sql.spark.dispatcher.model.IndexQueryDetails;
import org.opensearch.sql.spark.execution.statement.StatementState;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpVacuum;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Index DML query. includes * DROP * ALT? */
Expand All @@ -45,15 +40,10 @@ public class IndexDMLHandler extends AsyncQueryHandler {
public static final String DROP_INDEX_JOB_ID = "dropIndexJobId";
public static final String DML_QUERY_JOB_ID = "DMLQueryJobId";

private final EMRServerlessClient emrServerlessClient;

private final JobExecutionResponseReader jobExecutionResponseReader;

private final FlintIndexMetadataService flintIndexMetadataService;

private final StateStore stateStore;

private final Client client;
private final IndexDMLResultStorageService indexDMLResultStorageService;
private final FlintIndexOpFactory flintIndexOpFactory;

public static boolean isIndexDMLQuery(String jobId) {
return DROP_INDEX_JOB_ID.equalsIgnoreCase(jobId) || DML_QUERY_JOB_ID.equalsIgnoreCase(jobId);
Expand All @@ -67,14 +57,16 @@ public DispatchQueryResponse submit(
try {
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails);
executeIndexOp(dispatchQueryRequest, indexDetails, indexMetadata);

getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata);

AsyncQueryId asyncQueryId =
storeIndexDMLResult(
dispatchQueryRequest,
dataSourceMetadata,
JobRunState.SUCCESS.toString(),
StringUtils.EMPTY,
startTime);
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
} catch (Exception e) {
Expand All @@ -85,7 +77,7 @@ public DispatchQueryResponse submit(
dataSourceMetadata,
JobRunState.FAILED.toString(),
e.getMessage(),
startTime);
getElapsedTimeSince(startTime));
return new DispatchQueryResponse(
asyncQueryId, DML_QUERY_JOB_ID, dataSourceMetadata.getResultIndex(), null);
}
Expand All @@ -96,46 +88,34 @@ private AsyncQueryId storeIndexDMLResult(
DataSourceMetadata dataSourceMetadata,
String status,
String error,
long startTime) {
long queryRunTime) {
AsyncQueryId asyncQueryId = AsyncQueryId.newAsyncQueryId(dataSourceMetadata.getName());
IndexDMLResult indexDMLResult =
new IndexDMLResult(
asyncQueryId.getId(),
status,
error,
dispatchQueryRequest.getDatasource(),
System.currentTimeMillis() - startTime,
queryRunTime,
System.currentTimeMillis());
createIndexDMLResult(stateStore, dataSourceMetadata.getResultIndex()).apply(indexDMLResult);
indexDMLResultStorageService.createIndexDMLResult(indexDMLResult, dataSourceMetadata.getName());
return asyncQueryId;
}

private void executeIndexOp(
DispatchQueryRequest dispatchQueryRequest,
IndexQueryDetails indexQueryDetails,
FlintIndexMetadata indexMetadata) {
private long getElapsedTimeSince(long startTime) {
return System.currentTimeMillis() - startTime;
}

private FlintIndexOp getIndexOp(
DispatchQueryRequest dispatchQueryRequest, IndexQueryDetails indexQueryDetails) {
switch (indexQueryDetails.getIndexQueryActionType()) {
case DROP:
FlintIndexOp dropOp =
new FlintIndexOpDrop(
stateStore, dispatchQueryRequest.getDatasource(), emrServerlessClient);
dropOp.apply(indexMetadata);
break;
return flintIndexOpFactory.getDrop(dispatchQueryRequest.getDatasource());
case ALTER:
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
indexQueryDetails.getFlintIndexOptions(),
stateStore,
dispatchQueryRequest.getDatasource(),
emrServerlessClient,
flintIndexMetadataService);
flintIndexOpAlter.apply(indexMetadata);
break;
return flintIndexOpFactory.getAlter(
indexQueryDetails.getFlintIndexOptions(), dispatchQueryRequest.getDatasource());
case VACUUM:
FlintIndexOp indexVacuumOp =
new FlintIndexOpVacuum(stateStore, dispatchQueryRequest.getDatasource(), client);
indexVacuumOp.apply(indexMetadata);
break;
return flintIndexOpFactory.getVacuum(dispatchQueryRequest.getDatasource());
default:
throw new IllegalStateException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
package org.opensearch.sql.spark.dispatcher;

import lombok.RequiredArgsConstructor;
import org.opensearch.client.Client;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.session.SessionManager;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.IndexDMLResultStorageService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

Expand All @@ -19,19 +19,19 @@ public class QueryHandlerFactory {

private final JobExecutionResponseReader jobExecutionResponseReader;
private final FlintIndexMetadataService flintIndexMetadataService;
private final Client client;
private final SessionManager sessionManager;
private final LeaseManager leaseManager;
private final StateStore stateStore;
private final IndexDMLResultStorageService indexDMLResultStorageService;
private final FlintIndexOpFactory flintIndexOpFactory;
private final EMRServerlessClientFactory emrServerlessClientFactory;

public RefreshQueryHandler getRefreshQueryHandler() {
return new RefreshQueryHandler(
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
flintIndexMetadataService,
stateStore,
leaseManager);
leaseManager,
flintIndexOpFactory);
}

public StreamingQueryHandler getStreamingQueryHandler() {
Expand All @@ -50,10 +50,9 @@ public InteractiveQueryHandler getInteractiveQueryHandler() {

public IndexDMLHandler getIndexDMLHandler() {
return new IndexDMLHandler(
emrServerlessClientFactory.getClient(),
jobExecutionResponseReader,
flintIndexMetadataService,
stateStore,
client);
indexDMLResultStorageService,
flintIndexOpFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,28 @@
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOp;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpCancel;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory;
import org.opensearch.sql.spark.leasemanager.LeaseManager;
import org.opensearch.sql.spark.response.JobExecutionResponseReader;

/** Handle Refresh Query. */
public class RefreshQueryHandler extends BatchQueryHandler {

private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClient emrServerlessClient;
private final FlintIndexOpFactory flintIndexOpFactory;

public RefreshQueryHandler(
EMRServerlessClient emrServerlessClient,
JobExecutionResponseReader jobExecutionResponseReader,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
LeaseManager leaseManager) {
LeaseManager leaseManager,
FlintIndexOpFactory flintIndexOpFactory) {
super(emrServerlessClient, jobExecutionResponseReader, leaseManager);
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClient = emrServerlessClient;
this.flintIndexOpFactory = flintIndexOpFactory;
}

@Override
Expand All @@ -51,8 +48,7 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp =
new FlintIndexOpCancel(stateStore, datasourceName, emrServerlessClient);
FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName);
jobCancelOp.apply(indexMetadata);
return asyncQueryJobMetadata.getQueryId().getId();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.flint;

import org.opensearch.sql.spark.dispatcher.model.IndexDMLResult;

public interface IndexDMLResultStorageService {
IndexDMLResult createIndexDMLResult(IndexDMLResult result, String datasourceName);
}
Loading
Loading