diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java index 8d0abb5aeb..992d9979d7 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -41,6 +41,7 @@ import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest; import org.opensearch.threadpool.ThreadPool; +/** Scheduler class for managing asynchronous query jobs. */ public class OpenSearchAsyncQueryScheduler { public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler"; public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler"; @@ -52,6 +53,7 @@ public class OpenSearchAsyncQueryScheduler { private Client client; private ClusterService clusterService; + /** Loads job resources, setting up required services and job runner instance. */ public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) { this.client = client; this.clusterService = clusterService; @@ -62,6 +64,7 @@ public void loadJobResource(Client client, ClusterService clusterService, Thread openSearchRefreshIndexJob.setClient(client); } + /** Schedules a new job by indexing it into the job index. */ public void scheduleJob(OpenSearchRefreshIndexJobRequest request) { if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { createAsyncQuerySchedulerIndex(); @@ -90,6 +93,7 @@ public void scheduleJob(OpenSearchRefreshIndexJobRequest request) { } } + /** Unschedules a job by marking it as disabled and updating its last update time. */ public void unscheduleJob(String jobId) throws IOException { if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { throw new IllegalArgumentException("Job index does not exist."); @@ -103,6 +107,7 @@ public void unscheduleJob(String jobId) throws IOException { updateJob(request); } + /** Updates an existing job with new parameters. */ public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException { if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { throw new IllegalArgumentException("Job index does not exist."); @@ -123,6 +128,7 @@ public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOExcepti } } + /** Removes a job by deleting its document from the index. */ public void removeJob(String jobId) { if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) { throw new IllegalArgumentException("Job index does not exist."); @@ -142,6 +148,7 @@ public void removeJob(String jobId) { } } + /** Creates the async query scheduler index with specified mappings and settings. */ private void createAsyncQuerySchedulerIndex() { try { InputStream mappingFileStream = @@ -177,10 +184,12 @@ private void createAsyncQuerySchedulerIndex() { } } + /** Returns the job runner instance for the scheduler. */ public static ScheduledJobRunner getJobRunner() { return OpenSearchRefreshIndexJob.getJobRunnerInstance(); } + /** Returns the job parser instance for the scheduler. */ public static ScheduledJobParser getJobParser() { return (parser, id, jobDocVersion) -> { OpenSearchRefreshIndexJobRequest.OpenSearchRefreshIndexJobRequestBuilder builder = diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/exceptions/AsyncQuerySchedulerException.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/exceptions/AsyncQuerySchedulerException.java index 2b4f15f4fc..c5cafa03bc 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/exceptions/AsyncQuerySchedulerException.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/exceptions/AsyncQuerySchedulerException.java @@ -7,6 +7,7 @@ package org.opensearch.sql.spark.scheduler.exceptions; +/** Exception class for handling errors related to the asynchronous query scheduler. */ public class AsyncQuerySchedulerException extends RuntimeException { public AsyncQuerySchedulerException(String message) { super(message);