diff --git a/build.gradle b/build.gradle index 1e97167f..cb55e177 100644 --- a/build.gradle +++ b/build.gradle @@ -152,6 +152,8 @@ repositories { dependencies { implementation project(path: ":${rootProject.name}-spi", configuration: 'shadow') + implementation group: 'com.google.guava', name: 'guava', version:'31.0.1-jre' + implementation group: 'com.google.guava', name: 'failureaccess', version:'1.0.1' javaRestTestImplementation project.sourceSets.main.runtimeClasspath } @@ -252,7 +254,8 @@ task integTestRemote(type: RestIntegTestTask) { systemProperty "security", System.getProperty("security") systemProperty "user", System.getProperty("user") systemProperty "password", System.getProperty("password") - + systemProperty 'tests.rest.cluster', 'localhost:9200' + systemProperty 'tests.clustername', 'opensearch-job-scheduler-cluster' if (System.getProperty("tests.rest.cluster") != null) { filter { includeTestsMatching "org.opensearch.jobscheduler.*RestIT" diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/JobDocVersion.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/JobDocVersion.java index 39fedfd0..9d6e8187 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/JobDocVersion.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/JobDocVersion.java @@ -8,12 +8,17 @@ */ package org.opensearch.jobscheduler.spi; +import java.io.IOException; import java.util.Locale; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + /** * Structure to represent scheduled job document version. JobScheduler use this to determine this job */ -public class JobDocVersion implements Comparable { +public class JobDocVersion implements Comparable, Writeable { private final long primaryTerm; private final long seqNo; private final long version; @@ -24,6 +29,19 @@ public JobDocVersion(long primaryTerm, long seqNo, long version) { this.version = version; } + public JobDocVersion(StreamInput in) throws IOException { + this.primaryTerm = in.readLong(); + this.seqNo = in.readLong(); + this.version = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(this.primaryTerm); + out.writeLong(this.seqNo); + out.writeLong(this.version); + } + public long getPrimaryTerm() { return primaryTerm; } diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java index 42e5b5b6..cc9b3d35 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/JobExecutionContext.java @@ -8,11 +8,15 @@ */ package org.opensearch.jobscheduler.spi; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; import org.opensearch.jobscheduler.spi.utils.LockService; +import java.io.IOException; import java.time.Instant; -public class JobExecutionContext { +public class JobExecutionContext implements Writeable { private final Instant expectedExecutionTime; private final JobDocVersion jobVersion; private final LockService lockService; @@ -33,6 +37,22 @@ public JobExecutionContext( this.jobId = jobId; } + public JobExecutionContext(StreamInput in) throws IOException { + this.expectedExecutionTime = in.readInstant(); + this.jobVersion = new JobDocVersion(in); + this.lockService = null; + this.jobIndexName = in.readString(); + this.jobId = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInstant(this.expectedExecutionTime); + this.jobVersion.writeTo(out); + out.writeString(this.jobIndexName); + out.writeString(this.jobId); + } + public Instant getExpectedExecutionTime() { return this.expectedExecutionTime; } @@ -52,4 +72,5 @@ public String getJobIndexName() { public String getJobId() { return this.jobId; } + } diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java index f241400d..808dc33a 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/LockModel.java @@ -8,12 +8,14 @@ */ package org.opensearch.jobscheduler.spi; -import org.opensearch.common.Strings; +import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchException; +import org.opensearch.common.bytes.BytesReference; import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentParserUtils; -import org.opensearch.common.xcontent.XContentType; +import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.index.seqno.SequenceNumbers; import java.io.IOException; @@ -30,6 +32,13 @@ public final class LockModel implements ToXContentObject { public static final String LOCK_DURATION = "lock_duration_seconds"; public static final String RELEASED = "released"; + // Rest Fields + public static final String GET_LOCK_ACTION = "get_lock_action"; + public static final String SEQUENCE_NUMBER = "seq_no"; + public static final String PRIMARY_TERM = "primary_term"; + public static final String LOCK_ID = "lock_id"; + public static final String LOCK_MODEL = "lock_model"; + private final String lockId; private final String jobIndexName; private final String jobId; @@ -174,7 +183,23 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa @Override public String toString() { - return Strings.toString(XContentType.JSON, this, false, true); + try { + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.humanReadable(true); + this.toXContent(builder, EMPTY_PARAMS); + return BytesReference.bytes(builder).utf8ToString(); + } catch (IOException e) { + try { + XContentBuilder builder = JsonXContent.contentBuilder(); + builder.startObject(); + builder.field("error", "error building toString out of XContent: " + e.getMessage()); + builder.field("stack_trace", ExceptionsHelper.stackTrace(e)); + builder.endObject(); + return BytesReference.bytes(builder).utf8ToString(); + } catch (IOException e2) { + throw new OpenSearchException("cannot generate error message for deserialization", e); + } + } } public String getLockId() { diff --git a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java index 888d50d6..96a0b87b 100644 --- a/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java +++ b/spi/src/main/java/org/opensearch/jobscheduler/spi/utils/LockService.java @@ -243,7 +243,7 @@ private void createLock(final LockModel tempLock, ActionListener list } } - private void findLock(final String lockId, ActionListener listener) { + public void findLock(final String lockId, ActionListener listener) { GetRequest getRequest = new GetRequest(LOCK_INDEX_NAME).id(lockId); client.get(getRequest, ActionListener.wrap(response -> { if (!response.isExists()) { diff --git a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java index bbfa4d9a..0e671b10 100644 --- a/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java +++ b/src/main/java/org/opensearch/jobscheduler/JobSchedulerPlugin.java @@ -8,6 +8,15 @@ */ package org.opensearch.jobscheduler; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.jobscheduler.rest.action.RestGetJobDetailsAction; +import org.opensearch.jobscheduler.rest.action.RestGetLockAction; +import org.opensearch.jobscheduler.rest.action.RestReleaseLockAction; import org.opensearch.jobscheduler.scheduler.JobScheduler; import org.opensearch.jobscheduler.spi.JobSchedulerExtension; import org.opensearch.jobscheduler.spi.ScheduledJobParser; @@ -23,35 +32,39 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.ParseField; import org.opensearch.common.io.stream.NamedWriteableRegistry; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexModule; +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.watcher.ResourceWatcherService; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.ArrayList; import java.util.function.Supplier; -public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin { +import com.google.common.collect.ImmutableList; + +public class JobSchedulerPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin { public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler"; + public static final String JS_BASE_URI = "/_plugins/_job_scheduler"; private static final Logger log = LogManager.getLogger(JobSchedulerPlugin.class); @@ -61,6 +74,8 @@ public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin { private Map indexToJobProviders; private Set indicesToListen; + private JobDetailsService jobDetailsService; + public JobSchedulerPlugin() { this.indicesToListen = new HashSet<>(); this.indexToJobProviders = new HashMap<>(); @@ -81,6 +96,7 @@ public Collection createComponents( Supplier repositoriesServiceSupplier ) { this.lockService = new LockService(client, clusterService); + this.jobDetailsService = new JobDetailsService(client, clusterService, this.indicesToListen, this.indexToJobProviders); this.scheduler = new JobScheduler(threadPool, this.lockService); this.sweeper = initSweeper( environment.settings(), @@ -89,7 +105,8 @@ public Collection createComponents( threadPool, xContentRegistry, this.scheduler, - this.lockService + this.lockService, + this.jobDetailsService ); clusterService.addListener(this.sweeper); clusterService.addLifecycleListener(this.sweeper); @@ -135,6 +152,10 @@ public List> getExecutorBuilders(Settings settings) { @Override public void onIndexModule(IndexModule indexModule) { + if (indexModule.getIndex().getName().equals(JobDetailsService.JOB_DETAILS_INDEX_NAME)) { + indexModule.addIndexOperationListener(this.jobDetailsService); + log.info("JobDetailsService started listening to operations on index {}", JobDetailsService.JOB_DETAILS_INDEX_NAME); + } if (this.indicesToListen.contains(indexModule.getIndex().getName())) { indexModule.addIndexOperationListener(this.sweeper); log.info("JobSweeper started listening to operations on index {}", indexModule.getIndex().getName()); @@ -181,8 +202,36 @@ private JobSweeper initSweeper( ThreadPool threadPool, NamedXContentRegistry registry, JobScheduler scheduler, - LockService lockService + LockService lockService, + JobDetailsService jobDetailsService ) { - return new JobSweeper(settings, client, clusterService, threadPool, registry, this.indexToJobProviders, scheduler, lockService); + return new JobSweeper( + settings, + client, + clusterService, + threadPool, + registry, + this.indexToJobProviders, + scheduler, + lockService, + jobDetailsService + ); } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + RestGetJobDetailsAction restGetJobDetailsAction = new RestGetJobDetailsAction(jobDetailsService); + RestGetLockAction restGetLockAction = new RestGetLockAction(lockService); + RestReleaseLockAction restReleaseLockAction = new RestReleaseLockAction(lockService); + return ImmutableList.of(restGetJobDetailsAction, restGetLockAction, restReleaseLockAction); + } + } diff --git a/src/main/java/org/opensearch/jobscheduler/model/ExtensionJobParameter.java b/src/main/java/org/opensearch/jobscheduler/model/ExtensionJobParameter.java new file mode 100644 index 00000000..c3574e71 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/model/ExtensionJobParameter.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.model; + +import java.io.IOException; +import java.time.Instant; + +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.CronSchedule; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; + +/** + * A {@link Writeable} ScheduledJobParameter used to transport job parameters between OpenSearch and Extensions + * + */ +public class ExtensionJobParameter implements ScheduledJobParameter, Writeable { + + /** + * Enum for Schedule types used to indicate which Schedule constructor to use to read from/write to the stream. Job schedules can be set via cron expression or interval. + */ + public enum ScheduleType { + CRON, + INTERVAL + } + + public static final String NAME_FIELD = "name"; + public static final String SCHEDULE_FIELD = "schedule"; + public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; + public static final String ENABLED_TIME_FIELD = "enabled_time"; + public static final String IS_ENABLED_FIELD = "enabled"; + public static final String LOCK_DURATION_SECONDS_FIELD = "lock_duration_seconds"; + public static final String JITTER_FIELD = "jitter"; + + private String jobName; + private Schedule schedule; + private Instant lastUpdateTime; + private Instant enabledTime; + private boolean isEnabled; + + @Nullable + private Long lockDurationSeconds; + + @Nullable + private Double jitter; + + public ExtensionJobParameter( + String jobName, + Schedule schedule, + Instant lastUpdateTime, + Instant enabledTime, + boolean isEnabled, + Long lockDurationSeconds, + Double jitter + ) { + this.jobName = jobName; + this.schedule = schedule; + this.lastUpdateTime = lastUpdateTime; + this.enabledTime = enabledTime; + this.isEnabled = isEnabled; + this.lockDurationSeconds = lockDurationSeconds; + this.jitter = jitter; + } + + public ExtensionJobParameter(ScheduledJobParameter jobParameter) { + + // Convert job Parameter into writeable ExtensionJobParameter + this.jobName = jobParameter.getName(); + this.schedule = jobParameter.getSchedule(); + this.lastUpdateTime = jobParameter.getLastUpdateTime(); + this.enabledTime = jobParameter.getEnabledTime(); + this.isEnabled = jobParameter.isEnabled(); + this.lockDurationSeconds = jobParameter.getLockDurationSeconds(); + if (jobParameter.getJitter() != null) { + this.jitter = jobParameter.getJitter(); + } else { + this.jitter = 0.0; + } + } + + public ExtensionJobParameter(StreamInput in) throws IOException { + this.jobName = in.readString(); + if (in.readEnum(ExtensionJobParameter.ScheduleType.class) == ScheduleType.CRON) { + this.schedule = new CronSchedule(in); + } else { + this.schedule = new IntervalSchedule(in); + } + this.lastUpdateTime = in.readInstant(); + this.enabledTime = in.readInstant(); + this.isEnabled = in.readBoolean(); + this.lockDurationSeconds = in.readOptionalLong(); + this.jitter = in.readOptionalDouble(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.jobName); + if (this.schedule instanceof CronSchedule) { + out.writeEnum(ScheduleType.CRON); + } else { + out.writeEnum(ScheduleType.INTERVAL); + } + this.schedule.writeTo(out); + out.writeInstant(this.lastUpdateTime); + out.writeInstant(this.enabledTime); + out.writeBoolean(this.isEnabled); + out.writeOptionalLong(this.lockDurationSeconds); + out.writeOptionalDouble(this.jitter); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(NAME_FIELD, this.jobName) + .field(SCHEDULE_FIELD, this.schedule) + .field(LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli()) + .field(ENABLED_TIME_FIELD, enabledTime.toEpochMilli()) + .field(IS_ENABLED_FIELD, isEnabled); + if (this.lockDurationSeconds != null) { + builder.field(LOCK_DURATION_SECONDS_FIELD, this.lockDurationSeconds); + } + if (this.jitter != null) { + builder.field(JITTER_FIELD, this.jitter); + } + builder.endObject(); + return builder; + } + + @Override + public String getName() { + return this.jobName; + } + + @Override + public Instant getLastUpdateTime() { + return this.lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return this.enabledTime; + } + + @Override + public Schedule getSchedule() { + return this.schedule; + } + + @Override + public boolean isEnabled() { + return this.isEnabled; + } + + @Override + public Long getLockDurationSeconds() { + return this.lockDurationSeconds; + } + + @Override + public Double getJitter() { + return this.jitter; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/model/JobDetails.java b/src/main/java/org/opensearch/jobscheduler/model/JobDetails.java new file mode 100644 index 00000000..889d4924 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/model/JobDetails.java @@ -0,0 +1,219 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.model; + +import org.opensearch.common.Nullable; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; + +/** + * This model class stores the job details of the extension. + */ +public class JobDetails implements ToXContentObject { + + /** + * jobIndex from the extension. + */ + private String jobIndex; + + /** + * jobType from the extension. + */ + private String jobType; + + /** + * jobParser action to trigger the response back to the extension. + */ + private String jobParameterAction; + + /** + * jobRunner action to trigger the response back to the extension. + */ + private String jobRunnerAction; + + /** + * extension unique ID + */ + private String extensionUniqueId; + + public static final String DOCUMENT_ID = "document_id"; + public static final String JOB_INDEX = "job_index"; + public static final String JOB_TYPE = "job_type"; + public static final String JOB_PARAMETER_ACTION = "job_parser_action"; + public static final String JOB_RUNNER_ACTION = "job_runner_action"; + public static final String EXTENSION_UNIQUE_ID = "extension_unique_id"; + + public JobDetails() {} + + public JobDetails(String jobIndex, String jobType, String jobParameterAction, String jobRunnerAction, String extensionUniqueId) { + this.jobIndex = jobIndex; + this.jobType = jobType; + this.jobParameterAction = jobParameterAction; + this.jobRunnerAction = jobRunnerAction; + this.extensionUniqueId = extensionUniqueId; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + XContentBuilder xContentBuilder = builder.startObject(); + if (jobIndex != null) { + xContentBuilder.field(JOB_INDEX, jobIndex); + } + if (jobType != null) { + xContentBuilder.field(JOB_TYPE, jobType); + } + if (jobParameterAction != null) { + xContentBuilder.field(JOB_PARAMETER_ACTION, jobParameterAction); + } + if (jobRunnerAction != null) { + xContentBuilder.field(JOB_RUNNER_ACTION, jobRunnerAction); + } + if (extensionUniqueId != null) { + xContentBuilder.field(EXTENSION_UNIQUE_ID, extensionUniqueId); + } + return xContentBuilder.endObject(); + } + + public static JobDetails parse(XContentParser parser) throws IOException { + String jobIndex = null; + String jobType = null; + String jobParameterAction = null; + String jobRunnerAction = null; + String extensionUniqueId = null; + + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case JOB_INDEX: + jobIndex = parser.text(); + break; + case JOB_TYPE: + jobType = parser.text(); + break; + case JOB_PARAMETER_ACTION: + jobParameterAction = parser.text(); + break; + case JOB_RUNNER_ACTION: + jobRunnerAction = parser.text(); + break; + case EXTENSION_UNIQUE_ID: + extensionUniqueId = parser.text(); + break; + default: + parser.skipChildren(); + break; + } + } + + return new JobDetails(jobIndex, jobType, jobParameterAction, jobRunnerAction, extensionUniqueId); + } + + public JobDetails(final JobDetails copyJobDetails) { + this( + copyJobDetails.jobIndex, + copyJobDetails.jobType, + copyJobDetails.jobParameterAction, + copyJobDetails.jobRunnerAction, + copyJobDetails.extensionUniqueId + ); + } + + @Nullable + public String getJobIndex() { + return jobIndex; + } + + public void setJobIndex(String jobIndex) { + this.jobIndex = jobIndex; + } + + @Nullable + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + @Nullable + public String getJobParameterAction() { + return jobParameterAction; + } + + public void setJobParameterAction(String jobParameterAction) { + this.jobParameterAction = jobParameterAction; + } + + @Nullable + public String getJobRunnerAction() { + return jobRunnerAction; + } + + public void setJobRunnerAction(String jobRunnerAction) { + this.jobRunnerAction = jobRunnerAction; + } + + @Nullable + public String getExtensionUniqueId() { + return extensionUniqueId; + } + + public void setExtensionUniqueId(String extensionUniqueId) { + this.extensionUniqueId = extensionUniqueId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JobDetails that = (JobDetails) o; + return Objects.equals(jobIndex, that.jobIndex) + && Objects.equals(jobType, that.jobType) + && Objects.equals(jobParameterAction, that.jobParameterAction) + && Objects.equals(jobRunnerAction, that.jobRunnerAction) + && Objects.equals(extensionUniqueId, that.extensionUniqueId); + } + + @Override + public int hashCode() { + return Objects.hash(jobIndex, jobType, jobParameterAction, jobRunnerAction, extensionUniqueId); + } + + @Override + public String toString() { + return "JobDetails{" + + "jobIndex='" + + jobIndex + + '\'' + + ", jobType='" + + jobType + + '\'' + + ", jobParameterAction='" + + jobParameterAction + + '\'' + + ", jobRunnerAction='" + + jobRunnerAction + + '\'' + + ", extensionUniqueId='" + + extensionUniqueId + + '\'' + + '}'; + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/rest/action/RestGetJobDetailsAction.java b/src/main/java/org/opensearch/jobscheduler/rest/action/RestGetJobDetailsAction.java new file mode 100644 index 00000000..94b33b46 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/rest/action/RestGetJobDetailsAction.java @@ -0,0 +1,156 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.JobSchedulerPlugin; + +import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest; + +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; +import org.opensearch.rest.BytesRestResponse; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.collect.ImmutableList; +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.rest.RestRequest.Method.PUT; + +/** + * This class consists of the REST handler to GET job details from extensions. + */ +public class RestGetJobDetailsAction extends BaseRestHandler { + + public static final String GET_JOB_DETAILS_ACTION = "get_job_details_action"; + + private final Logger logger = LogManager.getLogger(RestGetJobDetailsAction.class); + + public JobDetailsService jobDetailsService; + + public RestGetJobDetailsAction(final JobDetailsService jobDetailsService) { + this.jobDetailsService = jobDetailsService; + } + + @Override + public String getName() { + return GET_JOB_DETAILS_ACTION; + } + + @Override + public List routes() { + return ImmutableList.of( + // New Job Details Entry Request + new Route(PUT, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_job_details")), + // Update Job Details Entry Request + new Route( + PUT, + String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_job_details", GetJobDetailsRequest.DOCUMENT_ID) + ) + + ); + } + + @VisibleForTesting + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + XContentParser parser = restRequest.contentParser(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + GetJobDetailsRequest getJobDetailsRequest = GetJobDetailsRequest.parse(parser); + String documentId = restRequest.param(GetJobDetailsRequest.DOCUMENT_ID); + String jobIndex = getJobDetailsRequest.getJobIndex(); + String jobType = getJobDetailsRequest.getJobType(); + String jobParameterAction = getJobDetailsRequest.getJobParameterAction(); + String jobRunnerAction = getJobDetailsRequest.getJobRunnerAction(); + String extensionUniqueId = getJobDetailsRequest.getExtensionUniqueId(); + + CompletableFuture inProgressFuture = new CompletableFuture<>(); + + jobDetailsService.processJobDetails( + documentId, + jobIndex, + jobType, + jobParameterAction, + jobRunnerAction, + extensionUniqueId, + new ActionListener<>() { + @Override + public void onResponse(String indexedDocumentId) { + // Set document Id + inProgressFuture.complete(indexedDocumentId); + } + + @Override + public void onFailure(Exception e) { + logger.info("could not process job index", e); + inProgressFuture.completeExceptionally(e); + } + } + ); + + try { + inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.error("Get Job Details timed out ", e); + } + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } + + return channel -> { + String jobDetailsResponseHolder = null; + try { + jobDetailsResponseHolder = inProgressFuture.get(); + } catch (Exception e) { + logger.error("Exception occured in get job details ", e); + } + XContentBuilder builder = channel.newBuilder(); + RestStatus restStatus = RestStatus.OK; + String restResponseString = jobDetailsResponseHolder != null ? "success" : "failed"; + BytesRestResponse bytesRestResponse; + try { + builder.startObject(); + builder.field("response", restResponseString); + if (restResponseString.equals("success")) { + builder.field(GetJobDetailsRequest.DOCUMENT_ID, jobDetailsResponseHolder); + } else { + restStatus = RestStatus.INTERNAL_SERVER_ERROR; + } + builder.endObject(); + bytesRestResponse = new BytesRestResponse(restStatus, builder); + } finally { + builder.close(); + } + + channel.sendResponse(bytesRestResponse); + }; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/rest/action/RestGetLockAction.java b/src/main/java/org/opensearch/jobscheduler/rest/action/RestGetLockAction.java new file mode 100644 index 00000000..92fde4f2 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/rest/action/RestGetLockAction.java @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.transport.AcquireLockResponse; +import org.opensearch.jobscheduler.transport.AcquireLockRequest; +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; + +import java.io.IOException; +import java.util.List; +import java.util.Locale; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import com.google.common.collect.ImmutableList; +import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.rest.RestRequest.Method.GET; + +import static org.opensearch.jobscheduler.spi.LockModel.GET_LOCK_ACTION; + +/** + * This class consists of the REST handler to GET a lock model for extensions + */ +public class RestGetLockAction extends BaseRestHandler { + private final Logger logger = LogManager.getLogger(RestGetLockAction.class); + + public LockService lockService; + + public RestGetLockAction(final LockService lockService) { + this.lockService = lockService; + } + + @Override + public String getName() { + return GET_LOCK_ACTION; + } + + @Override + public List routes() { + return ImmutableList.of(new Route(GET, String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_lock"))); + } + + @VisibleForTesting + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + XContentParser parser = restRequest.contentParser(); + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + + // Deserialize acquire lock request + AcquireLockRequest acquireLockRequest = AcquireLockRequest.parse(parser); + String jobId = acquireLockRequest.getJobId(); + String jobIndexName = acquireLockRequest.getJobIndexName(); + long lockDurationSeconds = acquireLockRequest.getLockDurationSeconds(); + + // Process acquire lock request + CompletableFuture inProgressFuture = new CompletableFuture<>(); + lockService.acquireLockWithId(jobIndexName, lockDurationSeconds, jobId, ActionListener.wrap(lockModel -> { + inProgressFuture.complete(lockModel); + }, exception -> { + logger.error("Could not acquire lock with ID : " + jobId, exception); + inProgressFuture.completeExceptionally(exception); + })); + + try { + inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.error("Acquiring lock timed out ", e); + } + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } + + return channel -> { + BytesRestResponse bytesRestResponse; + LockModel lockModelResponseHolder = null; + try { + lockModelResponseHolder = inProgressFuture.get(); + } catch (Exception e) { + logger.error("Exception occured in acquiring lock ", e); + } + try (XContentBuilder builder = channel.newBuilder()) { + // Prepare response + RestStatus restStatus = RestStatus.OK; + String restResponseString = lockModelResponseHolder != null ? "success" : "failed"; + if (restResponseString.equals("success")) { + // Create Response + AcquireLockResponse acquireLockResponse = new AcquireLockResponse( + lockModelResponseHolder, + LockModel.generateLockId(jobIndexName, jobId), + lockModelResponseHolder.getSeqNo(), + lockModelResponseHolder.getPrimaryTerm() + ); + acquireLockResponse.toXContent(builder, ToXContent.EMPTY_PARAMS); + + } else { + restStatus = RestStatus.INTERNAL_SERVER_ERROR; + } + bytesRestResponse = new BytesRestResponse(restStatus, builder); + channel.sendResponse(bytesRestResponse); + } + }; + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockAction.java b/src/main/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockAction.java new file mode 100644 index 00000000..62e3668d --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockAction.java @@ -0,0 +1,146 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.action; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import static org.opensearch.rest.RestRequest.Method.PUT; +import org.opensearch.rest.RestStatus; + +public class RestReleaseLockAction extends BaseRestHandler { + + public static final String RELEASE_LOCK_ACTION = "release_lock_action"; + private final Logger logger = LogManager.getLogger(RestReleaseLockAction.class); + + private LockService lockService; + + public RestReleaseLockAction(LockService lockService) { + this.lockService = lockService; + } + + @Override + public String getName() { + return RELEASE_LOCK_ACTION; + } + + @Override + public List routes() { + return ImmutableList.of( + new Route(PUT, String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LockModel.LOCK_ID)) + ); + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException { + String lockId = restRequest.param(LockModel.LOCK_ID); + if (lockId == null || lockId.isEmpty()) { + throw new IOException("lockId cannot be null or empty"); + } + CompletableFuture releaseLockInProgressFuture = new CompletableFuture<>(); + if (!lockService.lockIndexExist()) { + releaseLockInProgressFuture.complete(false); + } else { + CompletableFuture findInProgressFuture = new CompletableFuture<>(); + lockService.findLock(lockId, ActionListener.wrap(lock -> { findInProgressFuture.complete(lock); }, exception -> { + logger.error("Could not find lock model with lockId " + lockId, exception); + findInProgressFuture.completeExceptionally(exception); + })); + + LockModel releaseLock; + try { + releaseLock = findInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).get(); + } catch (CompletionException | InterruptedException | ExecutionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.error(" Finding lock timed out ", e); + } + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } + + if (releaseLock != null) { + lockService.release(releaseLock, new ActionListener<>() { + @Override + public void onResponse(Boolean response) { + releaseLockInProgressFuture.complete(response); + } + + @Override + public void onFailure(Exception e) { + logger.error("Releasing lock failed with an exception", e); + releaseLockInProgressFuture.completeExceptionally(e); + } + }); + + try { + releaseLockInProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.error("Release lock timed out ", e); + } + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } + } + } + return channel -> { + boolean releaseResponse = false; + try { + releaseResponse = releaseLockInProgressFuture.get(); + } catch (Exception e) { + logger.error("Exception occured in releasing lock ", e); + } + XContentBuilder builder = channel.newBuilder(); + RestStatus restStatus = RestStatus.OK; + String restResponseString = releaseResponse ? "success" : "failed"; + BytesRestResponse bytesRestResponse; + try { + builder.startObject(); + builder.field("release-lock", restResponseString); + if (restResponseString.equals("failed")) { + restStatus = RestStatus.INTERNAL_SERVER_ERROR; + } + builder.endObject(); + bytesRestResponse = new BytesRestResponse(restStatus, builder); + } finally { + builder.close(); + } + + channel.sendResponse(bytesRestResponse); + }; + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/rest/action/VisibleForTesting.java b/src/main/java/org/opensearch/jobscheduler/rest/action/VisibleForTesting.java new file mode 100644 index 00000000..3fadc99a --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/rest/action/VisibleForTesting.java @@ -0,0 +1,12 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.action; + +public @interface VisibleForTesting { +} diff --git a/src/main/java/org/opensearch/jobscheduler/rest/request/GetJobDetailsRequest.java b/src/main/java/org/opensearch/jobscheduler/rest/request/GetJobDetailsRequest.java new file mode 100644 index 00000000..8fc8a46d --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/rest/request/GetJobDetailsRequest.java @@ -0,0 +1,170 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.request; + +import java.util.Objects; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentParserUtils; + +import java.io.IOException; + +/** + * Get Job Details Request Model class + */ +public class GetJobDetailsRequest extends ActionRequest { + + private static String documentId; + + private static String jobIndex; + + private static String jobType; + + private static String jobParameterAction; + + private static String jobRunnerAction; + + private static String extensionUniqueId; + + public static final String DOCUMENT_ID = "document_id"; + public static final String JOB_INDEX = "job_index"; + public static final String JOB_TYPE = "job_type"; + public static final String EXTENSION_UNIQUE_ID = "extension_unique_id"; + public static final String JOB_PARAMETER_ACTION = "job_parameter_action"; + public static final String JOB_RUNNER_ACTION = "job_runner_action"; + + public GetJobDetailsRequest(StreamInput in) throws IOException { + super(in); + documentId = in.readOptionalString(); + jobIndex = in.readString(); + jobType = in.readString(); + jobParameterAction = in.readString(); + jobRunnerAction = in.readString(); + extensionUniqueId = in.readString(); + + } + + public GetJobDetailsRequest( + String documentId, + String jobIndex, + String jobType, + String jobParameterAction, + String jobRunnerAction, + String extensionUniqueId + ) { + super(); + this.documentId = documentId; + this.jobIndex = Objects.requireNonNull(jobIndex); + this.jobType = Objects.requireNonNull(jobType); + this.jobParameterAction = Objects.requireNonNull(jobParameterAction); + this.jobRunnerAction = Objects.requireNonNull(jobRunnerAction); + this.extensionUniqueId = Objects.requireNonNull(extensionUniqueId); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(documentId); + out.writeString(jobIndex); + out.writeString(jobType); + out.writeString(jobParameterAction); + out.writeString(jobRunnerAction); + out.writeString(extensionUniqueId); + } + + public String getDocumentId() { + return documentId; + } + + public void setDocumentId(String documentId) { + this.documentId = documentId; + } + + public String getJobIndex() { + return jobIndex; + } + + public void setJobIndex(String jobIndex) { + this.jobIndex = jobIndex; + } + + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getJobParameterAction() { + return jobParameterAction; + } + + public void setJobParameterAction(String jobParameterAction) { + this.jobParameterAction = jobParameterAction; + } + + public String getJobRunnerAction() { + return jobRunnerAction; + } + + public void setJobRunnerAction(String jobRunnerAction) { + this.jobRunnerAction = jobRunnerAction; + } + + public String getExtensionUniqueId() { + return extensionUniqueId; + } + + public void setExtensionUniqueId(String extensionUniqueId) { + this.extensionUniqueId = extensionUniqueId; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public static GetJobDetailsRequest parse(XContentParser parser) throws IOException { + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case DOCUMENT_ID: + documentId = parser.textOrNull(); + break; + case JOB_INDEX: + jobIndex = parser.text(); + break; + case JOB_TYPE: + jobType = parser.text(); + case JOB_PARAMETER_ACTION: + jobParameterAction = parser.text(); + break; + case JOB_RUNNER_ACTION: + jobRunnerAction = parser.text(); + break; + case EXTENSION_UNIQUE_ID: + extensionUniqueId = parser.text(); + break; + default: + parser.skipChildren(); + break; + } + + } + return new GetJobDetailsRequest(documentId, jobIndex, jobType, jobParameterAction, jobRunnerAction, extensionUniqueId); + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java b/src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java index 0f3dc27a..593aee54 100644 --- a/src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java +++ b/src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java @@ -16,6 +16,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.jobscheduler.spi.JobDocVersion; import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.jobscheduler.utils.JobDetailsService; import org.opensearch.jobscheduler.utils.VisibleForTesting; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -84,6 +85,7 @@ public class JobSweeper extends LifecycleListener implements IndexingOperationLi private ConcurrentHashMap> sweptJobs; private JobScheduler scheduler; private LockService lockService; + private JobDetailsService jobDetailsService; private volatile long lastFullSweepTimeNano; @@ -103,7 +105,8 @@ public JobSweeper( NamedXContentRegistry registry, Map indexToProviders, JobScheduler scheduler, - LockService lockService + LockService lockService, + JobDetailsService jobDetailsService ) { this.client = client; this.clusterService = clusterService; @@ -112,6 +115,7 @@ public JobSweeper( this.indexToProviders = indexToProviders; this.scheduler = scheduler; this.lockService = lockService; + this.jobDetailsService = jobDetailsService; this.lastFullSweepTimeNano = System.nanoTime(); this.loadSettings(settings); diff --git a/src/main/java/org/opensearch/jobscheduler/transport/AcquireLockRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/AcquireLockRequest.java new file mode 100644 index 00000000..5efe5aa7 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/AcquireLockRequest.java @@ -0,0 +1,137 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport; + +import java.io.IOException; +import java.util.Objects; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentParserUtils; + +/** + * Request from extensions to acquire a lock for scheduled job execution + */ +public class AcquireLockRequest extends ActionRequest implements ToXContentObject { + + /** + * the id of the job + */ + private final String jobId; + + /** + * the name of the job index + */ + private final String jobIndexName; + + /** + * the duration for which this lock will be acquired + */ + private final long lockDurationSeconds; + + public static final String JOB_ID = "job_id"; + public static final String JOB_INDEX_NAME = "job_index_name"; + public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds"; + + /** + * Instantiates a new AcquireLockRequest + * + * @param jobId the id of the job in which the lock will be given to + * @param jobIndexName the name of the job index + * @param lockDurationSeconds the duration for which this lock will be acquired + */ + public AcquireLockRequest(String jobId, String jobIndexName, long lockDurationSeconds) { + super(); + this.jobId = Objects.requireNonNull(jobId); + this.jobIndexName = Objects.requireNonNull(jobIndexName); + this.lockDurationSeconds = Objects.requireNonNull(lockDurationSeconds); + } + + /** + * Instantiates a new AcquireLockRequest from {@link StreamInput} + * + * @param in is the byte stream input used to de-serialize the message. + * @throws IOException IOException when message de-serialization fails. + */ + public AcquireLockRequest(StreamInput in) throws IOException { + super(in); + this.jobId = in.readString(); + this.jobIndexName = in.readString(); + this.lockDurationSeconds = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(this.jobId); + out.writeString(this.jobIndexName); + out.writeLong(this.lockDurationSeconds); + } + + public String getJobId() { + return this.jobId; + } + + public String getJobIndexName() { + return this.jobIndexName; + } + + public long getLockDurationSeconds() { + return this.lockDurationSeconds; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public static AcquireLockRequest parse(XContentParser parser) throws IOException { + + String jobId = null; + String jobIndexName = null; + Long lockDurationSeconds = null; + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + String fieldName = parser.currentName(); + parser.nextToken(); + + switch (fieldName) { + case JOB_ID: + jobId = parser.text(); + break; + case JOB_INDEX_NAME: + jobIndexName = parser.text(); + break; + case LOCK_DURATION_SECONDS: + lockDurationSeconds = parser.longValue(); + break; + default: + parser.skipChildren(); + break; + } + } + return new AcquireLockRequest(jobId, jobIndexName, lockDurationSeconds); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(JOB_ID, jobId); + builder.field(JOB_INDEX_NAME, jobIndexName); + builder.field(LOCK_DURATION_SECONDS, lockDurationSeconds); + builder.endObject(); + return builder; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/AcquireLockResponse.java b/src/main/java/org/opensearch/jobscheduler/transport/AcquireLockResponse.java new file mode 100644 index 00000000..c0f92a84 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/AcquireLockResponse.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport; + +import java.io.IOException; + +import org.opensearch.common.xcontent.XContentParserUtils; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.spi.LockModel; + +import static java.util.Objects.requireNonNull; + +/** + * Response class used to facilitate serialization/deserialization of the GetLock response + */ +public class AcquireLockResponse implements ToXContentObject { + private final LockModel lock; + private final String lockId; + private final long seqNo; + private final long primaryTerm; + + public AcquireLockResponse(final LockModel lock, final String lockId, final long seqNo, final long primaryTerm) { + this.lock = lock; + this.lockId = lockId; + this.seqNo = seqNo; + this.primaryTerm = primaryTerm; + } + + public LockModel getLock() { + return this.lock; + } + + public String getLockId() { + return this.lockId; + } + + public long getSeqNo() { + return this.seqNo; + } + + public long getPrimaryTerm() { + return this.primaryTerm; + } + + public static AcquireLockResponse parse(final XContentParser parser) throws IOException { + LockModel lock = null; + String lockId = null; + Long seqNo = null; + Long primaryTerm = null; + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + while (!XContentParser.Token.END_OBJECT.equals(parser.nextToken())) { + String fieldName = parser.currentName(); + parser.nextToken(); + switch (fieldName) { + case LockModel.LOCK_ID: + lockId = parser.text(); + break; + case LockModel.SEQUENCE_NUMBER: + seqNo = parser.longValue(); + break; + case LockModel.PRIMARY_TERM: + primaryTerm = parser.longValue(); + break; + case LockModel.LOCK_MODEL: + lock = LockModel.parse(parser, seqNo, primaryTerm); + break; + default: + throw new IllegalArgumentException("Unknown field " + fieldName); + } + } + return new AcquireLockResponse( + requireNonNull(lock, "LockModel cannot be null"), + requireNonNull(lockId, "LockId cannot be null"), + requireNonNull(seqNo, "Sequence Number cannot be null"), + requireNonNull(primaryTerm, "Primary Term cannot be null") + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(LockModel.LOCK_ID, lockId); + builder.field(LockModel.SEQUENCE_NUMBER, seqNo); + builder.field(LockModel.PRIMARY_TERM, primaryTerm); + builder.field(LockModel.LOCK_MODEL, lock); + builder.endObject(); + return builder; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java new file mode 100644 index 00000000..37e60ea2 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/ExtensionJobActionRequest.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport.request; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import com.google.protobuf.ByteString; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.extensions.action.ExtensionActionRequest; + +/** + * Request to extensions to invoke a job action, converts request params to a byte array + * + */ +public class ExtensionJobActionRequest extends ExtensionActionRequest { + + public static final byte UNIT_SEPARATOR = (byte) '\u001F'; + + /** + * Instantiates a new ExtensionJobActionRequest + * + * @param extensionActionName the extension action to invoke + * @param actionParams the request object holding the action parameters + * @throws IOException if serialization fails + */ + public ExtensionJobActionRequest(String extensionActionName, T actionParams) throws IOException { + super(extensionActionName, convertParamsToByteString(actionParams)); + } + + /** + * Converts an object of type T that extends {@link Writeable} into a byte array and prepends the fully qualified request class name bytes + * + * @param a class that extends writeable + * @param actionParams the action parameters to be serialized + * @throws IOException if serialization fails + * @return the byte array of the parameters + */ + private static ByteString convertParamsToByteString(T actionParams) throws IOException { + + // Write inner request to output stream and convert to byte array + BytesStreamOutput out = new BytesStreamOutput(); + actionParams.writeTo(out); + out.flush(); + byte[] requestBytes = BytesReference.toBytes(out.bytes()); + + // Convert fully qualifed class name to byte array + byte[] requestClassBytes = actionParams.getClass().getName().getBytes(StandardCharsets.UTF_8); + + // Generate ExtensionActionRequest responseByte array + byte[] proxyRequestBytes = ByteBuffer.allocate(requestClassBytes.length + 1 + requestBytes.length) + .put(requestClassBytes) + .put(ExtensionJobActionRequest.UNIT_SEPARATOR) + .put(requestBytes) + .array(); + + return ByteString.copyFrom(proxyRequestBytes); + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java new file mode 100644 index 00000000..326cccc1 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/JobParameterRequest.java @@ -0,0 +1,119 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport.request; + +import java.io.IOException; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.spi.JobDocVersion; + +/** + * Request to extensions to parse a ScheduledJobParameter + */ +public class JobParameterRequest extends ActionRequest { + + /** + * accessToken is the placeholder for the user Identity/access token to be used to perform validation prior to invoking the extension action + */ + private final String accessToken; + + /** + * jobSource is the index entry bytes reference from the registered job index + */ + private final BytesReference jobSource; + + /** + * id is the job Id + */ + private final String id; + + /** + * jobDocVersion is the metadata regarding this particular registered job + */ + private final JobDocVersion jobDocVersion; + + /** + * Instantiates a new Job Parameter Request + * + * @param accessToken the user identiy/access token that will be validated prior to triggering an extension action + * @param jobParser the parser obect to extract the jobSource {@link BytesReference} from + * @param id the job id + * @param jobDocVersion the job document version + * @throws IOException IOException when message de-serialization fails. + */ + public JobParameterRequest(String accessToken, XContentParser jobParser, String id, JobDocVersion jobDocVersion) throws IOException { + + // Extract jobSource bytesRef from xContentParser + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.copyCurrentStructure(jobParser); + this.jobSource = BytesReference.bytes(builder); + this.accessToken = accessToken; + this.id = id; + this.jobDocVersion = jobDocVersion; + } + + /** + * Instantiates a new Job Parameter Request from {@link StreamInput} + * + * @param in is the byte stream input used to de-serialize the message. + * @throws IOException IOException when message de-serialization fails. + */ + public JobParameterRequest(StreamInput in) throws IOException { + this.accessToken = in.readString(); + this.jobSource = in.readBytesReference(); + this.id = in.readString(); + this.jobDocVersion = new JobDocVersion(in); + } + + /** + * Instantiates a new Job Parameter Request by wrapping the given byte array within a {@link StreamInput} + * + * @param requestParams in bytes array used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public JobParameterRequest(byte[] requestParams) throws IOException { + this(StreamInput.wrap(requestParams)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.accessToken); + out.writeBytesReference(this.jobSource); + out.writeString(this.id); + this.jobDocVersion.writeTo(out); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getAccessToken() { + return this.accessToken; + } + + public BytesReference getJobSource() { + return this.jobSource; + } + + public String getId() { + return this.id; + } + + public JobDocVersion getJobDocVersion() { + return this.jobDocVersion; + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java b/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java new file mode 100644 index 00000000..99bd5be9 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/request/JobRunnerRequest.java @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport.request; + +import java.io.IOException; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.jobscheduler.spi.JobExecutionContext; + +/** + * Request to extensions to invoke their ScheduledJobRunner implementation + * + */ +public class JobRunnerRequest extends ActionRequest { + + /** + * accessToken is the placeholder for the user Identity/access token to be used to perform validation prior to invoking the extension action + */ + private final String accessToken; + + /** + * jobParameterDocumentId is job index entry id + */ + private final String jobParameterDocumentId; + + /** + * jobExecutionContext holds the metadata to configure a job execution + */ + private final JobExecutionContext jobExecutionContext; + + /** + * Instantiates a new Job Runner Request + * + * @param accessToken the access token of this request + * @param jobParameterDocumentId the document id of the job parameter + * @param jobExecutionContext the context used to facilitate a job run + */ + public JobRunnerRequest(String accessToken, String jobParameterDocumentId, JobExecutionContext jobExecutionContext) { + this.accessToken = accessToken; + this.jobParameterDocumentId = jobParameterDocumentId; + this.jobExecutionContext = jobExecutionContext; + } + + /** + * Instantiates a new Job Runner Request from {@link StreamInput} + * + * @param in is the byte stream input used to de-serialize the message. + * @throws IOException IOException when message de-serialization fails. + */ + public JobRunnerRequest(StreamInput in) throws IOException { + this.accessToken = in.readString(); + this.jobParameterDocumentId = in.readString(); + this.jobExecutionContext = new JobExecutionContext(in); + } + + /** + * Instantiates a new Job Runner Request by wrapping the given byte array within a {@link StreamInput} + * + * @param requestParams in bytes array used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public JobRunnerRequest(byte[] requestParams) throws IOException { + this(StreamInput.wrap(requestParams)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.accessToken); + out.writeString(this.jobParameterDocumentId); + this.jobExecutionContext.writeTo(out); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getAccessToken() { + return this.accessToken; + } + + public String getJobParameterDocumentId() { + return this.jobParameterDocumentId; + } + + public JobExecutionContext getJobExecutionContext() { + return this.jobExecutionContext; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/response/JobParameterResponse.java b/src/main/java/org/opensearch/jobscheduler/transport/response/JobParameterResponse.java new file mode 100644 index 00000000..38a94439 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/response/JobParameterResponse.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport.response; + +import java.io.IOException; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.jobscheduler.model.ExtensionJobParameter; + +/** + * Response from extensions to parse a ScheduledJobParameter + */ +public class JobParameterResponse extends ActionResponse { + + /** + * jobParameter is job index entry intended to be used to validate prior to job execution + */ + private final ExtensionJobParameter jobParameter; + + /** + * Instantiates a new Job Parameter Response + * + * @param jobParameter the job parameter parsed from the extension + */ + public JobParameterResponse(ExtensionJobParameter jobParameter) { + this.jobParameter = jobParameter; + } + + /** + * Instantiates a new Job Parameter Response from {@link StreamInput} + * + * @param in is the byte stream input used to de-serialize the message. + * @throws IOException IOException when message de-serialization fails. + */ + public JobParameterResponse(StreamInput in) throws IOException { + this.jobParameter = new ExtensionJobParameter(in); + } + + /** + * Instantiates a new Job Parameter Response by wrapping the given byte array within a {@link StreamInput} + * + * @param responseParams in bytes array used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public JobParameterResponse(byte[] responseParams) throws IOException { + this(StreamInput.wrap(responseParams)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + this.jobParameter.writeTo(out); + } + + public ExtensionJobParameter getJobParameter() { + return this.jobParameter; + } +} diff --git a/src/main/java/org/opensearch/jobscheduler/transport/response/JobRunnerResponse.java b/src/main/java/org/opensearch/jobscheduler/transport/response/JobRunnerResponse.java new file mode 100644 index 00000000..7af48973 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/transport/response/JobRunnerResponse.java @@ -0,0 +1,66 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.transport.response; + +import java.io.IOException; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +/** + * Response from extensions indicating the status of the ScheduledJobRunner invocation + * + */ +public class JobRunnerResponse extends ActionResponse { + + /** + * jobRunnerStatus indicates if the extension job runner has been executed + */ + private final boolean jobRunnerStatus; + + /** + * Instantiates a new Job Runner Response + * + * @param jobRunnerStatus the run status of the extension job runner + */ + public JobRunnerResponse(boolean jobRunnerStatus) { + this.jobRunnerStatus = jobRunnerStatus; + } + + /** + * Instantiates a new Job Runner Response from {@link StreamInput} + * + * @param in is the byte stream input used to de-serialize the message. + * @throws IOException IOException when message de-serialization fails. + */ + public JobRunnerResponse(StreamInput in) throws IOException { + this.jobRunnerStatus = in.readBoolean(); + } + + /** + * Instantiates a new Job Runner Response by wrapping the given byte array within a {@link StreamInput} + * + * @param responseParams in bytes array used to de-serialize the message. + * @throws IOException when message de-serialization fails. + */ + public JobRunnerResponse(byte[] responseParams) throws IOException { + this(StreamInput.wrap(responseParams)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(this.jobRunnerStatus); + } + + public boolean getJobRunnerStatus() { + return this.jobRunnerStatus; + } + +} diff --git a/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java new file mode 100644 index 00000000..c1ed8269 --- /dev/null +++ b/src/main/java/org/opensearch/jobscheduler/utils/JobDetailsService.java @@ -0,0 +1,524 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.utils; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.DocWriteResponse; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.extensions.action.ExtensionProxyAction; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.engine.DocumentMissingException; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.shard.IndexingOperationListener; +import org.opensearch.index.shard.ShardId; +import org.opensearch.jobscheduler.ScheduledJobProvider; +import org.opensearch.jobscheduler.model.ExtensionJobParameter; +import org.opensearch.jobscheduler.model.JobDetails; +import org.opensearch.jobscheduler.spi.JobDocVersion; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.transport.request.ExtensionJobActionRequest; +import org.opensearch.jobscheduler.transport.request.JobParameterRequest; +import org.opensearch.jobscheduler.transport.response.JobParameterResponse; +import org.opensearch.jobscheduler.transport.request.JobRunnerRequest; +import org.opensearch.jobscheduler.transport.response.JobRunnerResponse; + +import java.nio.charset.StandardCharsets; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.Map; + +public class JobDetailsService implements IndexingOperationListener { + + private static final Logger logger = LogManager.getLogger(JobDetailsService.class); + public static final String JOB_DETAILS_INDEX_NAME = ".opensearch-job-scheduler-job-details"; + private static final String PLUGINS_JOB_DETAILS_MAPPING_FILE = "/mappings/opensearch_job_scheduler_job_details.json"; + + public static Long TIME_OUT_FOR_REQUEST = 15L; + private final Client client; + private final ClusterService clusterService; + private Set indicesToListen; + private Map indexToJobProviders; + private static final ConcurrentMap indexToJobDetails = IndexToJobDetails.getInstance(); + + public JobDetailsService( + final Client client, + final ClusterService clusterService, + Set indicesToListen, + Map indexToJobProviders + ) { + this.client = client; + this.clusterService = clusterService; + this.indicesToListen = indicesToListen; + this.indexToJobProviders = indexToJobProviders; + } + + public static ConcurrentMap getIndexToJobDetails() { + return JobDetailsService.indexToJobDetails; + } + + public Map getIndexToJobProviders() { + return this.indexToJobProviders; + } + + public boolean jobDetailsIndexExist() { + return clusterService.state().routingTable().hasIndex(JOB_DETAILS_INDEX_NAME); + } + + private void updateIndicesToListen(String jobIndexName) { + this.indicesToListen.add(jobIndexName); + } + + /** + * Creates a proxy ScheduledJobProvider that facilitates callbacks between extensions and JobScheduler + * + * @param documentId the document Id of the extension job index entry + * @param jobDetails the extension job information + */ + void updateIndexToJobProviders(String documentId, JobDetails jobDetails) { + + String extensionJobIndex = jobDetails.getJobIndex(); + String extensionJobType = jobDetails.getJobType(); + String extensionUniqueId = jobDetails.getExtensionUniqueId(); + + // Create proxy callback objects + ScheduledJobParser extensionJobParser = createProxyScheduledJobParser(extensionUniqueId, jobDetails.getJobParameterAction()); + ScheduledJobRunner extensionJobRunner = createProxyScheduledJobRunner( + documentId, + extensionUniqueId, + jobDetails.getJobRunnerAction() + ); + + // Update indexToJobProviders + this.indexToJobProviders.put( + extensionJobIndex, + new ScheduledJobProvider(extensionJobType, extensionJobIndex, extensionJobParser, extensionJobRunner) + ); + } + + /** + * Adds a new entry into the indexToJobDetails using the document Id as the key, registers the index name to indicesToListen, and registers the ScheduledJobProvider + * + * @param documentId the unique Id for the job details + * @param jobDetails the jobDetails to register + */ + void updateIndexToJobDetails(String documentId, JobDetails jobDetails) { + // Register new JobDetails entry + indexToJobDetails.put(documentId, jobDetails); + updateIndicesToListen(jobDetails.getJobIndex()); + updateIndexToJobProviders(documentId, jobDetails); + } + + /** + * Creates a proxy ScheduledJobParser that triggers an extension's jobParameter action + * + * @param extensionUniqueId the extension to trigger the job parameter action + * @param extensionJobParameterAction the job parameter action name + */ + private ScheduledJobParser createProxyScheduledJobParser(String extensionUniqueId, String extensionJobParameterAction) { + return new ScheduledJobParser() { + + @Override + public ScheduledJobParameter parse(XContentParser xContentParser, String id, JobDocVersion jobDocVersion) throws IOException { + + logger.info("Sending ScheduledJobParameter parse request to extension : " + extensionUniqueId); + + final ExtensionJobParameter[] extensionJobParameterHolder = new ExtensionJobParameter[1]; + CompletableFuture inProgressFuture = new CompletableFuture<>(); + + // TODO : Replace the placeholder with the provided access token from the inital job detials request + + // Prepare JobParameterRequest + JobParameterRequest jobParamRequest = new JobParameterRequest("placeholder", xContentParser, id, jobDocVersion); + + // Invoke extension job parameter action and return ScheduledJobParameter + client.execute( + ExtensionProxyAction.INSTANCE, + new ExtensionJobActionRequest(extensionJobParameterAction, jobParamRequest), + ActionListener.wrap(response -> { + + // Extract response bytes and generate the parsed job parameter + JobParameterResponse jobParameterResponse = new JobParameterResponse(response.getResponseBytes()); + extensionJobParameterHolder[0] = jobParameterResponse.getJobParameter(); + inProgressFuture.complete(extensionJobParameterHolder); + + }, exception -> { + + logger.error("Could not parse job parameter", exception); + inProgressFuture.completeExceptionally(exception); + + }) + ); + + // Stall execution until request completes or times out + try { + inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.error("Request timed out with an exception ", e); + } + } catch (Exception e) { + logger.error("Could not parse ScheduledJobParameter due to exception ", e); + } + + return extensionJobParameterHolder[0]; + } + }; + + } + + /** + * Creates a proxy ScheduledJobRunner that triggers an extension's jobRunner action + * + * @param documentId the document Id of the extension job index entry + * @param extensionUniqueId the extension to trigger the job runner action + * @param extensionJobRunnerAction the job runner action name + */ + private ScheduledJobRunner createProxyScheduledJobRunner(String documentId, String extensionUniqueId, String extensionJobRunnerAction) { + return new ScheduledJobRunner() { + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + + logger.info("Sending ScheduledJobRunner runJob request to extension : " + extensionUniqueId); + + final Boolean[] extensionJobRunnerStatus = new Boolean[1]; + CompletableFuture inProgressFuture = new CompletableFuture<>(); + + try { + // TODO : Replace the placeholder with the provided access token from the inital job detials request + + // Prepare JobRunnerRequest + JobRunnerRequest jobRunnerRequest = new JobRunnerRequest("placeholder", documentId, context); + + // Invoke extension job runner action + client.execute( + ExtensionProxyAction.INSTANCE, + new ExtensionJobActionRequest(extensionJobRunnerAction, jobRunnerRequest), + ActionListener.wrap(response -> { + + // Extract response bytes into a streamInput and set the extensionJobParameter + JobRunnerResponse jobRunnerResponse = new JobRunnerResponse(response.getResponseBytes()); + extensionJobRunnerStatus[0] = jobRunnerResponse.getJobRunnerStatus(); + inProgressFuture.complete(extensionJobRunnerStatus); + + }, exception -> { + + logger.error("Failed to run job due to exception ", exception); + inProgressFuture.completeExceptionally(exception); + + }) + ); + + // Stall execution until request completes or times out + inProgressFuture.orTimeout(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS).join(); + + } catch (IOException e) { + logger.error("Failed to create JobRunnerRequest", e); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.error("Request timed out with an exception ", e); + } + } catch (Exception e) { + logger.error("Could not run extension job due to exception ", e); + } + + // log extension job runner status + logger.info("Job Runner Status for extension " + extensionUniqueId + " : " + extensionJobRunnerStatus[0]); + } + }; + } + + @Override + public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) { + + // Determine if index operation was successful + if (result.getResultType().equals(Engine.Result.Type.FAILURE)) { + logger.info("Job Details Registration failed for extension {} on index {}", index.id(), shardId.getIndexName()); + return; + } + + // Generate parser using bytesRef from index + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, index.source().utf8ToString()); + parser.nextToken(); + updateIndexToJobDetails(index.id(), JobDetails.parse(parser)); + } catch (IOException e) { + logger.error("IOException occurred creating job details for extension id " + index.id(), e); + } + } + + /** + * + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details index if it was created + * or else null. + */ + @VisibleForTesting + void createJobDetailsIndex(ActionListener listener) { + if (jobDetailsIndexExist()) { + listener.onResponse(true); + } else { + CreateIndexRequest request = new CreateIndexRequest(JOB_DETAILS_INDEX_NAME).mapping(jobDetailsMapping()); + client.admin() + .indices() + .create(request, ActionListener.wrap(response -> listener.onResponse(response.isAcknowledged()), exception -> { + if (exception instanceof ResourceAlreadyExistsException + || exception.getCause() instanceof ResourceAlreadyExistsException) { + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + } + + /** + * Attempts to process job details with a specified documentId. If the job details does not exist it attempts to create the job details document. + * If the job details document exists, it will try to update the job details. + * + * @param documentId a nullable document Id + * @param jobIndexName a non-null job index name. + * @param jobTypeName a non-null job type name. + * @param jobParameterActionName a non-null job parameter action name. + * @param jobRunnerActionName a non-null job runner action name. + * @param extensionUniqueId the extension Id + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was processed + * or else null. + */ + public void processJobDetails( + final String documentId, + final String jobIndexName, + final String jobTypeName, + final String jobParameterActionName, + final String jobRunnerActionName, + final String extensionUniqueId, + ActionListener listener + ) { + // Validate job detail params + if (jobIndexName == null + || jobIndexName.isEmpty() + || jobTypeName == null + || jobTypeName.isEmpty() + || jobParameterActionName == null + || jobParameterActionName.isEmpty() + || jobRunnerActionName == null + || jobRunnerActionName.isEmpty() + || extensionUniqueId == null + || extensionUniqueId.isEmpty()) { + listener.onFailure( + new IllegalArgumentException( + "JobIndexName, JobTypeName, JobParameterActionName, JobRunnerActionName, Extension Unique Id must not be null or empty" + ) + ); + } else { + // Ensure job details index has been created + createJobDetailsIndex(ActionListener.wrap(created -> { + if (created) { + try { + // Update entry request + if (documentId != null) { + // Recover entry via documentId + findJobDetails(documentId, ActionListener.wrap(existingJobDetails -> { + JobDetails updateJobDetails = new JobDetails(existingJobDetails); + + // Set updated fields + updateJobDetails.setJobIndex(jobIndexName); + updateJobDetails.setJobType(jobTypeName); + updateJobDetails.setJobParameterAction(jobParameterActionName); + updateJobDetails.setJobRunnerAction(jobRunnerActionName); + + // Send update Request + updateJobDetails(documentId, updateJobDetails, listener); + }, listener::onFailure)); + } else { + // Create JobDetails from params + JobDetails tempJobDetails = new JobDetails( + jobIndexName, + jobTypeName, + jobParameterActionName, + jobRunnerActionName, + extensionUniqueId + ); + + // Index new Job Details entry + logger.info( + "Creating job details for extension unique id " + extensionUniqueId + " : " + tempJobDetails.toString() + ); + createJobDetails(tempJobDetails, listener); + } + } catch (VersionConflictEngineException e) { + logger.debug("could not process job index for extensionUniqueId " + extensionUniqueId, e.getMessage()); + listener.onResponse(null); + } + } else { + listener.onResponse(null); + } + }, listener::onFailure)); + } + } + + /** + * Create Job details entry + * @param tempJobDetails new job details object that need to be inserted as document in the index= + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was created + * or else null. + */ + private void createJobDetails(final JobDetails tempJobDetails, ActionListener listener) { + try { + // Create index request, document Id will be randomly generated + final IndexRequest request = new IndexRequest(JOB_DETAILS_INDEX_NAME).source( + tempJobDetails.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS) + ).setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO).setIfPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM).create(true); + + // Index Job Details + client.index(request, ActionListener.wrap(response -> { listener.onResponse(response.getId()); }, exception -> { + if (exception instanceof IOException) { + logger.error("IOException occurred creating job details", exception); + } + listener.onResponse(null); + })); + } catch (IOException e) { + logger.error("IOException occurred creating job details", e); + listener.onResponse(null); + } + } + + /** + * Find Job details for a particular document Id + * @param documentId unique id for Job Details document + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was found + * or else null. + */ + private void findJobDetails(final String documentId, ActionListener listener) { + GetRequest getRequest = new GetRequest(JOB_DETAILS_INDEX_NAME).id(documentId); + client.get(getRequest, ActionListener.wrap(response -> { + if (!response.isExists()) { + listener.onResponse(null); + } else { + try { + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getSourceAsString()); + parser.nextToken(); + listener.onResponse(JobDetails.parse(parser)); + } catch (IOException e) { + logger.error("IOException occurred finding JobDetails for documentId " + documentId, e); + listener.onResponse(null); + } + } + }, exception -> { + logger.error("Exception occurred finding job details for documentId " + documentId, exception); + listener.onFailure(exception); + })); + } + + /** + * Delete job details to a corresponding document Id + * @param documentId unique id to find and delete the job details document in the index + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was deleted + * or else null. + */ + public void deleteJobDetails(final String documentId, ActionListener listener) { + DeleteRequest deleteRequest = new DeleteRequest(JOB_DETAILS_INDEX_NAME).id(documentId); + client.delete(deleteRequest, ActionListener.wrap(response -> { + listener.onResponse( + response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND + ); + }, exception -> { + if (exception instanceof IndexNotFoundException || exception.getCause() instanceof IndexNotFoundException) { + logger.debug("Index is not found to delete job details for document id. {} " + documentId, exception.getMessage()); + listener.onResponse(true); + } else { + listener.onFailure(exception); + } + })); + } + + /** + * Update Job details to a corresponding documentId + * @param updateJobDetails update job details object entry + * @param documentId unique id to find and update the corresponding document mapped to it + * @param listener an {@code ActionListener} that has onResponse and onFailure that is used to return the job details if it was updated + * or else null. + */ + private void updateJobDetails(final String documentId, final JobDetails updateJobDetails, ActionListener listener) { + try { + UpdateRequest updateRequest = new UpdateRequest().index(JOB_DETAILS_INDEX_NAME) + .id(documentId) + .doc(updateJobDetails.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .fetchSource(true); + + client.update(updateRequest, ActionListener.wrap(response -> listener.onResponse(response.getId()), exception -> { + if (exception instanceof VersionConflictEngineException) { + logger.debug("could not update job details for documentId " + documentId, exception.getMessage()); + } + if (exception instanceof DocumentMissingException) { + logger.debug("Document is deleted. This happens if the job details is already removed {}", exception.getMessage()); + } + if (exception instanceof IOException) { + logger.error("IOException occurred in updating job details.", exception); + } + listener.onResponse(null); + })); + } catch (IOException e) { + logger.error("IOException occurred updating job details for documentId " + documentId, e); + listener.onResponse(null); + } + } + + private String jobDetailsMapping() { + try { + InputStream in = JobDetailsService.class.getResourceAsStream(PLUGINS_JOB_DETAILS_MAPPING_FILE); + StringBuilder stringBuilder = new StringBuilder(); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + for (String line; (line = bufferedReader.readLine()) != null;) { + stringBuilder.append(line); + } + return stringBuilder.toString(); + } catch (IOException e) { + throw new IllegalArgumentException("JobDetails Mapping cannot be read correctly."); + } + } + + private static class IndexToJobDetails { + private static final ConcurrentMap indexToJobDetails = new ConcurrentHashMap<>(); + + public static ConcurrentMap getInstance() { + return IndexToJobDetails.indexToJobDetails; + } + } +} diff --git a/src/main/resources/mappings/opensearch_job_scheduler_job_details.json b/src/main/resources/mappings/opensearch_job_scheduler_job_details.json new file mode 100644 index 00000000..e975bec0 --- /dev/null +++ b/src/main/resources/mappings/opensearch_job_scheduler_job_details.json @@ -0,0 +1,20 @@ +{ + "dynamic": "false", + "properties": { + "job_index_name": { + "type": "keyword" + }, + "job_type_name": { + "type": "keyword" + }, + "job_parser_action": { + "type": "keyword" + }, + "job_runner_action": { + "type": "keyword" + }, + "extension_unique_id": { + "type": "keyword" + } + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/ODFERestTestCase.java b/src/test/java/org/opensearch/jobscheduler/ODFERestTestCase.java new file mode 100644 index 00000000..d204f26c --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/ODFERestTestCase.java @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.http.Header; +import org.apache.http.HttpHost; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.message.BasicHeader; +import org.apache.http.ssl.SSLContextBuilder; +import org.junit.After; +import org.opensearch.client.Request; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.RestClientBuilder; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.test.rest.OpenSearchRestTestCase; + +public abstract class ODFERestTestCase extends OpenSearchRestTestCase { + + private static String localhostName = "localhost"; + private static int port = 9200; + + protected boolean isHttps() { + boolean isHttps = Optional.ofNullable(System.getProperty("https")).map("true"::equalsIgnoreCase).orElse(false); + if (isHttps) { + // currently only external cluster is supported for security enabled testing + if (!Optional.ofNullable(System.getProperty("tests.rest.cluster")).isPresent()) { + throw new RuntimeException("cluster url should be provided for security enabled testing"); + } + } + + return isHttps; + } + + @Override + protected String getProtocol() { + return isHttps() ? "https" : "http"; + } + + @Override + protected Settings restAdminSettings() { + return Settings.builder().put("strictDeprecationMode", false).put("http.port", 9200).build(); + } + + @Override + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + boolean strictDeprecationMode = settings.getAsBoolean("strictDeprecationMode", true); + RestClientBuilder builder = RestClient.builder(hosts); + if (isHttps()) { + configureHttpsClient(builder, settings); + builder.setStrictDeprecationMode(strictDeprecationMode); + return builder.build(); + } else { + configureClient(builder, settings); + builder.setStrictDeprecationMode(strictDeprecationMode); + return builder.build(); + } + + } + + @SuppressWarnings("unchecked") + @After + protected void wipeAllODFEIndices() throws IOException { + Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all")); + XContentType xContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue()); + try ( + XContentParser parser = xContentType.xContent() + .createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + response.getEntity().getContent() + ) + ) { + XContentParser.Token token = parser.nextToken(); + List> parserList = null; + if (token == XContentParser.Token.START_ARRAY) { + parserList = parser.listOrderedMap().stream().map(obj -> (Map) obj).collect(Collectors.toList()); + } else { + parserList = Collections.singletonList(parser.mapOrdered()); + } + + for (Map index : parserList) { + String indexName = (String) index.get("index"); + if (indexName != null && !".opendistro_security".equals(indexName)) { + adminClient().performRequest(new Request("DELETE", "/" + indexName)); + } + } + } + } + + protected static void configureHttpsClient(RestClientBuilder builder, Settings settings) throws IOException { + Map headers = ThreadContext.buildDefaultHeaders(settings); + Header[] defaultHeaders = new Header[headers.size()]; + int i = 0; + for (Map.Entry entry : headers.entrySet()) { + defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue()); + } + builder.setDefaultHeaders(defaultHeaders); + builder.setHttpClientConfigCallback(httpClientBuilder -> { + String userName = Optional.ofNullable(System.getProperty("user")) + .orElseThrow(() -> new RuntimeException("user name is missing")); + String password = Optional.ofNullable(System.getProperty("password")) + .orElseThrow(() -> new RuntimeException("password is missing")); + CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials( + new AuthScope(new HttpHost(localhostName, port)), + new UsernamePasswordCredentials(userName, password) + ); + try { + return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) + // disable the certificate since our testing cluster just uses the default security configuration + .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) + .setSSLContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); + final TimeValue socketTimeout = TimeValue.parseTimeValue( + socketTimeoutString == null ? "60s" : socketTimeoutString, + CLIENT_SOCKET_TIMEOUT + ); + builder.setRequestConfigCallback(conf -> conf.setSocketTimeout(Math.toIntExact(socketTimeout.getMillis()))); + if (settings.hasValue(CLIENT_PATH_PREFIX)) { + builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX)); + } + } + + /** + * wipeAllIndices won't work since it cannot delete security index. Use wipeAllODFEIndices instead. + */ + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/TestHelpers.java b/src/test/java/org/opensearch/jobscheduler/TestHelpers.java new file mode 100644 index 00000000..76ca9b8c --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/TestHelpers.java @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.http.entity.ContentType; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.entity.StringEntity; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.WarningsHandler; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; + +public class TestHelpers { + + public static final String GET_JOB_DETAILS_BASE_URI = "/_plugins/_job_scheduler/_job_details"; + public static final String GET_LOCK_BASE_URI = "/_plugins/_job_scheduler/_lock"; + public static final String RELEASE_LOCK_BASE_URI = "/_plugins/_job_scheduler/_release_lock"; + + public static String xContentBuilderToString(XContentBuilder builder) { + return BytesReference.bytes(builder).utf8ToString(); + } + + public static String toJsonString(ToXContentObject object) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder(); + return TestHelpers.xContentBuilderToString(object.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } + + public static HttpEntity toHttpEntity(ToXContentObject object) throws IOException { + return new StringEntity(toJsonString(object), ContentType.APPLICATION_JSON); + } + + public static Response makeRequest( + RestClient client, + String method, + String endpoint, + Map params, + HttpEntity entity, + List
headers + ) throws IOException { + return makeRequest(client, method, endpoint, params, entity, headers, false); + } + + public static HttpEntity toHttpEntity(String jsonString) throws IOException { + return new StringEntity(jsonString, ContentType.APPLICATION_JSON); + } + + public static Response makeRequest( + RestClient client, + String method, + String endpoint, + Map params, + HttpEntity entity, + List
headers, + boolean strictDeprecationMode + ) throws IOException { + Request request = new Request(method, endpoint); + + RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); + if (headers != null) { + headers.forEach(header -> options.addHeader(header.getName(), header.getValue())); + } + options.setWarningsHandler(strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE); + request.setOptions(options.build()); + + if (params != null) { + params.entrySet().forEach(it -> request.addParameter(it.getKey(), it.getValue())); + } + if (entity != null) { + request.setEntity(entity); + } + return client.performRequest(request); + } + + public static String generateAcquireLockRequestBody(String jobIndexName, String jobId) { + return "{\"job_id\":\"" + jobId + "\",\"job_index_name\":\"" + jobIndexName + "\",\"lock_duration_seconds\":\"30.0\"}"; + } + + public static String generateExpectedLockId(String jobIndexName, String jobId) { + return jobIndexName + "-" + jobId; + } + +} diff --git a/src/test/java/org/opensearch/jobscheduler/multinode/GetJobDetailsMultiNodeRestIT.java b/src/test/java/org/opensearch/jobscheduler/multinode/GetJobDetailsMultiNodeRestIT.java new file mode 100644 index 00000000..4b294ac1 --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/multinode/GetJobDetailsMultiNodeRestIT.java @@ -0,0 +1,68 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.multinode; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.opensearch.client.Response; +import org.opensearch.jobscheduler.ODFERestTestCase; +import org.opensearch.jobscheduler.TestHelpers; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) +public class GetJobDetailsMultiNodeRestIT extends ODFERestTestCase { + + private static final String initialRequestBody = + "{\"job_index\":\"intial_job_index\",\"job_type\":\"intial_job_type\",\"job_parameter_action\":\"intial_job_parameter_action\",\"job_runner_action\":\"intial_job_runner_action\",\"extension_unique_id\":\"extension_unique_id\"}"; + private static final String updatedRequestBody = + "{\"job_index\":\"updated_job_index\",\"job_type\":\"updated_job_type\",\"job_parameter_action\":\"updated_job_parameter_action\",\"job_runner_action\":\"updated_job_runner_action\",\"extension_unique_id\":\"extension_unique_id\"}"; + + /** + * The below test performs a get index api on a multinode cluster. Internally, the cluster redirects the request to either of the node. + * After getting successful response, the get job type api is triggered for 100 times. From the response of get job type, job index is retrieved and is being compared with get index api response. + * Both response should be equal. + * @throws Exception + */ + public void testGetJobDetailsRestAPI() throws Exception { + + // Send intial request + Response response = TestHelpers.makeRequest( + client(), + "PUT", + TestHelpers.GET_JOB_DETAILS_BASE_URI, + ImmutableMap.of(), + TestHelpers.toHttpEntity(initialRequestBody), + null + ); + + String expectedDocumentId = validateResponseAndGetDocumentId(entityAsMap(response)); + + // Submit 100 update requests + for (int i = 0; i < 100; i++) { + Response updateResponse = TestHelpers.makeRequest( + client(), + "PUT", + TestHelpers.GET_JOB_DETAILS_BASE_URI, + ImmutableMap.of(GetJobDetailsRequest.DOCUMENT_ID, expectedDocumentId), + TestHelpers.toHttpEntity(updatedRequestBody), + null + ); + + String documentId = validateResponseAndGetDocumentId(entityAsMap(updateResponse)); + assertEquals(expectedDocumentId, documentId); + } + } + + private String validateResponseAndGetDocumentId(Map responseMap) { + assertEquals("success", responseMap.get("response")); + return (String) responseMap.get(GetJobDetailsRequest.DOCUMENT_ID); + } + +} diff --git a/src/test/java/org/opensearch/jobscheduler/multinode/GetLockMultiNodeRestIT.java b/src/test/java/org/opensearch/jobscheduler/multinode/GetLockMultiNodeRestIT.java new file mode 100644 index 00000000..986dfead --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/multinode/GetLockMultiNodeRestIT.java @@ -0,0 +1,86 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.multinode; + +import com.google.common.collect.ImmutableMap; + +import java.io.IOException; + +import org.junit.Before; +import org.opensearch.client.Response; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.jobscheduler.ODFERestTestCase; +import org.opensearch.jobscheduler.TestHelpers; +import org.opensearch.jobscheduler.transport.AcquireLockResponse; +import org.opensearch.test.OpenSearchIntegTestCase; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) +public class GetLockMultiNodeRestIT extends ODFERestTestCase { + + private String initialJobId; + private String initialJobIndexName; + private Response initialGetLockResponse; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.initialJobId = "testJobId"; + this.initialJobIndexName = "testJobIndexName"; + // Send initial request to ensure lock index has been created + this.initialGetLockResponse = TestHelpers.makeRequest( + client(), + "GET", + TestHelpers.GET_LOCK_BASE_URI, + ImmutableMap.of(), + TestHelpers.toHttpEntity(TestHelpers.generateAcquireLockRequestBody(this.initialJobIndexName, this.initialJobId)), + null + ); + } + + public void testGetLockRestAPI() throws Exception { + + String initialLockId = validateResponseAndGetLockId(initialGetLockResponse); + assertEquals(TestHelpers.generateExpectedLockId(initialJobIndexName, initialJobId), initialLockId); + + // Submit 10 requests to generate new lock models for different job indexes + for (int i = 0; i < 10; i++) { + Response getLockResponse = TestHelpers.makeRequest( + client(), + "GET", + TestHelpers.GET_LOCK_BASE_URI, + ImmutableMap.of(), + TestHelpers.toHttpEntity(TestHelpers.generateAcquireLockRequestBody(String.valueOf(i), String.valueOf(i))), + null + ); + + String lockId = validateResponseAndGetLockId(getLockResponse); + + assertEquals(TestHelpers.generateExpectedLockId(String.valueOf(i), String.valueOf(i)), lockId); + } + } + + private String validateResponseAndGetLockId(Response response) throws IOException { + + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.getEntity().getContent()); + + AcquireLockResponse acquireLockResponse = AcquireLockResponse.parse(parser); + + // Validate response map fields + assertNotNull(acquireLockResponse.getLockId()); + assertNotNull(acquireLockResponse.getSeqNo()); + assertNotNull(acquireLockResponse.getPrimaryTerm()); + assertNotNull(acquireLockResponse.getLock()); + + return acquireLockResponse.getLockId(); + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/multinode/ReleaseLockActionMultiNodeRestIT.java b/src/test/java/org/opensearch/jobscheduler/multinode/ReleaseLockActionMultiNodeRestIT.java new file mode 100644 index 00000000..99fca6f1 --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/multinode/ReleaseLockActionMultiNodeRestIT.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.multinode; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.Before; +import org.opensearch.client.Response; +import org.opensearch.jobscheduler.ODFERestTestCase; +import org.opensearch.jobscheduler.TestHelpers; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.test.OpenSearchIntegTestCase; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2) +public class ReleaseLockActionMultiNodeRestIT extends ODFERestTestCase { + private Response initialGetLockResponse; + private String initialJobId; + private String initialJobIndexName; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.initialJobId = "testJobId"; + this.initialJobIndexName = "testJobIndexName"; + // Send initial request to ensure lock index has been created + this.initialGetLockResponse = TestHelpers.makeRequest( + client(), + "GET", + TestHelpers.GET_LOCK_BASE_URI, + ImmutableMap.of(), + TestHelpers.toHttpEntity(TestHelpers.generateAcquireLockRequestBody(initialJobIndexName, initialJobId)), + null + ); + } + + public void testReleaseLockRestAPI() throws Exception { + String initialLockId = validateResponseAndGetLockId(entityAsMap(this.initialGetLockResponse)); + assertEquals(TestHelpers.generateExpectedLockId(initialJobIndexName, initialJobId), initialLockId); + Response releaseLockResponse = TestHelpers.makeRequest( + client(), + "PUT", + TestHelpers.RELEASE_LOCK_BASE_URI + "/" + TestHelpers.generateExpectedLockId(initialJobIndexName, initialJobId), + ImmutableMap.of(), + null, + null + ); + assertEquals("success", entityAsMap(releaseLockResponse).get("release-lock")); + } + + private String validateResponseAndGetLockId(Map responseMap) { + return (String) responseMap.get(LockModel.LOCK_ID); + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetJobDetailsActionTests.java b/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetJobDetailsActionTests.java new file mode 100644 index 00000000..e592427b --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetJobDetailsActionTests.java @@ -0,0 +1,159 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.action; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.rest.request.GetJobDetailsRequest; +import org.opensearch.jobscheduler.utils.JobDetailsService; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class RestGetJobDetailsActionTests extends OpenSearchTestCase { + + private RestGetJobDetailsAction action; + private JobDetailsService jobDetailsService; + private String getJobDetailsPath; + private String updateJobDetailsPath; + private String jobIndex; + private String jobType; + private String jobRunnerAction; + private String jobParameterAction; + private String extensionUniqueId; + private String requestBody; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.jobDetailsService = Mockito.mock(JobDetailsService.class); + this.action = new RestGetJobDetailsAction(jobDetailsService); + this.getJobDetailsPath = String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_job_details"); + this.updateJobDetailsPath = String.format( + Locale.ROOT, + "%s/%s/{%s}", + JobSchedulerPlugin.JS_BASE_URI, + "_job_details", + GetJobDetailsRequest.DOCUMENT_ID + ); + + this.jobIndex = "sample-index-name"; + this.jobType = "sample-job-type"; + this.jobRunnerAction = "sample-job-runner-action"; + this.jobParameterAction = "sample-job-parameter-action"; + this.extensionUniqueId = "sample-extension"; + this.requestBody = "{\"" + + GetJobDetailsRequest.JOB_INDEX + + "\":\"" + + this.jobIndex + + "\",\"" + + GetJobDetailsRequest.JOB_TYPE + + "\":\"" + + this.jobType + + "\",\"" + + GetJobDetailsRequest.JOB_RUNNER_ACTION + + "\":\"" + + this.jobRunnerAction + + "\",\"" + + GetJobDetailsRequest.JOB_PARAMETER_ACTION + + "\":\"" + + this.jobParameterAction + + "\",\"" + + GetJobDetailsRequest.EXTENSION_UNIQUE_ID + + "\":\"" + + this.extensionUniqueId + + "\"}"; + } + + public void testGetNames() { + String name = action.getName(); + assertEquals(action.GET_JOB_DETAILS_ACTION, name); + } + + public void testGetRoutes() { + List routes = action.routes(); + assertEquals(getJobDetailsPath, routes.get(0).getPath()); + assertEquals(updateJobDetailsPath, routes.get(1).getPath()); + } + + public void testPrepareGetJobDetailsRequest() throws IOException { + + String documentId = null; + Map params = new HashMap<>(); + + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(getJobDetailsPath) + .withParams(params) + .withContent(new BytesArray(requestBody), XContentType.JSON) + .build(); + + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + Mockito.doNothing() + .when(jobDetailsService) + .processJobDetails( + documentId, + jobIndex, + jobType, + jobParameterAction, + jobRunnerAction, + extensionUniqueId, + ActionListener.wrap(response -> {}, exception -> {}) + ); + + action.prepareRequest(request, Mockito.mock(NodeClient.class)); + + assertEquals(channel.responses().get(), 0); + assertEquals(channel.errors().get(), 0); + } + + public void testPrepareUpdateJobDetailsRequest() throws IOException { + + String documentId = "document-id"; + Map params = new HashMap<>(); + params.put(GetJobDetailsRequest.DOCUMENT_ID, documentId); + + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(updateJobDetailsPath) + .withParams(params) + .withContent(new BytesArray(requestBody), XContentType.JSON) + .build(); + + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + Mockito.doNothing() + .when(jobDetailsService) + .processJobDetails( + documentId, + jobIndex, + jobType, + jobParameterAction, + jobRunnerAction, + extensionUniqueId, + ActionListener.wrap(response -> {}, exception -> {}) + ); + + action.prepareRequest(request, Mockito.mock(NodeClient.class)); + + assertEquals(channel.responses().get(), 0); + assertEquals(channel.errors().get(), 0); + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetLockActionTests.java b/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetLockActionTests.java new file mode 100644 index 00000000..1cd4c474 --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/rest/action/RestGetLockActionTests.java @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.action; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.TestHelpers; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.jobscheduler.transport.AcquireLockRequest; +import org.opensearch.rest.RestHandler; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class RestGetLockActionTests extends OpenSearchTestCase { + + private ClusterService clusterService; + private LockService lockService; + private RestGetLockAction getLockAction; + private String getLockPath; + private String testJobId; + private String testJobIndexName; + private long testLockDurationSeconds; + private String requestBody; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(this.clusterService.state().routingTable().hasIndex(".opendistro-job-scheduler-lock")).thenReturn(true); + this.lockService = new LockService(Mockito.mock(NodeClient.class), clusterService); + this.getLockAction = new RestGetLockAction(this.lockService); + this.getLockPath = String.format(Locale.ROOT, "%s/%s", JobSchedulerPlugin.JS_BASE_URI, "_lock"); + this.testJobId = "testJobId"; + this.testJobIndexName = "testJobIndexName"; + this.testLockDurationSeconds = 1L; + this.requestBody = "{\"job_id\":\"" + + this.testJobId + + "\",\"job_index_name\":\"" + + this.testJobIndexName + + "\",\"lock_duration_seconds\":\"" + + this.testLockDurationSeconds + + "\"}"; + } + + public void testGetNames() { + String name = getLockAction.getName(); + assertEquals(LockModel.GET_LOCK_ACTION, name); + } + + public void testGetRoutes() { + List routes = getLockAction.routes(); + assertEquals(getLockPath, routes.get(0).getPath()); + } + + public void testAcquireLockRequest() throws IOException { + + // Create AcquireLockRequest + AcquireLockRequest acquireLockRequest = new AcquireLockRequest(testJobId, testJobIndexName, testLockDurationSeconds); + + // Generate Xcontent from request + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.field(AcquireLockRequest.JOB_ID, acquireLockRequest.getJobId()); + builder.field(AcquireLockRequest.JOB_INDEX_NAME, acquireLockRequest.getJobIndexName()); + builder.field(AcquireLockRequest.LOCK_DURATION_SECONDS, acquireLockRequest.getLockDurationSeconds()); + builder.endObject(); + + // Test request serde logic + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, TestHelpers.xContentBuilderToString(builder)); + parser.nextToken(); + acquireLockRequest = AcquireLockRequest.parse(parser); + assertEquals(this.testJobId, acquireLockRequest.getJobId()); + assertEquals(this.testJobIndexName, acquireLockRequest.getJobIndexName()); + assertEquals(this.testLockDurationSeconds, acquireLockRequest.getLockDurationSeconds()); + } + + public void testPrepareGetLockRequest() throws IOException { + + // Prepare rest request + Map params = new HashMap<>(); + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath(this.getLockPath) + .withParams(params) + .withContent(new BytesArray(this.requestBody), XContentType.JSON) + .build(); + + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + + this.getLockAction.prepareRequest(request, Mockito.mock(NodeClient.class)); + assertEquals(channel.responses().get(), 0); + assertEquals(channel.errors().get(), 0); + } + +} diff --git a/src/test/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockActionTests.java b/src/test/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockActionTests.java new file mode 100644 index 00000000..122e0cab --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/rest/action/RestReleaseLockActionTests.java @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.rest.action; + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.jobscheduler.JobSchedulerPlugin; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.rest.RestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +public class RestReleaseLockActionTests extends OpenSearchTestCase { + + private RestReleaseLockAction restReleaseLockAction; + + private LockService lockService; + + private String releaseLockPath; + + private ClusterService clusterService; + + private Client client; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS); + this.client = Mockito.mock(Client.class); + this.lockService = new LockService(client, clusterService); + restReleaseLockAction = new RestReleaseLockAction(this.lockService); + this.releaseLockPath = String.format(Locale.ROOT, "%s/%s/{%s}", JobSchedulerPlugin.JS_BASE_URI, "_release_lock", LockModel.LOCK_ID); + + } + + public void testGetNames() { + String name = restReleaseLockAction.getName(); + assertEquals(restReleaseLockAction.RELEASE_LOCK_ACTION, name); + } + + public void testGetRoutes() { + List routes = restReleaseLockAction.routes(); + assertEquals(releaseLockPath, routes.get(0).getPath()); + } + + public void testPrepareReleaseLockRequest() throws IOException { + Map params = new HashMap<>(); + params.put(LockModel.LOCK_ID, "lock_id"); + FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(releaseLockPath) + .withParams(params) + .build(); + final FakeRestChannel channel = new FakeRestChannel(request, true, 0); + assertEquals(channel.responses().get(), 0); + assertEquals(channel.errors().get(), 0); + } +} diff --git a/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java b/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java index 769485c4..df357da1 100644 --- a/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java +++ b/src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java @@ -16,6 +16,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.jobscheduler.utils.JobDetailsService; import org.apache.lucene.index.Term; import org.apache.lucene.util.BytesRef; import org.opensearch.Version; @@ -76,6 +77,7 @@ public class JobSweeperTests extends OpenSearchAllocationTestCase { private ScheduledJobRunner jobRunner; private JobSweeper sweeper; + private JobDetailsService jobDetailsService; private DiscoveryNode discoveryNode; @@ -88,6 +90,7 @@ public void setup() throws IOException { this.scheduler = Mockito.mock(JobScheduler.class); this.jobRunner = Mockito.mock(ScheduledJobRunner.class); this.jobParser = Mockito.mock(ScheduledJobParser.class); + this.jobDetailsService = Mockito.mock(JobDetailsService.class); // NamedXContentRegistry.Entry xContentRegistryEntry = new NamedXContentRegistry.Entry(ScheduledJobParameter.class, // new ParseField("JOB_TYPE"), this.jobParser); @@ -124,7 +127,8 @@ public void setup() throws IOException { xContentRegistry, jobProviderMap, scheduler, - new LockService(client, clusterService) + new LockService(client, clusterService), + jobDetailsService ); } diff --git a/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java new file mode 100644 index 00000000..f7d9fa0f --- /dev/null +++ b/src/test/java/org/opensearch/jobscheduler/utils/JobDetailsServiceIT.java @@ -0,0 +1,444 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.jobscheduler.utils; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.junit.Before; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.extensions.action.ExtensionActionRequest; +import org.opensearch.extensions.action.ExtensionActionResponse; +import org.opensearch.jobscheduler.model.ExtensionJobParameter; +import org.opensearch.jobscheduler.model.JobDetails; +import org.opensearch.jobscheduler.spi.JobDocVersion; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.jobscheduler.transport.request.ExtensionJobActionRequest; +import org.opensearch.jobscheduler.transport.request.JobParameterRequest; +import org.opensearch.jobscheduler.transport.response.JobParameterResponse; +import org.opensearch.jobscheduler.transport.request.JobRunnerRequest; +import org.opensearch.jobscheduler.transport.response.JobRunnerResponse; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.jobscheduler.ScheduledJobProvider; + +public class JobDetailsServiceIT extends OpenSearchIntegTestCase { + + private ClusterService clusterService; + private Set indicesToListen; + private Map indexToJobProviders; + + private String expectedJobIndex; + private String expectedJobType; + private String expectedJobParamAction; + private String expectedJobRunnerAction; + private String expectedExtensionUniqueId; + + private String expectedDocumentId; + private String updatedJobIndex; + + private ExtensionJobParameter extensionJobParameter; + + @Before + public void setup() { + this.clusterService = Mockito.mock(ClusterService.class, Mockito.RETURNS_DEEP_STUBS); + Mockito.when(this.clusterService.state().routingTable().hasIndex(JobDetailsService.JOB_DETAILS_INDEX_NAME)) + .thenReturn(false) + .thenReturn(true); + + this.indicesToListen = new HashSet<>(); + this.indexToJobProviders = new HashMap<>(); + + this.expectedJobIndex = "sample-job-index"; + this.expectedJobType = "sample-job-type"; + this.expectedJobParamAction = "sample-job-parameter"; + this.expectedJobRunnerAction = "sample-job-runner"; + this.expectedExtensionUniqueId = "sample-extension"; + + this.expectedDocumentId = "sample-document-id"; + this.updatedJobIndex = "updated-job-index"; + + this.extensionJobParameter = new ExtensionJobParameter( + "jobName", + new IntervalSchedule(Instant.now(), 5, ChronoUnit.MINUTES), + Instant.now(), + Instant.now(), + true, + 2L, + 2.0 + ); + } + + /** + * Finds the index of the specified byte value within the given byte array + * + * @param bytes the byte array to process + * @param value the byte to identify index of + * @return the index of the byte value + */ + private int indexOf(byte[] bytes, byte value) { + for (int offset = 0; offset < bytes.length; ++offset) { + if (bytes[offset] == value) { + return offset; + } + } + return -1; + } + + /** + * Trims off the fully qualified request class name bytes and null byte from the ExtensionActionRequest requestBytes + * + * @param requestBytes the request bytes of an ExtensionActionRequest + * @return the trimmed array of bytes + */ + private byte[] trimRequestBytes(byte[] requestBytes) { + int pos = indexOf(requestBytes, ExtensionJobActionRequest.UNIT_SEPARATOR); + return Arrays.copyOfRange(requestBytes, pos + 1, requestBytes.length); + } + + /** + * Takes in an object of type T that extends {@link Writeable} and converts the writeable fields to a byte array + * + * @param a class that extends writeable + * @param actionParams the action parameters to be serialized + * @throws IOException if serialization fails + * @return the byte array of the parameters + */ + private static byte[] convertParamsToBytes(T actionParams) throws IOException { + // Write all to output stream + BytesStreamOutput out = new BytesStreamOutput(); + actionParams.writeTo(out); + out.flush(); + + // convert bytes stream to byte array + return BytesReference.toBytes(out.bytes()); + } + + public void testGetJobDetailsSanity() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture inProgressFuture = new CompletableFuture<>(); + JobDetailsService jobDetailsService = new JobDetailsService( + client(), + this.clusterService, + this.indicesToListen, + this.indexToJobProviders + ); + + jobDetailsService.processJobDetails( + null, + expectedJobIndex, + expectedJobType, + expectedJobParamAction, + expectedJobRunnerAction, + expectedExtensionUniqueId, + ActionListener.wrap(indexedDocumentId -> { + + // Ensure that indexedDocumentId is nbt null + assertNotNull(indexedDocumentId); + + jobDetailsService.createJobDetailsIndex(ActionListener.wrap(response -> { + assertTrue(response); + inProgressFuture.complete(response); + }, exception -> { fail(exception.getMessage()); })); + + jobDetailsService.deleteJobDetails(this.expectedDocumentId, ActionListener.wrap(response -> { + assertTrue(response); + inProgressFuture.complete(response); + }, exception -> { fail(exception.getMessage()); })); + }, exception -> { fail(exception.getMessage()); }) + ); + + inProgressFuture.get(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS); + } + + public void testUpdateJobDetailsSanity() throws ExecutionException, InterruptedException, TimeoutException { + CompletableFuture inProgressFuture = new CompletableFuture<>(); + JobDetailsService jobDetailsService = new JobDetailsService( + client(), + this.clusterService, + this.indicesToListen, + this.indexToJobProviders + ); + + // Create initial index request + jobDetailsService.processJobDetails( + null, + expectedJobIndex, + expectedJobType, + expectedJobParamAction, + expectedJobRunnerAction, + expectedExtensionUniqueId, + ActionListener.wrap(indexedDocumentId -> { + assertNotNull(indexedDocumentId); + + // submit update request to change the job index name for the same document Id + jobDetailsService.processJobDetails( + indexedDocumentId, + updatedJobIndex, + expectedJobType, + expectedJobParamAction, + expectedJobRunnerAction, + expectedExtensionUniqueId, + ActionListener.wrap(updatedIndexedDocumentId -> { + + // Ensure that the response document ID matches the initial document ID + assertNotNull(updatedIndexedDocumentId); + assertEquals(indexedDocumentId, updatedIndexedDocumentId); + inProgressFuture.complete(updatedIndexedDocumentId); + + }, exception -> fail(exception.getMessage())) + ); + }, exception -> fail(exception.getMessage())) + ); + + inProgressFuture.get(JobDetailsService.TIME_OUT_FOR_REQUEST, TimeUnit.SECONDS); + } + + public void testDeleteJobDetailsWithOutDocumentIdCreation() throws ExecutionException, InterruptedException, TimeoutException { + JobDetailsService jobDetailsService = new JobDetailsService( + client(), + this.clusterService, + this.indicesToListen, + this.indexToJobProviders + ); + jobDetailsService.deleteJobDetails(expectedDocumentId, ActionListener.wrap(deleted -> { + assertTrue("Failed to delete JobDetails.", deleted); + }, exception -> { fail(exception.getMessage()); })); + } + + public void testDeleteNonExistingJobDetails() throws ExecutionException, InterruptedException, TimeoutException { + JobDetailsService jobDetailsService = new JobDetailsService( + client(), + this.clusterService, + this.indicesToListen, + this.indexToJobProviders + ); + jobDetailsService.createJobDetailsIndex(ActionListener.wrap(created -> { + if (created) { + jobDetailsService.deleteJobDetails(expectedDocumentId, ActionListener.wrap(deleted -> { + assertTrue("Failed to delete job details for documentId.", deleted); + }, exception -> fail(exception.getMessage()))); + } else { + fail("Failed to job details for extension"); + } + + }, exception -> fail(exception.getMessage()))); + } + + public void testUpdateIndexToJobDetails() throws ExecutionException, InterruptedException, TimeoutException { + + JobDetailsService jobDetailsService = new JobDetailsService( + client(), + this.clusterService, + this.indicesToListen, + this.indexToJobProviders + ); + JobDetails jobDetails = new JobDetails( + expectedJobIndex, + expectedJobType, + expectedJobParamAction, + expectedJobRunnerAction, + expectedExtensionUniqueId + ); + + // We'll have to invoke updateIndexToJobDetails as jobDetailsService is added as an indexOperationListener + // onIndexModule + jobDetailsService.updateIndexToJobDetails(expectedDocumentId, jobDetails); + + // Ensure indicesToListen is updated + assertTrue(this.indicesToListen.contains(jobDetails.getJobIndex())); + + // Ensure indexToJobDetails is updated + JobDetails entry = jobDetailsService.getIndexToJobDetails().get(expectedDocumentId); + assertEquals(expectedJobIndex, entry.getJobIndex()); + assertEquals(expectedJobType, entry.getJobType()); + assertEquals(expectedJobParamAction, entry.getJobParameterAction()); + assertEquals(expectedJobRunnerAction, entry.getJobRunnerAction()); + assertEquals(expectedExtensionUniqueId, entry.getExtensionUniqueId()); + + } + + public void testUpdateIndexToJobProviders() { + JobDetailsService jobDetailsService = new JobDetailsService( + client(), + this.clusterService, + this.indicesToListen, + this.indexToJobProviders + ); + JobDetails jobDetails = new JobDetails( + expectedJobIndex, + expectedJobType, + expectedJobParamAction, + expectedJobRunnerAction, + expectedExtensionUniqueId + ); + + // Create job provider for given job details entry + jobDetailsService.updateIndexToJobProviders("documentId", jobDetails); + + // Ensure that the indexToJobProviders is updated + ScheduledJobProvider provider = jobDetailsService.getIndexToJobProviders().get(jobDetails.getJobIndex()); + assertEquals(expectedJobIndex, provider.getJobIndexName()); + assertEquals(expectedJobType, provider.getJobType()); + assertNotNull(provider.getJobParser()); + assertNotNull(provider.getJobRunner()); + } + + private void compareExtensionJobParameters( + ExtensionJobParameter extensionJobParameter, + ExtensionJobParameter deserializedJobParameter + ) { + assertEquals(extensionJobParameter.getName(), deserializedJobParameter.getName()); + assertEquals(extensionJobParameter.getSchedule(), deserializedJobParameter.getSchedule()); + assertEquals(extensionJobParameter.getLastUpdateTime(), deserializedJobParameter.getLastUpdateTime()); + assertEquals(extensionJobParameter.getEnabledTime(), deserializedJobParameter.getEnabledTime()); + assertEquals(extensionJobParameter.isEnabled(), deserializedJobParameter.isEnabled()); + assertEquals(extensionJobParameter.getLockDurationSeconds(), deserializedJobParameter.getLockDurationSeconds()); + assertEquals(extensionJobParameter.getJitter(), deserializedJobParameter.getJitter()); + } + + public void testJobRunnerExtensionJobActionRequest() throws IOException { + + LockService lockService = new LockService(client(), this.clusterService); + JobExecutionContext jobExecutionContext = new JobExecutionContext( + Instant.now(), + new JobDocVersion(0, 0, 0), + lockService, + "indexName", + "id" + ); + String documentId = "documentId"; + + // Create JobRunner Request + JobRunnerRequest jobRunnerRequest = new JobRunnerRequest("placeholder", documentId, jobExecutionContext); + ExtensionActionRequest actionRequest = new ExtensionJobActionRequest("actionName", jobRunnerRequest); + + // Test ExtensionActionRequest deserialization + try (BytesStreamOutput out = new BytesStreamOutput()) { + actionRequest.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + + actionRequest = new ExtensionActionRequest(in); + + // Trim request class bytes from requestBytes + byte[] trimmedRequestBytes = trimRequestBytes(actionRequest.getRequestBytes().toByteArray()); + + // Test deserialization of action request params + JobRunnerRequest deserializedRequest = new JobRunnerRequest(trimmedRequestBytes); + + // Test deserialization of extension job parameter document Id + String deserializedDocumentId = deserializedRequest.getJobParameterDocumentId(); + assertEquals(documentId, deserializedDocumentId); + + // Test deserialization of job execution context + JobExecutionContext deserializedJobExecutionContext = deserializedRequest.getJobExecutionContext(); + assertEquals(jobExecutionContext.getJobId(), deserializedJobExecutionContext.getJobId()); + assertEquals(jobExecutionContext.getJobIndexName(), deserializedJobExecutionContext.getJobIndexName()); + assertEquals(jobExecutionContext.getExpectedExecutionTime(), deserializedJobExecutionContext.getExpectedExecutionTime()); + assertEquals(0, jobExecutionContext.getJobVersion().compareTo(deserializedJobExecutionContext.getJobVersion())); + } + } + } + + public void testJobParameterExtensionJobActionRequest() throws IOException { + + String content = "{\"test_field\":\"test\"}"; + JobDocVersion jobDocVersion = new JobDocVersion(1L, 1L, 1L); + XContentParser parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, content.getBytes()); + + // Create JobParameterRequest + JobParameterRequest jobParamRequest = new JobParameterRequest("placeholder", parser, "id", jobDocVersion); + ExtensionActionRequest actionRequest = new ExtensionJobActionRequest("actionName", jobParamRequest); + + // Test ExtensionActionRequest deserialization + try (BytesStreamOutput out = new BytesStreamOutput()) { + actionRequest.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + actionRequest = new ExtensionActionRequest(in); + + // Trim request class bytes from requestBytes + byte[] trimmedRequestBytes = trimRequestBytes(actionRequest.getRequestBytes().toByteArray()); + + // Test deserialization of action request params + JobParameterRequest deserializedRequest = new JobParameterRequest(trimmedRequestBytes); + assertEquals(jobParamRequest.getId(), deserializedRequest.getId()); + assertEquals(jobParamRequest.getJobSource(), deserializedRequest.getJobSource()); + + // Test deserialization of job doc version + assertEquals(0, jobParamRequest.getJobDocVersion().compareTo(deserializedRequest.getJobDocVersion())); + } + } + } + + public void testJobRunnerExtensionActionResponse() throws IOException { + + // Create JobRunnerResponse + JobRunnerResponse jobRunnerResponse = new JobRunnerResponse(true); + ExtensionActionResponse actionResponse = new ExtensionActionResponse(convertParamsToBytes(jobRunnerResponse)); + + // Test ExtensionActionResponse deserialization + try (BytesStreamOutput out = new BytesStreamOutput()) { + actionResponse.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + + actionResponse = new ExtensionActionResponse(in); + + // Test deserialization of action response params + JobRunnerResponse deserializedResponse = new JobRunnerResponse(actionResponse.getResponseBytes()); + assertEquals(jobRunnerResponse.getJobRunnerStatus(), deserializedResponse.getJobRunnerStatus()); + } + } + + } + + public void testJobParameterExtensionActionResponse() throws IOException { + + // Create JobParameterResponse + JobParameterResponse jobParameterResponse = new JobParameterResponse(this.extensionJobParameter); + ExtensionActionResponse actionResponse = new ExtensionActionResponse(convertParamsToBytes(jobParameterResponse)); + + // Test ExtensionActionReseponse deserialization + try (BytesStreamOutput out = new BytesStreamOutput()) { + actionResponse.writeTo(out); + out.flush(); + try (BytesStreamInput in = new BytesStreamInput(BytesReference.toBytes(out.bytes()))) { + + actionResponse = new ExtensionActionResponse(in); + + // Test deserialization of action response params + JobParameterResponse deserializedResponse = new JobParameterResponse(actionResponse.getResponseBytes()); + compareExtensionJobParameters(this.extensionJobParameter, deserializedResponse.getJobParameter()); + } + } + } + +}