Skip to content

Commit

Permalink
#29478 Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Aug 29, 2024
1 parent 90bc1ef commit 77ac1a8
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {
* number of threads.
*/
public JobQueueManagerAPIImpl() {
// TODO: Use a job queue implementation
// We don't have yet a JobQueue implementation, an implementation will be developed in a
// later task.
this(null, DEFAULT_THREAD_POOL_SIZE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -59,7 +60,7 @@ public class JobQueueManagerAPITest {

private CircuitBreaker mockCircuitBreaker;

private JobQueueManagerAPIImpl jobQueueManagerAPI;
private JobQueueManagerAPI jobQueueManagerAPI;

@Before
public void setUp() {
Expand All @@ -74,6 +75,11 @@ public void setUp() {
jobQueueManagerAPI.setRetryStrategy("testQueue", mockRetryStrategy);
}

/**
* Method to test: createJob in JobQueueManagerAPI
* Given Scenario: Valid queue name and parameters are provided
* ExpectedResult: Job is created successfully and correct job ID is returned
*/
@Test
public void test_createJob() {

Expand All @@ -87,6 +93,11 @@ public void test_createJob() {
verify(mockJobQueue).addJob("testQueue", parameters);
}

/**
* Method to test: getJob in JobQueueManagerAPI
* Given Scenario: Valid job ID is provided
* ExpectedResult: Correct job is retrieved from the job queue
*/
@Test
public void test_getJob() {

Expand All @@ -100,6 +111,11 @@ public void test_getJob() {
verify(mockJobQueue).getJob("job123");
}

/**
* Method to test: getJobs in JobQueueManagerAPI
* Given Scenario: Valid page and pageSize parameters are provided
* ExpectedResult: Correct list of jobs is retrieved from the job queue
*/
@Test
public void test_getJobs() {

Expand All @@ -119,23 +135,11 @@ public void test_getJobs() {
verify(mockJobQueue).getJobs(1, 10);
}

@Test
public void testCancelJob() {

Job mockJob = mock(Job.class);
when(mockJobQueue.getJob("job123")).thenReturn(mockJob);
when(mockJob.queueName()).thenReturn("testQueue");
when(mockJob.id()).thenReturn("job123");
when(mockJob.withState(any())).thenReturn(mockJob);

when(mockJobProcessor.canCancel(mockJob)).thenReturn(true);

jobQueueManagerAPI.cancelJob("job123");

verify(mockJobProcessor).cancel(mockJob);
verify(mockJobQueue).updateJobStatus(any(Job.class));
}

/**
* Method to test: start in JobQueueManagerAPI
* Given Scenario: JobQueueManagerAPI is not started
* ExpectedResult: JobQueueManagerAPI starts successfully and begins processing jobs
*/
@Test
public void test_start() throws InterruptedException {

Expand All @@ -153,6 +157,11 @@ public void test_start() throws InterruptedException {
verify(mockJobQueue, timeout(5000).atLeastOnce()).nextJob();
}

/**
* Method to test: close in JobQueueManagerAPI
* Given Scenario: JobQueueManagerAPI is running
* ExpectedResult: JobQueueManagerAPI stops successfully and no more jobs are processed
*/
@Test
public void test_close() throws Exception {

Expand All @@ -176,16 +185,16 @@ public void test_close() throws Exception {
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> !jobQueueManagerAPI.isStarted());

// Verify that no more jobs are being processed
int currentJobCount = jobCheckCount.get();

// Verify that no more jobs are being processed by waiting for two seconds
long startTime = System.currentTimeMillis();
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.atMost(3, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> {
int count = jobCheckCount.get();
Thread.sleep(500); // Wait a bit to see if count increases
return jobCheckCount.get()
== count; // If count hasn't increased, job processing has stopped
});
.until(() -> System.currentTimeMillis() - startTime >= 2000);

assertEquals(currentJobCount, jobCheckCount.get());

// Try to start a new job and verify it's not processed
Job mockJob = mock(Job.class);
Expand All @@ -206,6 +215,11 @@ public void test_close() throws Exception {
assertFalse(jobQueueManagerAPI.isStarted());
}

/**
* Method to test: watchJob in JobQueueManagerAPI
* Given Scenario: Valid job ID and watcher are provided
* ExpectedResult: Watcher receives all job state updates correctly
*/
@Test
public void test_watchJob() throws Exception {

Expand Down Expand Up @@ -278,6 +292,11 @@ public void test_watchJob() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Job retry mechanism in JobQueueManagerAPI
* Given Scenario: Job fails on first attempt but succeeds on retry
* ExpectedResult: Job is retried once and completes successfully
*/
@Test
public void test_JobRetry_single_retry() throws Exception {

Expand Down Expand Up @@ -359,6 +378,11 @@ public void test_JobRetry_single_retry() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Job retry mechanism in JobQueueManagerAPI
* Given Scenario: Job fails twice before succeeding on third attempt
* ExpectedResult: Job is retried twice and completes successfully
*/
@Test
public void test_JobRetry_retry_twice() throws Exception {

Expand Down Expand Up @@ -456,6 +480,11 @@ public void test_JobRetry_retry_twice() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Job retry mechanism in JobQueueManagerAPI
* Given Scenario: Job fails repeatedly and reaches max retry limit
* ExpectedResult: Job is retried up to max limit and then marked as failed
*/
@Test
public void test_JobRetry_MaxRetryLimit() throws Exception {

Expand Down Expand Up @@ -527,6 +556,11 @@ public void test_JobRetry_MaxRetryLimit() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Job processing in JobQueueManagerAPI
* Given Scenario: Job succeeds on first attempt
* ExpectedResult: Job is processed once and completes successfully without retries
*/
@Test
public void test_Job_SucceedsFirstAttempt() throws Exception {

Expand Down Expand Up @@ -587,6 +621,11 @@ public void test_Job_SucceedsFirstAttempt() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Job retry mechanism in JobQueueManagerAPI
* Given Scenario: Job fails with a non-retryable error
* ExpectedResult: Job is not retried and is marked as failed
*/
@Test
public void test_Job_NotRetryable() throws Exception {

Expand Down Expand Up @@ -651,6 +690,11 @@ public void test_Job_NotRetryable() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Job progress tracking in JobQueueManagerAPI
* Given Scenario: Job with multiple progress updates
* ExpectedResult: Progress is tracked correctly and increases monotonically
*/
@Test
public void test_JobProgressTracking() throws Exception {

Expand Down Expand Up @@ -700,7 +744,13 @@ public void test_JobProgressTracking() throws Exception {
realProgressTracker.updateProgress(progress);
// Simulate the effect of updateJobProgress
jobProgress.set(progress);
Thread.sleep(50); // Simulate work

// Simulate work
long startTime = System.currentTimeMillis();
Awaitility.await()
.atMost(3, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> System.currentTimeMillis() - startTime >= 50);
}

Job job = inv.getArgument(0);
Expand Down Expand Up @@ -754,6 +804,11 @@ public void test_JobProgressTracking() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Circuit breaker mechanism in JobQueueManagerAPI
* Given Scenario: Multiple job failures occur
* ExpectedResult: Circuit breaker opens after threshold is reached
*/
@Test
public void test_CircuitBreaker_Opens() throws Exception {

Expand Down Expand Up @@ -802,17 +857,27 @@ public void test_CircuitBreaker_Opens() throws Exception {
// Verify that the correct number of jobs were processed before the circuit opened
verify(mockJobProcessor, times(5)).process(any());

// Verify that no more jobs are processed while the circuit is open
Thread.sleep(1000); // Wait a bit to ensure no more processing attempts
// Verify that no more jobs are processed while the circuit is open waiting for two seconds
long startTime = System.currentTimeMillis();
Awaitility.await()
.atMost(3, TimeUnit.SECONDS)
.pollInterval(100, TimeUnit.MILLISECONDS)
.until(() -> System.currentTimeMillis() - startTime >= 2000);

verify(mockJobProcessor, times(5)).process(any());

// Verify the final circuit breaker status
assertTrue("Circuit breaker should be open", !circuitBreaker.allowRequest());
assertFalse("Circuit breaker should be open", circuitBreaker.allowRequest());
assertEquals(5, circuitBreaker.getFailureCount(), "Failure count should be 5");

jobQueueManagerAPI.close();
}

/**
* Method to test: Circuit breaker mechanism in JobQueueManagerAPI
* Given Scenario: Circuit breaker is open and then jobs start succeeding
* ExpectedResult: Circuit breaker closes after successful job completions
*/
@Test
public void test_CircuitBreaker_Closes() throws Exception {

Expand Down Expand Up @@ -881,6 +946,11 @@ public void test_CircuitBreaker_Closes() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: Circuit breaker reset in JobQueueManagerAPI
* Given Scenario: Circuit breaker is open
* ExpectedResult: Circuit breaker closes immediately after manual reset
*/
@Test
public void test_CircuitBreaker_Reset() throws Exception {

Expand Down Expand Up @@ -933,23 +1003,53 @@ public void test_CircuitBreaker_Reset() throws Exception {
jobQueueManagerAPI.close();
}

/**
* Method to test: cancelJob in JobQueueManagerAPI
* Given Scenario: Valid job ID for a cancellable job is provided
* ExpectedResult: Job is successfully cancelled and its status is updated
*/
@Test
public void test_Job_Cancellation() throws Exception {
public void test_simple_cancelJob() {

Job mockJob = mock(Job.class);
when(mockJobQueue.getJob("job123")).thenReturn(mockJob);
when(mockJob.queueName()).thenReturn("testQueue");
when(mockJob.id()).thenReturn("job123");
when(mockJob.withState(any())).thenReturn(mockJob);

when(mockJobProcessor.canCancel(mockJob)).thenReturn(true);

jobQueueManagerAPI.cancelJob("job123");

verify(mockJobProcessor).cancel(mockJob);
verify(mockJobQueue).updateJobStatus(any(Job.class));
}

/**
* Method to test: Job cancellation in JobQueueManagerAPI
* Given Scenario: Running job is cancelled
* ExpectedResult: Job is successfully cancelled and its state transitions are correct
*/
@Test
public void test_complex_cancelJob() throws Exception {

class TestJobProcessor implements JobProcessor {

private volatile boolean cancellationRequested = false;
private final AtomicBoolean cancellationRequested = new AtomicBoolean(false);
private final CountDownLatch processingStarted = new CountDownLatch(1);
private final CountDownLatch processingCompleted = new CountDownLatch(1);

@Override
public void process(Job job) throws JobProcessingException {
processingStarted.countDown();
try {
while (!cancellationRequested) {
// Simulate some work
Thread.sleep(100);
}

// Simulate work and wait for cancellation
Awaitility.await()
.pollInterval(100, TimeUnit.MILLISECONDS)
.atMost(30, TimeUnit.SECONDS)
.until(cancellationRequested::get);

throw new InterruptedException("Job cancelled");
} catch (InterruptedException e) {
processingCompleted.countDown();
Expand All @@ -964,7 +1064,7 @@ public boolean canCancel(Job job) {

@Override
public void cancel(Job job) {
cancellationRequested = true;
cancellationRequested.set(true);
}

public boolean awaitProcessingStart(long timeout, TimeUnit unit)
Expand Down

0 comments on commit 77ac1a8

Please sign in to comment.