Skip to content

Commit

Permalink
#29478 Improvements and unit testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Aug 28, 2024
1 parent 378e1b7 commit 12aee87
Show file tree
Hide file tree
Showing 5 changed files with 447 additions and 47 deletions.
26 changes: 22 additions & 4 deletions dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.error.ProcessorNotFoundException;

import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.processor.JobProcessor;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
Expand All @@ -21,6 +21,24 @@ public interface JobQueueAPI extends AutoCloseable {
*/
void start();

/**
* Checks if the JobQueueManager has been started.
*
* @return {@code true} if the JobQueueManager has been started, {@code false} otherwise.
*/
boolean isStarted();

/**
* Waits for the JobQueueManager to start up.
*
* @param timeout The maximum time to wait.
* @param unit The time unit of the timeout argument.
* @return {@code true} if the JobQueueManager has started, {@code false} if the waiting time
* elapsed before the JobQueueManager started.
* @throws InterruptedException if the current thread is interrupted while waiting.
*/
boolean awaitStart(long timeout, TimeUnit unit) throws InterruptedException;

/**
* Stops all job processing and releases resources. This method should be called when the job
* queue manager is no longer needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.DefaultProgressTracker;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.queue.JobQueue;
Expand All @@ -20,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -69,6 +71,9 @@
public class JobQueueAPIImpl implements JobQueueAPI {

private final AtomicBoolean isStarted = new AtomicBoolean(false);
private CountDownLatch startLatch;
private volatile boolean isShuttingDown = false;
private volatile boolean isClosed = false;

private final CircuitBreaker circuitBreaker;
private final JobQueue jobQueue;
Expand All @@ -84,7 +89,7 @@ public class JobQueueAPIImpl implements JobQueueAPI {
);

/**
* Constructs a new JobQueueManager with the default job queue implementation and the default
* Constructs a new JobQueueAPIImpl with the default job queue implementation and the default
* number of threads.
*/
public JobQueueAPIImpl() {
Expand All @@ -93,7 +98,7 @@ public JobQueueAPIImpl() {
}

/**
* Constructs a new JobQueueManager.
* Constructs a new JobQueueAPIImpl.
*
* @param jobQueue The JobQueue implementation to use.
* @param threadPoolSize The number of threads to use for job processing.
Expand All @@ -113,53 +118,78 @@ public JobQueueAPIImpl(JobQueue jobQueue, int threadPoolSize) {
); // 5 failures within 1 minute
}

@Override
public boolean isStarted() {
return isStarted.get() && !isClosed;
}

@Override
public boolean awaitStart(long timeout, TimeUnit unit) throws InterruptedException {
return startLatch.await(timeout, unit);
}

@Override
public void start() {

if (isClosed) {
Logger.warn(this, "Attempt to start JobQueue that has been closed. Ignoring.");
return;
}

if (isStarted.compareAndSet(false, true)) {

Logger.info(
this, "Starting JobQueueManager with " + threadPoolSize + " threads."
this, "Starting JobQueue with " + threadPoolSize + " threads."
);

startLatch = new CountDownLatch(threadPoolSize);
executorService = Executors.newFixedThreadPool(threadPoolSize);

for (int i = 0; i < threadPoolSize; i++) {
executorService.submit(this::processJobs);
executorService.submit(() -> {
startLatch.countDown();
processJobs();
});
}

Logger.info(this, "JobQueueManager has been successfully started.");
Logger.info(this, "JobQueue has been successfully started.");
} else {
Logger.warn(this,
"Attempt to start JobQueueAPIImpl that is already running. Ignoring."
"Attempt to start JobQueue that is already running. Ignoring."
);
}
}

@Override
public void close() throws Exception {

if (isClosed) {
Logger.warn(this, "JobQueue is already closed. Ignoring.");
return;
}

if (isStarted.compareAndSet(true, false)) {

Logger.info(this, "Closing JobQueueManager and stopping all job processing.");
executorService.shutdown();
isShuttingDown = true;
Logger.info(this, "Closing JobQueue and stopping all job processing.");
executorService.shutdownNow();

try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
Logger.error(this, "ExecutorService did not terminate");
}
Logger.error(this, "ExecutorService did not terminate");
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
Logger.error(this, "Interrupted while waiting for jobs to complete", e);
} finally {
isShuttingDown = false;
}

Logger.info(this, "JobQueueManager has been successfully closed.");
isClosed = true;
Logger.info(this, "JobQueue has been successfully closed.");
} else {
Logger.warn(this,
"Attempt to close JobQueueAPIImpl that is not running. Ignoring."
"Attempt to close JobQueue that is not running. Ignoring."
);
}
}
Expand All @@ -183,18 +213,18 @@ public String createJob(final String queueName, final Map<String, Object> parame

@Override
public Job getJob(final String jobId) {
return jobQueue.job(jobId);
return jobQueue.getJob(jobId);
}

@Override
public List<Job> getJobs(final int page, final int pageSize) {
return jobQueue.jobs(page, pageSize);
return jobQueue.getJobs(page, pageSize);
}

@Override
public void cancelJob(final String jobId) {

Job job = jobQueue.job(jobId);
Job job = jobQueue.getJob(jobId);
if (job != null) {

final var processor = processors.get(job.queueName());
Expand Down Expand Up @@ -226,7 +256,7 @@ public void cancelJob(final String jobId) {
@Override
public void watchJob(final String jobId, final Consumer<Job> watcher) {
jobWatchers.computeIfAbsent(jobId, k -> new CopyOnWriteArrayList<>()).add(watcher);
Job currentJob = jobQueue.job(jobId);
Job currentJob = jobQueue.getJob(jobId);
watcher.accept(currentJob);
}

Expand Down Expand Up @@ -286,7 +316,7 @@ private void updateJobProgress(final Job job, final ProgressTracker progressTrac
*/
private void processJobs() {

while (!Thread.currentThread().isInterrupted()) {
while (!Thread.currentThread().isInterrupted() && !isShuttingDown) {

if (isCircuitBreakerOpen()) {
continue;
Expand Down Expand Up @@ -318,15 +348,20 @@ private void processJobs() {
* @return true if the circuit breaker is open, false otherwise.
*/
private boolean isCircuitBreakerOpen() {

if (!circuitBreaker.allowRequest()) {

Logger.warn(this, "Circuit breaker is open. Pausing job processing for a while.");

try {
Thread.sleep(5000); // Wait for 5 seconds before checking again
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

return true;
}

return false;
}

Expand All @@ -336,10 +371,12 @@ private boolean isCircuitBreakerOpen() {
* @return The next job to be processed, or null if no job is available.
*/
private Job fetchNextJob() {

Job job = jobQueue.nextPendingJob();
if (job == null) {
job = jobQueue.nextFailedJob();
}

return job;
}

Expand Down Expand Up @@ -375,7 +412,9 @@ private void handleFailedJobWithRetry(final Job job) {
if (now >= nextRetryTime) {
processJob(job);
} else {
jobQueue.updateJobStatus(job); // Put the job back in the queue for later retry
Job updatedJob = job.withState(JobState.PENDING);
jobQueue.updateJobStatus(updatedJob); // Put the job back in the queue for later retry
notifyJobWatchers(updatedJob);
}
}

Expand Down Expand Up @@ -407,10 +446,15 @@ private void processJob(final Job job) {

try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) {

ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService();
final ProgressTracker progressTracker = processor.progressTracker(runningJob);
final ProgressTracker progressTracker;
if (processor.progressTracker(runningJob) != null) {
progressTracker = processor.progressTracker(runningJob);
} else {
progressTracker = new DefaultProgressTracker();
}

// Start a separate thread to periodically update and persist progress
ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService();
progressUpdater.scheduleAtFixedRate(() ->
updateJobProgress(runningJob, progressTracker), 0, 1, TimeUnit.SECONDS
);
Expand All @@ -433,6 +477,7 @@ private void processJob(final Job job) {
final var errorDetail = ErrorDetail.builder()
.message("Job processing failed")
.exception(e)
.exceptionClass(e.getClass().getName())
.processingStage("Job execution")
.timestamp(LocalDateTime.now())
.build();
Expand Down Expand Up @@ -463,8 +508,7 @@ private void handleJobFailure(final Job job, final ErrorDetail errorDetail) {

Job updatedJob;
if (canRetry(job)) {
updatedJob = job.incrementRetry().
markAsFailed(errorDetail);
updatedJob = job.incrementRetry().markAsFailed(errorDetail);
} else {
updatedJob = job.markAsFailed(errorDetail);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,6 @@ default Job withState(JobState newState) {
.build();
}

/**
* Creates a new Job with an updated result.
*
* @param newResult The new result to set.
* @return A new Job instance with the updated result.
*/
default Job withResult(JobResult newResult) {
return AbstractJob.builder().from(this)
.result(Optional.of(newResult))
.updatedAt(LocalDateTime.now())
.build();
}

/**
* Creates a new Job marked as completed.
*
Expand All @@ -105,6 +92,7 @@ default Job withResult(JobResult newResult) {
default Job markAsCompleted() {
return AbstractJob.builder().from(this)
.state(JobState.COMPLETED)
.result(JobResult.SUCCESS)
.completedAt(Optional.of(LocalDateTime.now()))
.updatedAt(LocalDateTime.now())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface JobQueue {
* @param jobId The ID of the job to retrieve.
* @return The job with the specified ID, or null if not found.
*/
Job job(String jobId);
Job getJob(String jobId);

/**
* Retrieves a list of active jobs for a specific queue.
Expand All @@ -36,7 +36,7 @@ public interface JobQueue {
* @param pageSize The number of items per page.
* @return A list of active jobs.
*/
List<Job> activeJobs(String queueName, int page, int pageSize);
List<Job> getActiveJobs(String queueName, int page, int pageSize);

/**
* Retrieves a list of completed jobs for a specific queue within a date range.
Expand All @@ -48,7 +48,7 @@ public interface JobQueue {
* @param pageSize The number of items per page.
* @return A list of completed jobs.
*/
List<Job> completedJobs(String queueName, LocalDateTime startDate, LocalDateTime endDate,
List<Job> getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateTime endDate,
int page, int pageSize);

/**
Expand All @@ -58,7 +58,7 @@ List<Job> completedJobs(String queueName, LocalDateTime startDate, LocalDateTime
* @param pageSize The number of items per page.
* @return A list of all jobs.
*/
List<Job> jobs(int page, int pageSize);
List<Job> getJobs(int page, int pageSize);

/**
* Retrieves a list of failed jobs.
Expand All @@ -67,7 +67,7 @@ List<Job> completedJobs(String queueName, LocalDateTime startDate, LocalDateTime
* @param pageSize The number of items per page.
* @return A list of failed jobs.
*/
List<Job> failedJobs(int page, int pageSize);
List<Job> getFailedJobs(int page, int pageSize);

/**
* Updates the status of a job.
Expand Down
Loading

0 comments on commit 12aee87

Please sign in to comment.