Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop Streaming Jobs When datasource is disabled/deleted. #2559

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public enum Key {
"plugins.query.executionengine.spark.session_inactivity_timeout_millis"),

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled");
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"),
STREAMING_JOB_HOUSEKEEPER_INTERVAL(
"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {
public void deleteDataSourceMetadata(String datasourceName) {
DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME);
deleteRequest.id(datasourceName);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture;
try (ThreadContext.StoredContext storedContext =
client.threadPool().getThreadContext().stashContext()) {
Expand Down
35 changes: 35 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,38 @@ Request::
}
}

plugins.query.executionengine.spark.streamingjobs.housekeeper.interval
===============================

Description
-----------
This setting specifies the interval at which the streaming job housekeeper runs to clean up streaming jobs associated with deleted and disabled data sources.
The default configuration executes this cleanup every 15 minutes.

* Default Value: 15 minutes

To modify the TTL to 30 minutes for example, use this command:

Request ::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval":"30m"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"streamingjobs": {
"housekeeper": {
"interval": "30m"
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum MetricName {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count");
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"),
STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT("streaming_job_housekeeper_task_failure_count");

private String name;

Expand Down Expand Up @@ -91,6 +92,7 @@ public static List<String> getNames() {
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.add(STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.common.settings.Settings.EMPTY;
import static org.opensearch.common.unit.TimeValue.timeValueDays;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -193,6 +194,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING =
Setting.positiveTimeSetting(
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL.getKeyValue(),
timeValueMinutes(15),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -313,6 +321,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS,
SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING,
new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)));
register(
settingBuilder,
clusterSettings,
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL,
STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
new Updater((Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL)));
defaultSettings = settingBuilder.build();
}

Expand Down Expand Up @@ -384,6 +398,7 @@ public static List<Setting<?>> pluginSettings() {
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.add(SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING)
.add(STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -220,8 +223,13 @@ public Collection<Object> createComponents(
Clock.systemUTC(),
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataServiceImpl.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class ClusterManagerEventListener implements LocalNodeClusterManagerListener {

private Cancellable flintIndexRetentionCron;
private Cancellable flintStreamingJobHouseKeeperCron;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private TimeValue streamingJobHouseKeepingInterval;
private boolean isAutoIndexManagementEnabled;

public ClusterManagerEventListener(
Expand All @@ -41,16 +51,25 @@
Clock clock,
Setting<TimeValue> sessionTtl,
Setting<TimeValue> resultTtl,
Setting<TimeValue> streamingJobHouseKeepingInterval,
Setting<Boolean> isAutoIndexManagementEnabledSetting,
Settings settings) {
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {

Check warning on line 60 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L60

Added line #L60 was not covered by tests
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;

this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;

Check warning on line 69 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L66-L69

Added lines #L66 - L69 were not covered by tests
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));
this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings);

Check warning on line 72 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L72

Added line #L72 was not covered by tests

clusterService
.getClusterSettings()
Expand Down Expand Up @@ -87,6 +106,16 @@
}
}
});

clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(

Check warning on line 112 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L110-L112

Added lines #L110 - L112 were not covered by tests
streamingJobHouseKeepingInterval,
it -> {
this.streamingJobHouseKeepingInterval = it;
cancel(flintStreamingJobHouseKeeperCron);
initializeStreamingJobHouseKeeperCron();
});

Check warning on line 118 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L115-L118

Added lines #L115 - L118 were not covered by tests
}

@Override
Expand All @@ -104,6 +133,19 @@
}
});
}
initializeStreamingJobHouseKeeperCron();
}

Check warning on line 137 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L136-L137

Added lines #L136 - L137 were not covered by tests

private void initializeStreamingJobHouseKeeperCron() {
flintStreamingJobHouseKeeperCron =
threadPool.scheduleWithFixedDelay(

Check warning on line 141 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L140-L141

Added lines #L140 - L141 were not covered by tests
new FlintStreamingJobHouseKeeperTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
streamingJobHouseKeepingInterval,
executorName());

Check warning on line 148 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L148

Added line #L148 was not covered by tests
}

