Skip to content

Commit

Permalink
#29478 Improvements and adding more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Aug 29, 2024
1 parent 551733a commit 485d513
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.DefaultProgressTracker;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.queue.JobQueue;
Expand Down Expand Up @@ -306,8 +305,12 @@ private void notifyJobWatchers(final Job job) {
*/
private void updateJobProgress(final Job job, final ProgressTracker progressTracker) {
if (job != null) {
jobQueue.updateJobProgress(job.id(), progressTracker.progress());
notifyJobWatchers(job);

float progress = progressTracker.progress();
Job updatedJob = job.withProgress(progress);

jobQueue.updateJobProgress(job.id(), updatedJob.progress());
notifyJobWatchers(updatedJob);
}
}

Expand Down Expand Up @@ -439,12 +442,7 @@ private void processJob(final Job job) {

try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) {

final ProgressTracker progressTracker;
if (processor.progressTracker(runningJob) != null) {
progressTracker = processor.progressTracker(runningJob);
} else {
progressTracker = new DefaultProgressTracker();
}
final ProgressTracker progressTracker = processor.progressTracker(runningJob);

// Start a separate thread to periodically update and persist progress
ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public interface AbstractJob {

long lastRetryTimestamp();

float progress();

/**
* Creates a new Job with an incremented retry count and updated timestamp.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.anyFloat;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyMap;
Expand All @@ -23,6 +24,7 @@
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.DefaultProgressTracker;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.queue.JobQueue;
Expand Down Expand Up @@ -224,6 +226,8 @@ public void test_watchJob() throws Exception {
// Mock JobProcessor behavior
ProgressTracker mockProgressTracker = mock(ProgressTracker.class);
when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker);
when(runningJob.progress()).thenReturn(0f);
when(runningJob.withProgress(anyFloat())).thenReturn(runningJob);

// Create a list to capture job states
List<JobState> capturedStates = Collections.synchronizedList(new ArrayList<>());
Expand Down Expand Up @@ -302,6 +306,8 @@ public void test_JobRetry_single_retry() throws Exception {
// Configure progress tracker
ProgressTracker mockProgressTracker = mock(ProgressTracker.class);
when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker);
when(mockJob.progress()).thenReturn(0f);
when(mockJob.withProgress(anyFloat())).thenReturn(mockJob);

// Configure job processor to fail on first attempt, succeed on second
AtomicInteger processAttempts = new AtomicInteger(0);
Expand Down Expand Up @@ -383,6 +389,8 @@ public void test_JobRetry_retry_twice() throws Exception {
// Configure progress tracker
ProgressTracker mockProgressTracker = mock(ProgressTracker.class);
when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker);
when(mockJob.progress()).thenReturn(0f);
when(mockJob.withProgress(anyFloat())).thenReturn(mockJob);

// Configure job processor to fail twice, succeed on third attempt
AtomicInteger processAttempts = new AtomicInteger(0);
Expand Down Expand Up @@ -523,6 +531,8 @@ public void test_Job_SucceedsFirstAttempt() throws Exception {
// Configure progress tracker
ProgressTracker mockProgressTracker = mock(ProgressTracker.class);
when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker);
when(mockJob.progress()).thenReturn(0f);
when(mockJob.withProgress(anyFloat())).thenReturn(mockJob);

// Configure job processor to succeed
doAnswer(inv -> {
Expand Down Expand Up @@ -581,6 +591,8 @@ public void test_Job_NotRetryable() throws Exception {
// Configure progress tracker
ProgressTracker mockProgressTracker = mock(ProgressTracker.class);
when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker);
when(mockJob.progress()).thenReturn(0f);
when(mockJob.withProgress(anyFloat())).thenReturn(mockJob);

// Configure job processor to fail
doThrow(new RuntimeException("Non-retryable error")).when(mockJobProcessor).process(any());
Expand Down Expand Up @@ -611,4 +623,104 @@ public void test_Job_NotRetryable() throws Exception {
jobQueueAPI.close();
}

@Test
public void test_JobProgressTracking() throws Exception {

// Create a mock job
Job mockJob = mock(Job.class);
when(mockJob.id()).thenReturn("progress-test-job");
when(mockJob.queueName()).thenReturn("testQueue");

AtomicReference<Float> jobProgress = new AtomicReference<>(0f);
AtomicReference<JobState> jobState = new AtomicReference<>(JobState.PENDING);

when(mockJob.state()).thenAnswer(inv -> jobState.get());
when(mockJob.withState(any())).thenAnswer(inv -> {
jobState.set(inv.getArgument(0));
return mockJob;
});
when(mockJob.markAsCompleted()).thenAnswer(inv -> {
jobState.set(JobState.COMPLETED);
return mockJob;
});

when(mockJob.progress()).thenAnswer(inv -> jobProgress.get());
when(mockJob.withProgress(anyFloat())).thenAnswer(inv -> {
jobProgress.set(inv.getArgument(0));
return mockJob;
});

// Set up the job queue to return our mock job
when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null);
when(mockJobQueue.getJob(anyString())).thenReturn(mockJob);

// Create a real ProgressTracker
ProgressTracker realProgressTracker = new DefaultProgressTracker();
when(mockJobProcessor.progressTracker(any())).thenReturn(realProgressTracker);

// List to store progress updates
List<Float> progressUpdates = Collections.synchronizedList(new ArrayList<>());

// Configure the mockJobProcessor to update progress
doAnswer(inv -> {

for (int i = 0; i <= 10; i++) {
float progress = i / 10f;
realProgressTracker.updateProgress(progress);
// Simulate the effect of updateJobProgress
jobProgress.set(progress);
Thread.sleep(50); // Simulate work
}

Job job = inv.getArgument(0);
job.markAsCompleted();
return null;
}).when(mockJobProcessor).process(any());

// Set up a job watcher to capture progress updates
jobQueueAPI.watchJob("progress-test-job", job -> {
progressUpdates.add(job.progress());
});

// Start the job queue
jobQueueAPI.start();

// Wait for job processing to complete
Awaitility.await().atMost(10, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
assertEquals(JobState.COMPLETED, jobState.get());
});

// Verify that progress was tracked correctly
assertTrue(
"Should have multiple progress updates",
progressUpdates.size() > 1
);
assertEquals(
0.0f, progressUpdates.get(0), 0.01f,
"Initial progress should be 0"
);
assertEquals(
1.0f, progressUpdates.get(progressUpdates.size() - 1), 0.01f,
"Final progress should be 1"
);

// Verify that progress increased monotonically
for (int i = 1; i < progressUpdates.size(); i++) {
assertTrue("Progress should increase or stay the same",
progressUpdates.get(i) >= progressUpdates.get(i - 1)
);
}

// Verify that the job was processed
verify(mockJobProcessor, times(1)).process(any());
verify(mockJobQueue, times(2)).updateJobStatus(any());
verify(mockJobQueue, times(2)).
updateJobStatus(argThat(job -> job.state() == JobState.COMPLETED));

// Stop the job queue
jobQueueAPI.close();
}

}

0 comments on commit 485d513

Please sign in to comment.