Skip to content

Commit

Permalink
Introduce async query scheduler (#668) (#691)
Browse files Browse the repository at this point in the history
* Introduce async query scheduler

Signed-off-by: Louis Chu <[email protected]>

* Update IT and doc

Signed-off-by: Louis Chu <[email protected]>

* Resolve comments

Signed-off-by: Louis Chu <[email protected]>

* Bugfix

Signed-off-by: Louis Chu <[email protected]>

* Add more tests

Signed-off-by: Louis Chu <[email protected]>

---------

Signed-off-by: Louis Chu <[email protected]>
(cherry picked from commit 92121a3)
  • Loading branch information
noCharger authored Sep 24, 2024
1 parent a2e65f9 commit 31bdd15
Show file tree
Hide file tree
Showing 38 changed files with 1,721 additions and 102 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Version compatibility:
| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.13+ |
| 0.5.0 | 11+ | 3.5.1 | 2.12.14 | 2.17+ |

## Flint Extension Usage

Expand Down
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ lazy val jacksonVersion = "2.15.2"
// The transitive opensearch jackson-databind dependency version should align with Spark jackson databind dependency version.
// Issue: https://github.com/opensearch-project/opensearch-spark/issues/442
lazy val opensearchVersion = "2.6.0"
lazy val opensearchMavenVersion = "2.6.0.0"
lazy val icebergVersion = "1.5.0"

val scalaMinorVersion = scala212.split("\\.").take(2).mkString(".")
Expand Down Expand Up @@ -81,6 +82,7 @@ lazy val flintCore = (project in file("flint-core"))
exclude ("com.fasterxml.jackson.core", "jackson-databind")
exclude ("com.fasterxml.jackson.core", "jackson-core")
exclude ("org.apache.httpcomponents.client5", "httpclient5"),
"org.opensearch" % "opensearch-job-scheduler-spi" % opensearchMavenVersion,
"dev.failsafe" % "failsafe" % "3.3.2",
"com.amazonaws" % "aws-java-sdk" % "1.12.397" % "provided"
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
Expand Down Expand Up @@ -115,6 +117,7 @@ lazy val flintCommons = (project in file("flint-commons"))
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"org.projectlombok" % "lombok" % "1.18.30",
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true,
Expand Down
4 changes: 4 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ WITH (

User can provide the following options in `WITH` clause of alter statement:
+ `auto_refresh`: This is required for alter statement. Currently, we restrict that an alter statement must change the auto refresh option from its original value.
+ `scheduler_mode`: A mode string (`internal` or `external`) that describes how `auto_refresh` is scheduled. `checkpoint_location` is required for the external scheduler.
+ `refresh_interval`
+ `incremental_refresh`
+ `checkpoint_location`
Expand Down Expand Up @@ -524,6 +525,9 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.customAWSCredentialsProvider`: default is empty.
- `spark.datasource.flint.customFlintMetadataLogServiceClass`: default is empty.
- `spark.datasource.flint.customFlintIndexMetadataServiceClass`: default is empty.
- `spark.datasource.flint.customFlintSchedulerClass`: default is empty.
- `spark.flint.job.externalScheduler.enabled`: default is false. enable external scheduler for flint auto refresh to schedule refresh job outside of spark.
- `spark.flint.job.externalScheduler.interval`: default is 5 minutes. a string of refresh interval for external scheduler to trigger index refresh.
- `spark.datasource.flint.write.id_name`: no default value.
- `spark.datasource.flint.ignore.id_column` : default value is true.
- `spark.datasource.flint.write.batch_size`: "The number of documents written to Flint in a single batch request. Default value is Integer.MAX_VALUE.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.scheduler;


import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. Copied from https://github.com/opensearch-project/sql/blob/main/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java */
public interface AsyncQueryScheduler {

/**
* Schedules a new job in the system. This method creates a new job entry based on the provided
* request parameters.
*
* <p>Use cases: - Creating a new periodic query execution - Setting up a scheduled data refresh
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Updates an existing job with new parameters. This method modifies the configuration of an
* already scheduled job.
*
* <p>Use cases: - Changing the schedule of an existing job - Modifying query parameters of a
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
* used when you want to temporarily stop a job from running but keep its configuration and
* history in the system.
*
* <p>Use cases: - Pausing a job that's causing issues without losing its configuration -
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param asyncQuerySchedulerRequest The request containing the job configuration to unschedule.
* At minimum, it must include the jobId.
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
* @throws RuntimeException if there's an error during the unschedule process
*/
void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
* its associated data from the system.
*
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param asyncQuerySchedulerRequest The request containing the job configuration to unschedule.
* At minimum, it must include the jobId.
* @throws IllegalArgumentException if the job to be removed doesn't exist
* @throws RuntimeException if there's an error during the remove process
*/
void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.common.scheduler.model;

import lombok.Builder;
import lombok.Data;

import java.time.Instant;

/**
* Represents a job request for a scheduled task.
*/
@Builder
@Data
public class AsyncQuerySchedulerRequest {

/**
* The AWS accountId used to identify the resource.
*/
String accountId;

/**
* The unique identifier for the scheduler job.
*/
String jobId;

/**
* The name of the data source on which the scheduled query will be executed.
*/
String dataSource;

/**
* The scheduled query to be executed.
*/
String scheduledQuery;

/**
* The language in which the query is written, such as SQL, PPL (Piped Processing Language), etc.
*/
String queryLang;

/**
* The interval expression defining the frequency of the job execution.
* Typically expressed as a time-based pattern (e.g. 5 minutes).
*/
String interval;

/**
* Indicates whether the scheduled job is currently enabled or not.
*/
boolean enabled;

/**
* The timestamp of the last update made to this job.
*/
Instant lastUpdateTime;

/**
* The timestamp when this job was enabled.
*/
Instant enabledTime;

/**
* The duration, in seconds, for which the job remains locked.
* This lock is used to prevent concurrent executions of the same job, ensuring that only one instance of the job runs at a time.
*/
Long lockDurationSeconds;

/**
* The jitter value to add randomness to the execution schedule, helping to avoid the thundering herd problem.
*/
Double jitter;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
##
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
##

# Schema file for the .async-query-scheduler index
# Also "dynamic" is set to "false" so that other fields cannot be added.
dynamic: false
properties:
accountId:
type: keyword
jobId:
type: keyword
dataSource:
type: keyword
scheduledQuery:
type: text
queryLang:
type: keyword
lastUpdateTime:
type: date
format: epoch_millis
enabledTime:
type: date
format: epoch_millis
schedule:
properties:
initialDelay:
type: long
interval:
properties:
start_time:
type: date
format: "strict_date_time||epoch_millis"
period:
type: integer
unit:
type: keyword
enabled:
type: boolean
lockDurationSeconds:
type: long
null_value: -1
jitter:
type: double
null_value: 0.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
##
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
##

# Settings file for the .async-query-scheduler index
index:
number_of_shards: "1"
auto_expand_replicas: "0-2"
number_of_replicas: "0"
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class FlintOptions implements Serializable {

public static final String CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS = "customFlintIndexMetadataServiceClass";

public static final String CUSTOM_FLINT_SCHEDULER_CLASS = "customFlintSchedulerClass";

public static final String SUPPORT_SHARD = "read.support_shard";

public static final String DEFAULT_SUPPORT_SHARD = "true";
Expand Down Expand Up @@ -176,6 +178,17 @@ public int getSocketTimeoutMillis() {
public String getDataSourceName() {
return options.getOrDefault(DATA_SOURCE_NAME, "");
}

/**
* Get the AWS accountId from the cluster name.
* Flint cluster name is in the format of "accountId:clusterName".
* @return the AWS accountId
*/
public String getAWSAccountId() {
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", "");
String[] parts = clusterName.split(":");
return parts.length == 2 ? parts[0] : "";
}

public String getSystemIndexName() {
return options.getOrDefault(SYSTEM_INDEX_KEY_NAME, "");
Expand Down Expand Up @@ -210,4 +223,8 @@ public boolean supportShard() {
public long getBulkRequestRateLimitPerNode() {
return Long.parseLong(options.getOrDefault(BULK_REQUEST_RATE_LIMIT_PER_NODE, DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE));
}

public String getCustomAsyncQuerySchedulerClass() {
return options.getOrDefault(CUSTOM_FLINT_SCHEDULER_CLASS, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ object FlintSparkConf {
.doc("Enable hybrid scan to include latest source data not refreshed to index yet")
.createWithDefault("false")

val EXTERNAL_SCHEDULER_ENABLED = FlintConfig("spark.flint.job.externalScheduler.enabled")
.doc("Enable external scheduler for index refresh")
.createWithDefault("false")

val EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD =
FlintConfig("spark.flint.job.externalScheduler.interval")
.doc("Interval threshold in minutes for external scheduler to trigger index refresh")
.createWithDefault("5 minutes")

val CHECKPOINT_LOCATION_ROOT_DIR = FlintConfig("spark.flint.index.checkpointLocation.rootDir")
.doc("Root directory of a user specified checkpoint location for index refresh")
.createOptional()
Expand Down Expand Up @@ -207,6 +216,11 @@ object FlintSparkConf {
.datasourceOption()
.doc("custom Flint index metadata service class")
.createOptional()
val CUSTOM_FLINT_SCHEDULER_CLASS =
FlintConfig(s"spark.datasource.flint.${FlintOptions.CUSTOM_FLINT_SCHEDULER_CLASS}")
.datasourceOption()
.doc("custom Flint scheduler class")
.createOptional()
val QUERY =
FlintConfig("spark.flint.job.query")
.doc("Flint query for batch and streaming job")
Expand Down Expand Up @@ -278,6 +292,11 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean

def isExternalSchedulerEnabled: Boolean = EXTERNAL_SCHEDULER_ENABLED.readFrom(reader).toBoolean

def externalSchedulerIntervalThreshold(): String =
EXTERNAL_SCHEDULER_INTERVAL_THRESHOLD.readFrom(reader)

def checkpointLocationRootDir: Option[String] = CHECKPOINT_LOCATION_ROOT_DIR.readFrom(reader)

def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean
Expand Down Expand Up @@ -324,6 +343,7 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
DATA_SOURCE_NAME,
CUSTOM_FLINT_METADATA_LOG_SERVICE_CLASS,
CUSTOM_FLINT_INDEX_METADATA_SERVICE_CLASS,
CUSTOM_FLINT_SCHEDULER_CLASS,
SESSION_ID,
REQUEST_INDEX,
METADATA_ACCESS_AWS_CREDENTIALS_PROVIDER,
Expand Down
Loading

0 comments on commit 31bdd15

Please sign in to comment.