Skip to content

Commit

Permalink
FlintStreamingJobCleanerTask Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Mar 19, 2024
1 parent 8374cb6 commit 428a5e3
Show file tree
Hide file tree
Showing 16 changed files with 905 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public enum Key {
"plugins.query.executionengine.spark.session_inactivity_timeout_millis"),

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled");
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"),
STREAMING_JOB_HOUSEKEEPER_INTERVAL(
"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {
public void deleteDataSourceMetadata(String datasourceName) {
DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME);
deleteRequest.id(datasourceName);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture;
try (ThreadContext.StoredContext storedContext =
client.threadPool().getThreadContext().stashContext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum MetricName {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count");
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"),
STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT("streaming_job_housekeeper_task_failure_count");

private String name;

Expand Down Expand Up @@ -91,6 +92,7 @@ public static List<String> getNames() {
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.add(STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.common.settings.Settings.EMPTY;
import static org.opensearch.common.unit.TimeValue.timeValueDays;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -193,6 +194,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING =
Setting.positiveTimeSetting(
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL.getKeyValue(),
timeValueMinutes(14),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -313,6 +321,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS,
SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING,
new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)));
register(
settingBuilder,
clusterSettings,
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL,
STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
new Updater((Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL)));
defaultSettings = settingBuilder.build();
}

Expand Down
10 changes: 9 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -220,8 +223,13 @@ public Collection<Object> createComponents(
Clock.systemUTC(),
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataServiceImpl.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class ClusterManagerEventListener implements LocalNodeClusterManagerListener {

private Cancellable flintIndexRetentionCron;
private Cancellable flintStreamingJobHouseKeeperCron;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private TimeValue streamingJobHouseKeepingInterval;
private boolean isAutoIndexManagementEnabled;

public ClusterManagerEventListener(
Expand All @@ -41,16 +51,25 @@ public ClusterManagerEventListener(
Clock clock,
Setting<TimeValue> sessionTtl,
Setting<TimeValue> resultTtl,
Setting<TimeValue> streamingJobHouseKeepingInterval,
Setting<Boolean> isAutoIndexManagementEnabledSetting,
Settings settings) {
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;

this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));
this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings);

clusterService
.getClusterSettings()
Expand Down Expand Up @@ -87,6 +106,16 @@ public ClusterManagerEventListener(
}
}
});

clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(
streamingJobHouseKeepingInterval,
it -> {
this.streamingJobHouseKeepingInterval = it;
cancel(flintStreamingJobHouseKeeperCron);
initializeStreamingJobHouseKeeperCron();
});
}

@Override
Expand All @@ -104,6 +133,19 @@ public void beforeStop() {
}
});
}
initializeStreamingJobHouseKeeperCron();
}

private void initializeStreamingJobHouseKeeperCron() {
flintStreamingJobHouseKeeperCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobHouseKeeperTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
streamingJobHouseKeepingInterval,
executorName());
}

