diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java index df3ffeacdb4c..fe74cded93b4 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java @@ -8,7 +8,6 @@ import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; -import com.dotcms.jobs.business.processor.DefaultProgressTracker; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.jobs.business.queue.JobQueue; @@ -306,8 +305,12 @@ private void notifyJobWatchers(final Job job) { */ private void updateJobProgress(final Job job, final ProgressTracker progressTracker) { if (job != null) { - jobQueue.updateJobProgress(job.id(), progressTracker.progress()); - notifyJobWatchers(job); + + float progress = progressTracker.progress(); + Job updatedJob = job.withProgress(progress); + + jobQueue.updateJobProgress(job.id(), updatedJob.progress()); + notifyJobWatchers(updatedJob); } } @@ -439,12 +442,7 @@ private void processJob(final Job job) { try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) { - final ProgressTracker progressTracker; - if (processor.progressTracker(runningJob) != null) { - progressTracker = processor.progressTracker(runningJob); - } else { - progressTracker = new DefaultProgressTracker(); - } + final ProgressTracker progressTracker = processor.progressTracker(runningJob); // Start a separate thread to periodically update and persist progress ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService(); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java index 1316d8f1cae1..6e05a28b13c5 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java @@ -44,6 +44,8 @@ public interface AbstractJob { long lastRetryTimestamp(); + float progress(); + /** * Creates a new Job with an incremented retry count and updated timestamp. * diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java index 088b2d4287b4..5d1625c22a46 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java @@ -4,6 +4,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyFloat; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyMap; @@ -23,6 +24,7 @@ import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; +import com.dotcms.jobs.business.processor.DefaultProgressTracker; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.jobs.business.queue.JobQueue; @@ -224,6 +226,8 @@ public void test_watchJob() throws Exception { // Mock JobProcessor behavior ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(runningJob.progress()).thenReturn(0f); + when(runningJob.withProgress(anyFloat())).thenReturn(runningJob); // Create a list to capture job states List capturedStates = Collections.synchronizedList(new ArrayList<>()); @@ -302,6 +306,8 @@ public void test_JobRetry_single_retry() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); // Configure job processor to fail on first attempt, succeed on second AtomicInteger processAttempts = new AtomicInteger(0); @@ -383,6 +389,8 @@ public void test_JobRetry_retry_twice() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); // Configure job processor to fail twice, succeed on third attempt AtomicInteger processAttempts = new AtomicInteger(0); @@ -523,6 +531,8 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); // Configure job processor to succeed doAnswer(inv -> { @@ -581,6 +591,8 @@ public void test_Job_NotRetryable() throws Exception { // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); // Configure job processor to fail doThrow(new RuntimeException("Non-retryable error")).when(mockJobProcessor).process(any()); @@ -611,4 +623,104 @@ public void test_Job_NotRetryable() throws Exception { jobQueueAPI.close(); } + @Test + public void test_JobProgressTracking() throws Exception { + + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.id()).thenReturn("progress-test-job"); + when(mockJob.queueName()).thenReturn("testQueue"); + + AtomicReference jobProgress = new AtomicReference<>(0f); + AtomicReference jobState = new AtomicReference<>(JobState.PENDING); + + when(mockJob.state()).thenAnswer(inv -> jobState.get()); + when(mockJob.withState(any())).thenAnswer(inv -> { + jobState.set(inv.getArgument(0)); + return mockJob; + }); + when(mockJob.markAsCompleted()).thenAnswer(inv -> { + jobState.set(JobState.COMPLETED); + return mockJob; + }); + + when(mockJob.progress()).thenAnswer(inv -> jobProgress.get()); + when(mockJob.withProgress(anyFloat())).thenAnswer(inv -> { + jobProgress.set(inv.getArgument(0)); + return mockJob; + }); + + // Set up the job queue to return our mock job + when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); + when(mockJobQueue.getJob(anyString())).thenReturn(mockJob); + + // Create a real ProgressTracker + ProgressTracker realProgressTracker = new DefaultProgressTracker(); + when(mockJobProcessor.progressTracker(any())).thenReturn(realProgressTracker); + + // List to store progress updates + List progressUpdates = Collections.synchronizedList(new ArrayList<>()); + + // Configure the mockJobProcessor to update progress + doAnswer(inv -> { + + for (int i = 0; i <= 10; i++) { + float progress = i / 10f; + realProgressTracker.updateProgress(progress); + // Simulate the effect of updateJobProgress + jobProgress.set(progress); + Thread.sleep(50); // Simulate work + } + + Job job = inv.getArgument(0); + job.markAsCompleted(); + return null; + }).when(mockJobProcessor).process(any()); + + // Set up a job watcher to capture progress updates + jobQueueAPI.watchJob("progress-test-job", job -> { + progressUpdates.add(job.progress()); + }); + + // Start the job queue + jobQueueAPI.start(); + + // Wait for job processing to complete + Awaitility.await().atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertEquals(JobState.COMPLETED, jobState.get()); + }); + + // Verify that progress was tracked correctly + assertTrue( + "Should have multiple progress updates", + progressUpdates.size() > 1 + ); + assertEquals( + 0.0f, progressUpdates.get(0), 0.01f, + "Initial progress should be 0" + ); + assertEquals( + 1.0f, progressUpdates.get(progressUpdates.size() - 1), 0.01f, + "Final progress should be 1" + ); + + // Verify that progress increased monotonically + for (int i = 1; i < progressUpdates.size(); i++) { + assertTrue("Progress should increase or stay the same", + progressUpdates.get(i) >= progressUpdates.get(i - 1) + ); + } + + // Verify that the job was processed + verify(mockJobProcessor, times(1)).process(any()); + verify(mockJobQueue, times(2)).updateJobStatus(any()); + verify(mockJobQueue, times(2)). + updateJobStatus(argThat(job -> job.state() == JobState.COMPLETED)); + + // Stop the job queue + jobQueueAPI.close(); + } + }