diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java index fb785d60d301..b7a2bbc03fd0 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java @@ -1,13 +1,13 @@ package com.dotcms.jobs.business.api; -import com.dotcms.jobs.business.processor.JobProcessor; -import com.dotcms.jobs.business.error.RetryStrategy; -import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.ProcessorNotFoundException; - +import com.dotcms.jobs.business.error.RetryStrategy; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.processor.JobProcessor; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -21,6 +21,24 @@ public interface JobQueueAPI extends AutoCloseable { */ void start(); + /** + * Checks if the JobQueueManager has been started. + * + * @return {@code true} if the JobQueueManager has been started, {@code false} otherwise. + */ + boolean isStarted(); + + /** + * Waits for the JobQueueManager to start up. + * + * @param timeout The maximum time to wait. + * @param unit The time unit of the timeout argument. + * @return {@code true} if the JobQueueManager has started, {@code false} if the waiting time + * elapsed before the JobQueueManager started. + * @throws InterruptedException if the current thread is interrupted while waiting. + */ + boolean awaitStart(long timeout, TimeUnit unit) throws InterruptedException; + /** * Stops all job processing and releases resources. This method should be called when the job * queue manager is no longer needed. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java index 5fd85887773f..aae579e56769 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java @@ -8,6 +8,7 @@ import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; +import com.dotcms.jobs.business.processor.DefaultProgressTracker; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.jobs.business.queue.JobQueue; @@ -20,6 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -69,6 +71,9 @@ public class JobQueueAPIImpl implements JobQueueAPI { private final AtomicBoolean isStarted = new AtomicBoolean(false); + private CountDownLatch startLatch; + private volatile boolean isShuttingDown = false; + private volatile boolean isClosed = false; private final CircuitBreaker circuitBreaker; private final JobQueue jobQueue; @@ -84,7 +89,7 @@ public class JobQueueAPIImpl implements JobQueueAPI { ); /** - * Constructs a new JobQueueManager with the default job queue implementation and the default + * Constructs a new JobQueueAPIImpl with the default job queue implementation and the default * number of threads. */ public JobQueueAPIImpl() { @@ -93,7 +98,7 @@ public JobQueueAPIImpl() { } /** - * Constructs a new JobQueueManager. + * Constructs a new JobQueueAPIImpl. * * @param jobQueue The JobQueue implementation to use. * @param threadPoolSize The number of threads to use for job processing. @@ -113,24 +118,44 @@ public JobQueueAPIImpl(JobQueue jobQueue, int threadPoolSize) { ); // 5 failures within 1 minute } + @Override + public boolean isStarted() { + return isStarted.get() && !isClosed; + } + + @Override + public boolean awaitStart(long timeout, TimeUnit unit) throws InterruptedException { + return startLatch.await(timeout, unit); + } + @Override public void start() { + if (isClosed) { + Logger.warn(this, "Attempt to start JobQueue that has been closed. Ignoring."); + return; + } + if (isStarted.compareAndSet(false, true)) { Logger.info( - this, "Starting JobQueueManager with " + threadPoolSize + " threads." + this, "Starting JobQueue with " + threadPoolSize + " threads." ); + startLatch = new CountDownLatch(threadPoolSize); executorService = Executors.newFixedThreadPool(threadPoolSize); + for (int i = 0; i < threadPoolSize; i++) { - executorService.submit(this::processJobs); + executorService.submit(() -> { + startLatch.countDown(); + processJobs(); + }); } - Logger.info(this, "JobQueueManager has been successfully started."); + Logger.info(this, "JobQueue has been successfully started."); } else { Logger.warn(this, - "Attempt to start JobQueueAPIImpl that is already running. Ignoring." + "Attempt to start JobQueue that is already running. Ignoring." ); } } @@ -138,28 +163,33 @@ public void start() { @Override public void close() throws Exception { + if (isClosed) { + Logger.warn(this, "JobQueue is already closed. Ignoring."); + return; + } + if (isStarted.compareAndSet(true, false)) { - Logger.info(this, "Closing JobQueueManager and stopping all job processing."); - executorService.shutdown(); + isShuttingDown = true; + Logger.info(this, "Closing JobQueue and stopping all job processing."); + executorService.shutdownNow(); try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - executorService.shutdownNow(); - if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { - Logger.error(this, "ExecutorService did not terminate"); - } + Logger.error(this, "ExecutorService did not terminate"); } } catch (InterruptedException e) { - executorService.shutdownNow(); Thread.currentThread().interrupt(); Logger.error(this, "Interrupted while waiting for jobs to complete", e); + } finally { + isShuttingDown = false; } - Logger.info(this, "JobQueueManager has been successfully closed."); + isClosed = true; + Logger.info(this, "JobQueue has been successfully closed."); } else { Logger.warn(this, - "Attempt to close JobQueueAPIImpl that is not running. Ignoring." + "Attempt to close JobQueue that is not running. Ignoring." ); } } @@ -183,18 +213,18 @@ public String createJob(final String queueName, final Map parame @Override public Job getJob(final String jobId) { - return jobQueue.job(jobId); + return jobQueue.getJob(jobId); } @Override public List getJobs(final int page, final int pageSize) { - return jobQueue.jobs(page, pageSize); + return jobQueue.getJobs(page, pageSize); } @Override public void cancelJob(final String jobId) { - Job job = jobQueue.job(jobId); + Job job = jobQueue.getJob(jobId); if (job != null) { final var processor = processors.get(job.queueName()); @@ -226,7 +256,7 @@ public void cancelJob(final String jobId) { @Override public void watchJob(final String jobId, final Consumer watcher) { jobWatchers.computeIfAbsent(jobId, k -> new CopyOnWriteArrayList<>()).add(watcher); - Job currentJob = jobQueue.job(jobId); + Job currentJob = jobQueue.getJob(jobId); watcher.accept(currentJob); } @@ -286,7 +316,7 @@ private void updateJobProgress(final Job job, final ProgressTracker progressTrac */ private void processJobs() { - while (!Thread.currentThread().isInterrupted()) { + while (!Thread.currentThread().isInterrupted() && !isShuttingDown) { if (isCircuitBreakerOpen()) { continue; @@ -318,15 +348,20 @@ private void processJobs() { * @return true if the circuit breaker is open, false otherwise. */ private boolean isCircuitBreakerOpen() { + if (!circuitBreaker.allowRequest()) { + Logger.warn(this, "Circuit breaker is open. Pausing job processing for a while."); + try { Thread.sleep(5000); // Wait for 5 seconds before checking again } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + return true; } + return false; } @@ -336,10 +371,12 @@ private boolean isCircuitBreakerOpen() { * @return The next job to be processed, or null if no job is available. */ private Job fetchNextJob() { + Job job = jobQueue.nextPendingJob(); if (job == null) { job = jobQueue.nextFailedJob(); } + return job; } @@ -375,7 +412,9 @@ private void handleFailedJobWithRetry(final Job job) { if (now >= nextRetryTime) { processJob(job); } else { - jobQueue.updateJobStatus(job); // Put the job back in the queue for later retry + Job updatedJob = job.withState(JobState.PENDING); + jobQueue.updateJobStatus(updatedJob); // Put the job back in the queue for later retry + notifyJobWatchers(updatedJob); } } @@ -407,10 +446,15 @@ private void processJob(final Job job) { try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) { - ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService(); - final ProgressTracker progressTracker = processor.progressTracker(runningJob); + final ProgressTracker progressTracker; + if (processor.progressTracker(runningJob) != null) { + progressTracker = processor.progressTracker(runningJob); + } else { + progressTracker = new DefaultProgressTracker(); + } // Start a separate thread to periodically update and persist progress + ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService(); progressUpdater.scheduleAtFixedRate(() -> updateJobProgress(runningJob, progressTracker), 0, 1, TimeUnit.SECONDS ); @@ -433,6 +477,7 @@ private void processJob(final Job job) { final var errorDetail = ErrorDetail.builder() .message("Job processing failed") .exception(e) + .exceptionClass(e.getClass().getName()) .processingStage("Job execution") .timestamp(LocalDateTime.now()) .build(); @@ -463,8 +508,7 @@ private void handleJobFailure(final Job job, final ErrorDetail errorDetail) { Job updatedJob; if (canRetry(job)) { - updatedJob = job.incrementRetry(). - markAsFailed(errorDetail); + updatedJob = job.incrementRetry().markAsFailed(errorDetail); } else { updatedJob = job.markAsFailed(errorDetail); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java index cb9817fa6dfc..1316d8f1cae1 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java @@ -84,19 +84,6 @@ default Job withState(JobState newState) { .build(); } - /** - * Creates a new Job with an updated result. - * - * @param newResult The new result to set. - * @return A new Job instance with the updated result. - */ - default Job withResult(JobResult newResult) { - return AbstractJob.builder().from(this) - .result(Optional.of(newResult)) - .updatedAt(LocalDateTime.now()) - .build(); - } - /** * Creates a new Job marked as completed. * @@ -105,6 +92,7 @@ default Job withResult(JobResult newResult) { default Job markAsCompleted() { return AbstractJob.builder().from(this) .state(JobState.COMPLETED) + .result(JobResult.SUCCESS) .completedAt(Optional.of(LocalDateTime.now())) .updatedAt(LocalDateTime.now()) .build(); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index 58fe0419a8ff..079354c90de1 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -26,7 +26,7 @@ public interface JobQueue { * @param jobId The ID of the job to retrieve. * @return The job with the specified ID, or null if not found. */ - Job job(String jobId); + Job getJob(String jobId); /** * Retrieves a list of active jobs for a specific queue. @@ -36,7 +36,7 @@ public interface JobQueue { * @param pageSize The number of items per page. * @return A list of active jobs. */ - List activeJobs(String queueName, int page, int pageSize); + List getActiveJobs(String queueName, int page, int pageSize); /** * Retrieves a list of completed jobs for a specific queue within a date range. @@ -48,7 +48,7 @@ public interface JobQueue { * @param pageSize The number of items per page. * @return A list of completed jobs. */ - List completedJobs(String queueName, LocalDateTime startDate, LocalDateTime endDate, + List getCompletedJobs(String queueName, LocalDateTime startDate, LocalDateTime endDate, int page, int pageSize); /** @@ -58,7 +58,7 @@ List completedJobs(String queueName, LocalDateTime startDate, LocalDateTime * @param pageSize The number of items per page. * @return A list of all jobs. */ - List jobs(int page, int pageSize); + List getJobs(int page, int pageSize); /** * Retrieves a list of failed jobs. @@ -67,7 +67,7 @@ List completedJobs(String queueName, LocalDateTime startDate, LocalDateTime * @param pageSize The number of items per page. * @return A list of failed jobs. */ - List failedJobs(int page, int pageSize); + List getFailedJobs(int page, int pageSize); /** * Updates the status of a job. diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java new file mode 100644 index 000000000000..52c6190cb8e3 --- /dev/null +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java @@ -0,0 +1,350 @@ +package com.dotcms.jobs.business.api; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyMap; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.dotcms.jobs.business.error.RetryStrategy; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobState; +import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotcms.jobs.business.processor.ProgressTracker; +import com.dotcms.jobs.business.queue.JobQueue; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class JobQueueAPITest { + + private JobQueue mockJobQueue; + + private JobProcessor mockJobProcessor; + + private RetryStrategy mockRetryStrategy; + + private JobQueueAPIImpl jobQueueAPI; + + @Before + public void setUp() { + + mockJobQueue = mock(JobQueue.class); + mockJobProcessor = mock(JobProcessor.class); + mockRetryStrategy = mock(RetryStrategy.class); + + jobQueueAPI = new JobQueueAPIImpl(mockJobQueue, 3); + jobQueueAPI.registerProcessor("testQueue", mockJobProcessor); + jobQueueAPI.setRetryStrategy("testQueue", mockRetryStrategy); + } + + @Test + public void test_createJob() { + + Map parameters = new HashMap<>(); + when(mockJobQueue.addJob(anyString(), anyMap())).thenReturn("job123"); + + // Creating a job + String jobId = jobQueueAPI.createJob("testQueue", parameters); + + assertEquals("job123", jobId); + verify(mockJobQueue).addJob("testQueue", parameters); + } + + @Test + public void test_getJob() { + + Job mockJob = mock(Job.class); + when(mockJobQueue.getJob("job123")).thenReturn(mockJob); + + // Getting a job + Job result = jobQueueAPI.getJob("job123"); + + assertEquals(mockJob, result); + verify(mockJobQueue).getJob("job123"); + } + + @Test + public void test_getJobs() { + + // Prepare test data + Job job1 = mock(Job.class); + Job job2 = mock(Job.class); + List expectedJobs = Arrays.asList(job1, job2); + + // Mock the behavior of jobQueue.getJobs + when(mockJobQueue.getJobs(1, 10)).thenReturn(expectedJobs); + + // Call the method under test + List actualJobs = jobQueueAPI.getJobs(1, 10); + + // Verify the results + assertEquals(expectedJobs, actualJobs); + verify(mockJobQueue).getJobs(1, 10); + } + + @Test + public void testCancelJob() { + + Job mockJob = mock(Job.class); + when(mockJobQueue.getJob("job123")).thenReturn(mockJob); + when(mockJob.queueName()).thenReturn("testQueue"); + when(mockJob.id()).thenReturn("job123"); + when(mockJob.withState(any())).thenReturn(mockJob); + + when(mockJobProcessor.canCancel(mockJob)).thenReturn(true); + + jobQueueAPI.cancelJob("job123"); + + verify(mockJobProcessor).cancel(mockJob); + verify(mockJobQueue).updateJobStatus(any(Job.class)); + } + + @Test + public void test_start() throws InterruptedException { + + assertFalse(jobQueueAPI.isStarted()); + + jobQueueAPI.start(); + + assertTrue(jobQueueAPI.isStarted()); + assertTrue(jobQueueAPI.awaitStart(5, TimeUnit.SECONDS)); + + // Verify that jobs are being processed + verify(mockJobQueue, timeout(5000).atLeastOnce()).nextPendingJob(); + } + + @Test + public void test_close() throws Exception { + + // Start the JobQueueAPI + jobQueueAPI.start(); + assertTrue(jobQueueAPI.isStarted()); + assertTrue(jobQueueAPI.awaitStart(5, TimeUnit.SECONDS)); + + AtomicInteger jobCheckCount = new AtomicInteger(0); + when(mockJobQueue.nextPendingJob()).thenAnswer(invocation -> { + jobCheckCount.incrementAndGet(); + return null; + }); + + // Close the JobQueueAPI + jobQueueAPI.close(); + + // Wait for the JobQueueAPI to be fully stopped + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> !jobQueueAPI.isStarted()); + + // Verify that no more jobs are being processed + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> { + int count = jobCheckCount.get(); + Thread.sleep(500); // Wait a bit to see if count increases + return jobCheckCount.get() + == count; // If count hasn't increased, job processing has stopped + }); + + // Try to start a new job and verify it's not processed + Job mockJob = mock(Job.class); + when(mockJobQueue.nextPendingJob()).thenReturn(mockJob); + + // Wait and verify that the job was not processed + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> verify(mockJobProcessor, never()).process(any(Job.class))); + + // Verify that we can't start the JobQueueAPIImpl again after closing + jobQueueAPI.start(); + assertFalse(jobQueueAPI.isStarted()); + + // Verify that close() can be called multiple times without error + jobQueueAPI.close(); + assertFalse(jobQueueAPI.isStarted()); + } + + @Test + public void test_watchJob() throws Exception { + + // Prepare test data + String jobId = "testJobId"; + Job initialJob = mock(Job.class); + when(initialJob.id()).thenReturn(jobId); + when(initialJob.queueName()).thenReturn("testQueue"); + when(initialJob.state()).thenReturn(JobState.PENDING); + + // Mock behavior for job state changes + Job runningJob = mock(Job.class); + when(runningJob.id()).thenReturn(jobId); + when(runningJob.queueName()).thenReturn("testQueue"); + when(runningJob.state()).thenReturn(JobState.RUNNING); + when(initialJob.withState(JobState.RUNNING)).thenReturn(runningJob); + + Job completedJob = mock(Job.class); + when(completedJob.id()).thenReturn(jobId); + when(completedJob.queueName()).thenReturn("testQueue"); + when(completedJob.state()).thenReturn(JobState.COMPLETED); + when(runningJob.markAsCompleted()).thenReturn(completedJob); + + // Mock JobQueue behavior + when(mockJobQueue.getJob(jobId)).thenReturn(initialJob); + when(mockJobQueue.nextPendingJob()).thenReturn(initialJob).thenReturn(null); + + // Mock JobProcessor behavior + ProgressTracker mockProgressTracker = mock(ProgressTracker.class); + when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + + // Create a list to capture job states + List capturedStates = Collections.synchronizedList(new ArrayList<>()); + + // Create a test watcher + Consumer testWatcher = job -> { + assertNotNull(job); + assertEquals(jobId, job.id()); + capturedStates.add(job.state()); + }; + + // Start the JobQueueAPI + jobQueueAPI.start(); + + // Register the watcher + jobQueueAPI.watchJob(jobId, testWatcher); + + // Wait for job processing to complete + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> capturedStates.contains(JobState.COMPLETED)); + + // Verify job processing + verify(mockJobQueue, timeout(5000)).updateJobStatus(runningJob); + verify(mockJobProcessor, timeout(5000)).process(runningJob); + verify(mockJobQueue, timeout(5000)).updateJobStatus(completedJob); + + // Verify watcher received all job states + assertTrue(capturedStates.contains(JobState.PENDING)); + assertTrue(capturedStates.contains(JobState.RUNNING)); + assertTrue(capturedStates.contains(JobState.COMPLETED)); + + // Stop the JobQueueAPI + jobQueueAPI.close(); + } + + @Test + public void test_JobRetry() throws Exception { + + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.id()).thenReturn("job123"); + when(mockJob.queueName()).thenReturn("testQueue"); + + AtomicReference jobState = new AtomicReference<>(JobState.PENDING); + when(mockJob.state()).thenAnswer(inv -> jobState.get()); + + AtomicInteger retryCount = new AtomicInteger(0); + when(mockJob.retryCount()).thenAnswer(inv -> retryCount.get()); + + when(mockJob.withState(any())).thenAnswer(inv -> { + JobState newState = inv.getArgument(0); + System.out.println("Job state changed to: " + newState); + jobState.set(newState); + return mockJob; + }); + when(mockJob.incrementRetry()).thenAnswer(inv -> { + int newRetryCount = retryCount.incrementAndGet(); + System.out.println("Retry count incremented to: " + newRetryCount); + return mockJob; + }); + when(mockJob.markAsCompleted()).thenAnswer(inv -> { + System.out.println("Job marked as completed"); + jobState.set(JobState.COMPLETED); + return mockJob; + }); + when(mockJob.markAsFailed(any())).thenAnswer(inv -> { + System.out.println("Job marked as failed"); + jobState.set(JobState.FAILED); + return mockJob; + }); + + // Set up the job queue to return our mock job twice (for initial attempt and retry) + when(mockJobQueue.nextPendingJob()).thenReturn(mockJob, mockJob, null); + + // Configure retry strategy + when(mockRetryStrategy.shouldRetry(any(), any())).thenReturn(true); + when(mockRetryStrategy.nextRetryDelay(any())).thenReturn(100L); + + // Configure progress tracker + ProgressTracker mockProgressTracker = mock(ProgressTracker.class); + when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + + // Configure job processor to fail on first attempt, succeed on second + AtomicInteger processAttempts = new AtomicInteger(0); + doAnswer(invocation -> { + int attempt = processAttempts.getAndIncrement(); + System.out.println("Processing attempt: " + attempt); + if (attempt == 0) { + throw new RuntimeException("Simulated failure"); + } + Job job = invocation.getArgument(0); + job.markAsCompleted(); + return null; + }).when(mockJobProcessor).process(any()); + + // Start the job queue + jobQueueAPI.start(); + + // Wait for job processing to complete + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + System.out.println("Current job state: " + jobState.get()); + assertEquals(JobState.COMPLETED, jobState.get()); + }); + + // Verify that the job was processed twice + verify(mockJobProcessor, times(2)).process(any()); + + // Verify that the job state was updated correctly + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobQueue, atLeast(3)).updateJobStatus(jobCaptor.capture()); + + List capturedJobs = jobCaptor.getAllValues(); + Job finalJobState = capturedJobs.get(capturedJobs.size() - 1); + assertNotNull("Final job state should not be null", finalJobState); + assertEquals(JobState.COMPLETED, finalJobState.state()); + assertEquals(1, finalJobState.retryCount()); + + // Verify that the retry strategy was consulted + verify(mockRetryStrategy, times(1)).shouldRetry(any(), any()); + verify(mockRetryStrategy, times(1)).nextRetryDelay(any()); + + // Stop the job queue + jobQueueAPI.close(); + } + +}