Skip to content

Commit

Permalink
#29478 Improvements and unit testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Aug 29, 2024
1 parent 104a851 commit c4303ac
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ private void updateJobProgress(final Job job, final ProgressTracker progressTrac
*/
private void processJobs() {

int emptyQueueCount = 0;

while (!Thread.currentThread().isInterrupted() && !isShuttingDown) {

if (isCircuitBreakerOpen()) {
Expand All @@ -324,14 +326,17 @@ private void processJobs() {

try {

Job job = fetchNextJob();
Job job = jobQueue.nextJob();
if (job != null) {
processJobWithRetry(job);
emptyQueueCount = 0;
} else {
// If no jobs were found, wait for a short time before checking again
Thread.sleep(1000);
// Implement exponential backoff when queue is repeatedly empty
long sleepTime = Math.min(1000 * (long) Math.pow(2, emptyQueueCount), 30000);
Thread.sleep(sleepTime);
emptyQueueCount++;
}

} catch (InterruptedException e) {
Logger.error(this, "Job processing thread interrupted: " + e.getMessage(), e);
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -365,21 +370,6 @@ private boolean isCircuitBreakerOpen() {
return false;
}

/**
* Fetches the next job to be processed, either pending or failed.
*
* @return The next job to be processed, or null if no job is available.
*/
private Job fetchNextJob() {

Job job = jobQueue.nextPendingJob();
if (job == null) {
job = jobQueue.nextFailedJob();
}

return job;
}

/**
* Processes a job, handling retries if necessary. This method determines whether a job should
* be processed immediately, retried later, or handled as a non-retryable failure.
Expand All @@ -389,8 +379,19 @@ private Job fetchNextJob() {
private void processJobWithRetry(final Job job) {

if (job.state() == JobState.FAILED) {

if (canRetry(job)) {
handleFailedJobWithRetry(job);

if (isReadyForRetry(job)) {
Logger.warn(this, "Retrying job " + job.id() + " after failure.");
processJob(job.incrementRetry());
} else {

Logger.debug(this, "Job " + job.id() + " is not ready for retry, "
+ "putting back in queue.");
// Put the job back in the queue for later retry
jobQueue.putJobBackInQueue(job);
}
} else {
handleNonRetryableFailedJob(job);
}
Expand All @@ -400,22 +401,14 @@ private void processJobWithRetry(final Job job) {
}

/**
* Handles a failed job that is eligible for retry. If it's time for the next retry attempt, the
* job is processed; otherwise, it's updated in the queue for a future retry.
*
* @param job The failed job that can be retried.
* Determines whether a job is ready for retry based on its retry strategy.
* @param job The job to check for retry eligibility.
* @return {@code true} if the job is ready for retry, {@code false} otherwise.
*/
private void handleFailedJobWithRetry(final Job job) {

private boolean isReadyForRetry(Job job) {
long now = System.currentTimeMillis();
long nextRetryTime = job.lastRetryTimestamp() + nextRetryDelay(job);
if (now >= nextRetryTime) {
processJob(job);
} else {
Job updatedJob = job.withState(JobState.PENDING);
jobQueue.updateJobStatus(updatedJob); // Put the job back in the queue for later retry
notifyJobWatchers(updatedJob);
}
return now >= nextRetryTime;
}

/**
Expand Down Expand Up @@ -491,29 +484,20 @@ private void processJob(final Job job) {
.processingStage("Processor selection")
.timestamp(LocalDateTime.now())
.build();
Job failedJob = job.markAsFailed(errorDetail);
jobQueue.updateJobStatus(failedJob);

notifyJobWatchers(failedJob);
handleJobFailure(job, errorDetail);
}
}

/**
* Handles the failure of a job, including retry logic.
* Handles the failure of a job
*
* @param job The job that failed.
* @param errorDetail The details of the error that caused the failure.
*/
private void handleJobFailure(final Job job, final ErrorDetail errorDetail) {

Job updatedJob;
if (canRetry(job)) {
updatedJob = job.incrementRetry().markAsFailed(errorDetail);
} else {
updatedJob = job.markAsFailed(errorDetail);
}
jobQueue.updateJobStatus(updatedJob);
notifyJobWatchers(updatedJob);
final Job failedJob = job.markAsFailed(errorDetail);
jobQueue.updateJobStatus(failedJob);
notifyJobWatchers(failedJob);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@ List<Job> getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateT
void updateJobStatus(Job job);

/**
* Retrieves the next pending job.
* Puts a job back in the queue for retry.
*
* @return The next pending job, or null if no pending jobs are available.
* @param job The job to retry.
*/
Job nextPendingJob();
void putJobBackInQueue(Job job);

/**
* Retrieves the next failed job.
* Retrieves the next job in the queue.
*
* @return The next failed job, or null if no failed jobs are available.
* @return The next job in the queue, or null if the queue is empty.
*/
Job nextFailedJob();
Job nextJob();

/**
* Updates the progress of a job.
Expand Down
4 changes: 3 additions & 1 deletion dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.dotcms.integritycheckers.ContentPageIntegrityCheckerTest;
import com.dotcms.integritycheckers.HostIntegrityCheckerTest;
import com.dotcms.integritycheckers.IntegrityUtilTest;
import com.dotcms.jobs.business.api.JobQueueAPITest;
import com.dotcms.junit.MainBaseSuite;
import com.dotcms.mail.MailAPIImplTest;
import com.dotcms.publisher.bundle.business.BundleAPITest;
Expand Down Expand Up @@ -311,7 +312,8 @@
Task240530AddDotAIPortletToLayoutTest.class,
EmbeddingContentListenerTest.class,
Task240606AddVariableColumnToWorkflowTest.class,
OpenAIContentPromptActionletTest.class
OpenAIContentPromptActionletTest.class,
JobQueueAPITest.class
})

public class MainSuite2b {
Expand Down
Loading

0 comments on commit c4303ac

Please sign in to comment.