From 729bb13247f12c3b7b91a92276e79430e9477db3 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 4 Sep 2024 15:32:22 -0700 Subject: [PATCH] Flint query scheduler part 2 (#2961) * Flint query scheduler part 2 Signed-off-by: Louis Chu * spotless apply Signed-off-by: Louis Chu * Add UT Signed-off-by: Louis Chu * Resolve comments Signed-off-by: Louis Chu * Add more UTs Signed-off-by: Louis Chu * Resolve comments Signed-off-by: Louis Chu * Use SQL thread pool Signed-off-by: Louis Chu --------- Signed-off-by: Louis Chu --- .../src/main/antlr/FlintSparkSqlExtensions.g4 | 5 +- .../src/main/antlr/SparkSqlBase.g4 | 1 + .../src/main/antlr/SqlBaseLexer.g4 | 2 + .../src/main/antlr/SqlBaseParser.g4 | 22 +- .../dispatcher/model/FlintIndexOptions.java | 6 + .../flint/operation/FlintIndexOpAlter.java | 12 +- .../flint/operation/FlintIndexOpDrop.java | 13 +- .../flint/operation/FlintIndexOpFactory.java | 13 +- .../flint/operation/FlintIndexOpVacuum.java | 11 +- .../spark/scheduler/AsyncQueryScheduler.java | 57 +++++ .../model/AsyncQuerySchedulerRequest.java | 31 +++ .../asyncquery/AsyncQueryCoreIntegTest.java | 110 ++++++++- .../dispatcher/SparkQueryDispatcherTest.java | 2 + .../operation/FlintIndexOpFactoryTest.java | 2 + .../operation/FlintIndexOpVacuumTest.java | 57 ++++- async-query/build.gradle | 2 +- .../OpenSearchAsyncQueryScheduler.java | 58 ++--- .../job/OpenSearchRefreshIndexJob.java | 93 -------- .../job/ScheduledAsyncQueryJobRunner.java | 116 ++++++++++ .../OpenSearchRefreshIndexJobRequest.java | 108 --------- .../model/ScheduledAsyncQueryJobRequest.java | 156 +++++++++++++ .../parser/IntervalScheduleParser.java | 100 +++++++++ ...nSearchScheduleQueryJobRequestParser.java} | 40 ++-- .../config/AsyncExecutorServiceModule.java | 16 +- .../async-query-scheduler-index-mapping.yml | 10 +- .../AsyncQueryExecutorServiceSpec.java | 10 +- .../OpenSearchAsyncQuerySchedulerTest.java | 63 +++--- .../job/OpenSearchRefreshIndexJobTest.java | 145 ------------ .../job/ScheduledAsyncQueryJobRunnerTest.java | 210 ++++++++++++++++++ .../OpenSearchRefreshIndexJobRequestTest.java | 81 ------- .../ScheduledAsyncQueryJobRequestTest.java | 210 ++++++++++++++++++ .../parser/IntervalScheduleParserTest.java | 122 ++++++++++ .../org/opensearch/sql/plugin/SQLPlugin.java | 23 +- 33 files changed, 1371 insertions(+), 536 deletions(-) create mode 100644 async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java create mode 100644 async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java delete mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java delete mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java create mode 100644 async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java rename async-query/src/main/java/org/opensearch/sql/spark/scheduler/{OpenSearchRefreshIndexJobRequestParser.java => parser/OpenSearchScheduleQueryJobRequestParser.java} (57%) delete mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java create mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java delete mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java create mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java create mode 100644 async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java diff --git a/async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4 b/async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4 index 2e8d634dad..46e814e9f5 100644 --- a/async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4 +++ b/async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4 @@ -156,7 +156,10 @@ indexManagementStatement ; showFlintIndexStatement - : SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier + : SHOW FLINT (INDEX | INDEXES) + IN catalogDb=multipartIdentifier #showFlintIndex + | SHOW FLINT (INDEX | INDEXES) EXTENDED + IN catalogDb=multipartIdentifier #showFlintIndexExtended ; indexJobManagementStatement diff --git a/async-query-core/src/main/antlr/SparkSqlBase.g4 b/async-query-core/src/main/antlr/SparkSqlBase.g4 index 283981e471..c53c61adfd 100644 --- a/async-query-core/src/main/antlr/SparkSqlBase.g4 +++ b/async-query-core/src/main/antlr/SparkSqlBase.g4 @@ -163,6 +163,7 @@ DESC: 'DESC'; DESCRIBE: 'DESCRIBE'; DROP: 'DROP'; EXISTS: 'EXISTS'; +EXTENDED: 'EXTENDED'; FALSE: 'FALSE'; FLINT: 'FLINT'; IF: 'IF'; diff --git a/async-query-core/src/main/antlr/SqlBaseLexer.g4 b/async-query-core/src/main/antlr/SqlBaseLexer.g4 index bde298c23e..acfc0011f5 100644 --- a/async-query-core/src/main/antlr/SqlBaseLexer.g4 +++ b/async-query-core/src/main/antlr/SqlBaseLexer.g4 @@ -212,6 +212,7 @@ DIRECTORY: 'DIRECTORY'; DISTINCT: 'DISTINCT'; DISTRIBUTE: 'DISTRIBUTE'; DIV: 'DIV'; +DO: 'DO'; DOUBLE: 'DOUBLE'; DROP: 'DROP'; ELSE: 'ELSE'; @@ -467,6 +468,7 @@ WEEK: 'WEEK'; WEEKS: 'WEEKS'; WHEN: 'WHEN'; WHERE: 'WHERE'; +WHILE: 'WHILE'; WINDOW: 'WINDOW'; WITH: 'WITH'; WITHIN: 'WITHIN'; diff --git a/async-query-core/src/main/antlr/SqlBaseParser.g4 b/async-query-core/src/main/antlr/SqlBaseParser.g4 index c7aa56cf92..5b8805821b 100644 --- a/async-query-core/src/main/antlr/SqlBaseParser.g4 +++ b/async-query-core/src/main/antlr/SqlBaseParser.g4 @@ -63,6 +63,8 @@ compoundStatement : statement | setStatementWithOptionalVarKeyword | beginEndCompoundBlock + | ifElseStatement + | whileStatement ; setStatementWithOptionalVarKeyword @@ -71,6 +73,16 @@ setStatementWithOptionalVarKeyword LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword ; +whileStatement + : beginLabel? WHILE booleanExpression DO compoundBody END WHILE endLabel? + ; + +ifElseStatement + : IF booleanExpression THEN conditionalBodies+=compoundBody + (ELSE IF booleanExpression THEN conditionalBodies+=compoundBody)* + (ELSE elseBody=compoundBody)? END IF + ; + singleStatement : (statement|setResetStatement) SEMICOLON* EOF ; @@ -406,9 +418,9 @@ query ; insertInto - : INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable - | INSERT INTO TABLE? identifierReference partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable - | INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere + : INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable + | INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable + | INSERT INTO TABLE? identifierReference optionsClause? REPLACE whereClause #insertIntoReplaceWhere | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir ; @@ -1522,6 +1534,7 @@ ansiNonReserved | DIRECTORY | DISTRIBUTE | DIV + | DO | DOUBLE | DROP | ESCAPED @@ -1723,6 +1736,7 @@ ansiNonReserved | VOID | WEEK | WEEKS + | WHILE | WINDOW | YEAR | YEARS @@ -1853,6 +1867,7 @@ nonReserved | DISTINCT | DISTRIBUTE | DIV + | DO | DOUBLE | DROP | ELSE @@ -2092,6 +2107,7 @@ nonReserved | VOID | WEEK | WEEKS + | WHILE | WHEN | WHERE | WINDOW diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java index 79af1c91ab..6c7cc7c5fb 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/dispatcher/model/FlintIndexOptions.java @@ -19,6 +19,7 @@ public class FlintIndexOptions { public static final String INCREMENTAL_REFRESH = "incremental_refresh"; public static final String CHECKPOINT_LOCATION = "checkpoint_location"; public static final String WATERMARK_DELAY = "watermark_delay"; + public static final String SCHEDULER_MODE = "scheduler_mode"; private final Map options = new HashMap<>(); public void setOption(String key, String value) { @@ -33,6 +34,11 @@ public boolean autoRefresh() { return Boolean.parseBoolean(getOption(AUTO_REFRESH).orElse("false")); } + public boolean isExternalScheduler() { + // Default is false, which means using internal scheduler to refresh the index. + return getOption(SCHEDULER_MODE).map(mode -> "external".equals(mode)).orElse(false); + } + public Map getProvidedOptions() { return new HashMap<>(options); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java index 4a00195ebf..de34803823 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpAlter.java @@ -16,6 +16,7 @@ import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; /** * Index Operation for Altering the flint index. Only handles alter operation when @@ -25,16 +26,19 @@ public class FlintIndexOpAlter extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(FlintIndexOpAlter.class); private final FlintIndexMetadataService flintIndexMetadataService; private final FlintIndexOptions flintIndexOptions; + private final AsyncQueryScheduler asyncQueryScheduler; public FlintIndexOpAlter( FlintIndexOptions flintIndexOptions, FlintIndexStateModelService flintIndexStateModelService, String datasourceName, EMRServerlessClientFactory emrServerlessClientFactory, - FlintIndexMetadataService flintIndexMetadataService) { + FlintIndexMetadataService flintIndexMetadataService, + AsyncQueryScheduler asyncQueryScheduler) { super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); this.flintIndexMetadataService = flintIndexMetadataService; this.flintIndexOptions = flintIndexOptions; + this.asyncQueryScheduler = asyncQueryScheduler; } @Override @@ -57,7 +61,11 @@ void runOp( "Running alter index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); this.flintIndexMetadataService.updateIndexToManualRefresh( flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext); - cancelStreamingJob(flintIndexStateModel); + if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { + asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); + } else { + cancelStreamingJob(flintIndexStateModel); + } } @Override diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java index fc9b644fc7..3fa5423c10 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpDrop.java @@ -14,16 +14,21 @@ import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; /** Operation to drop Flint index */ public class FlintIndexOpDrop extends FlintIndexOp { private static final Logger LOG = LogManager.getLogger(); + private final AsyncQueryScheduler asyncQueryScheduler; + public FlintIndexOpDrop( FlintIndexStateModelService flintIndexStateModelService, String datasourceName, - EMRServerlessClientFactory emrServerlessClientFactory) { + EMRServerlessClientFactory emrServerlessClientFactory, + AsyncQueryScheduler asyncQueryScheduler) { super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); + this.asyncQueryScheduler = asyncQueryScheduler; } public boolean validate(FlintIndexState state) { @@ -48,7 +53,11 @@ void runOp( LOG.debug( "Performing drop index operation for index: {}", flintIndexMetadata.getOpensearchIndexName()); - cancelStreamingJob(flintIndexStateModel); + if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { + asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName()); + } else { + cancelStreamingJob(flintIndexStateModel); + } } @Override diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java index 14cf9fa7c9..9f925e0bcf 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactory.java @@ -11,6 +11,7 @@ import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; @RequiredArgsConstructor public class FlintIndexOpFactory { @@ -18,10 +19,11 @@ public class FlintIndexOpFactory { private final FlintIndexClient flintIndexClient; private final FlintIndexMetadataService flintIndexMetadataService; private final EMRServerlessClientFactory emrServerlessClientFactory; + private final AsyncQueryScheduler asyncQueryScheduler; public FlintIndexOpDrop getDrop(String datasource) { return new FlintIndexOpDrop( - flintIndexStateModelService, datasource, emrServerlessClientFactory); + flintIndexStateModelService, datasource, emrServerlessClientFactory, asyncQueryScheduler); } public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String datasource) { @@ -30,12 +32,17 @@ public FlintIndexOpAlter getAlter(FlintIndexOptions flintIndexOptions, String da flintIndexStateModelService, datasource, emrServerlessClientFactory, - flintIndexMetadataService); + flintIndexMetadataService, + asyncQueryScheduler); } public FlintIndexOpVacuum getVacuum(String datasource) { return new FlintIndexOpVacuum( - flintIndexStateModelService, datasource, flintIndexClient, emrServerlessClientFactory); + flintIndexStateModelService, + datasource, + flintIndexClient, + emrServerlessClientFactory, + asyncQueryScheduler); } public FlintIndexOpCancel getCancel(String datasource) { diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java index 06aaf8ef9f..324ddb5720 100644 --- a/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuum.java @@ -14,12 +14,14 @@ import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; /** Flint index vacuum operation. */ public class FlintIndexOpVacuum extends FlintIndexOp { - private static final Logger LOG = LogManager.getLogger(); + private final AsyncQueryScheduler asyncQueryScheduler; + /** OpenSearch client. */ private final FlintIndexClient flintIndexClient; @@ -27,9 +29,11 @@ public FlintIndexOpVacuum( FlintIndexStateModelService flintIndexStateModelService, String datasourceName, FlintIndexClient flintIndexClient, - EMRServerlessClientFactory emrServerlessClientFactory) { + EMRServerlessClientFactory emrServerlessClientFactory, + AsyncQueryScheduler asyncQueryScheduler) { super(flintIndexStateModelService, datasourceName, emrServerlessClientFactory); this.flintIndexClient = flintIndexClient; + this.asyncQueryScheduler = asyncQueryScheduler; } @Override @@ -48,6 +52,9 @@ public void runOp( FlintIndexStateModel flintIndex, AsyncQueryRequestContext asyncQueryRequestContext) { LOG.info("Vacuuming Flint index {}", flintIndexMetadata.getOpensearchIndexName()); + if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) { + asyncQueryScheduler.removeJob(flintIndexMetadata.getOpensearchIndexName()); + } flintIndexClient.deleteIndex(flintIndexMetadata.getOpensearchIndexName()); } diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java new file mode 100644 index 0000000000..8ac499081e --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java @@ -0,0 +1,57 @@ +package org.opensearch.sql.spark.scheduler; + +import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; + +/** Scheduler interface for scheduling asynchronous query jobs. */ +public interface AsyncQueryScheduler { + + /** + * Schedules a new job in the system. This method creates a new job entry based on the provided + * request parameters. + * + *

Use cases: - Creating a new periodic query execution - Setting up a scheduled data refresh + * task + * + * @param asyncQuerySchedulerRequest The request containing job configuration details + * @throws IllegalArgumentException if a job with the same name already exists + * @throws RuntimeException if there's an error during job creation + */ + void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); + + /** + * Updates an existing job with new parameters. This method modifies the configuration of an + * already scheduled job. + * + *

Use cases: - Changing the schedule of an existing job - Modifying query parameters of a + * scheduled job - Updating resource allocations for a job + * + * @param asyncQuerySchedulerRequest The request containing updated job configuration + * @throws IllegalArgumentException if the job to be updated doesn't exist + * @throws RuntimeException if there's an error during the update process + */ + void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest); + + /** + * Unschedules a job by marking it as disabled and updating its last update time. This method is + * used when you want to temporarily stop a job from running but keep its configuration and + * history in the system. + * + *

Use cases: - Pausing a job that's causing issues without losing its configuration - + * Temporarily disabling a job during maintenance or high-load periods - Allowing for easy + * re-enabling of the job in the future + * + * @param jobId The unique identifier of the job to unschedule + */ + void unscheduleJob(String jobId); + + /** + * Removes a job completely from the scheduler. This method permanently deletes the job and all + * its associated data from the system. + * + *

Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously + * created jobs - Freeing up resources by deleting unused job configurations + * + * @param jobId The unique identifier of the job to remove + */ + void removeJob(String jobId); +} diff --git a/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java new file mode 100644 index 0000000000..b54e5b30ce --- /dev/null +++ b/async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/model/AsyncQuerySchedulerRequest.java @@ -0,0 +1,31 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.model; + +import java.time.Instant; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.opensearch.sql.spark.rest.model.LangType; + +/** Represents a job request for a scheduled task. */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class AsyncQuerySchedulerRequest { + protected String accountId; + // Scheduler jobid is the opensearch index name until we support multiple jobs per index + protected String jobId; + protected String dataSource; + protected String scheduledQuery; + protected LangType queryLang; + protected Object schedule; + protected boolean enabled; + protected Instant lastUpdateTime; + protected Instant enabledTime; + protected Long lockDurationSeconds; + protected Double jitter; +} diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java index 09767d16bd..226e0ff5eb 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java @@ -83,6 +83,7 @@ import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; import org.opensearch.sql.spark.rest.model.LangType; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; /** * This tests async-query-core library end-to-end using mocked implementation of extension points. @@ -112,6 +113,7 @@ public class AsyncQueryCoreIntegTest { @Mock FlintIndexClient flintIndexClient; @Mock AsyncQueryRequestContext asyncQueryRequestContext; @Mock MetricsService metricsService; + @Mock AsyncQueryScheduler asyncQueryScheduler; @Mock SparkSubmitParametersBuilderProvider sparkSubmitParametersBuilderProvider; // storage services @@ -159,7 +161,8 @@ public void setUp() { flintIndexStateModelService, flintIndexClient, flintIndexMetadataService, - emrServerlessClientFactory); + emrServerlessClientFactory, + asyncQueryScheduler); QueryHandlerFactory queryHandlerFactory = new QueryHandlerFactory( jobExecutionResponseReader, @@ -205,6 +208,30 @@ public void createDropIndexQuery() { verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } + @Test + public void createDropIndexQueryWithScheduler() { + givenSparkExecutionEngineConfigIsSupplied(); + givenValidDataSourceMetadataExist(); + when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); + + String indexName = "flint_datasource_name_table_name_index_name_index"; + givenFlintIndexMetadataExistsWithExternalScheduler(indexName); + + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "DROP INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL), + asyncQueryRequestContext); + + assertEquals(QUERY_ID, response.getQueryId()); + assertNull(response.getSessionId()); + verifyGetQueryIdCalled(); + verifyCreateIndexDMLResultCalled(); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + + verify(asyncQueryScheduler).unscheduleJob(indexName); + } + @Test public void createVacuumIndexQuery() { givenSparkExecutionEngineConfigIsSupplied(); @@ -227,6 +254,32 @@ public void createVacuumIndexQuery() { verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } + @Test + public void createVacuumIndexQueryWithScheduler() { + givenSparkExecutionEngineConfigIsSupplied(); + givenValidDataSourceMetadataExist(); + when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); + + String indexName = "flint_datasource_name_table_name_index_name_index"; + givenFlintIndexMetadataExistsWithExternalScheduler(indexName); + + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "VACUUM INDEX index_name ON table_name", DATASOURCE_NAME, LangType.SQL), + asyncQueryRequestContext); + + assertEquals(QUERY_ID, response.getQueryId()); + assertNull(response.getSessionId()); + verifyGetQueryIdCalled(); + + verify(flintIndexClient).deleteIndex(indexName); + verifyCreateIndexDMLResultCalled(); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + + verify(asyncQueryScheduler).removeJob(indexName); + } + @Test public void createAlterIndexQuery() { givenSparkExecutionEngineConfigIsSupplied(); @@ -258,6 +311,40 @@ public void createAlterIndexQuery() { verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, JobType.BATCH); } + @Test + public void createAlterIndexQueryWithScheduler() { + givenSparkExecutionEngineConfigIsSupplied(); + givenValidDataSourceMetadataExist(); + when(queryIdProvider.getQueryId(any(), eq(asyncQueryRequestContext))).thenReturn(QUERY_ID); + + String indexName = "flint_datasource_name_table_name_index_name_index"; + givenFlintIndexMetadataExistsWithExternalScheduler(indexName); + + CreateAsyncQueryResponse response = + asyncQueryExecutorService.createAsyncQuery( + new CreateAsyncQueryRequest( + "ALTER INDEX index_name ON table_name WITH (auto_refresh = false)", + DATASOURCE_NAME, + LangType.SQL), + asyncQueryRequestContext); + + assertEquals(QUERY_ID, response.getQueryId()); + assertNull(response.getSessionId()); + verifyGetQueryIdCalled(); + + verify(flintIndexMetadataService) + .updateIndexToManualRefresh( + eq(indexName), flintIndexOptionsArgumentCaptor.capture(), eq(asyncQueryRequestContext)); + + FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue(); + assertFalse(flintIndexOptions.autoRefresh()); + + verify(asyncQueryScheduler).unscheduleJob(indexName); + + verifyCreateIndexDMLResultCalled(); + verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID); + } + @Test public void createStreamingQuery() { givenSparkExecutionEngineConfigIsSupplied(); @@ -507,7 +594,8 @@ private void givenSparkExecutionEngineConfigIsSupplied() { .build()); } - private void givenFlintIndexMetadataExists(String indexName) { + private void givenFlintIndexMetadataExists( + String indexName, FlintIndexOptions flintIndexOptions) { when(flintIndexMetadataService.getFlintIndexMetadata(indexName, asyncQueryRequestContext)) .thenReturn( ImmutableMap.of( @@ -516,9 +604,27 @@ private void givenFlintIndexMetadataExists(String indexName) { .appId(APPLICATION_ID) .jobId(JOB_ID) .opensearchIndexName(indexName) + .flintIndexOptions(flintIndexOptions) .build())); } + // Overload method for default FlintIndexOptions usage + private void givenFlintIndexMetadataExists(String indexName) { + givenFlintIndexMetadataExists(indexName, new FlintIndexOptions()); + } + + // Method to set up FlintIndexMetadata with external scheduler + private void givenFlintIndexMetadataExistsWithExternalScheduler(String indexName) { + givenFlintIndexMetadataExists(indexName, createExternalSchedulerFlintIndexOptions()); + } + + // Helper method for creating FlintIndexOptions with external scheduler + private FlintIndexOptions createExternalSchedulerFlintIndexOptions() { + FlintIndexOptions options = new FlintIndexOptions(); + options.setOption(FlintIndexOptions.SCHEDULER_MODE, "external"); + return options; + } + private void givenValidDataSourceMetadataExist() { when(dataSourceService.verifyDataSourceAccessAndGetRawMetadata( DATASOURCE_NAME, asyncQueryRequestContext)) diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java index 1587ce6638..d040db24b2 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcherTest.java @@ -86,6 +86,7 @@ import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.rest.model.LangType; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; @ExtendWith(MockitoExtension.class) public class SparkQueryDispatcherTest { @@ -108,6 +109,7 @@ public class SparkQueryDispatcherTest { @Mock private QueryIdProvider queryIdProvider; @Mock private AsyncQueryRequestContext asyncQueryRequestContext; @Mock private MetricsService metricsService; + @Mock private AsyncQueryScheduler asyncQueryScheduler; private DataSourceSparkParameterComposer dataSourceSparkParameterComposer = (datasourceMetadata, sparkSubmitParameters, dispatchQueryRequest, context) -> { sparkSubmitParameters.setConfigItem(FLINT_INDEX_STORE_AUTH_KEY, "basic"); diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java index 3bf438aeb9..62ac98f1a2 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpFactoryTest.java @@ -17,6 +17,7 @@ import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadataService; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; @ExtendWith(MockitoExtension.class) class FlintIndexOpFactoryTest { @@ -26,6 +27,7 @@ class FlintIndexOpFactoryTest { @Mock private FlintIndexClient flintIndexClient; @Mock private FlintIndexMetadataService flintIndexMetadataService; @Mock private EMRServerlessClientFactory emrServerlessClientFactory; + @Mock private AsyncQueryScheduler asyncQueryScheduler; @InjectMocks FlintIndexOpFactory flintIndexOpFactory; diff --git a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java index 26858c18fe..08f8efd488 100644 --- a/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java +++ b/async-query-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -18,11 +19,13 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext; import org.opensearch.sql.spark.client.EMRServerlessClientFactory; +import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions; import org.opensearch.sql.spark.flint.FlintIndexClient; import org.opensearch.sql.spark.flint.FlintIndexMetadata; import org.opensearch.sql.spark.flint.FlintIndexState; import org.opensearch.sql.spark.flint.FlintIndexStateModel; import org.opensearch.sql.spark.flint.FlintIndexStateModelService; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; @ExtendWith(MockitoExtension.class) class FlintIndexOpVacuumTest { @@ -30,16 +33,20 @@ class FlintIndexOpVacuumTest { public static final String DATASOURCE_NAME = "DATASOURCE_NAME"; public static final String LATEST_ID = "LATEST_ID"; public static final String INDEX_NAME = "INDEX_NAME"; + public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITH_LATEST_ID = - FlintIndexMetadata.builder().latestId(LATEST_ID).opensearchIndexName(INDEX_NAME).build(); + createFlintIndexMetadataWithLatestId(); + public static final FlintIndexMetadata FLINT_INDEX_METADATA_WITHOUT_LATEST_ID = - FlintIndexMetadata.builder().opensearchIndexName(INDEX_NAME).build(); + createFlintIndexMetadataWithoutLatestId(); + @Mock FlintIndexClient flintIndexClient; @Mock FlintIndexStateModelService flintIndexStateModelService; @Mock EMRServerlessClientFactory emrServerlessClientFactory; @Mock FlintIndexStateModel flintIndexStateModel; @Mock FlintIndexStateModel transitionedFlintIndexStateModel; @Mock AsyncQueryRequestContext asyncQueryRequestContext; + @Mock AsyncQueryScheduler asyncQueryScheduler; RuntimeException testException = new RuntimeException("Test Exception"); @@ -52,7 +59,33 @@ public void setUp() { flintIndexStateModelService, DATASOURCE_NAME, flintIndexClient, - emrServerlessClientFactory); + emrServerlessClientFactory, + asyncQueryScheduler); + } + + private static FlintIndexMetadata createFlintIndexMetadataWithLatestId() { + return FlintIndexMetadata.builder() + .latestId(LATEST_ID) + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(new FlintIndexOptions()) + .build(); + } + + private static FlintIndexMetadata createFlintIndexMetadataWithoutLatestId() { + return FlintIndexMetadata.builder() + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(new FlintIndexOptions()) + .build(); + } + + private FlintIndexMetadata createFlintIndexMetadataWithExternalScheduler() { + FlintIndexOptions flintIndexOptions = new FlintIndexOptions(); + flintIndexOptions.setOption(FlintIndexOptions.SCHEDULER_MODE, "external"); + + return FlintIndexMetadata.builder() + .opensearchIndexName(INDEX_NAME) + .flintIndexOptions(flintIndexOptions) + .build(); } @Test @@ -207,4 +240,22 @@ public void testApplyHappyPath() { .deleteFlintIndexStateModel(LATEST_ID, DATASOURCE_NAME, asyncQueryRequestContext); verify(flintIndexClient).deleteIndex(INDEX_NAME); } + + @Test + public void testRunOpWithExternalScheduler() { + FlintIndexMetadata flintIndexMetadata = createFlintIndexMetadataWithExternalScheduler(); + flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext); + + verify(asyncQueryScheduler).removeJob(INDEX_NAME); + verify(flintIndexClient).deleteIndex(INDEX_NAME); + } + + @Test + public void testRunOpWithoutExternalScheduler() { + FlintIndexMetadata flintIndexMetadata = FLINT_INDEX_METADATA_WITHOUT_LATEST_ID; + flintIndexOpVacuum.runOp(flintIndexMetadata, flintIndexStateModel, asyncQueryRequestContext); + + verify(asyncQueryScheduler, never()).removeJob(INDEX_NAME); + verify(flintIndexClient).deleteIndex(INDEX_NAME); + } } diff --git a/async-query/build.gradle b/async-query/build.gradle index abda6161d3..53fdcbe292 100644 --- a/async-query/build.gradle +++ b/async-query/build.gradle @@ -99,7 +99,7 @@ jacocoTestCoverageVerification { // ignore because XContext IOException 'org.opensearch.sql.spark.execution.statestore.StateStore', 'org.opensearch.sql.spark.rest.*', - 'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser', + 'org.opensearch.sql.spark.scheduler.parser.OpenSearchScheduleQueryJobRequestParser', 'org.opensearch.sql.spark.transport.model.*' ] limit { diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index c7a66fc6be..9ebde4fe83 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -8,10 +8,11 @@ import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Instant; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import org.apache.commons.io.IOUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,12 +35,13 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; -import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob; -import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; -import org.opensearch.threadpool.ThreadPool; +import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner; +import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest; +import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; /** Scheduler class for managing asynchronous query jobs. */ -public class OpenSearchAsyncQueryScheduler { +@RequiredArgsConstructor +public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler { public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler"; public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler"; private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME = @@ -48,22 +50,14 @@ public class OpenSearchAsyncQueryScheduler { "async-query-scheduler-index-settings.yml"; private static final Logger LOG = LogManager.getLogger(); - private Client client; - private ClusterService clusterService; - - /** Loads job resources, setting up required services and job runner instance. */ - public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) { - this.client = client; - this.clusterService = clusterService; - OpenSearchRefreshIndexJob openSearchRefreshIndexJob = - OpenSearchRefreshIndexJob.getJobRunnerInstance(); - openSearchRefreshIndexJob.setClusterService(clusterService); - openSearchRefreshIndexJob.setThreadPool(threadPool); - openSearchRefreshIndexJob.setClient(client); - } + private final Client client; + private final ClusterService clusterService; + @Override /** Schedules a new job by indexing it into the job index. */ - public void scheduleJob(OpenSearchRefreshIndexJobRequest request) { + public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest); if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { createAsyncQuerySchedulerIndex(); } @@ -92,19 +86,28 @@ public void scheduleJob(OpenSearchRefreshIndexJobRequest request) { } /** Unschedules a job by marking it as disabled and updating its last update time. */ - public void unscheduleJob(String jobId) throws IOException { - assertIndexExists(); - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(jobId) + @Override + public void unscheduleJob(String jobId) { + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(jobId) .enabled(false) .lastUpdateTime(Instant.now()) .build(); - updateJob(request); + try { + updateJob(request); + LOG.info("Unscheduled job for jobId: {}", jobId); + } catch (IllegalStateException | DocumentMissingException e) { + LOG.error("Failed to unschedule job: {}", jobId, e); + } } /** Updates an existing job with new parameters. */ - public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException { + @Override + @SneakyThrows + public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest); assertIndexExists(); UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName()); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -130,6 +133,7 @@ public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOExcepti } /** Removes a job by deleting its document from the index. */ + @Override public void removeJob(String jobId) { assertIndexExists(); DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId); @@ -192,6 +196,6 @@ private void assertIndexExists() { /** Returns the job runner instance for the scheduler. */ public static ScheduledJobRunner getJobRunner() { - return OpenSearchRefreshIndexJob.getJobRunnerInstance(); + return ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); } } diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java deleted file mode 100644 index e465a8790f..0000000000 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJob.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.scheduler.job; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.client.Client; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.jobscheduler.spi.JobExecutionContext; -import org.opensearch.jobscheduler.spi.ScheduledJobParameter; -import org.opensearch.jobscheduler.spi.ScheduledJobRunner; -import org.opensearch.plugins.Plugin; -import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; -import org.opensearch.threadpool.ThreadPool; - -/** - * The job runner class for scheduling refresh index query. - * - *

The job runner should be a singleton class if it uses OpenSearch client or other objects - * passed from OpenSearch. Because when registering the job runner to JobScheduler plugin, - * OpenSearch has not invoked plugins' createComponents() method. That is saying the plugin is not - * completely initialized, and the OpenSearch {@link org.opensearch.client.Client}, {@link - * ClusterService} and other objects are not available to plugin and this job runner. - * - *

So we have to move this job runner initialization to {@link Plugin} createComponents() method, - * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler - * plugin. - */ -public class OpenSearchRefreshIndexJob implements ScheduledJobRunner { - - private static final Logger log = LogManager.getLogger(OpenSearchRefreshIndexJob.class); - - public static OpenSearchRefreshIndexJob INSTANCE = new OpenSearchRefreshIndexJob(); - - public static OpenSearchRefreshIndexJob getJobRunnerInstance() { - return INSTANCE; - } - - private ClusterService clusterService; - private ThreadPool threadPool; - private Client client; - - private OpenSearchRefreshIndexJob() { - // Singleton class, use getJobRunnerInstance method instead of constructor - } - - public void setClusterService(ClusterService clusterService) { - this.clusterService = clusterService; - } - - public void setThreadPool(ThreadPool threadPool) { - this.threadPool = threadPool; - } - - public void setClient(Client client) { - this.client = client; - } - - @Override - public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { - if (!(jobParameter instanceof OpenSearchRefreshIndexJobRequest)) { - throw new IllegalStateException( - "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " - + jobParameter.getClass().getCanonicalName()); - } - - if (this.clusterService == null) { - throw new IllegalStateException("ClusterService is not initialized."); - } - - if (this.threadPool == null) { - throw new IllegalStateException("ThreadPool is not initialized."); - } - - if (this.client == null) { - throw new IllegalStateException("Client is not initialized."); - } - - Runnable runnable = - () -> { - doRefresh(jobParameter.getName()); - }; - threadPool.generic().submit(runnable); - } - - void doRefresh(String refreshIndex) { - // TODO: add logic to refresh index - log.info("Scheduled refresh index job on : " + refreshIndex); - } -} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java new file mode 100644 index 0000000000..3652acf295 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java @@ -0,0 +1,116 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.plugins.Plugin; +import org.opensearch.sql.legacy.executor.AsyncRestExecutor; +import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; +import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse; +import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; +import org.opensearch.threadpool.ThreadPool; + +/** + * The job runner class for scheduling async query. + * + *

The job runner should be a singleton class if it uses OpenSearch client or other objects + * passed from OpenSearch. Because when registering the job runner to JobScheduler plugin, + * OpenSearch has not invoked plugins' createComponents() method. That is saying the plugin is not + * completely initialized, and the OpenSearch {@link org.opensearch.client.Client}, {@link + * ClusterService} and other objects are not available to plugin and this job runner. + * + *

So we have to move this job runner initialization to {@link Plugin} createComponents() method, + * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler + * plugin. + */ +public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner { + // Share SQL plugin thread pool + private static final String ASYNC_QUERY_THREAD_POOL_NAME = + AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME; + private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class); + + private static ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner(); + + public static ScheduledAsyncQueryJobRunner getJobRunnerInstance() { + return INSTANCE; + } + + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + private AsyncQueryExecutorService asyncQueryExecutorService; + + private ScheduledAsyncQueryJobRunner() { + // Singleton class, use getJobRunnerInstance method instead of constructor + } + + /** Loads job resources, setting up required services and job runner instance. */ + public void loadJobResource( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + AsyncQueryExecutorService asyncQueryExecutorService) { + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + this.asyncQueryExecutorService = asyncQueryExecutorService; + } + + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + // Parser will convert jobParameter to ScheduledAsyncQueryJobRequest + if (!(jobParameter instanceof ScheduledAsyncQueryJobRequest)) { + throw new IllegalStateException( + "Job parameter is not instance of ScheduledAsyncQueryJobRequest, type: " + + jobParameter.getClass().getCanonicalName()); + } + + if (this.clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (this.threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + if (this.client == null) { + throw new IllegalStateException("Client is not initialized."); + } + + if (this.asyncQueryExecutorService == null) { + throw new IllegalStateException("AsyncQueryExecutorService is not initialized."); + } + + Runnable runnable = + () -> { + try { + doRefresh((ScheduledAsyncQueryJobRequest) jobParameter); + } catch (Throwable throwable) { + LOGGER.error(throwable); + } + }; + threadPool.executor(ASYNC_QUERY_THREAD_POOL_NAME).submit(runnable); + } + + void doRefresh(ScheduledAsyncQueryJobRequest request) { + LOGGER.info("Scheduled refresh index job on job: " + request.getName()); + CreateAsyncQueryRequest createAsyncQueryRequest = + new CreateAsyncQueryRequest( + request.getScheduledQuery(), request.getDataSource(), request.getQueryLang()); + CreateAsyncQueryResponse createAsyncQueryResponse = + asyncQueryExecutorService.createAsyncQuery( + createAsyncQueryRequest, new NullAsyncQueryRequestContext()); + LOGGER.info("Created async query with queryId: " + createAsyncQueryResponse.getQueryId()); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java deleted file mode 100644 index 7eaa4e2d29..0000000000 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.scheduler.model; - -import java.io.IOException; -import java.time.Instant; -import lombok.Builder; -import org.opensearch.core.xcontent.ToXContent; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.jobscheduler.spi.ScheduledJobParameter; -import org.opensearch.jobscheduler.spi.schedule.Schedule; - -/** Represents a job request to refresh index. */ -@Builder -public class OpenSearchRefreshIndexJobRequest implements ScheduledJobParameter { - // Constant fields for JSON serialization - public static final String JOB_NAME_FIELD = "jobName"; - public static final String JOB_TYPE_FIELD = "jobType"; - public static final String LAST_UPDATE_TIME_FIELD = "lastUpdateTime"; - public static final String LAST_UPDATE_TIME_FIELD_READABLE = "last_update_time_field"; - public static final String SCHEDULE_FIELD = "schedule"; - public static final String ENABLED_TIME_FIELD = "enabledTime"; - public static final String ENABLED_TIME_FIELD_READABLE = "enabled_time_field"; - public static final String LOCK_DURATION_SECONDS = "lockDurationSeconds"; - public static final String JITTER = "jitter"; - public static final String ENABLED_FIELD = "enabled"; - - // name is doc id - private final String jobName; - private final String jobType; - private final Schedule schedule; - private final boolean enabled; - private final Instant lastUpdateTime; - private final Instant enabledTime; - private final Long lockDurationSeconds; - private final Double jitter; - - @Override - public String getName() { - return jobName; - } - - public String getJobType() { - return jobType; - } - - @Override - public Schedule getSchedule() { - return schedule; - } - - @Override - public boolean isEnabled() { - return enabled; - } - - @Override - public Instant getLastUpdateTime() { - return lastUpdateTime; - } - - @Override - public Instant getEnabledTime() { - return enabledTime; - } - - @Override - public Long getLockDurationSeconds() { - return lockDurationSeconds; - } - - @Override - public Double getJitter() { - return jitter; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) - throws IOException { - builder.startObject(); - builder.field(JOB_NAME_FIELD, getName()).field(ENABLED_FIELD, isEnabled()); - if (getSchedule() != null) { - builder.field(SCHEDULE_FIELD, getSchedule()); - } - if (getJobType() != null) { - builder.field(JOB_TYPE_FIELD, getJobType()); - } - if (getEnabledTime() != null) { - builder.timeField( - ENABLED_TIME_FIELD, ENABLED_TIME_FIELD_READABLE, getEnabledTime().toEpochMilli()); - } - builder.timeField( - LAST_UPDATE_TIME_FIELD, - LAST_UPDATE_TIME_FIELD_READABLE, - getLastUpdateTime().toEpochMilli()); - if (this.lockDurationSeconds != null) { - builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds); - } - if (this.jitter != null) { - builder.field(JITTER, this.jitter); - } - builder.endObject(); - return builder; - } -} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java new file mode 100644 index 0000000000..9b85a11888 --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java @@ -0,0 +1,156 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.model; + +import java.io.IOException; +import java.time.Instant; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.sql.spark.rest.model.LangType; +import org.opensearch.sql.spark.scheduler.parser.IntervalScheduleParser; + +/** Represents a job request to refresh index. */ +@Data +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class ScheduledAsyncQueryJobRequest extends AsyncQuerySchedulerRequest + implements ScheduledJobParameter { + // Constant fields for JSON serialization + public static final String ACCOUNT_ID_FIELD = "accountId"; + public static final String JOB_ID_FIELD = "jobId"; + public static final String DATA_SOURCE_NAME_FIELD = "dataSource"; + public static final String SCHEDULED_QUERY_FIELD = "scheduledQuery"; + public static final String QUERY_LANG_FIELD = "queryLang"; + public static final String LAST_UPDATE_TIME_FIELD = "lastUpdateTime"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String ENABLED_TIME_FIELD = "enabledTime"; + public static final String LOCK_DURATION_SECONDS = "lockDurationSeconds"; + public static final String JITTER = "jitter"; + public static final String ENABLED_FIELD = "enabled"; + private final Schedule schedule; + + @Builder + public ScheduledAsyncQueryJobRequest( + String accountId, + String jobId, + String dataSource, + String scheduledQuery, + LangType queryLang, + Schedule schedule, // Use the OpenSearch Schedule type + boolean enabled, + Instant lastUpdateTime, + Instant enabledTime, + Long lockDurationSeconds, + Double jitter) { + super( + accountId, + jobId, + dataSource, + scheduledQuery, + queryLang, + schedule, + enabled, + lastUpdateTime, + enabledTime, + lockDurationSeconds, + jitter); + this.schedule = schedule; + } + + @Override + public String getName() { + return getJobId(); + } + + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public Instant getLastUpdateTime() { + return lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return enabledTime; + } + + @Override + public Schedule getSchedule() { + return schedule; + } + + @Override + public Long getLockDurationSeconds() { + return lockDurationSeconds; + } + + @Override + public Double getJitter() { + return jitter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) + throws IOException { + builder.startObject(); + if (getAccountId() != null) { + builder.field(ACCOUNT_ID_FIELD, getAccountId()); + } + builder.field(JOB_ID_FIELD, getJobId()).field(ENABLED_FIELD, isEnabled()); + if (getDataSource() != null) { + builder.field(DATA_SOURCE_NAME_FIELD, getDataSource()); + } + if (getScheduledQuery() != null) { + builder.field(SCHEDULED_QUERY_FIELD, getScheduledQuery()); + } + if (getQueryLang() != null) { + builder.field(QUERY_LANG_FIELD, getQueryLang()); + } + if (getSchedule() != null) { + builder.field(SCHEDULE_FIELD, getSchedule()); + } + if (getEnabledTime() != null) { + builder.field(ENABLED_TIME_FIELD, getEnabledTime().toEpochMilli()); + } + builder.field(LAST_UPDATE_TIME_FIELD, getLastUpdateTime().toEpochMilli()); + if (this.lockDurationSeconds != null) { + builder.field(LOCK_DURATION_SECONDS, this.lockDurationSeconds); + } + if (this.jitter != null) { + builder.field(JITTER, this.jitter); + } + builder.endObject(); + return builder; + } + + public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest( + AsyncQuerySchedulerRequest request) { + Instant updateTime = + request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now(); + return ScheduledAsyncQueryJobRequest.builder() + .accountId(request.getAccountId()) + .jobId(request.getJobId()) + .dataSource(request.getDataSource()) + .scheduledQuery(request.getScheduledQuery()) + .queryLang(request.getQueryLang()) + .enabled(request.isEnabled()) + .lastUpdateTime(updateTime) + .enabledTime(request.getEnabledTime()) + .lockDurationSeconds(request.getLockDurationSeconds()) + .jitter(request.getJitter()) + .schedule(IntervalScheduleParser.parse(request.getSchedule(), updateTime)) + .build(); + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java new file mode 100644 index 0000000000..2d5a1b332f --- /dev/null +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java @@ -0,0 +1,100 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.parser; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +/** Parse string raw schedule into job scheduler IntervalSchedule */ +public class IntervalScheduleParser { + + private static final Pattern DURATION_PATTERN = + Pattern.compile( + "^(\\d+)\\s*(years?|months?|weeks?|days?|hours?|minutes?|minute|mins?|seconds?|secs?|milliseconds?|millis?|microseconds?|microsecond|micros?|micros|nanoseconds?|nanos?)$", + Pattern.CASE_INSENSITIVE); + + public static Schedule parse(Object schedule, Instant startTime) { + if (schedule == null) { + return null; + } + + if (schedule instanceof Schedule) { + return (Schedule) schedule; + } + + if (!(schedule instanceof String)) { + throw new IllegalArgumentException("Schedule must be a String object for parsing."); + } + + String intervalStr = ((String) schedule).trim().toLowerCase(); + + Matcher matcher = DURATION_PATTERN.matcher(intervalStr); + if (!matcher.matches()) { + throw new IllegalArgumentException("Invalid interval format: " + intervalStr); + } + + long value = Long.parseLong(matcher.group(1)); + String unitStr = matcher.group(2).toLowerCase(); + + // Convert to a supported unit or directly return an IntervalSchedule + long intervalInMinutes = convertToSupportedUnit(value, unitStr); + + return new IntervalSchedule(startTime, (int) intervalInMinutes, ChronoUnit.MINUTES); + } + + @VisibleForTesting + protected static long convertToSupportedUnit(long value, String unitStr) { + switch (unitStr) { + case "years": + case "year": + throw new IllegalArgumentException("Years cannot be converted to minutes accurately."); + case "months": + case "month": + throw new IllegalArgumentException("Months cannot be converted to minutes accurately."); + case "weeks": + case "week": + return value * 7 * 24 * 60; // Convert weeks to minutes + case "days": + case "day": + return value * 24 * 60; // Convert days to minutes + case "hours": + case "hour": + return value * 60; // Convert hours to minutes + case "minutes": + case "minute": + case "mins": + case "min": + return value; // Already in minutes + case "seconds": + case "second": + case "secs": + case "sec": + return value / 60; // Convert seconds to minutes + case "milliseconds": + case "millisecond": + case "millis": + case "milli": + return value / (60 * 1000); // Convert milliseconds to minutes + case "microseconds": + case "microsecond": + case "micros": + case "micro": + return value / (60 * 1000 * 1000); // Convert microseconds to minutes + case "nanoseconds": + case "nanosecond": + case "nanos": + case "nano": + return value / (60 * 1000 * 1000 * 1000L); // Convert nanoseconds to minutes + default: + throw new IllegalArgumentException("Unsupported time unit: " + unitStr); + } + } +} diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchRefreshIndexJobRequestParser.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java similarity index 57% rename from async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchRefreshIndexJobRequestParser.java rename to async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java index 0422e7c015..9e33ef0248 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchRefreshIndexJobRequestParser.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/OpenSearchScheduleQueryJobRequestParser.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.sql.spark.scheduler; +package org.opensearch.sql.spark.scheduler.parser; import java.io.IOException; import java.time.Instant; @@ -11,9 +11,10 @@ import org.opensearch.core.xcontent.XContentParserUtils; import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; -import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; +import org.opensearch.sql.spark.rest.model.LangType; +import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; -public class OpenSearchRefreshIndexJobRequestParser { +public class OpenSearchScheduleQueryJobRequestParser { private static Instant parseInstantValue(XContentParser parser) throws IOException { if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { @@ -28,8 +29,8 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti public static ScheduledJobParser getJobParser() { return (parser, id, jobDocVersion) -> { - OpenSearchRefreshIndexJobRequest.OpenSearchRefreshIndexJobRequestBuilder builder = - OpenSearchRefreshIndexJobRequest.builder(); + ScheduledAsyncQueryJobRequest.ScheduledAsyncQueryJobRequestBuilder builder = + ScheduledAsyncQueryJobRequest.builder(); XContentParserUtils.ensureExpectedToken( XContentParser.Token.START_OBJECT, parser.nextToken(), parser); @@ -37,28 +38,37 @@ public static ScheduledJobParser getJobParser() { String fieldName = parser.currentName(); parser.nextToken(); switch (fieldName) { - case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD: - builder.jobName(parser.text()); + case ScheduledAsyncQueryJobRequest.ACCOUNT_ID_FIELD: + builder.accountId(parser.text()); break; - case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD: - builder.jobType(parser.text()); + case ScheduledAsyncQueryJobRequest.JOB_ID_FIELD: + builder.jobId(parser.text()); break; - case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD: + case ScheduledAsyncQueryJobRequest.DATA_SOURCE_NAME_FIELD: + builder.dataSource(parser.text()); + break; + case ScheduledAsyncQueryJobRequest.SCHEDULED_QUERY_FIELD: + builder.scheduledQuery(parser.text()); + break; + case ScheduledAsyncQueryJobRequest.QUERY_LANG_FIELD: + builder.queryLang(LangType.fromString(parser.text())); + break; + case ScheduledAsyncQueryJobRequest.ENABLED_FIELD: builder.enabled(parser.booleanValue()); break; - case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD: + case ScheduledAsyncQueryJobRequest.ENABLED_TIME_FIELD: builder.enabledTime(parseInstantValue(parser)); break; - case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD: + case ScheduledAsyncQueryJobRequest.LAST_UPDATE_TIME_FIELD: builder.lastUpdateTime(parseInstantValue(parser)); break; - case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD: + case ScheduledAsyncQueryJobRequest.SCHEDULE_FIELD: builder.schedule(ScheduleParser.parse(parser)); break; - case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS: + case ScheduledAsyncQueryJobRequest.LOCK_DURATION_SECONDS: builder.lockDurationSeconds(parser.longValue()); break; - case OpenSearchRefreshIndexJobRequest.JITTER: + case ScheduledAsyncQueryJobRequest.JITTER: builder.jitter(parser.doubleValue()); break; default: diff --git a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java index 9cc69b2fb7..52ffda483c 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/transport/config/AsyncExecutorServiceModule.java @@ -61,6 +61,8 @@ import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; +import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; @RequiredArgsConstructor public class AsyncExecutorServiceModule extends AbstractModule { @@ -136,12 +138,14 @@ public FlintIndexOpFactory flintIndexOpFactory( FlintIndexStateModelService flintIndexStateModelService, FlintIndexClient flintIndexClient, FlintIndexMetadataServiceImpl flintIndexMetadataService, - EMRServerlessClientFactory emrServerlessClientFactory) { + EMRServerlessClientFactory emrServerlessClientFactory, + AsyncQueryScheduler asyncQueryScheduler) { return new FlintIndexOpFactory( flintIndexStateModelService, flintIndexClient, flintIndexMetadataService, - emrServerlessClientFactory); + emrServerlessClientFactory, + asyncQueryScheduler); } @Provides @@ -245,6 +249,14 @@ public SessionConfigSupplier sessionConfigSupplier(Settings settings) { return new OpenSearchSessionConfigSupplier(settings); } + @Provides + @Singleton + public AsyncQueryScheduler asyncQueryScheduler(NodeClient client, ClusterService clusterService) { + OpenSearchAsyncQueryScheduler scheduler = + new OpenSearchAsyncQueryScheduler(client, clusterService); + return scheduler; + } + private void registerStateStoreMetrics(StateStore stateStore) { GaugeMetric activeSessionMetric = new GaugeMetric<>( diff --git a/async-query/src/main/resources/async-query-scheduler-index-mapping.yml b/async-query/src/main/resources/async-query-scheduler-index-mapping.yml index 36bd1b873e..1aa90e8ed8 100644 --- a/async-query/src/main/resources/async-query-scheduler-index-mapping.yml +++ b/async-query/src/main/resources/async-query-scheduler-index-mapping.yml @@ -8,9 +8,15 @@ # Also "dynamic" is set to "false" so that other fields cannot be added. dynamic: false properties: - name: + accountId: type: keyword - jobType: + jobId: + type: keyword + dataSource: + type: keyword + scheduledQuery: + type: text + queryLang: type: keyword lastUpdateTime: type: date diff --git a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java index 641b083d53..9b897d36b4 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryExecutorServiceSpec.java @@ -100,6 +100,8 @@ import org.opensearch.sql.spark.parameter.SparkSubmitParametersBuilderProvider; import org.opensearch.sql.spark.response.JobExecutionResponseReader; import org.opensearch.sql.spark.response.OpenSearchJobExecutionResponseReader; +import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler; +import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; import org.opensearch.sql.storage.DataSourceFactory; import org.opensearch.test.OpenSearchIntegTestCase; @@ -124,6 +126,7 @@ public class AsyncQueryExecutorServiceSpec extends OpenSearchIntegTestCase { protected StateStore stateStore; protected SessionStorageService sessionStorageService; protected StatementStorageService statementStorageService; + protected AsyncQueryScheduler asyncQueryScheduler; protected AsyncQueryRequestContext asyncQueryRequestContext; protected SessionIdProvider sessionIdProvider = new DatasourceEmbeddedSessionIdProvider(); @@ -204,6 +207,7 @@ public void setup() { new OpenSearchSessionStorageService(stateStore, new SessionModelXContentSerializer()); statementStorageService = new OpenSearchStatementStorageService(stateStore, new StatementModelXContentSerializer()); + asyncQueryScheduler = new OpenSearchAsyncQueryScheduler(client, clusterService); } protected FlintIndexOpFactory getFlintIndexOpFactory( @@ -212,7 +216,8 @@ protected FlintIndexOpFactory getFlintIndexOpFactory( flintIndexStateModelService, flintIndexClient, flintIndexMetadataService, - emrServerlessClientFactory); + emrServerlessClientFactory, + asyncQueryScheduler); } @After @@ -298,7 +303,8 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService( flintIndexStateModelService, flintIndexClient, new FlintIndexMetadataServiceImpl(client), - emrServerlessClientFactory), + emrServerlessClientFactory, + asyncQueryScheduler), emrServerlessClientFactory, new OpenSearchMetricsService(), sparkSubmitParametersBuilderProvider); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java index de86f111f3..a4a6eb6471 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQuerySchedulerTest.java @@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -42,8 +43,7 @@ import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; -import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; -import org.opensearch.threadpool.ThreadPool; +import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; public class OpenSearchAsyncQuerySchedulerTest { @@ -57,9 +57,6 @@ public class OpenSearchAsyncQuerySchedulerTest { @Mock(answer = Answers.RETURNS_DEEP_STUBS) private ClusterService clusterService; - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private ThreadPool threadPool; - @Mock private ActionFuture indexResponseActionFuture; @Mock private ActionFuture updateResponseActionFuture; @@ -77,8 +74,7 @@ public class OpenSearchAsyncQuerySchedulerTest { @BeforeEach public void setup() { MockitoAnnotations.openMocks(this); - scheduler = new OpenSearchAsyncQueryScheduler(); - scheduler.loadJobResource(client, clusterService, threadPool); + scheduler = new OpenSearchAsyncQueryScheduler(client, clusterService); } @Test @@ -95,9 +91,9 @@ public void testScheduleJob() { when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse); when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED); - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -119,9 +115,9 @@ public void testScheduleJobWithExistingJob() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) .thenReturn(Boolean.TRUE); - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -148,9 +144,9 @@ public void testScheduleJobWithExceptions() { .thenReturn(new CreateIndexResponse(true, true, TEST_SCHEDULER_INDEX_NAME)); when(client.index(any(IndexRequest.class))).thenThrow(new RuntimeException("Test exception")); - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -199,14 +195,17 @@ public void testUnscheduleJob() throws IOException { public void testUnscheduleJobWithIndexNotFound() { when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false); - assertThrows(IllegalStateException.class, () -> scheduler.unscheduleJob(TEST_JOB_ID)); + scheduler.unscheduleJob(TEST_JOB_ID); + + // Verify that no update operation was performed + verify(client, never()).update(any(UpdateRequest.class)); } @Test public void testUpdateJob() throws IOException { - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -229,9 +228,9 @@ public void testUpdateJob() throws IOException { @Test public void testUpdateJobWithIndexNotFound() { - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -242,9 +241,9 @@ public void testUpdateJobWithIndexNotFound() { @Test public void testUpdateJobWithExceptions() { - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -351,9 +350,9 @@ public void testCreateAsyncQuerySchedulerIndexFailure() { Mockito.when(createIndexResponseActionFuture.actionGet()) .thenReturn(new CreateIndexResponse(false, false, SCHEDULER_INDEX_NAME)); - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); @@ -367,9 +366,9 @@ public void testCreateAsyncQuerySchedulerIndexFailure() { @Test public void testUpdateJobNotFound() { - OpenSearchRefreshIndexJobRequest request = - OpenSearchRefreshIndexJobRequest.builder() - .jobName(TEST_JOB_ID) + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId(TEST_JOB_ID) .lastUpdateTime(Instant.now()) .build(); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java deleted file mode 100644 index cbf137997e..0000000000 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/OpenSearchRefreshIndexJobTest.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.scheduler.job; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -import java.time.Instant; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.Answers; -import org.mockito.ArgumentCaptor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; -import org.opensearch.client.Client; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.jobscheduler.spi.JobExecutionContext; -import org.opensearch.jobscheduler.spi.ScheduledJobParameter; -import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; -import org.opensearch.threadpool.ThreadPool; - -public class OpenSearchRefreshIndexJobTest { - - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private ClusterService clusterService; - - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private ThreadPool threadPool; - - @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private Client client; - - @Mock private JobExecutionContext context; - - private OpenSearchRefreshIndexJob jobRunner; - - private OpenSearchRefreshIndexJob spyJobRunner; - - @BeforeEach - public void setup() { - MockitoAnnotations.openMocks(this); - jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance(); - jobRunner.setClient(null); - jobRunner.setClusterService(null); - jobRunner.setThreadPool(null); - } - - @Test - public void testRunJobWithCorrectParameter() { - spyJobRunner = spy(jobRunner); - spyJobRunner.setClusterService(clusterService); - spyJobRunner.setThreadPool(threadPool); - spyJobRunner.setClient(client); - - OpenSearchRefreshIndexJobRequest jobParameter = - OpenSearchRefreshIndexJobRequest.builder() - .jobName("testJob") - .lastUpdateTime(Instant.now()) - .lockDurationSeconds(10L) - .build(); - - spyJobRunner.runJob(jobParameter, context); - - ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); - verify(threadPool.generic()).submit(captor.capture()); - - Runnable runnable = captor.getValue(); - runnable.run(); - - verify(spyJobRunner).doRefresh(eq(jobParameter.getName())); - } - - @Test - public void testRunJobWithIncorrectParameter() { - jobRunner = OpenSearchRefreshIndexJob.getJobRunnerInstance(); - jobRunner.setClusterService(clusterService); - jobRunner.setThreadPool(threadPool); - jobRunner.setClient(client); - - ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class); - - IllegalStateException exception = - assertThrows( - IllegalStateException.class, - () -> jobRunner.runJob(wrongParameter, context), - "Expected IllegalStateException but no exception was thrown"); - - assertEquals( - "Job parameter is not instance of OpenSearchRefreshIndexJobRequest, type: " - + wrongParameter.getClass().getCanonicalName(), - exception.getMessage()); - } - - @Test - public void testRunJobWithUninitializedServices() { - OpenSearchRefreshIndexJobRequest jobParameter = - OpenSearchRefreshIndexJobRequest.builder() - .jobName("testJob") - .lastUpdateTime(Instant.now()) - .build(); - - IllegalStateException exception = - assertThrows( - IllegalStateException.class, - () -> jobRunner.runJob(jobParameter, context), - "Expected IllegalStateException but no exception was thrown"); - assertEquals("ClusterService is not initialized.", exception.getMessage()); - - jobRunner.setClusterService(clusterService); - - exception = - assertThrows( - IllegalStateException.class, - () -> jobRunner.runJob(jobParameter, context), - "Expected IllegalStateException but no exception was thrown"); - assertEquals("ThreadPool is not initialized.", exception.getMessage()); - - jobRunner.setThreadPool(threadPool); - - exception = - assertThrows( - IllegalStateException.class, - () -> jobRunner.runJob(jobParameter, context), - "Expected IllegalStateException but no exception was thrown"); - assertEquals("Client is not initialized.", exception.getMessage()); - } - - @Test - public void testGetJobRunnerInstanceMultipleCalls() { - OpenSearchRefreshIndexJob instance1 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); - OpenSearchRefreshIndexJob instance2 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); - OpenSearchRefreshIndexJob instance3 = OpenSearchRefreshIndexJob.getJobRunnerInstance(); - - assertSame(instance1, instance2); - assertSame(instance2, instance3); - } -} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java new file mode 100644 index 0000000000..cba8d43a2a --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java @@ -0,0 +1,210 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.job; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.LogEvent; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Answers; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.sql.legacy.executor.AsyncRestExecutor; +import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; +import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; +import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; +import org.opensearch.sql.spark.rest.model.LangType; +import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest; +import org.opensearch.threadpool.ThreadPool; + +public class ScheduledAsyncQueryJobRunnerTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ClusterService clusterService; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private ThreadPool threadPool; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private Client client; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private AsyncQueryExecutorService asyncQueryExecutorService; + + @Mock private JobExecutionContext context; + + private ScheduledAsyncQueryJobRunner jobRunner; + + private ScheduledAsyncQueryJobRunner spyJobRunner; + + @BeforeEach + public void setup() { + MockitoAnnotations.openMocks(this); + jobRunner = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + jobRunner.loadJobResource(null, null, null, null); + } + + @Test + public void testRunJobWithCorrectParameter() { + spyJobRunner = spy(jobRunner); + spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); + + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId("testJob") + .lastUpdateTime(Instant.now()) + .lockDurationSeconds(10L) + .scheduledQuery("REFRESH INDEX testIndex") + .dataSource("testDataSource") + .queryLang(LangType.SQL) + .build(); + + CreateAsyncQueryRequest createAsyncQueryRequest = + new CreateAsyncQueryRequest( + request.getScheduledQuery(), request.getDataSource(), request.getQueryLang()); + spyJobRunner.runJob(request, context); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + .submit(captor.capture()); + + Runnable runnable = captor.getValue(); + runnable.run(); + + verify(spyJobRunner).doRefresh(eq(request)); + verify(asyncQueryExecutorService) + .createAsyncQuery(eq(createAsyncQueryRequest), any(NullAsyncQueryRequestContext.class)); + } + + @Test + public void testRunJobWithIncorrectParameter() { + jobRunner = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + jobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); + + ScheduledJobParameter wrongParameter = mock(ScheduledJobParameter.class); + + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(wrongParameter, context), + "Expected IllegalStateException but no exception was thrown"); + + assertEquals( + "Job parameter is not instance of ScheduledAsyncQueryJobRequest, type: " + + wrongParameter.getClass().getCanonicalName(), + exception.getMessage()); + } + + @Test + public void testDoRefreshThrowsException() { + spyJobRunner = spy(jobRunner); + spyJobRunner.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); + + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .jobId("testJob") + .lastUpdateTime(Instant.now()) + .lockDurationSeconds(10L) + .scheduledQuery("REFRESH INDEX testIndex") + .dataSource("testDataSource") + .queryLang(LangType.SQL) + .build(); + + doThrow(new RuntimeException("Test exception")).when(spyJobRunner).doRefresh(request); + + Logger logger = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class); + Appender mockAppender = mock(Appender.class); + when(mockAppender.getName()).thenReturn("MockAppender"); + when(mockAppender.isStarted()).thenReturn(true); + when(mockAppender.isStopped()).thenReturn(false); + ((org.apache.logging.log4j.core.Logger) logger) + .addAppender((org.apache.logging.log4j.core.Appender) mockAppender); + + spyJobRunner.runJob(request, context); + + ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); + verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + .submit(captor.capture()); + + Runnable runnable = captor.getValue(); + runnable.run(); + + verify(spyJobRunner).doRefresh(eq(request)); + verify(mockAppender).append(any(LogEvent.class)); + } + + @Test + public void testRunJobWithUninitializedServices() { + ScheduledAsyncQueryJobRequest jobParameter = + ScheduledAsyncQueryJobRequest.builder() + .jobId("testJob") + .lastUpdateTime(Instant.now()) + .build(); + + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(jobParameter, context), + "Expected IllegalStateException but no exception was thrown"); + assertEquals("ClusterService is not initialized.", exception.getMessage()); + + jobRunner.loadJobResource(null, clusterService, null, null); + + exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(jobParameter, context), + "Expected IllegalStateException but no exception was thrown"); + assertEquals("ThreadPool is not initialized.", exception.getMessage()); + + jobRunner.loadJobResource(null, clusterService, threadPool, null); + + exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(jobParameter, context), + "Expected IllegalStateException but no exception was thrown"); + assertEquals("Client is not initialized.", exception.getMessage()); + + jobRunner.loadJobResource(client, clusterService, threadPool, null); + + exception = + assertThrows( + IllegalStateException.class, + () -> jobRunner.runJob(jobParameter, context), + "Expected IllegalStateException but no exception was thrown"); + assertEquals("AsyncQueryExecutorService is not initialized.", exception.getMessage()); + } + + @Test + public void testGetJobRunnerInstanceMultipleCalls() { + ScheduledAsyncQueryJobRunner instance1 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + ScheduledAsyncQueryJobRunner instance2 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + ScheduledAsyncQueryJobRunner instance3 = ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); + + assertSame(instance1, instance2); + assertSame(instance2, instance3); + } +} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java deleted file mode 100644 index 108f1acfd5..0000000000 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/OpenSearchRefreshIndexJobRequestTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.sql.spark.scheduler.model; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; - -import java.io.IOException; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import org.junit.jupiter.api.Test; -import org.opensearch.common.xcontent.XContentFactory; -import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; - -public class OpenSearchRefreshIndexJobRequestTest { - - @Test - public void testBuilderAndGetterMethods() { - Instant now = Instant.now(); - IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); - - OpenSearchRefreshIndexJobRequest jobRequest = - OpenSearchRefreshIndexJobRequest.builder() - .jobName("testJob") - .jobType("testType") - .schedule(schedule) - .enabled(true) - .lastUpdateTime(now) - .enabledTime(now) - .lockDurationSeconds(60L) - .jitter(0.1) - .build(); - - assertEquals("testJob", jobRequest.getName()); - assertEquals("testType", jobRequest.getJobType()); - assertEquals(schedule, jobRequest.getSchedule()); - assertTrue(jobRequest.isEnabled()); - assertEquals(now, jobRequest.getLastUpdateTime()); - assertEquals(now, jobRequest.getEnabledTime()); - assertEquals(60L, jobRequest.getLockDurationSeconds()); - assertEquals(0.1, jobRequest.getJitter()); - } - - @Test - public void testToXContent() throws IOException { - Instant now = Instant.now(); - IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); - - OpenSearchRefreshIndexJobRequest jobRequest = - OpenSearchRefreshIndexJobRequest.builder() - .jobName("testJob") - .jobType("testType") - .schedule(schedule) - .enabled(true) - .lastUpdateTime(now) - .enabledTime(now) - .lockDurationSeconds(60L) - .jitter(0.1) - .build(); - - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); - jobRequest.toXContent(builder, EMPTY_PARAMS); - String jsonString = builder.toString(); - - assertTrue(jsonString.contains("\"jobName\" : \"testJob\"")); - assertTrue(jsonString.contains("\"jobType\" : \"testType\"")); - assertTrue(jsonString.contains("\"start_time\" : " + now.toEpochMilli())); - assertTrue(jsonString.contains("\"period\" : 1")); - assertTrue(jsonString.contains("\"unit\" : \"Minutes\"")); - assertTrue(jsonString.contains("\"enabled\" : true")); - assertTrue(jsonString.contains("\"lastUpdateTime\" : " + now.toEpochMilli())); - assertTrue(jsonString.contains("\"enabledTime\" : " + now.toEpochMilli())); - assertTrue(jsonString.contains("\"lockDurationSeconds\" : 60")); - assertTrue(jsonString.contains("\"jitter\" : 0.1")); - } -} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java new file mode 100644 index 0000000000..85d1948dc3 --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java @@ -0,0 +1,210 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.model; + +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.junit.jupiter.api.Test; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.sql.spark.rest.model.LangType; + +public class ScheduledAsyncQueryJobRequestTest { + + @Test + public void testBuilderAndGetterMethods() { + Instant now = Instant.now(); + IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); + + ScheduledAsyncQueryJobRequest jobRequest = + ScheduledAsyncQueryJobRequest.builder() + .accountId("testAccount") + .jobId("testJob") + .dataSource("testDataSource") + .scheduledQuery("SELECT * FROM test") + .queryLang(LangType.SQL) + .schedule(schedule) + .enabled(true) + .lastUpdateTime(now) + .enabledTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + assertEquals("testAccount", jobRequest.getAccountId()); + assertEquals("testJob", jobRequest.getJobId()); + assertEquals("testJob", jobRequest.getName()); + assertEquals("testDataSource", jobRequest.getDataSource()); + assertEquals("SELECT * FROM test", jobRequest.getScheduledQuery()); + assertEquals(LangType.SQL, jobRequest.getQueryLang()); + assertEquals(schedule, jobRequest.getSchedule()); + assertTrue(jobRequest.isEnabled()); + assertEquals(now, jobRequest.getLastUpdateTime()); + assertEquals(now, jobRequest.getEnabledTime()); + assertEquals(60L, jobRequest.getLockDurationSeconds()); + assertEquals(0.1, jobRequest.getJitter()); + } + + @Test + public void testToXContent() throws IOException { + Instant now = Instant.now(); + IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); + + ScheduledAsyncQueryJobRequest request = + ScheduledAsyncQueryJobRequest.builder() + .accountId("testAccount") + .jobId("testJob") + .dataSource("testDataSource") + .scheduledQuery("SELECT * FROM test") + .queryLang(LangType.SQL) + .schedule(schedule) + .enabled(true) + .enabledTime(now) + .lastUpdateTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); + request.toXContent(builder, EMPTY_PARAMS); + String jsonString = builder.toString(); + + assertTrue(jsonString.contains("\"accountId\" : \"testAccount\"")); + assertTrue(jsonString.contains("\"jobId\" : \"testJob\"")); + assertTrue(jsonString.contains("\"dataSource\" : \"testDataSource\"")); + assertTrue(jsonString.contains("\"scheduledQuery\" : \"SELECT * FROM test\"")); + assertTrue(jsonString.contains("\"queryLang\" : \"SQL\"")); + assertTrue(jsonString.contains("\"start_time\" : " + now.toEpochMilli())); + assertTrue(jsonString.contains("\"period\" : 1")); + assertTrue(jsonString.contains("\"unit\" : \"Minutes\"")); + assertTrue(jsonString.contains("\"enabled\" : true")); + assertTrue(jsonString.contains("\"lastUpdateTime\" : " + now.toEpochMilli())); + assertTrue(jsonString.contains("\"enabledTime\" : " + now.toEpochMilli())); + assertTrue(jsonString.contains("\"lockDurationSeconds\" : 60")); + assertTrue(jsonString.contains("\"jitter\" : 0.1")); + } + + @Test + public void testFromAsyncQuerySchedulerRequest() { + Instant now = Instant.now(); + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId("testJob"); + request.setAccountId("testAccount"); + request.setDataSource("testDataSource"); + request.setScheduledQuery("SELECT * FROM test"); + request.setQueryLang(LangType.SQL); + request.setSchedule("1 minutes"); + request.setEnabled(true); + request.setLastUpdateTime(now); + request.setLockDurationSeconds(60L); + request.setJitter(0.1); + + ScheduledAsyncQueryJobRequest jobRequest = + ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(request); + + assertEquals("testJob", jobRequest.getJobId()); + assertEquals("testAccount", jobRequest.getAccountId()); + assertEquals("testDataSource", jobRequest.getDataSource()); + assertEquals("SELECT * FROM test", jobRequest.getScheduledQuery()); + assertEquals(LangType.SQL, jobRequest.getQueryLang()); + assertEquals(new IntervalSchedule(now, 1, ChronoUnit.MINUTES), jobRequest.getSchedule()); + assertTrue(jobRequest.isEnabled()); + assertEquals(60L, jobRequest.getLockDurationSeconds()); + assertEquals(0.1, jobRequest.getJitter()); + } + + @Test + public void testFromAsyncQuerySchedulerRequestWithInvalidSchedule() { + AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest(); + request.setJobId("testJob"); + request.setSchedule(new Object()); // Set schedule to a non-String object + + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> { + ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(request); + }); + + assertEquals("Schedule must be a String object for parsing.", exception.getMessage()); + } + + @Test + public void testEqualsAndHashCode() { + Instant now = Instant.now(); + IntervalSchedule schedule = new IntervalSchedule(now, 1, ChronoUnit.MINUTES); + + ScheduledAsyncQueryJobRequest request1 = + ScheduledAsyncQueryJobRequest.builder() + .accountId("testAccount") + .jobId("testJob") + .dataSource("testDataSource") + .scheduledQuery("SELECT * FROM test") + .queryLang(LangType.SQL) + .schedule(schedule) + .enabled(true) + .enabledTime(now) + .lastUpdateTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + // Test toString + String toString = request1.toString(); + assertTrue(toString.contains("accountId=testAccount")); + assertTrue(toString.contains("jobId=testJob")); + assertTrue(toString.contains("dataSource=testDataSource")); + assertTrue(toString.contains("scheduledQuery=SELECT * FROM test")); + assertTrue(toString.contains("queryLang=SQL")); + assertTrue(toString.contains("enabled=true")); + assertTrue(toString.contains("lockDurationSeconds=60")); + assertTrue(toString.contains("jitter=0.1")); + + ScheduledAsyncQueryJobRequest request2 = + ScheduledAsyncQueryJobRequest.builder() + .accountId("testAccount") + .jobId("testJob") + .dataSource("testDataSource") + .scheduledQuery("SELECT * FROM test") + .queryLang(LangType.SQL) + .schedule(schedule) + .enabled(true) + .enabledTime(now) + .lastUpdateTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + assertEquals(request1, request2); + assertEquals(request1.hashCode(), request2.hashCode()); + + ScheduledAsyncQueryJobRequest request3 = + ScheduledAsyncQueryJobRequest.builder() + .accountId("differentAccount") + .jobId("testJob") + .dataSource("testDataSource") + .scheduledQuery("SELECT * FROM test") + .queryLang(LangType.SQL) + .schedule(schedule) + .enabled(true) + .enabledTime(now) + .lastUpdateTime(now) + .lockDurationSeconds(60L) + .jitter(0.1) + .build(); + + assertNotEquals(request1, request3); + assertNotEquals(request1.hashCode(), request3.hashCode()); + } +} diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java new file mode 100644 index 0000000000..b119c345b9 --- /dev/null +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParserTest.java @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.spark.scheduler.parser; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; + +public class IntervalScheduleParserTest { + + private Instant startTime; + + @BeforeEach + public void setup() { + startTime = Instant.now(); + } + + @Test + public void testParseValidScheduleString() { + verifyParseSchedule(5, "5 minutes"); + } + + @Test + public void testParseValidScheduleStringWithDifferentUnits() { + verifyParseSchedule(120, "2 hours"); + verifyParseSchedule(1440, "1 day"); + verifyParseSchedule(30240, "3 weeks"); + } + + @Test + public void testParseNullSchedule() { + Schedule schedule = IntervalScheduleParser.parse(null, startTime); + assertNull(schedule); + } + + @Test + public void testParseScheduleObject() { + IntervalSchedule expectedSchedule = new IntervalSchedule(startTime, 10, ChronoUnit.MINUTES); + Schedule schedule = IntervalScheduleParser.parse(expectedSchedule, startTime); + assertEquals(expectedSchedule, schedule); + } + + @Test + public void testParseInvalidScheduleString() { + String scheduleStr = "invalid schedule"; + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse(scheduleStr, startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertEquals("Invalid interval format: " + scheduleStr.toLowerCase(), exception.getMessage()); + } + + @Test + public void testParseUnsupportedUnits() { + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse("1 year", startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse("1 month", startTime), + "Expected IllegalArgumentException but no exception was thrown"); + } + + @Test + public void testParseNonStringSchedule() { + Object nonStringSchedule = 12345; + IllegalArgumentException exception = + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.parse(nonStringSchedule, startTime), + "Expected IllegalArgumentException but no exception was thrown"); + + assertEquals("Schedule must be a String object for parsing.", exception.getMessage()); + } + + @Test + public void testParseScheduleWithNanoseconds() { + verifyParseSchedule(1, "60000000000 nanoseconds"); + } + + @Test + public void testParseScheduleWithMilliseconds() { + verifyParseSchedule(1, "60000 milliseconds"); + } + + @Test + public void testParseScheduleWithMicroseconds() { + verifyParseSchedule(1, "60000000 microseconds"); + } + + @Test + public void testUnsupportedTimeUnit() { + assertThrows( + IllegalArgumentException.class, + () -> IntervalScheduleParser.convertToSupportedUnit(10, "unsupportedunit"), + "Expected IllegalArgumentException but no exception was thrown"); + } + + @Test + public void testParseScheduleWithSeconds() { + verifyParseSchedule(2, "120 seconds"); + } + + private void verifyParseSchedule(int expectedMinutes, String scheduleStr) { + Schedule schedule = IntervalScheduleParser.parse(scheduleStr, startTime); + assertEquals(new IntervalSchedule(startTime, expectedMinutes, ChronoUnit.MINUTES), schedule); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 971ef5e928..560c5edadd 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -96,8 +96,8 @@ import org.opensearch.sql.spark.flint.operation.FlintIndexOpFactory; import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction; import org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler; -import org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser; -import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob; +import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner; +import org.opensearch.sql.spark.scheduler.parser.OpenSearchScheduleQueryJobRequestParser; import org.opensearch.sql.spark.storage.SparkStorageFactory; import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction; import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction; @@ -217,8 +217,6 @@ public Collection createComponents( this.client = (NodeClient) client; this.dataSourceService = createDataSourceService(); dataSourceService.createDataSource(defaultOpenSearchDataSourceMetadata()); - this.asyncQueryScheduler = new OpenSearchAsyncQueryScheduler(); - this.asyncQueryScheduler.loadJobResource(client, clusterService, threadPool); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); LocalClusterState.state().setClient(client); @@ -247,11 +245,13 @@ public Collection createComponents( dataSourceService, injector.getInstance(FlintIndexMetadataServiceImpl.class), injector.getInstance(FlintIndexOpFactory.class)); + AsyncQueryExecutorService asyncQueryExecutorService = + injector.getInstance(AsyncQueryExecutorService.class); + ScheduledAsyncQueryJobRunner.getJobRunnerInstance() + .loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); + return ImmutableList.of( - dataSourceService, - injector.getInstance(AsyncQueryExecutorService.class), - clusterManagerEventListener, - pluginSettings); + dataSourceService, asyncQueryExecutorService, clusterManagerEventListener, pluginSettings); } @Override @@ -266,12 +266,12 @@ public String getJobIndex() { @Override public ScheduledJobRunner getJobRunner() { - return OpenSearchRefreshIndexJob.getJobRunnerInstance(); + return ScheduledAsyncQueryJobRunner.getJobRunnerInstance(); } @Override public ScheduledJobParser getJobParser() { - return OpenSearchRefreshIndexJobRequestParser.getJobParser(); + return OpenSearchScheduleQueryJobRequestParser.getJobParser(); } @Override @@ -342,6 +342,9 @@ public Collection getSystemIndexDescriptors(Settings sett systemIndexDescriptors.add( new SystemIndexDescriptor( SPARK_REQUEST_BUFFER_INDEX_NAME + "*", "SQL Spark Request Buffer index pattern")); + systemIndexDescriptors.add( + new SystemIndexDescriptor( + OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, "SQL Scheduler job index")); return systemIndexDescriptors; } }