Skip to content

Commit

Permalink
[Feature] Flint query scheduler part1 - integrate job scheduler plugin (
Browse files Browse the repository at this point in the history
#2834)

* [Feature] Flint query scheduler part1 - integrate job scheduler plugin

Signed-off-by: Louis Chu <[email protected]>

* Add comments

Signed-off-by: Louis Chu <[email protected]>

* Add unit test

Signed-off-by: Louis Chu <[email protected]>

* Remove test rest API

Signed-off-by: Louis Chu <[email protected]>

* Fix doc test

Signed-off-by: Louis Chu <[email protected]>

* Add more tests

Signed-off-by: Louis Chu <[email protected]>

* Fix IT

Signed-off-by: Louis Chu <[email protected]>

* Fix IT with security

Signed-off-by: Louis Chu <[email protected]>

* Improve test coverage

Signed-off-by: Louis Chu <[email protected]>

* Fix integTest cluster

Signed-off-by: Louis Chu <[email protected]>

* Fix UT

Signed-off-by: Louis Chu <[email protected]>

* Update UT

Signed-off-by: Louis Chu <[email protected]>

* Fix bwc test

Signed-off-by: Louis Chu <[email protected]>

* Resolve comments

Signed-off-by: Louis Chu <[email protected]>

* Fix bwc test

Signed-off-by: Louis Chu <[email protected]>

* clean up doc test

Signed-off-by: Louis Chu <[email protected]>

* Resolve comments

Signed-off-by: Louis Chu <[email protected]>

* Fix UT

Signed-off-by: Louis Chu <[email protected]>

---------

Signed-off-by: Louis Chu <[email protected]>
(cherry picked from commit 3daf64f)
  • Loading branch information
noCharger committed Aug 1, 2024
1 parent 97388d0 commit 6fc6608
Show file tree
Hide file tree
Showing 26 changed files with 1,401 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ gen
.worktrees
http-client.env.json
/doctest/sql-cli/
/doctest/opensearch-job-scheduler/
.factorypath
5 changes: 4 additions & 1 deletion async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
18 changes: 11 additions & 7 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ statement
| SHOW namespaces ((FROM | IN) multipartIdentifier)?
(LIKE? pattern=stringLit)? #showNamespaces
| createTableHeader (LEFT_PAREN colDefinitionList RIGHT_PAREN)? tableProvider?
| createTableHeader (LEFT_PAREN createOrReplaceTableColTypeList RIGHT_PAREN)? tableProvider?
createTableClauses
(AS? query)? #createTable
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier
Expand Down Expand Up @@ -211,14 +210,14 @@ statement
identifierReference AS className=stringLit
(USING resource (COMMA resource)*)? #createFunction
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF errorCapturingNot EXISTS)?
identifierReference LEFT_PAREN parameters=colDefinitionList? RIGHT_PAREN
(RETURNS (dataType | TABLE LEFT_PAREN returnParams=colTypeList RIGHT_PAREN))?
routineCharacteristics
RETURN (query | expression) #createUserDefinedFunction
identifierReference LEFT_PAREN parameters=colDefinitionList? RIGHT_PAREN
(RETURNS (dataType | TABLE LEFT_PAREN returnParams=colTypeList RIGHT_PAREN))?
routineCharacteristics
RETURN (query | expression) #createUserDefinedFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference #dropFunction
| DECLARE (OR REPLACE)? VARIABLE?
| DECLARE (OR REPLACE)? variable?
identifierReference dataType? variableDefaultExpression? #createVariable
| DROP TEMPORARY VARIABLE (IF EXISTS)? identifierReference #dropVariable
| DROP TEMPORARY variable (IF EXISTS)? identifierReference #dropVariable
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
(statement|setResetStatement) #explain
| SHOW TABLES ((FROM | IN) identifierReference)?
Expand Down Expand Up @@ -439,6 +438,11 @@ namespaces
| SCHEMAS
;

variable
: VARIABLE
| VAR
;

