Skip to content

Commit

Permalink
Feat (Core): Implement abandoned job detection and recovery (#30710)
Browse files Browse the repository at this point in the history
This pull request includes some refactoring and improvements to the job
queue management system.
The changes primarily focus on removing the job polling mechanism,
enhancing job validation, and adding an abandoned job detector. Below
are the most important changes:

### Removal of Job Polling Mechanism:

*
[`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java`](diffhunk://#diff-4c8b70e959cc00105adc8298bf8a977a3c18a6ceb5d55110bda6d023159a5e5aL13-L24):
Removed the `pollJobUpdatesIntervalMilliseconds` parameter and its
associated methods.
[[1]](diffhunk://#diff-4c8b70e959cc00105adc8298bf8a977a3c18a6ceb5d55110bda6d023159a5e5aL13-L24)
[[2]](diffhunk://#diff-4c8b70e959cc00105adc8298bf8a977a3c18a6ceb5d55110bda6d023159a5e5aL36-L44)
*
[`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java`](diffhunk://#diff-1367d122323650421a37296f7a223becc19bb3d6c1d2a965dd52ae6bbd2f5e74L19-L23):
Removed the default polling interval configuration and its usage.
[[1]](diffhunk://#diff-1367d122323650421a37296f7a223becc19bb3d6c1d2a965dd52ae6bbd2f5e74L19-L23)
[[2]](diffhunk://#diff-1367d122323650421a37296f7a223becc19bb3d6c1d2a965dd52ae6bbd2f5e74L33-R28)
*
[`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L121-R122):
Removed the polling scheduler and related methods.
[[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L121-R122)
[[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L477-L498)

### Addition of Abandoned Job Detector:

*
[`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R112):
Added `AbandonedJobDetector` as a dependency and integrated it into the
job queue lifecycle methods.
[[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R112)
[[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L142-R151)
[[3]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L170-L179)
[[4]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L221-R209)
[[5]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R230-L243)

### Enhancements to Job Validation:

*
[`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R283-R303):
Added validation for job parameters if the processor implements
`Validator` and handled validation exceptions.

### Event Handling Improvements:

*
[`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R283-R303):
Replaced direct event firing with `JobUtil.sendEvents` for better event
management.
[[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R283-R303)
[[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L543-R526)
[[3]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L668-R649)
[[4]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L721-R703)
[[5]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L743-R724)

### Miscellaneous Changes:

*
[`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L764-R743):
Adjusted the job progress update interval from 2 to 3 seconds.
  • Loading branch information
jgambarios authored Nov 21, 2024
1 parent 5082547 commit ab2ddd4
Show file tree
Hide file tree
Showing 34 changed files with 1,144 additions and 530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,13 @@ public class JobQueueConfig {
*/
private final int threadPoolSize;

// The interval in milliseconds to poll for job updates
private final int pollJobUpdatesIntervalMilliseconds;

/**
* Constructs a new JobQueueConfig
*
* @param threadPoolSize The number of threads to use for job processing.
* @param pollJobUpdatesIntervalMilliseconds The interval in milliseconds to poll for job updates.
*/
public JobQueueConfig(int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) {
public JobQueueConfig(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
this.pollJobUpdatesIntervalMilliseconds = pollJobUpdatesIntervalMilliseconds;
}

/**
Expand All @@ -33,13 +28,4 @@ public int getThreadPoolSize() {
return threadPoolSize;
}

/**
* Gets the interval in milliseconds to poll for job updates.
*
* @return The interval in milliseconds to poll for job updates.
*/
public int getPollJobUpdatesIntervalMilliseconds() {
return pollJobUpdatesIntervalMilliseconds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@ public class JobQueueConfigProducer {
"JOB_QUEUE_THREAD_POOL_SIZE", 10
);

// The interval in milliseconds to poll for job updates.
static final int DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS = Config.getIntProperty(
"JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 3000
);

/**
* Produces a JobQueueConfig object. This method is called by the CDI container to create a
* JobQueueConfig instance when it is necessary for dependency injection.
Expand All @@ -30,8 +25,7 @@ public class JobQueueConfigProducer {
@Produces
public JobQueueConfig produceJobQueueConfig() {
return new JobQueueConfig(
DEFAULT_THREAD_POOL_SIZE,
DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS
DEFAULT_THREAD_POOL_SIZE
);
}

Expand Down

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.dotcms.jobs.business.api.events;

import com.dotcms.jobs.business.job.Job;
import java.time.LocalDateTime;

/**
* Event fired when an abandoned job is detected.
*/
public class JobAbandonedEvent implements JobEvent {

private final Job job;
private final LocalDateTime detectedAt;

/**
* Constructs a new JobAbandonedEvent.
*
* @param job The job.
* @param detectedAt The timestamp when the abandoned job was detected.
*/
public JobAbandonedEvent(Job job, LocalDateTime detectedAt) {
this.job = job;
this.detectedAt = detectedAt;
}

/**
* @return The abandoned job.
*/
public Job getJob() {
return job;
}

/**
* @return The timestamp when the abandoned job was detected.
*/
public LocalDateTime getDetectedAt() {
return detectedAt;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when there is a request to cancel a job.
*/
public class JobCancelRequestEvent {
public class JobCancelRequestEvent implements JobEvent {

private final Job job;
private final LocalDateTime canceledOn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a job is canceled.
*/
public class JobCanceledEvent {
public class JobCanceledEvent implements JobEvent {

private final Job job;
private final LocalDateTime canceledAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a job is being canceled.
*/
public class JobCancellingEvent {
public class JobCancellingEvent implements JobEvent {

private final Job job;
private final LocalDateTime canceledAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a job completes successfully.
*/
public class JobCompletedEvent {
public class JobCompletedEvent implements JobEvent {

private final Job job;
private final LocalDateTime completedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a new job is created and added to the queue.
*/
public class JobCreatedEvent {
public class JobCreatedEvent implements JobEvent {

private final String jobId;
private final String queueName;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.dotcms.jobs.business.api.events;

/**
* Base marker interface for all job-related events in the job queue system. All specific job event
* types (e.g., JobStartedEvent, JobCompletedEvent, etc.) should implement this interface.
*/
public interface JobEvent {

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a job fails during processing.
*/
public class JobFailedEvent {
public class JobFailedEvent implements JobEvent {

private final Job job;
private final LocalDateTime failedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a job's progress is updated.
*/
public class JobProgressUpdatedEvent {
public class JobProgressUpdatedEvent implements JobEvent {

private final Job job;
private final LocalDateTime updatedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a job is removed from the queue because failed and is not retryable.
*/
public class JobRemovedFromQueueEvent {
public class JobRemovedFromQueueEvent implements JobEvent {

private final Job job;
private final LocalDateTime removedAt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/**
* Event fired when a job starts processing.
*/
public class JobStartedEvent {
public class JobStartedEvent implements JobEvent {

private final Job job;
private final LocalDateTime startedAt;
Expand Down
Loading

0 comments on commit ab2ddd4

Please sign in to comment.