Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#30367 Refactor job system and enhance SSE monitoring. #30816

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.api.events.JobWatcher;
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryStrategy;
Expand Down Expand Up @@ -128,6 +129,16 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
*/
JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of successful jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of successful jobs and pagination information.
* @throws DotDataException if there's an error fetching the jobs
*/
JobPaginatedResult getSuccessfulJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of canceled jobs
*
Expand All @@ -148,6 +159,16 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
*/
JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException;

/**
* Retrieves a list of abandoned jobs
*
* @param page The page number
* @param pageSize The number of jobs per page
* @return A result object containing the list of abandoned jobs and pagination information.
* @throws DotDataException if there's an error fetching the jobs
*/
JobPaginatedResult getAbandonedJobs(int page, int pageSize) throws DotDataException;

/**
* Cancels a job.
*
Expand All @@ -161,8 +182,24 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize)
*
* @param jobId The ID of the job to watch
* @param watcher The consumer to be notified of job updates
* @return A JobWatcher instance representing the registered watcher
*/
JobWatcher watchJob(String jobId, Consumer<Job> watcher);

/**
* Removes a watcher for a specific job.
*
* @param jobId The ID of the job to unwatch
* @param watcher The watcher to remove
*/
void removeJobWatcher(String jobId, JobWatcher watcher);

/**
* Removes all watchers for a specific job.
*
* @param jobId The ID of the job
*/
void watchJob(String jobId, Consumer<Job> watcher);
void removeAllJobWatchers(String jobId);

