diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 86f5b24ca77d..8ada82da14c0 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -7,7 +7,6 @@ import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.queue.JobQueue; -import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotmarketing.exception.DotDataException; import java.util.Map; import java.util.Optional; @@ -94,10 +93,10 @@ String createJob(String queueName, Map parameters) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of active jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) - throws JobQueueDataException; + throws DotDataException; /** * Retrieves a list of jobs. @@ -115,9 +114,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of active jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getActiveJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of completed jobs @@ -125,9 +124,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of completed jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of canceled jobs @@ -135,9 +134,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of canceled jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getCanceledJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException; /** * Retrieves a list of failed jobs @@ -145,9 +144,9 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * @param page The page number * @param pageSize The number of jobs per page * @return A result object containing the list of failed jobs and pagination information. - * @throws JobQueueDataException if there's an error fetching the jobs + * @throws DotDataException if there's an error fetching the jobs */ - JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException; + JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException; /** * Cancels a job. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 0eff5bd96b1c..40fea5bf13f5 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -316,11 +316,11 @@ public Job getJob(final String jobId) throws DotDataException { @CloseDBIfOpened @Override public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) - throws JobQueueDataException { + throws DotDataException { try { return jobQueue.getActiveJobs(queueName, page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching active jobs", e); + throw new DotDataException("Error fetching active jobs", e); } } @@ -336,45 +336,41 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot @CloseDBIfOpened @Override - public JobPaginatedResult getActiveJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getActiveJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching active jobs", e); + throw new DotDataException("Error fetching active jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getCompletedJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getCompletedJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching completed jobs", e); + throw new DotDataException("Error fetching completed jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getCanceledJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getCanceledJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching canceled jobs", e); + throw new DotDataException("Error fetching canceled jobs", e); } } @CloseDBIfOpened @Override - public JobPaginatedResult getFailedJobs(int page, int pageSize) - throws JobQueueDataException { + public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException { try { return jobQueue.getFailedJobs(page, pageSize); } catch (JobQueueDataException e) { - throw new JobQueueDataException("Error fetching failed jobs", e); + throw new DotDataException("Error fetching failed jobs", e); } } @@ -401,8 +397,9 @@ public void cancelJob(final String jobId) throws DotDataException { * * @param event The event that triggers the job cancellation request. */ + @VisibleForTesting @WrapInTransaction - private void onCancelRequestJob(final JobCancelRequestEvent event) { + void onCancelRequestJob(final JobCancelRequestEvent event) { try { @@ -862,19 +859,24 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) final float progress = getJobProgress(job); - final var latestState = getJobState(job.id()); - if (latestState == JobState.CANCELLING) { - Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); - updateJobStatus(canceledJob); - eventProducer.getEvent(JobCanceledEvent.class).fire( - new JobCanceledEvent(canceledJob, LocalDateTime.now()) - ); - } else { - final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); - updateJobStatus(completedJob); - eventProducer.getEvent(JobCompletedEvent.class).fire( - new JobCompletedEvent(completedJob, LocalDateTime.now()) - ); + try { + if (jobQueue.hasJobBeenInState(job.id(), JobState.CANCELLING)) { + Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); + updateJobStatus(canceledJob); + eventProducer.getEvent(JobCanceledEvent.class).fire( + new JobCanceledEvent(canceledJob, LocalDateTime.now()) + ); + } else { + final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); + updateJobStatus(completedJob); + eventProducer.getEvent(JobCompletedEvent.class).fire( + new JobCompletedEvent(completedJob, LocalDateTime.now()) + ); + } + } catch (JobQueueDataException e) { + final var errorMessage = "Error updating job status"; + Logger.error(this, errorMessage, e); + throw new DotDataException(errorMessage, e); } } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java index 518dd20eb9e7..534986fb7b37 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -10,7 +10,6 @@ import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.Queue; -import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.rest.api.v1.temp.DotTempFile; import com.dotcms.rest.api.v1.temp.TempFileAPI; import com.dotmarketing.business.APILocator; @@ -247,7 +246,7 @@ void watchJob(String jobId, Consumer watcher) { JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) { try { return jobQueueManagerAPI.getActiveJobs(queueName, page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching active jobs", e); } return JobPaginatedResult.builder().build(); @@ -279,7 +278,7 @@ JobPaginatedResult getJobs(int page, int pageSize) { JobPaginatedResult getActiveJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getActiveJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching active jobs", e); } return JobPaginatedResult.builder().build(); @@ -295,7 +294,7 @@ JobPaginatedResult getActiveJobs(int page, int pageSize) { JobPaginatedResult getCompletedJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getCompletedJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching completed jobs", e); } return JobPaginatedResult.builder().build(); @@ -311,7 +310,7 @@ JobPaginatedResult getCompletedJobs(int page, int pageSize) { JobPaginatedResult getCanceledJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getCanceledJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching canceled jobs", e); } return JobPaginatedResult.builder().build(); @@ -327,7 +326,7 @@ JobPaginatedResult getCanceledJobs(int page, int pageSize) { JobPaginatedResult getFailedJobs(int page, int pageSize) { try { return jobQueueManagerAPI.getFailedJobs(page, pageSize); - } catch (JobQueueDataException e) { + } catch (DotDataException e) { Logger.error(this.getClass(), "Error fetching failed jobs", e); } return JobPaginatedResult.builder().build(); diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 210028bdee69..97c3387fde78 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -28,7 +28,6 @@ import javax.inject.Inject; import org.awaitility.Awaitility; import org.jboss.weld.junit5.EnableWeld; -import org.jboss.weld.junit5.WeldJunit5Extension; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -38,7 +37,6 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.TestMethodOrder; -import org.junit.jupiter.api.extension.ExtendWith; /** * Integration tests for the JobQueueManagerAPI. @@ -50,6 +48,8 @@ @TestInstance(Lifecycle.PER_CLASS) public class JobQueueManagerAPIIntegrationTest extends com.dotcms.Junit5WeldBaseTest { + private static int attempts = 0; + @Inject JobQueueManagerAPI jobQueueManagerAPI; @@ -85,6 +85,9 @@ void reset() { if(null != jobQueueManagerAPI) { jobQueueManagerAPI.getCircuitBreaker().reset(); } + + // Reset retry attempts + attempts = 0; } /** @@ -185,8 +188,6 @@ void test_JobRetry() throws Exception { }); } - - /** * Method to test: Job failure handling in JobQueueManagerAPI * Given Scenario: A job is created that is designed to fail @@ -473,7 +474,6 @@ public Map getResultMetadata(Job job) { static class RetryingJobProcessor implements JobProcessor { public static final int MAX_RETRIES = 3; - private int attempts = 0; public RetryingJobProcessor() { // needed for instantiation purposes diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 8525440886d3..a0af91379c4c 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -13,9 +13,11 @@ import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -23,6 +25,7 @@ import static org.mockito.Mockito.when; import com.dotcms.jobs.business.api.events.EventProducer; +import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; @@ -43,6 +46,8 @@ import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.system.event.local.business.LocalSystemEventsAPI; +import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DotDataException; import java.time.LocalDateTime; import java.util.ArrayList; @@ -65,6 +70,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; +import org.mockito.MockedStatic; public class JobQueueManagerAPITest { @@ -1182,116 +1188,169 @@ public void test_CircuitBreaker_Reset() throws Exception { public void test_simple_cancelJob2() throws DotDataException, JobQueueException, JobCancellationException { - // Set up the job queue manager to return our mock cancellable processor - final String testQueue = "CancellableTestQueue"; + try (MockedStatic apiLocator = mockStatic(APILocator.class)) { - // Create a mock cancellable processor - final String jobIdIn = "job2"; - when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobIdIn); - // Create a mock job - Job mockJob = mock(Job.class); - when(mockJobQueue.getJob(jobIdIn)).thenReturn(mockJob); - when(mockJob.queueName()).thenReturn(testQueue); - when(mockJob.id()).thenReturn(jobIdIn); - when(mockJob.withState(any())).thenReturn(mockJob); + // Set up the job queue manager to return our mock cancellable processor + final String testQueue = "CancellableTestQueue"; + final String jobIdIn = "job2"; - jobQueueManagerAPI.registerProcessor(testQueue, SimpleCancellableJobProcessor.class); - // Create a job so we can cancel it - final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); - assertEquals(jobIdIn, jobIdOut); - // Perform the cancellation - jobQueueManagerAPI.cancelJob(jobIdOut); - // Verify that the cancel method was called on our mock processor - verify(mockCancellableProcessor).cancel(mockJob); - } + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.queueName()).thenReturn(testQueue); + when(mockJob.id()).thenReturn(jobIdIn); + when(mockJob.state()).thenReturn(JobState.RUNNING); + when(mockJob.withState(any(JobState.class))).thenReturn(mockJob); - /** - * Method to test: Job cancellation in JobQueueManagerAPI - * Given Scenario: Running job is canceled - * ExpectedResult: Job is successfully canceled and its state transitions are correct - */ - @Test - public void test_complex_cancelJob() throws Exception { + // Mock job queue operations + when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobIdIn); + when(mockJobQueue.getJob(anyString())).thenReturn(mockJob); + doNothing().when(mockJobQueue).updateJobStatus(any(Job.class)); - // Create a mock job - Job mockJob = mock(Job.class); - final String jobId = "job5644"; - when(mockJob.id()).thenReturn(jobId); - final String testQueue = "myTestQueue"; - when(mockJob.queueName()).thenReturn(testQueue); + // Create Mock Job Event + JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); + when(mockEvent.getJob()).thenReturn(mockJob); - // Configure JobQueue - when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); - when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); - when(mockJobQueue.hasJobBeenInState(any(), eq(JobState.CANCELLING))).thenReturn(true); - when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobId); + // Mock system events + LocalSystemEventsAPI localSystemEventsAPI = mock(LocalSystemEventsAPI.class); + apiLocator.when(APILocator::getLocalSystemEventsAPI).thenReturn(localSystemEventsAPI); - // List to capture job state updates - List stateUpdates = new CopyOnWriteArrayList<>(); - - when(mockJob.withState(any())).thenAnswer(inv -> { - stateUpdates.add(inv.getArgument(0)); - return mockJob; - }); - when(mockJob.markAsRunning()).thenAnswer(inv -> { - stateUpdates.add(JobState.RUNNING); - return mockJob; - }); - when(mockJob.markAsCanceled(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.CANCELED); - return mockJob; - }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.COMPLETED); - return mockJob; - }); - when(mockJob.markAsFailed(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.FAILED); - return mockJob; - }); - when(mockJob.progress()).thenReturn(0f); - when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); - when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); + // Handle the event notification + doAnswer(invocation -> { + JobCancelRequestEvent event = invocation.getArgument(0); + ((JobQueueManagerAPIImpl) jobQueueManagerAPI).onCancelRequestJob(event); + return null; + }).when(localSystemEventsAPI).notify(any(JobCancelRequestEvent.class)); - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); + jobQueueManagerAPI.registerProcessor(testQueue, SimpleCancellableJobProcessor.class); - // Register the test processor - jobQueueManagerAPI.registerProcessor(testQueue, ComplexCancellableJobProcessor.class); + // Create a job so we can cancel it + final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + assertEquals(jobIdIn, jobIdOut); - // Configure circuit breaker - when(mockCircuitBreaker.allowRequest()).thenReturn(true); + // Perform the cancellation + jobQueueManagerAPI.cancelJob(jobIdOut); - // Start the job queue manager - jobQueueManagerAPI.start(); + // Verify that the cancel method was called on our mock processor + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + verify(mockCancellableProcessor).cancel(mockJob); + }); - final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + } + } - final Optional instance = jobQueueManagerAPI.getInstance(jobIdOut); - assertTrue(instance.isPresent()); - // Use our TestJobProcessor - final ComplexCancellableJobProcessor testJobProcessor = (ComplexCancellableJobProcessor) instance.get(); + /** + * Method to test: Job cancellation in JobQueueManagerAPI + * Given Scenario: Running job is canceled + * ExpectedResult: Job is successfully canceled and its state transitions are correct + */ + @Test + public void test_complex_cancelJob() throws Exception { - // Wait for the job to start processing - Awaitility.await() - .atMost(5, TimeUnit.SECONDS) - .until(() -> testJobProcessor.awaitProcessingStart(100, TimeUnit.MILLISECONDS)); + try (MockedStatic apiLocator = mockStatic(APILocator.class)) { + + final String testQueue = "myTestQueue"; + final String jobId = "job5644"; + + // Create a mock job + Job mockJob = mock(Job.class); + when(mockJob.queueName()).thenReturn(testQueue); + when(mockJob.id()).thenReturn(jobId); + when(mockJob.state()).thenReturn(JobState.RUNNING); + when(mockJob.withState(any(JobState.class))).thenReturn(mockJob); + + // Configure JobQueue + when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); + when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); + when(mockJobQueue.hasJobBeenInState(any(), eq(JobState.CANCELLING))).thenReturn(true); + when(mockJobQueue.createJob(anyString(), anyMap())).thenReturn(jobId); + + // List to capture job state updates + List stateUpdates = new CopyOnWriteArrayList<>(); + + when(mockJob.withState(any())).thenAnswer(inv -> { + stateUpdates.add(inv.getArgument(0)); + return mockJob; + }); + when(mockJob.markAsRunning()).thenAnswer(inv -> { + stateUpdates.add(JobState.RUNNING); + return mockJob; + }); + when(mockJob.markAsCanceled(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.CANCELED); + return mockJob; + }); + when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.COMPLETED); + return mockJob; + }); + when(mockJob.markAsFailed(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.FAILED); + return mockJob; + }); + when(mockJob.progress()).thenReturn(0f); + when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); + when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn( + mockJob); + + when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) + .thenAnswer(invocation -> Collections.singletonList(mockJob)); + + // Create Mock Job Event + JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); + when(mockEvent.getJob()).thenReturn(mockJob); + + // Mock system events + LocalSystemEventsAPI localSystemEventsAPI = mock(LocalSystemEventsAPI.class); + apiLocator.when(APILocator::getLocalSystemEventsAPI).thenReturn(localSystemEventsAPI); + + // Handle the event notification + doAnswer(invocation -> { + JobCancelRequestEvent event = invocation.getArgument(0); + ((JobQueueManagerAPIImpl) jobQueueManagerAPI).onCancelRequestJob(event); + return null; + }).when(localSystemEventsAPI).notify(any(JobCancelRequestEvent.class)); + + // Register the test processor + jobQueueManagerAPI.registerProcessor(testQueue, ComplexCancellableJobProcessor.class); + + // Configure circuit breaker + when(mockCircuitBreaker.allowRequest()).thenReturn(true); + + // Start the job queue manager + jobQueueManagerAPI.start(); + + final String jobIdOut = jobQueueManagerAPI.createJob(testQueue, Map.of()); + + final Optional instance = jobQueueManagerAPI.getInstance(jobIdOut); + assertTrue(instance.isPresent()); + // Use our TestJobProcessor + final ComplexCancellableJobProcessor testJobProcessor = (ComplexCancellableJobProcessor) instance.get(); + + // Wait for the job to start processing + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .until(() -> testJobProcessor.awaitProcessingStart(100, TimeUnit.MILLISECONDS)); - // Cancel the job - jobQueueManagerAPI.cancelJob(jobId); + // Cancel the job + jobQueueManagerAPI.cancelJob(jobId); - // Wait for the job to complete (which should be due to cancellation) - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .until(() -> testJobProcessor.awaitProcessingCompleted(100, TimeUnit.MILLISECONDS)); + // Wait for the job to complete (which should be due to cancellation) + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .until(() -> testJobProcessor.awaitProcessingCompleted(100, + TimeUnit.MILLISECONDS)); - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> stateUpdates.contains(JobState.CANCELED)); + Awaitility.await() + .atMost(10, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> stateUpdates.contains(JobState.CANCELED)); - // Clean up - jobQueueManagerAPI.close(); + // Clean up + jobQueueManagerAPI.close(); + } } /**