Skip to content

Commit

Permalink
Extend scheduler interface for Multitenancy (opensearch-project#3014)
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored Sep 16, 2024
1 parent 2506468 commit 37188bd
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 69 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integ-tests-with-security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
- name: Upload test reports
if: ${{ always() }}
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports-${{ matrix.os }}-${{ matrix.java }}
Expand Down Expand Up @@ -79,7 +79,7 @@ jobs:

- name: Upload test reports
if: ${{ always() }}
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports-${{ matrix.os }}-${{ matrix.java }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sql-pitest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
- name: Upload test reports
if: always()
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: test-reports-${{ matrix.entry.java }}
path: |
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/sql-test-and-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,15 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}

- name: Upload Artifacts
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: opensearch-sql-ubuntu-latest-${{ matrix.java }}
path: opensearch-sql-builds

- name: Upload test reports
if: ${{ always() }}
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports-ubuntu-latest-${{ matrix.java }}
Expand Down Expand Up @@ -135,15 +135,15 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}

- name: Upload Artifacts
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: opensearch-sql-${{ matrix.entry.os }}-${{ matrix.entry.java }}
path: opensearch-sql-builds

- name: Upload test reports
if: ${{ always() && matrix.entry.os == 'ubuntu-latest' }}
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
continue-on-error: true
with:
name: test-reports-${{ matrix.entry.os }}-${{ matrix.entry.java }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sql-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
- name: Upload test reports
if: always()
uses: actions/upload-artifact@v2
uses: actions/upload-artifact@v4
with:
name: test-reports-${{ matrix.entry.java }}
path: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ void runOp(
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ void runOp(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. */
Expand All @@ -13,10 +19,13 @@ public interface AsyncQueryScheduler {
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Updates an existing job with new parameters. This method modifies the configuration of an
Expand All @@ -26,10 +35,13 @@ public interface AsyncQueryScheduler {
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
Expand All @@ -41,8 +53,11 @@ public interface AsyncQueryScheduler {
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
* @throws RuntimeException if there's an error during the unschedule process
*/
void unscheduleJob(String jobId);
void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
Expand All @@ -52,6 +67,9 @@ public interface AsyncQueryScheduler {
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be removed doesn't exist
* @throws RuntimeException if there's an error during the remove process
*/
void removeJob(String jobId);
void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@

import java.time.Instant;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.opensearch.sql.spark.rest.model.LangType;

/** Represents a job request for a scheduled task. */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AsyncQuerySchedulerRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void createDropIndexQueryWithScheduler() {
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);

verify(asyncQueryScheduler).unscheduleJob(indexName);
verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
}

@Test
Expand Down Expand Up @@ -318,8 +318,7 @@ public void createAlterIndexQueryWithScheduler() {
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

verify(asyncQueryScheduler).unscheduleJob(indexName);

verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand Down Expand Up @@ -35,6 +36,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;
Expand All @@ -55,7 +57,9 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {

@Override
/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
Expand Down Expand Up @@ -87,15 +91,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Unschedules a job by marking it as disabled and updating its last update time. */
@Override
public void unscheduleJob(String jobId) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
public void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
try {
updateJob(request);
AsyncQuerySchedulerRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
updateJob(request, asyncQueryRequestContext);
LOG.info("Unscheduled job for jobId: {}", jobId);
} catch (IllegalStateException | DocumentMissingException e) {
LOG.error("Failed to unschedule job: {}", jobId, e);
Expand All @@ -105,7 +112,9 @@ public void unscheduleJob(String jobId) {
/** Updates an existing job with new parameters. */
@Override
@SneakyThrows
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
assertIndexExists();
Expand Down Expand Up @@ -134,8 +143,11 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Removes a job by deleting its document from the index. */
@Override
public void removeJob(String jobId) {
public void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
assertIndexExists();
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class ScheduledAsyncQueryJobRequest extends AsyncQuerySchedulerRequest
public static final String ENABLED_FIELD = "enabled";
private final Schedule schedule;

@Builder
@Builder(builderMethodName = "scheduledAsyncQueryJobRequestBuilder")
public ScheduledAsyncQueryJobRequest(
String accountId,
String jobId,
Expand Down Expand Up @@ -139,7 +139,7 @@ public static ScheduledAsyncQueryJobRequest fromAsyncQuerySchedulerRequest(
AsyncQuerySchedulerRequest request) {
Instant updateTime =
request.getLastUpdateTime() != null ? request.getLastUpdateTime() : Instant.now();
return ScheduledAsyncQueryJobRequest.builder()
return ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder()
.accountId(request.getAccountId())
.jobId(request.getJobId())
.dataSource(request.getDataSource())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private static Instant parseInstantValue(XContentParser parser) throws IOExcepti
public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
ScheduledAsyncQueryJobRequest.ScheduledAsyncQueryJobRequestBuilder builder =
ScheduledAsyncQueryJobRequest.builder();
ScheduledAsyncQueryJobRequest.scheduledAsyncQueryJobRequestBuilder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

Expand Down
Loading

0 comments on commit 37188bd

Please sign in to comment.