/**
* Sets a retry strategy for a specific queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
import com.dotcms.business.CloseDBIfOpened;
import com.dotcms.business.WrapInTransaction;
import com.dotcms.jobs.business.api.events.JobCancelRequestEvent;
import com.dotcms.jobs.business.api.events.JobCanceledEvent;
import com.dotcms.jobs.business.api.events.JobCancellingEvent;
import com.dotcms.jobs.business.api.events.JobCompletedEvent;
import com.dotcms.jobs.business.api.events.JobCreatedEvent;
import com.dotcms.jobs.business.api.events.JobFailedEvent;
import com.dotcms.jobs.business.api.events.JobProgressUpdatedEvent;
import com.dotcms.jobs.business.api.events.JobRemovedFromQueueEvent;
import com.dotcms.jobs.business.api.events.JobStartedEvent;
import com.dotcms.jobs.business.api.events.JobWatcher;
import com.dotcms.jobs.business.api.events.RealTimeJobMonitor;
import com.dotcms.jobs.business.detector.AbandonedJobDetector;
import com.dotcms.jobs.business.error.CircuitBreaker;
Expand Down Expand Up @@ -360,6 +359,16 @@ public JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDat
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getSuccessfulJobs(int page, int pageSize) throws DotDataException {
jgambarios marked this conversation as resolved.
Show resolved Hide resolved
try {
return jobQueue.getSuccessfulJobs(page, pageSize);
jgambarios marked this conversation as resolved.
Show resolved Hide resolved
} catch (JobQueueDataException e) {
throw new DotDataException("Error fetching successful jobs", e);
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException {
Expand All @@ -380,12 +389,22 @@ public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataEx
}
}

@CloseDBIfOpened
@Override
public JobPaginatedResult getAbandonedJobs(int page, int pageSize) throws DotDataException {
try {
return jobQueue.getAbandonedJobs(page, pageSize);
} catch (JobQueueDataException e) {
throw new DotDataException("Error fetching abandoned jobs", e);
}
}

@Override
public void cancelJob(final String jobId) throws DotDataException {

final Job job = getJob(jobId);

if (job.state() == JobState.PENDING || job.state() == JobState.RUNNING) {
if (isInCancelableState(job)) {
handleJobCancelRequest(job);
} else {
Logger.warn(this, "Job " + job.id() + " is not in a cancellable state. "
Expand All @@ -408,9 +427,7 @@ void onCancelRequestJob(final JobCancelRequestEvent event) {
try {

final var job = getJob(event.getJob().id());
if (job.state() == JobState.PENDING
|| job.state() == JobState.RUNNING
|| job.state() == JobState.CANCEL_REQUESTED) {
if (isInCancelableState(job) || job.state() == JobState.CANCEL_REQUESTED) {

final Optional<JobProcessor> instance = getInstance(job.id());
if (instance.isPresent()) {
Expand Down Expand Up @@ -439,8 +456,18 @@ void onCancelRequestJob(final JobCancelRequestEvent event) {
}

@Override
public void watchJob(final String jobId, final Consumer<Job> watcher) {
realTimeJobMonitor.registerWatcher(jobId, watcher);
public JobWatcher watchJob(final String jobId, final Consumer<Job> watcher) {
return realTimeJobMonitor.registerWatcher(jobId, watcher);
}

@Override
public void removeJobWatcher(final String jobId, final JobWatcher watcher) {
realTimeJobMonitor.removeWatcher(jobId, watcher);
}

@Override
public void removeAllJobWatchers(final String jobId) {
realTimeJobMonitor.removeAllWatchers(jobId);
}

@Override
Expand Down Expand Up @@ -689,21 +716,24 @@ private boolean isReadyForRetry(Job job) throws DotDataException {

/**
* Handles a failed job that cannot be retried. This method logs a warning about the
* non-retryable job and removes it from the active queue.
* non-retryable job, removes it from the active queue, and marks it as failed permanently.
*
* @param job The failed job that cannot be retried.
*/
private void handleNonRetryableFailedJob(final Job job) throws DotDataException {

Logger.warn(this, "Job " + job.id() + " has failed and cannot be retried.");

try {
jobQueue.removeJobFromQueue(job.id());
// Send the job removed from queue events
JobUtil.sendEvents(job, JobRemovedFromQueueEvent::new);
} catch (JobQueueDataException e) {
throw new DotDataException("Error removing failed job", e);
Job finishedJob = job.markAsFailedPermanently();
if (job.state() == JobState.ABANDONED) {
finishedJob = job.markAsAbandonedPermanently();
}

// Giving the job a final state
updateJobStatus(finishedJob);

// Send the job completion events
JobUtil.sendEvents(finishedJob, JobCompletedEvent::new);
}

/**
Expand Down Expand Up @@ -839,17 +869,17 @@ private void handleJobCompletion(final Job job, final JobProcessor processor)
final float progress = getJobProgress(job);

try {

Job completedJob = job.markAsSuccessful(jobResult).withProgress(progress);

if (jobQueue.hasJobBeenInState(job.id(), JobState.CANCEL_REQUESTED, JobState.CANCELLING)) {
Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress);
updateJobStatus(canceledJob);
// Send the job canceled events
JobUtil.sendEvents(canceledJob, JobCanceledEvent::new);
} else {
final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress);
updateJobStatus(completedJob);
// Send the job completed events
JobUtil.sendEvents(completedJob, JobCompletedEvent::new);
completedJob = job.markAsCanceled(jobResult).withProgress(progress);
}

updateJobStatus(completedJob);
// Send the job completed events
JobUtil.sendEvents(completedJob, JobCompletedEvent::new);

} catch (JobQueueDataException e) {
final var errorMessage = "Error updating job status";
Logger.error(this, errorMessage, e);
Expand Down Expand Up @@ -1104,6 +1134,18 @@ private int incrementAndResetEmptyQueueCount(
return emptyQueueCount;
}

/**
* Verifies if a job state is in a cancellable state.
*
* @param job The job to check.
* @return {@code true} if the job is in a cancellable state, {@code false} otherwise.
*/
private boolean isInCancelableState(final Job job) {
return job.state() == JobState.PENDING || job.state() == JobState.RUNNING
|| job.state() == JobState.FAILED || job.state() == JobState.ABANDONED
|| job.state() == JobState.ABANDONED_PERMANENTLY;
}

/**
* A wrapper class that makes ScheduledExecutorService auto-closeable. This class is designed to
* be used with try-with-resources to ensure that the ScheduledExecutorService is properly shut
Expand Down

This file was deleted.

This file was deleted.

Loading
Loading