Skip to content

Commit

Permalink
FlintStreamingJobCleanerTask Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <[email protected]>
  • Loading branch information
vmmusings committed Mar 19, 2024
1 parent e17962f commit 8b6975e
Show file tree
Hide file tree
Showing 18 changed files with 1,218 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public enum Key {
"plugins.query.executionengine.spark.session_inactivity_timeout_millis"),

/** Async query Settings * */
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled");
ASYNC_QUERY_ENABLED("plugins.query.executionengine.async_query.enabled"),
STREAMING_JOB_HOUSEKEEPER_INTERVAL(
"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval");

@Getter private final String keyValue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public void updateDataSourceMetadata(DataSourceMetadata dataSourceMetadata) {
public void deleteDataSourceMetadata(String datasourceName) {
DeleteRequest deleteRequest = new DeleteRequest(DATASOURCE_INDEX_NAME);
deleteRequest.id(datasourceName);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture;
try (ThreadContext.StoredContext storedContext =
client.threadPool().getThreadContext().stashContext()) {
Expand Down
35 changes: 35 additions & 0 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,38 @@ Request::
}
}

plugins.query.executionengine.spark.streamingjobs.housekeeper.interval
===============================

Description
-----------
This setting specifies the interval at which the streaming job housekeeper runs to clean up streaming jobs associated with deleted and disabled data sources.
The default configuration executes this cleanup every 15 minutes.

* Default Value: 15 minutes

To modify the TTL to 30 minutes for example, use this command:

Request ::

sh$ curl -sS -H 'Content-Type: application/json' -X PUT localhost:9200/_cluster/settings \
... -d '{"transient":{"plugins.query.executionengine.spark.streamingjobs.housekeeper.interval":"30m"}}'
{
"acknowledged": true,
"persistent": {},
"transient": {
"plugins": {
"query": {
"executionengine": {
"spark": {
"streamingjobs": {
"housekeeper": {
"interval": "30m"
}
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum MetricName {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count");
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"),
STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT("streaming_job_housekeeper_task_failure_count");

private String name;

Expand Down Expand Up @@ -91,6 +92,7 @@ public static List<String> getNames() {
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.add(STREAMING_JOB_HOUSEKEEPER_TASK_FAILURE_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.opensearch.common.settings.Settings.EMPTY;
import static org.opensearch.common.unit.TimeValue.timeValueDays;
import static org.opensearch.common.unit.TimeValue.timeValueMinutes;
import static org.opensearch.sql.common.setting.Settings.Key.ENCYRPTION_MASTER_KEY;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -193,6 +194,13 @@ public class OpenSearchSettings extends Settings {
Setting.Property.NodeScope,
Setting.Property.Dynamic);

public static final Setting<TimeValue> STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING =
Setting.positiveTimeSetting(
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL.getKeyValue(),
timeValueMinutes(15),
Setting.Property.NodeScope,
Setting.Property.Dynamic);

/** Construct OpenSearchSetting. The OpenSearchSetting must be singleton. */
@SuppressWarnings("unchecked")
public OpenSearchSettings(ClusterSettings clusterSettings) {
Expand Down Expand Up @@ -313,6 +321,12 @@ public OpenSearchSettings(ClusterSettings clusterSettings) {
Key.SESSION_INACTIVITY_TIMEOUT_MILLIS,
SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING,
new Updater((Key.SESSION_INACTIVITY_TIMEOUT_MILLIS)));
register(
settingBuilder,
clusterSettings,
Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL,
STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
new Updater((Key.STREAMING_JOB_HOUSEKEEPER_INTERVAL)));
defaultSettings = settingBuilder.build();
}

Expand Down Expand Up @@ -384,6 +398,7 @@ public static List<Setting<?>> pluginSettings() {
.add(AUTO_INDEX_MANAGEMENT_ENABLED_SETTING)
.add(DATASOURCES_LIMIT_SETTING)
.add(SESSION_INACTIVITY_TIMEOUT_MILLIS_SETTING)
.add(STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING)
.build();
}

Expand Down
10 changes: 9 additions & 1 deletion plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataServiceImpl;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -220,8 +223,13 @@ public Collection<Object> createComponents(
Clock.systemUTC(),
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.STREAMING_JOB_HOUSEKEEPER_INTERVAL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataServiceImpl.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class ClusterManagerEventListener implements LocalNodeClusterManagerListener {

private Cancellable flintIndexRetentionCron;
private Cancellable flintStreamingJobHouseKeeperCron;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private TimeValue streamingJobHouseKeepingInterval;
private boolean isAutoIndexManagementEnabled;

public ClusterManagerEventListener(
Expand All @@ -41,16 +51,25 @@ public ClusterManagerEventListener(
Clock clock,
Setting<TimeValue> sessionTtl,
Setting<TimeValue> resultTtl,
Setting<TimeValue> streamingJobHouseKeepingInterval,
Setting<Boolean> isAutoIndexManagementEnabledSetting,
Settings settings) {
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;

this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));
this.streamingJobHouseKeepingInterval = streamingJobHouseKeepingInterval.get(settings);

clusterService
.getClusterSettings()
Expand Down Expand Up @@ -87,6 +106,16 @@ public ClusterManagerEventListener(
}
}
});

clusterService
.getClusterSettings()
.addSettingsUpdateConsumer(
streamingJobHouseKeepingInterval,
it -> {
this.streamingJobHouseKeepingInterval = it;
cancel(flintStreamingJobHouseKeeperCron);
initializeStreamingJobHouseKeeperCron();
});
}

@Override
Expand All @@ -104,6 +133,19 @@ public void beforeStop() {
}
});
}
initializeStreamingJobHouseKeeperCron();
}

private void initializeStreamingJobHouseKeeperCron() {
flintStreamingJobHouseKeeperCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobHouseKeeperTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
streamingJobHouseKeepingInterval,
executorName());
}

private void reInitializeFlintIndexRetention() {
Expand All @@ -125,6 +167,8 @@ private void reInitializeFlintIndexRetention() {
public void offClusterManager() {
cancel(flintIndexRetentionCron);
flintIndexRetentionCron = null;
cancel(flintStreamingJobHouseKeeperCron);
flintStreamingJobHouseKeeperCron = null;
}

private void cancel(Cancellable cron) {
Expand Down
Loading

0 comments on commit 8b6975e

Please sign in to comment.