Skip to content

Commit

Permalink
#29478 Improvements and adding more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Aug 29, 2024
1 parent 6072717 commit 90bc1ef
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,10 @@ public void cancelJob(final String jobId) {

try {

Logger.info(this, "Cancelling job " + jobId);

processor.cancel(job);

Job cancelledJob = job.withState(JobState.CANCELLED);
jobQueue.updateJobStatus(cancelledJob);
notifyJobWatchers(cancelledJob);
Expand Down
39 changes: 22 additions & 17 deletions dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.immutables.value.Value;
import org.immutables.value.Value.Default;

/**
* Abstract interface for an immutable Job class. This interface defines the structure for job
Expand All @@ -24,8 +26,10 @@ public interface AbstractJob {

JobState state();

@Nullable
LocalDateTime createdAt();

@Nullable
LocalDateTime updatedAt();

Optional<LocalDateTime> completedAt();
Expand All @@ -34,25 +38,34 @@ public interface AbstractJob {

Map<String, Object> parameters();

String executionNode();

@Nullable
Throwable lastException();

@Nullable
com.dotcms.jobs.business.error.ErrorDetail errorDetail();

int retryCount();
@Default
default int retryCount() {
return 0;
}

long lastRetryTimestamp();
@Default
default long lastRetryTimestamp() {
return 0;
}

float progress();
@Default
default float progress() {
return 0.0f;
}

/**
* Creates a new Job with an incremented retry count and updated timestamp.
*
* @return A new Job instance with updated retry information.
*/
default Job incrementRetry() {
return AbstractJob.builder().from(this)
return Job.builder().from(this)
.retryCount(retryCount() + 1)
.lastRetryTimestamp(System.currentTimeMillis())
.build();
Expand All @@ -65,7 +78,7 @@ default Job incrementRetry() {
* @return A new Job instance marked as failed.
*/
default Job markAsFailed(com.dotcms.jobs.business.error.ErrorDetail errorDetail) {
return AbstractJob.builder().from(this)
return Job.builder().from(this)
.state(JobState.FAILED)
.result(JobResult.ERROR)
.errorDetail(errorDetail)
Expand All @@ -80,7 +93,7 @@ default Job markAsFailed(com.dotcms.jobs.business.error.ErrorDetail errorDetail)
* @return A new Job instance with the updated state.
*/
default Job withState(JobState newState) {
return AbstractJob.builder().from(this)
return Job.builder().from(this)
.state(newState)
.updatedAt(LocalDateTime.now())
.build();
Expand All @@ -92,20 +105,12 @@ default Job withState(JobState newState) {
* @return A new Job instance marked as completed.
*/
default Job markAsCompleted() {
return AbstractJob.builder().from(this)
return Job.builder().from(this)
.state(JobState.COMPLETED)
.result(JobResult.SUCCESS)
.completedAt(Optional.of(LocalDateTime.now()))
.updatedAt(LocalDateTime.now())
.build();
}

class Builder extends Job.Builder {

}

static Builder builder() {
return new Builder();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.ErrorDetail;
import com.dotcms.jobs.business.error.JobProcessingException;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobState;
Expand All @@ -35,6 +36,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -811,7 +814,7 @@ public void test_CircuitBreaker_Opens() throws Exception {
}

@Test
public void testCircuitBreakerCloses() throws Exception {
public void test_CircuitBreaker_Closes() throws Exception {

// Create a job that initially fails but then succeeds
Job mockJob = mock(Job.class);
Expand Down Expand Up @@ -879,7 +882,7 @@ public void testCircuitBreakerCloses() throws Exception {
}

@Test
public void testManualCircuitBreakerReset() throws Exception {
public void test_CircuitBreaker_Reset() throws Exception {

// Create a failing job
Job failingJob = mock(Job.class);
Expand Down Expand Up @@ -930,4 +933,114 @@ public void testManualCircuitBreakerReset() throws Exception {
jobQueueManagerAPI.close();
}

@Test
public void test_Job_Cancellation() throws Exception {

class TestJobProcessor implements JobProcessor {

private volatile boolean cancellationRequested = false;
private final CountDownLatch processingStarted = new CountDownLatch(1);
private final CountDownLatch processingCompleted = new CountDownLatch(1);

@Override
public void process(Job job) throws JobProcessingException {
processingStarted.countDown();
try {
while (!cancellationRequested) {
// Simulate some work
Thread.sleep(100);
}
throw new InterruptedException("Job cancelled");
} catch (InterruptedException e) {
processingCompleted.countDown();
throw new JobProcessingException(job.id(), "Job was cancelled", e);
}
}

@Override
public boolean canCancel(Job job) {
return true;
}

@Override
public void cancel(Job job) {
cancellationRequested = true;
}

public boolean awaitProcessingStart(long timeout, TimeUnit unit)
throws InterruptedException {
return processingStarted.await(timeout, unit);
}

public boolean awaitProcessingCompleted(long timeout, TimeUnit unit)
throws InterruptedException {
return processingCompleted.await(timeout, unit);
}
}

// Create a real job
Job job = Job.builder()
.id("job123")
.queueName("testQueue")
.state(JobState.PENDING)
.build();

// Use our TestJobProcessor
TestJobProcessor testJobProcessor = new TestJobProcessor();

// Configure JobQueue
when(mockJobQueue.getJob("job123")).thenReturn(job);
when(mockJobQueue.nextJob()).thenReturn(job).thenReturn(null);

// List to capture job state updates
List<JobState> stateUpdates = new CopyOnWriteArrayList<>();

// Set up a job watcher to capture state updates
jobQueueManagerAPI.watchJob("job123", updatedJob -> {
stateUpdates.add(updatedJob.state());
});

// Register the test processor
jobQueueManagerAPI.registerProcessor("testQueue", testJobProcessor);

// Configure circuit breaker
when(mockCircuitBreaker.allowRequest()).thenReturn(true);

// Start the job queue manager
jobQueueManagerAPI.start();

// Wait for the job to start processing
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> testJobProcessor.awaitProcessingStart(100, TimeUnit.MILLISECONDS));

// Cancel the job
jobQueueManagerAPI.cancelJob("job123");

// Wait for the job to complete (which should be due to cancellation)
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.until(() -> testJobProcessor.awaitProcessingCompleted(100, TimeUnit.MILLISECONDS));

// Wait for state updates to be captured
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> stateUpdates.size() >= 3);

// Verify the job state transitions
assertFalse("No state updates were captured", stateUpdates.isEmpty());
assertEquals(JobState.PENDING, stateUpdates.get(0), "Initial state should be PENDING");
assertTrue("Job state should have transitioned to RUNNING",
stateUpdates.contains(JobState.RUNNING));
assertEquals(JobState.CANCELLED, stateUpdates.get(stateUpdates.size() - 1),
"Final state should be CANCELLED");

// Verify that the job status was updated in the queue
verify(mockJobQueue, timeout(5000)).
updateJobStatus(argThat(j -> j.state() == JobState.CANCELLED));

// Clean up
jobQueueManagerAPI.close();
}

}

0 comments on commit 90bc1ef

Please sign in to comment.