describeFuncName
: identifierReference
| stringLit
Expand Down
3 changes: 3 additions & 0 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ repositories {


dependencies {
implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
implementation project(':protocol')
Expand Down Expand Up @@ -97,6 +99,7 @@ jacocoTestCoverageVerification {
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;

/** Scheduler class for managing asynchronous query jobs. */
public class OpenSearchAsyncQueryScheduler {
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME =
"async-query-scheduler-index-mapping.yml";
private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME =
"async-query-scheduler-index-settings.yml";
private static final Logger LOG = LogManager.getLogger();

private Client client;
private ClusterService clusterService;

/** Loads job resources, setting up required services and job runner instance. */
public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) {
this.client = client;
this.clusterService = clusterService;
OpenSearchRefreshIndexJob openSearchRefreshIndexJob =
OpenSearchRefreshIndexJob.getJobRunnerInstance();
openSearchRefreshIndexJob.setClusterService(clusterService);
openSearchRefreshIndexJob.setThreadPool(threadPool);
openSearchRefreshIndexJob.setClient(client);
}

/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(OpenSearchRefreshIndexJobRequest request) {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
createAsyncQuerySchedulerIndex();
}
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME);
indexRequest.id(request.getName());
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
IndexResponse indexResponse;
try {
indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));
ActionFuture<IndexResponse> indexResponseActionFuture = client.index(indexRequest);
indexResponse = indexResponseActionFuture.actionGet();
} catch (VersionConflictEngineException exception) {
throw new IllegalArgumentException("A job already exists with name: " + request.getName());
} catch (Throwable e) {
LOG.error("Failed to schedule job : {}", request.getName(), e);
throw new RuntimeException(e);
}

if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
LOG.debug("Job : {} successfully created", request.getName());
} else {
throw new RuntimeException(
"Schedule job failed with result : " + indexResponse.getResult().getLowercase());
}
}

/** Unschedules a job by marking it as disabled and updating its last update time. */
public void unscheduleJob(String jobId) throws IOException {
assertIndexExists();
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
updateJob(request);
}

/** Updates an existing job with new parameters. */
public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
assertIndexExists();
UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));
UpdateResponse updateResponse;
try {
ActionFuture<UpdateResponse> updateResponseActionFuture = client.update(updateRequest);
updateResponse = updateResponseActionFuture.actionGet();
} catch (DocumentMissingException exception) {
throw new IllegalArgumentException("Job: " + request.getName() + " doesn't exist");
} catch (Throwable e) {
LOG.error("Failed to update job : {}", request.getName(), e);
throw new RuntimeException(e);
}

if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)
|| updateResponse.getResult().equals(DocWriteResponse.Result.NOOP)) {
LOG.debug("Job : {} successfully updated", request.getName());
} else {
throw new RuntimeException(
"Update job failed with result : " + updateResponse.getResult().getLowercase());
}
}

/** Removes a job by deleting its document from the index. */
public void removeJob(String jobId) {
assertIndexExists();
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet();

if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) {
LOG.debug("Job : {} successfully deleted", jobId);
} else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
throw new IllegalArgumentException("Job : " + jobId + " doesn't exist");
} else {
throw new RuntimeException(
"Remove job failed with result : " + deleteResponse.getResult().getLowercase());
}
}

/** Creates the async query scheduler index with specified mappings and settings. */
@VisibleForTesting
void createAsyncQuerySchedulerIndex() {
try {
InputStream mappingFileStream =
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME);
InputStream settingsFileStream =
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME);
createIndexRequest.mapping(
IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML);
createIndexRequest.settings(
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture =
client.admin().indices().create(createIndexRequest);
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();

if (createIndexResponse.isAcknowledged()) {
LOG.debug("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME);
} else {
throw new RuntimeException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e);
throw new RuntimeException(
"Internal server error while creating "
+ SCHEDULER_INDEX_NAME
+ " index: "
+ e.getMessage(),
e);
}
}

private void assertIndexExists() {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new IllegalStateException("Job index does not exist.");
}
}

/** Returns the job runner instance for the scheduler. */
public static ScheduledJobRunner getJobRunner() {
return OpenSearchRefreshIndexJob.getJobRunnerInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import java.io.IOException;
import java.time.Instant;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;

public class OpenSearchRefreshIndexJobRequestParser {

private static Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
OpenSearchRefreshIndexJobRequest.OpenSearchRefreshIndexJobRequestBuilder builder =
OpenSearchRefreshIndexJobRequest.builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD:
builder.jobName(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD:
builder.jobType(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD:
builder.enabled(parser.booleanValue());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD:
builder.enabledTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD:
builder.lastUpdateTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD:
builder.schedule(ScheduleParser.parse(parser));
break;
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS:
builder.lockDurationSeconds(parser.longValue());
break;
case OpenSearchRefreshIndexJobRequest.JITTER:
builder.jitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return builder.build();
};
}
}
Loading

0 comments on commit 6fc6608

Please sign in to comment.