private void reInitializeFlintIndexRetention() {
Expand All @@ -125,6 +167,8 @@
public void offClusterManager() {
cancel(flintIndexRetentionCron);
flintIndexRetentionCron = null;
cancel(flintStreamingJobHouseKeeperCron);
flintStreamingJobHouseKeeperCron = null;

Check warning on line 171 in spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java

View check run for this annotation

Codecov / codecov/patch

spark/src/main/java/org/opensearch/sql/spark/cluster/ClusterManagerEventListener.java#L170-L171

Added lines #L170 - L171 were not covered by tests
}

private void cancel(Cancellable cron) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceStatus;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobHouseKeeperTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class);
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);

@Override
public void run() {
if (!isRunning.compareAndSet(false, true)) {
LOGGER.info("Previous task is still running. Skipping this execution.");
return;
}
try {
LOGGER.info("Starting housekeeping task for auto refresh streaming jobs.");
Map<String, FlintIndexMetadata> autoRefreshFlintIndicesMap = getAllAutoRefreshIndices();
autoRefreshFlintIndicesMap.forEach(
(autoRefreshIndex, flintIndexMetadata) -> {
try {
String datasourceName = getDataSourceName(flintIndexMetadata);
try {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getDataSourceMetadata(datasourceName);
if (dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) {
LOGGER.info("Datasource is disabled for autoRefreshIndex: {}", autoRefreshIndex);
alterAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
} else {
LOGGER.debug("Datasource is enabled for autoRefreshIndex : {}", autoRefreshIndex);
}
} catch (DataSourceNotFoundException exception) {
LOGGER.info("Datasource is deleted for autoRefreshIndex: {}", autoRefreshIndex);
try {
dropAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
} catch (IllegalStateException illegalStateException) {
LOGGER.debug(
"AutoRefresh index: {} is not in valid state for deletion.",
autoRefreshIndex);
}
}
} catch (Exception exception) {
LOGGER.error(
"Failed to alter/cancel index {}: {}",
autoRefreshIndex,
exception.getMessage(),
exception);
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.increment();
}
});
LOGGER.info("Finished housekeeping task for auto refresh streaming jobs.");
} catch (Throwable error) {
LOGGER.error("Error while running the streaming job cleaner task: {}", error.getMessage());
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.increment();
} finally {
isRunning.set(false);
}
}

private void dropAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
// When the datasource is deleted. Possibly Replace with VACUUM Operation.
LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex);
FlintIndexOpDrop flintIndexOpDrop =
new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient());
flintIndexOpDrop.apply(flintIndexMetadata);
LOGGER.info("Successfully dropped index: {}", autoRefreshIndex);
}

private void alterAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
LOGGER.info("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
datasourceName,
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
}

private String getDataSourceName(FlintIndexMetadata flintIndexMetadata) {
String kind = flintIndexMetadata.getKind();
switch (kind) {
case "mv":
return flintIndexMetadata.getName().split("\\.")[0];
case "skipping":
case "covering":
return flintIndexMetadata.getSource().split("\\.")[0];
default:
throw new IllegalArgumentException(String.format("Unknown flint index kind: %s", kind));
}
}

private Map<String, FlintIndexMetadata> getAllAutoRefreshIndices() {
Map<String, FlintIndexMetadata> flintIndexMetadataHashMap =
flintIndexMetadataService.getFlintIndexMetadata("flint_*");
return flintIndexMetadataHashMap.entrySet().stream()
.filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class FlintIndexMetadata {
private final String jobId;
private final String appId;
private final String latestId;
private final String kind;
private final String source;
private final String name;
private final FlintIndexOptions flintIndexOptions;

public Optional<String> getLatestId() {
Expand Down
Loading
Loading