private void reInitializeFlintIndexRetention() {
Expand All @@ -125,6 +167,8 @@ private void reInitializeFlintIndexRetention() {
public void offClusterManager() {
cancel(flintIndexRetentionCron);
flintIndexRetentionCron = null;
cancel(flintStreamingJobHouseKeeperCron);
flintStreamingJobHouseKeeperCron = null;
}

private void cancel(Cancellable cron) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.cluster;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceStatus;
import org.opensearch.sql.datasources.exceptions.DataSourceNotFoundException;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpDrop;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobHouseKeeperTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobHouseKeeperTask.class);
protected static final AtomicBoolean isRunning = new AtomicBoolean(false);

@Override
public void run() {
if (!isRunning.compareAndSet(false, true)) {
LOGGER.info("Previous task is still running. Skipping this execution.");
return;
}
try {
LOGGER.info("Starting the cleaner task for disabled and deleted data sources.");
Map<String, FlintIndexMetadata> autoRefreshFlintIndicesMap = getAllAutoRefreshIndices();
autoRefreshFlintIndicesMap.forEach(
(autoRefreshIndex, flintIndexMetadata) -> {
try {
String datasourceName = getDataSourceName(flintIndexMetadata);
try {
DataSourceMetadata dataSourceMetadata =
this.dataSourceService.getDataSourceMetadata(datasourceName);
if (dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED) {
LOGGER.info("Datasource is disabled for autoRefreshIndex: {}", autoRefreshIndex);
alterAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
} else {
LOGGER.debug("Datasource is enabled for autoRefreshIndex : {}", autoRefreshIndex);
}
} catch (DataSourceNotFoundException exception) {
// Datasource disabled.
LOGGER.info("Datasource is deleted for autoRefreshIndex: {}", autoRefreshIndex);
dropAutoRefreshIndex(autoRefreshIndex, flintIndexMetadata, datasourceName);
}
} catch (Exception exception) {
LOGGER.error(
"Failed to alter/cancel index {}: {}",
autoRefreshIndex,
exception.getMessage(),
exception);
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.increment();
}
});
} catch (Throwable error) {
LOGGER.error("Error while running the streaming job cleaner task: {}", error.getMessage());
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.increment();
} finally {
isRunning.set(false);
}
}

private void dropAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
// When the datasource is deleted. Possibly Replace with VACUUM Operation.
LOGGER.info("Attempting to drop auto refresh index: {}", autoRefreshIndex);
FlintIndexOpDrop flintIndexOpDrop =
new FlintIndexOpDrop(stateStore, datasourceName, emrServerlessClientFactory.getClient());
flintIndexOpDrop.apply(flintIndexMetadata);
LOGGER.info("Successfully dropped index: {}", autoRefreshIndex);
}

private void alterAutoRefreshIndex(
String autoRefreshIndex, FlintIndexMetadata flintIndexMetadata, String datasourceName) {
LOGGER.info("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
datasourceName,
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
}

private String getDataSourceName(FlintIndexMetadata flintIndexMetadata) {
String kind = flintIndexMetadata.getKind();
switch (kind) {
case "mv":
return flintIndexMetadata.getName().split("\\.")[0];
case "skipping":
case "covering":
return flintIndexMetadata.getSource().split("\\.")[0];
default:
throw new IllegalArgumentException(String.format("Unknown flint index kind: %s", kind));
}
}

private Map<String, FlintIndexMetadata> getAllAutoRefreshIndices() {
Map<String, FlintIndexMetadata> flintIndexMetadataHashMap =
flintIndexMetadataService.getFlintIndexMetadata("flint_*");
return flintIndexMetadataHashMap.entrySet().stream()
.filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class FlintIndexMetadata {
private final String jobId;
private final String appId;
private final String latestId;
private final String kind;
private final String source;
private final String name;
private final FlintIndexOptions flintIndexOptions;

public Optional<String> getLatestId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import static org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions.WATERMARK_DELAY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.APP_ID;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.ENV_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.KIND_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.LATEST_ID_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.META_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.NAME_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.OPTIONS_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.PROPERTIES_KEY;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SERVERLESS_EMR_JOB_ID;
import static org.opensearch.sql.spark.flint.FlintIndexMetadata.SOURCE_KEY;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -149,9 +152,15 @@ private FlintIndexMetadata fromMetadata(String indexName, Map<String, Object> me
String jobId = (String) envMap.get(SERVERLESS_EMR_JOB_ID);
String appId = (String) envMap.getOrDefault(APP_ID, null);
String latestId = (String) metaMap.getOrDefault(LATEST_ID_KEY, null);
String kind = (String) metaMap.getOrDefault(KIND_KEY, null);
String name = (String) metaMap.getOrDefault(NAME_KEY, null);
String source = (String) metaMap.getOrDefault(SOURCE_KEY, null);
flintIndexMetadataBuilder.jobId(jobId);
flintIndexMetadataBuilder.appId(appId);
flintIndexMetadataBuilder.latestId(latestId);
flintIndexMetadataBuilder.name(name);
flintIndexMetadataBuilder.kind(kind);
flintIndexMetadataBuilder.source(source);
flintIndexMetadataBuilder.opensearchIndexName(indexName);
flintIndexMetadataBuilder.flintIndexOptions(flintIndexOptions);
return flintIndexMetadataBuilder.build();
Expand Down
Loading

0 comments on commit 428a5e3

Please sign in to comment.