diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java similarity index 88% rename from dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java rename to dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index b7a2bbc03fd0..bdbcb9e96fa9 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -1,5 +1,6 @@ package com.dotcms.jobs.business.api; +import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.ProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryStrategy; @@ -14,7 +15,7 @@ * Defines the contract for interacting with the job queue system. This interface provides methods * for managing jobs, processors, and the overall state of the job queue. */ -public interface JobQueueAPI extends AutoCloseable { +public interface JobQueueManagerAPI extends AutoCloseable { /** * Starts the job queue manager, initializing the thread pool for job processing. @@ -108,16 +109,10 @@ String createJob(String queueName, Map parameters) void setRetryStrategy(String queueName, RetryStrategy retryStrategy); /** - * Manually resets the CircuitBreaker. This should be called with caution, typically after - * addressing the underlying issues causing failures. - */ - void resetCircuitBreaker(); - - /** - * Provides information about the current state of the CircuitBreaker. + * Retrieves the CircuitBreaker instance. * - * @return A string representation of the CircuitBreaker's current status + * @return The CircuitBreaker instance */ - String getCircuitBreakerStatus(); + CircuitBreaker getCircuitBreaker(); } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java similarity index 89% rename from dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java rename to dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index fe74cded93b4..12b77ef753ab 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -15,7 +15,6 @@ import com.dotmarketing.util.Logger; import com.google.common.annotations.VisibleForTesting; import java.time.LocalDateTime; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -38,36 +37,36 @@ * JobQueue jobQueue = new PostgresJobQueue(); * * // Create and start the job queue manager - * JobQueueAPIImpl jobQueueAPI = new JobQueueAPIImpl(jobQueue, 5); // 5 threads + * JobQueueManagerAPIImpl jobQueueManagerAPI = new JobQueueManagerAPIImpl(jobQueue, 5); // 5 threads * * //(Optional) Set up a retry strategy for content import jobs * RetryStrategy contentImportRetryStrategy = new ExponentialBackoffRetryStrategy(5000, 300000, 2.0, 3); * contentImportRetryStrategy.addRetryableException(IOException.class); - * jobQueueAPI.setRetryStrategy("contentImport", contentImportRetryStrategy); + * jobQueueManagerAPI.setRetryStrategy("contentImport", contentImportRetryStrategy); * * // Register job processors - * jobQueueAPI.registerProcessor("contentImport", new ContentImportJobProcessor()); + * jobQueueManagerAPI.registerProcessor("contentImport", new ContentImportJobProcessor()); * * // Start the job queue manager - * jobQueueAPI.start(); + * jobQueueManagerAPI.start(); * * // Create a content import job (dummy example) * Map jobParameters = new HashMap<>(); * jobParameters.put("filePath", "/path/to/import/file.csv"); * jobParameters.put("contentType", "Article"); - * String jobId = jobQueueAPI.createJob("contentImport", jobParameters); + * String jobId = jobQueueManagerAPI.createJob("contentImport", jobParameters); * * // Optionally, watch the job progress - * jobQueueAPI.watchJob(jobId, job -> { + * jobQueueManagerAPI.watchJob(jobId, job -> { * System.out.println("Job " + job.id() + " progress: " + job.progress() * 100 + "%"); * }); * * // When shutting down the application - * jobQueueAPI.close(); + * jobQueueManagerAPI.close(); * } * } */ -public class JobQueueAPIImpl implements JobQueueAPI { +public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private final AtomicBoolean isStarted = new AtomicBoolean(false); private CountDownLatch startLatch; @@ -88,22 +87,22 @@ public class JobQueueAPIImpl implements JobQueueAPI { ); /** - * Constructs a new JobQueueAPIImpl with the default job queue implementation and the default + * Constructs a new JobQueueManagerAPIImpl with the default job queue implementation and the default * number of threads. */ - public JobQueueAPIImpl() { + public JobQueueManagerAPIImpl() { // TODO: Use a job queue implementation this(null, DEFAULT_THREAD_POOL_SIZE); } /** - * Constructs a new JobQueueAPIImpl. + * Constructs a new JobQueueManagerAPIImpl. * * @param jobQueue The JobQueue implementation to use. * @param threadPoolSize The number of threads to use for job processing. */ @VisibleForTesting - public JobQueueAPIImpl(JobQueue jobQueue, int threadPoolSize) { + public JobQueueManagerAPIImpl(JobQueue jobQueue, int threadPoolSize) { this.jobQueue = jobQueue; this.threadPoolSize = threadPoolSize; this.processors = new ConcurrentHashMap<>(); @@ -117,6 +116,26 @@ public JobQueueAPIImpl(JobQueue jobQueue, int threadPoolSize) { ); // 5 failures within 1 minute } + /** + * Constructs a new JobQueueManagerAPIImpl. + * + * @param jobQueue The JobQueue implementation to use. + * @param threadPoolSize The number of threads to use for job processing. + * @param circuitBreaker The CircuitBreaker implementation to use. + */ + @VisibleForTesting + public JobQueueManagerAPIImpl(JobQueue jobQueue, int threadPoolSize, CircuitBreaker circuitBreaker) { + this.jobQueue = jobQueue; + this.threadPoolSize = threadPoolSize; + this.processors = new ConcurrentHashMap<>(); + this.jobWatchers = new ConcurrentHashMap<>(); + this.retryStrategies = new ConcurrentHashMap<>(); + this.defaultRetryStrategy = new ExponentialBackoffRetryStrategy( + 1000, 60000, 2.0, 5 + ); + this.circuitBreaker = circuitBreaker; + } + @Override public boolean isStarted() { return isStarted.get() && !isClosed; @@ -203,7 +222,7 @@ public String createJob(final String queueName, final Map parame if (!processors.containsKey(queueName)) { final var error = new ProcessorNotFoundException(queueName); - Logger.error(JobQueueAPIImpl.class, error); + Logger.error(JobQueueManagerAPIImpl.class, error); throw error; } @@ -237,17 +256,17 @@ public void cancelJob(final String jobId) { notifyJobWatchers(cancelledJob); } catch (Exception e) { final var error = new JobCancellationException(jobId, e.getMessage()); - Logger.error(JobQueueAPIImpl.class, error); + Logger.error(JobQueueManagerAPIImpl.class, error); throw error; } } else { final var error = new JobCancellationException(jobId, "Job cannot be cancelled"); - Logger.error(JobQueueAPIImpl.class, error); + Logger.error(JobQueueManagerAPIImpl.class, error); throw error; } } else { final var error = new JobCancellationException(jobId, "Job not found"); - Logger.error(JobQueueAPIImpl.class, error); + Logger.error(JobQueueManagerAPIImpl.class, error); throw error; } } @@ -265,19 +284,9 @@ public void setRetryStrategy(final String queueName, final RetryStrategy retrySt } @Override - public void resetCircuitBreaker() { - Logger.info(this, "Manually resetting CircuitBreaker"); - circuitBreaker.reset(); - } - - @Override - public String getCircuitBreakerStatus() { - return String.format("CircuitBreaker - Open: %b, Failure Count: %d, Last Failure: %s", - circuitBreaker.isOpen(), - circuitBreaker.getFailureCount(), - circuitBreaker.getLastFailureTime() > 0 - ? new Date(circuitBreaker.getLastFailureTime()).toString() - : "N/A"); + @VisibleForTesting + public CircuitBreaker getCircuitBreaker() { + return circuitBreaker; } /** @@ -345,7 +354,7 @@ private void processJobs() { Thread.currentThread().interrupt(); } catch (Exception e) { Logger.error(this, "Unexpected error in job processing loop: " + e.getMessage(), e); - circuitBreaker.recordFailure(); + getCircuitBreaker().recordFailure(); } } } @@ -357,7 +366,7 @@ private void processJobs() { */ private boolean isCircuitBreakerOpen() { - if (!circuitBreaker.allowRequest()) { + if (!getCircuitBreaker().allowRequest()) { Logger.warn(this, "Circuit breaker is open. Pausing job processing for a while."); @@ -493,9 +502,13 @@ private void processJob(final Job job) { * @param errorDetail The details of the error that caused the failure. */ private void handleJobFailure(final Job job, final ErrorDetail errorDetail) { + final Job failedJob = job.markAsFailed(errorDetail); jobQueue.updateJobStatus(failedJob); notifyJobWatchers(failedJob); + + // Record the failure in the circuit breaker + getCircuitBreaker().recordFailure(); } /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/CircuitBreaker.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/CircuitBreaker.java index 0070905c8450..a667f8cb462d 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/CircuitBreaker.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/CircuitBreaker.java @@ -1,5 +1,7 @@ package com.dotcms.jobs.business.error; +import com.dotmarketing.util.Logger; + /** * Implements the Circuit Breaker pattern to prevent repeated failures in a system. It helps to * avoid cascading failures by temporarily disabling operations that are likely to fail. @@ -63,6 +65,9 @@ public synchronized void recordFailure() { * Manually resets the circuit breaker to a closed state. */ public synchronized void reset() { + + Logger.info(this, "Manually resetting CircuitBreaker"); + isOpen = false; failureCount = 0; lastFailureTime = 0; diff --git a/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java b/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java index 4e410730d0fd..43ce31ee6895 100644 --- a/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java +++ b/dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java @@ -47,7 +47,7 @@ import com.dotcms.integritycheckers.ContentPageIntegrityCheckerTest; import com.dotcms.integritycheckers.HostIntegrityCheckerTest; import com.dotcms.integritycheckers.IntegrityUtilTest; -import com.dotcms.jobs.business.api.JobQueueAPITest; +import com.dotcms.jobs.business.api.JobQueueManagerAPITest; import com.dotcms.junit.MainBaseSuite; import com.dotcms.mail.MailAPIImplTest; import com.dotcms.publisher.bundle.business.BundleAPITest; @@ -313,7 +313,7 @@ EmbeddingContentListenerTest.class, Task240606AddVariableColumnToWorkflowTest.class, OpenAIContentPromptActionletTest.class, - JobQueueAPITest.class + JobQueueManagerAPITest.class }) public class MainSuite2b { diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java similarity index 70% rename from dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java rename to dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 5d1625c22a46..a77345a9fae3 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; @@ -45,7 +46,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.InOrder; -public class JobQueueAPITest { +public class JobQueueManagerAPITest { private JobQueue mockJobQueue; @@ -53,7 +54,9 @@ public class JobQueueAPITest { private RetryStrategy mockRetryStrategy; - private JobQueueAPIImpl jobQueueAPI; + private CircuitBreaker mockCircuitBreaker; + + private JobQueueManagerAPIImpl jobQueueManagerAPI; @Before public void setUp() { @@ -61,10 +64,11 @@ public void setUp() { mockJobQueue = mock(JobQueue.class); mockJobProcessor = mock(JobProcessor.class); mockRetryStrategy = mock(RetryStrategy.class); + mockCircuitBreaker = mock(CircuitBreaker.class); - jobQueueAPI = new JobQueueAPIImpl(mockJobQueue, 1); - jobQueueAPI.registerProcessor("testQueue", mockJobProcessor); - jobQueueAPI.setRetryStrategy("testQueue", mockRetryStrategy); + jobQueueManagerAPI = new JobQueueManagerAPIImpl(mockJobQueue, 1, mockCircuitBreaker); + jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + jobQueueManagerAPI.setRetryStrategy("testQueue", mockRetryStrategy); } @Test @@ -74,7 +78,7 @@ public void test_createJob() { when(mockJobQueue.addJob(anyString(), anyMap())).thenReturn("job123"); // Creating a job - String jobId = jobQueueAPI.createJob("testQueue", parameters); + String jobId = jobQueueManagerAPI.createJob("testQueue", parameters); assertEquals("job123", jobId); verify(mockJobQueue).addJob("testQueue", parameters); @@ -87,7 +91,7 @@ public void test_getJob() { when(mockJobQueue.getJob("job123")).thenReturn(mockJob); // Getting a job - Job result = jobQueueAPI.getJob("job123"); + Job result = jobQueueManagerAPI.getJob("job123"); assertEquals(mockJob, result); verify(mockJobQueue).getJob("job123"); @@ -105,7 +109,7 @@ public void test_getJobs() { when(mockJobQueue.getJobs(1, 10)).thenReturn(expectedJobs); // Call the method under test - List actualJobs = jobQueueAPI.getJobs(1, 10); + List actualJobs = jobQueueManagerAPI.getJobs(1, 10); // Verify the results assertEquals(expectedJobs, actualJobs); @@ -123,7 +127,7 @@ public void testCancelJob() { when(mockJobProcessor.canCancel(mockJob)).thenReturn(true); - jobQueueAPI.cancelJob("job123"); + jobQueueManagerAPI.cancelJob("job123"); verify(mockJobProcessor).cancel(mockJob); verify(mockJobQueue).updateJobStatus(any(Job.class)); @@ -132,12 +136,15 @@ public void testCancelJob() { @Test public void test_start() throws InterruptedException { - assertFalse(jobQueueAPI.isStarted()); + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + + assertFalse(jobQueueManagerAPI.isStarted()); - jobQueueAPI.start(); + jobQueueManagerAPI.start(); - assertTrue(jobQueueAPI.isStarted()); - assertTrue(jobQueueAPI.awaitStart(5, TimeUnit.SECONDS)); + assertTrue(jobQueueManagerAPI.isStarted()); + assertTrue(jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS)); // Verify that jobs are being processed verify(mockJobQueue, timeout(5000).atLeastOnce()).nextJob(); @@ -146,10 +153,10 @@ public void test_start() throws InterruptedException { @Test public void test_close() throws Exception { - // Start the JobQueueAPI - jobQueueAPI.start(); - assertTrue(jobQueueAPI.isStarted()); - assertTrue(jobQueueAPI.awaitStart(5, TimeUnit.SECONDS)); + // Start the JobQueueManagerAPI + jobQueueManagerAPI.start(); + assertTrue(jobQueueManagerAPI.isStarted()); + assertTrue(jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS)); AtomicInteger jobCheckCount = new AtomicInteger(0); when(mockJobQueue.nextJob()).thenAnswer(invocation -> { @@ -157,14 +164,14 @@ public void test_close() throws Exception { return null; }); - // Close the JobQueueAPI - jobQueueAPI.close(); + // Close the JobQueueManagerAPI + jobQueueManagerAPI.close(); - // Wait for the JobQueueAPI to be fully stopped + // Wait for the JobQueueManagerAPI to be fully stopped Awaitility.await() .atMost(10, TimeUnit.SECONDS) .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> !jobQueueAPI.isStarted()); + .until(() -> !jobQueueManagerAPI.isStarted()); // Verify that no more jobs are being processed Awaitility.await() @@ -187,13 +194,13 @@ public void test_close() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> verify(mockJobProcessor, never()).process(any(Job.class))); - // Verify that we can't start the JobQueueAPIImpl again after closing - jobQueueAPI.start(); - assertFalse(jobQueueAPI.isStarted()); + // Verify that we can't start the JobQueueManagerAPIImpl again after closing + jobQueueManagerAPI.start(); + assertFalse(jobQueueManagerAPI.isStarted()); // Verify that close() can be called multiple times without error - jobQueueAPI.close(); - assertFalse(jobQueueAPI.isStarted()); + jobQueueManagerAPI.close(); + assertFalse(jobQueueManagerAPI.isStarted()); } @Test @@ -223,6 +230,9 @@ public void test_watchJob() throws Exception { when(mockJobQueue.getJob(jobId)).thenReturn(initialJob); when(mockJobQueue.nextJob()).thenReturn(initialJob).thenReturn(null); + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Mock JobProcessor behavior ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); @@ -239,11 +249,11 @@ public void test_watchJob() throws Exception { capturedStates.add(job.state()); }; - // Start the JobQueueAPI - jobQueueAPI.start(); + // Start the JobQueueManagerAPI + jobQueueManagerAPI.start(); // Register the watcher - jobQueueAPI.watchJob(jobId, testWatcher); + jobQueueManagerAPI.watchJob(jobId, testWatcher); // Wait for job processing to complete Awaitility.await() @@ -261,8 +271,8 @@ public void test_watchJob() throws Exception { assertTrue(capturedStates.contains(JobState.RUNNING)); assertTrue(capturedStates.contains(JobState.COMPLETED)); - // Stop the JobQueueAPI - jobQueueAPI.close(); + // Stop the JobQueueManagerAPI + jobQueueManagerAPI.close(); } @Test @@ -299,6 +309,9 @@ public void test_JobRetry_single_retry() throws Exception { // Set up the job queue to return our mock job twice (for initial attempt and retry) when(mockJobQueue.nextJob()).thenReturn(mockJob, mockJob, null); + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Configure retry strategy when(mockRetryStrategy.shouldRetry(any(), any())).thenReturn(true); when(mockRetryStrategy.nextRetryDelay(any())).thenReturn(0L); // Immediate retry @@ -321,7 +334,7 @@ public void test_JobRetry_single_retry() throws Exception { }).when(mockJobProcessor).process(any()); // Start the job queue - jobQueueAPI.start(); + jobQueueManagerAPI.start(); // Wait for job processing to complete Awaitility.await().atMost(10, TimeUnit.SECONDS) @@ -340,7 +353,7 @@ public void test_JobRetry_single_retry() throws Exception { assertEquals(1, retryCount.get()); // Stop the job queue - jobQueueAPI.close(); + jobQueueManagerAPI.close(); } @Test @@ -386,6 +399,9 @@ public void test_JobRetry_retry_twice() throws Exception { when(mockRetryStrategy.shouldRetry(any(), any())).thenReturn(true); when(mockRetryStrategy.nextRetryDelay(any())).thenReturn(100L); // Non-zero delay + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); @@ -405,7 +421,7 @@ public void test_JobRetry_retry_twice() throws Exception { }).when(mockJobProcessor).process(any()); // Start the job queue - jobQueueAPI.start(); + jobQueueManagerAPI.start(); // Wait for job processing to complete Awaitility.await().atMost(10, TimeUnit.SECONDS) @@ -434,7 +450,7 @@ public void test_JobRetry_retry_twice() throws Exception { verify(mockJobQueue, atLeast(3)).nextJob(); // Stop the job queue - jobQueueAPI.close(); + jobQueueManagerAPI.close(); } @Test @@ -476,6 +492,9 @@ public void test_JobRetry_MaxRetryLimit() throws Exception { inv -> retryCount.get() < maxRetries); when(mockRetryStrategy.nextRetryDelay(any())).thenReturn(0L); + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); @@ -484,7 +503,7 @@ public void test_JobRetry_MaxRetryLimit() throws Exception { doThrow(new RuntimeException("Simulated failure")).when(mockJobProcessor).process(any()); // Start the job queue - jobQueueAPI.start(); + jobQueueManagerAPI.start(); // Wait for job processing to complete Awaitility.await().atMost(10, TimeUnit.SECONDS) @@ -502,7 +521,7 @@ public void test_JobRetry_MaxRetryLimit() throws Exception { verify(mockJobQueue, times(1)).removeJob(mockJob.id()); // Stop the job queue - jobQueueAPI.close(); + jobQueueManagerAPI.close(); } @Test @@ -513,6 +532,9 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { when(mockJob.id()).thenReturn("job123"); when(mockJob.queueName()).thenReturn("testQueue"); + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + AtomicReference jobState = new AtomicReference<>(JobState.PENDING); when(mockJob.state()).thenAnswer(inv -> jobState.get()); @@ -542,7 +564,7 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { }).when(mockJobProcessor).process(any()); // Start the job queue - jobQueueAPI.start(); + jobQueueManagerAPI.start(); // Wait for job processing to complete Awaitility.await().atMost(10, TimeUnit.SECONDS) @@ -559,7 +581,7 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { argThat(job -> job.state() == JobState.COMPLETED)); // Stop the job queue - jobQueueAPI.close(); + jobQueueManagerAPI.close(); } @Test @@ -588,6 +610,9 @@ public void test_Job_NotRetryable() throws Exception { // Configure retry strategy to not retry when(mockRetryStrategy.shouldRetry(any(), any())).thenReturn(false); + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Configure progress tracker ProgressTracker mockProgressTracker = mock(ProgressTracker.class); when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); @@ -598,7 +623,7 @@ public void test_Job_NotRetryable() throws Exception { doThrow(new RuntimeException("Non-retryable error")).when(mockJobProcessor).process(any()); // Start the job queue - jobQueueAPI.start(); + jobQueueManagerAPI.start(); // Wait for job processing to complete Awaitility.await().atMost(10, TimeUnit.SECONDS) @@ -620,7 +645,7 @@ public void test_Job_NotRetryable() throws Exception { assertEquals("Non-retryable error", capturedErrorDetail.exception().getMessage()); // Stop the job queue - jobQueueAPI.close(); + jobQueueManagerAPI.close(); } @Test @@ -658,6 +683,9 @@ public void test_JobProgressTracking() throws Exception { ProgressTracker realProgressTracker = new DefaultProgressTracker(); when(mockJobProcessor.progressTracker(any())).thenReturn(realProgressTracker); + // Make the circuit breaker always allow requests + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // List to store progress updates List progressUpdates = Collections.synchronizedList(new ArrayList<>()); @@ -678,12 +706,12 @@ public void test_JobProgressTracking() throws Exception { }).when(mockJobProcessor).process(any()); // Set up a job watcher to capture progress updates - jobQueueAPI.watchJob("progress-test-job", job -> { + jobQueueManagerAPI.watchJob("progress-test-job", job -> { progressUpdates.add(job.progress()); }); // Start the job queue - jobQueueAPI.start(); + jobQueueManagerAPI.start(); // Wait for job processing to complete Awaitility.await().atMost(10, TimeUnit.SECONDS) @@ -720,7 +748,186 @@ public void test_JobProgressTracking() throws Exception { updateJobStatus(argThat(job -> job.state() == JobState.COMPLETED)); // Stop the job queue - jobQueueAPI.close(); + jobQueueManagerAPI.close(); + } + + @Test + public void test_CircuitBreaker_Opens() throws Exception { + + // Create a job that always fails + Job failingJob = mock(Job.class); + when(failingJob.id()).thenReturn("job123"); + when(failingJob.queueName()).thenReturn("testQueue"); + + // Set up the job queue to return the failing job a limited number of times + AtomicInteger jobCount = new AtomicInteger(0); + when(mockJobQueue.nextJob()).thenAnswer(invocation -> { + if (jobCount.getAndIncrement() < 10) { // Limit to 10 jobs + return failingJob; + } + return null; + }); + when(mockJobQueue.getJob(anyString())).thenReturn(failingJob); + when(failingJob.withState(any())).thenReturn(failingJob); + when(failingJob.markAsFailed(any())).thenReturn(failingJob); + + // Configure progress tracker + ProgressTracker mockProgressTracker = mock(ProgressTracker.class); + when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(failingJob.progress()).thenReturn(0f); + when(failingJob.withProgress(anyFloat())).thenReturn(failingJob); + + // Configure the processor to always throw an exception + doThrow(new RuntimeException("Simulated failure")).when(mockJobProcessor).process(any()); + + // Create a real CircuitBreaker with a low threshold for testing + CircuitBreaker circuitBreaker = new CircuitBreaker(5, 60000); + + // Create JobQueueManagerAPIImpl with the real CircuitBreaker + jobQueueManagerAPI = new JobQueueManagerAPIImpl(mockJobQueue, 1, circuitBreaker); + jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + + // Start the job queue + jobQueueManagerAPI.start(); + + // Wait for the circuit breaker to open (should happen after 5 failures) + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> !circuitBreaker.allowRequest()); + + // Verify that the correct number of jobs were processed before the circuit opened + verify(mockJobProcessor, times(5)).process(any()); + + // Verify that no more jobs are processed while the circuit is open + Thread.sleep(1000); // Wait a bit to ensure no more processing attempts + verify(mockJobProcessor, times(5)).process(any()); + + // Verify the final circuit breaker status + assertTrue("Circuit breaker should be open", !circuitBreaker.allowRequest()); + assertEquals(5, circuitBreaker.getFailureCount(), "Failure count should be 5"); + + jobQueueManagerAPI.close(); + } + + @Test + public void testCircuitBreakerCloses() throws Exception { + + // Create a job that initially fails but then succeeds + Job mockJob = mock(Job.class); + when(mockJob.id()).thenReturn("job123"); + when(mockJob.queueName()).thenReturn("testQueue"); + + // Set up the job queue to return our job + when(mockJobQueue.nextJob()).thenReturn(mockJob); + when(mockJobQueue.getJob(anyString())).thenReturn(mockJob); + when(mockJob.withState(any())).thenReturn(mockJob); + when(mockJob.markAsFailed(any())).thenReturn(mockJob); + + // Configure progress tracker + ProgressTracker mockProgressTracker = mock(ProgressTracker.class); + when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); + + // Configure the processor to fail 5 times and then succeed + AtomicInteger processCount = new AtomicInteger(0); + doAnswer(inv -> { + if (processCount.getAndIncrement() < 5) { + throw new RuntimeException("Simulated failure"); + } + + Job processingJob = inv.getArgument(0); + processingJob.markAsCompleted(); + return null; + }).when(mockJobProcessor).process(any()); + + AtomicReference jobState = new AtomicReference<>(JobState.PENDING); + when(mockJob.markAsCompleted()).thenAnswer(inv -> { + jobState.set(JobState.COMPLETED); + return mockJob; + }); + + // Create a real CircuitBreaker with a low threshold for testing + CircuitBreaker circuitBreaker = new CircuitBreaker(5, + 1000); // Short reset timeout for testing + + // Create JobQueueManagerAPIImpl with the real CircuitBreaker + jobQueueManagerAPI = new JobQueueManagerAPIImpl(mockJobQueue, 1, circuitBreaker); + jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + + // Start the job queue + jobQueueManagerAPI.start(); + + // Wait for the circuit breaker to open + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertFalse(circuitBreaker.allowRequest())); + + // Wait for the circuit breaker to close + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertTrue(circuitBreaker.allowRequest()); + assertEquals(JobState.COMPLETED, jobState.get()); + }); + + verify(mockJobProcessor, atLeast(6)).process(any()); + + jobQueueManagerAPI.close(); + } + + @Test + public void testManualCircuitBreakerReset() throws Exception { + + // Create a failing job + Job failingJob = mock(Job.class); + when(failingJob.id()).thenReturn("job123"); + when(failingJob.queueName()).thenReturn("testQueue"); + + // Set up the job queue to return the failing job + when(mockJobQueue.nextJob()).thenReturn(failingJob); + when(mockJobQueue.getJob(anyString())).thenReturn(failingJob); + when(failingJob.withState(any())).thenReturn(failingJob); + when(failingJob.markAsFailed(any())).thenReturn(failingJob); + + // Configure progress tracker + ProgressTracker mockProgressTracker = mock(ProgressTracker.class); + when(mockJobProcessor.progressTracker(any())).thenReturn(mockProgressTracker); + when(failingJob.progress()).thenReturn(0f); + when(failingJob.withProgress(anyFloat())).thenReturn(failingJob); + + // Configure the processor to always throw an exception + doThrow(new RuntimeException("Simulated failure")).when(mockJobProcessor).process(any()); + + // Create a real CircuitBreaker with a low threshold for testing + CircuitBreaker circuitBreaker = new CircuitBreaker(5, 60000); + + // Create JobQueueManagerAPIImpl with the real CircuitBreaker + jobQueueManagerAPI = new JobQueueManagerAPIImpl(mockJobQueue, 1, circuitBreaker); + jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + + // Start the job queue + jobQueueManagerAPI.start(); + + // Wait for the circuit breaker to open + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> !circuitBreaker.allowRequest()); + + // Verify that the circuit breaker is open + assertFalse("Circuit breaker should be open", circuitBreaker.allowRequest()); + + // Manually reset the circuit breaker + circuitBreaker.reset(); + + // Verify that the circuit breaker is now closed + assertTrue("Circuit breaker should be closed after reset", circuitBreaker.allowRequest()); + assertEquals(0, circuitBreaker.getFailureCount(), "Failure count should be reset to 0"); + + jobQueueManagerAPI.close(); } }