diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 958e206224c8..d7e0311f1240 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -10,6 +10,7 @@ import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotmarketing.exception.DotDataException; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -138,6 +139,13 @@ String createJob(String queueName, Map parameters) */ void setRetryStrategy(String queueName, RetryStrategy retryStrategy); + /** + * Retrieves the retry strategy for a specific queue. + * @param jobId The ID of the job + * @return The processor instance, or an empty optional if not found + */ + Optional getInstance(final String jobId); + /** * @return The CircuitBreaker instance */ diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 2bddb7bc72b4..edffb1d6770e 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -14,6 +14,7 @@ import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; +import com.dotcms.jobs.business.error.JobProcessorInstantiationException; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; @@ -38,6 +39,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -247,14 +249,20 @@ public Map> getQueueNames() { public String createJob(final String queueName, final Map parameters) throws JobProcessorNotFoundException, DotDataException { final String queueNameLower = queueName.toLowerCase(); - if (!processors.containsKey(queueNameLower)) { + final Class clazz = processors.get(queueNameLower); + if (null == clazz) { final var error = new JobProcessorNotFoundException(queueName); Logger.error(JobQueueManagerAPIImpl.class, error); throw error; } + //first attempt instantiating the processor, cuz if we cant no use to create an entry in the db + final var processor = newInstanceOfProcessor(queueNameLower).orElseThrow(); + // now that we know we can instantiate the processor, we can add it to the map of instances + // But first we need the job id try { - String jobId = jobQueue.createJob(queueNameLower, parameters); + final String jobId = jobQueue.createJob(queueNameLower, parameters); + addInstanceRef(jobId, processor); eventProducer.getEvent(JobCreatedEvent.class).fire( new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters) ); @@ -359,6 +367,12 @@ public void setRetryStrategy(final String queueName, final RetryStrategy retrySt retryStrategies.put(queueName, retryStrategy); } + @Override + @VisibleForTesting + public Optional getInstance(final String jobId) { + return Optional.ofNullable(processorInstancesByJobId.get(jobId)); + } + @Override @VisibleForTesting public CircuitBreaker getCircuitBreaker() { @@ -459,7 +473,7 @@ private void processJobs() { try { - boolean jobProcessed = processNextJob(); + final boolean jobProcessed = processNextJob(); if (jobProcessed) { emptyQueueCount = 0; } else { @@ -620,11 +634,8 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException */ private void processJob(final Job job) throws DotDataException { - Class processorClass = processors.get(job.queueName()); - if (processorClass != null) { - - final JobProcessor processor = newJobProcessorInstance(job, - processorClass); + final JobProcessor processor = processorInstancesByJobId.get(job.id()); + if (processor != null) { final ProgressTracker progressTracker = new DefaultProgressTracker(); Job runningJob = job.markAsRunning().withProgressTracker(progressTracker); @@ -669,8 +680,6 @@ private void processJob(final Job job) throws DotDataException { handleJobFailure( runningJob, processor, e, e.getMessage(), "Job execution" ); - } finally { - processorInstancesByJobId.remove(job.id()); } } else { @@ -682,22 +691,40 @@ private void processJob(final Job job) throws DotDataException { } /** - * Instantiate a new JobProcessor instance for a specific job. and store the reference in a map. - * @param job - * @param processorClass - * @return + * Creates a new instance of a JobProcessor for a specific queue. + * @param queueName + * @return An optional containing the new JobProcessor instance, or an empty optional if the processor could not be created. + */ + Optional newInstanceOfProcessor(final String queueName) { + final var processorClass = processors.get(queueName); + if (processorClass != null) { + try { + return Optional.of(processorClass.getDeclaredConstructor().newInstance()); + } catch (Exception e) { + Logger.error(this, "Error creating job processor", e); + throw new JobProcessorInstantiationException(processorClass,e); + } + } + return Optional.empty(); + } + + /** + * Once we're sure a processor can be instantiated, we add it to the map of instances. + * @param jobId The ID of the job + * @param processor The processor to add + * @return The processor instance */ - private JobProcessor newJobProcessorInstance(final Job job, final Class processorClass) { + private void addInstanceRef(final String jobId, final JobProcessor processor) { //Get an instance and put it in the map - return processorInstancesByJobId.computeIfAbsent( - job.id(), k -> { - try { - return processorClass.getDeclaredConstructor().newInstance(); - } catch (Exception e) { - throw new DotRuntimeException("Error creating job processor", e); - } - } - ); + processorInstancesByJobId.putIfAbsent(jobId, processor); + } + + /** + * Removes a processor instance from the map of instances. + * @param jobId The ID of the job + */ + private void removeInstanceRef(final String jobId) { + processorInstancesByJobId.remove(jobId); } /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorInstantiationException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorInstantiationException.java new file mode 100644 index 000000000000..3e243a2e045f --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessorInstantiationException.java @@ -0,0 +1,20 @@ +package com.dotcms.jobs.business.error; + +import com.dotcms.jobs.business.processor.JobProcessor; + +/** + * Exception thrown when an error occurs while attempting to instantiate a new JobProcessor. + */ +public class JobProcessorInstantiationException extends RuntimeException{ + + /** + * Constructs a new JobProcessorInstantiationException with the specified processor class and cause. + * + * @param processorClass The class of the JobProcessor that could not be instantiated + * @param cause The underlying cause of the error (can be null) + */ + public JobProcessorInstantiationException(Class processorClass, Throwable cause) { + super("Failed to instantiate a new JobProcessor out of the provided class: " + processorClass.getName(), cause); + } + +} diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 56a6f9e852d4..4d8f542feb1f 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,7 +49,7 @@ public class JobQueueManagerAPIIntegrationTest { */ @BeforeAll static void setUp() throws Exception { - + System.out.println("JobQueueManagerAPIIntegrationTest.setUp"); // Initialize the test environment IntegrationTestInitService.getInstance().init(); @@ -63,15 +64,20 @@ static void setUp() throws Exception { */ @AfterAll static void cleanUp() throws Exception { - - jobQueueManagerAPI.close(); + System.out.println("JobQueueManagerAPIIntegrationTest.cleanUp"); + if(null != jobQueueManagerAPI) { + jobQueueManagerAPI.close(); + } clearJobs(); } @BeforeEach void reset() { + System.out.println("JobQueueManagerAPIIntegrationTest.reset"); // Reset circuit breaker - jobQueueManagerAPI.getCircuitBreaker().reset(); + if(null != jobQueueManagerAPI) { + jobQueueManagerAPI.getCircuitBreaker().reset(); + } } /** @@ -81,9 +87,9 @@ void reset() { */ @Test void test_CreateAndProcessJob() throws Exception { - + System.out.println("JobQueueManagerAPIIntegrationTest.test_CreateAndProcessJob"); // Register a test processor - jobQueueManagerAPI.registerProcessor("testQueue", new TestJobProcessor()); + jobQueueManagerAPI.registerProcessor("testQueue", TestJobProcessor.class); // Start the JobQueueManagerAPI if (!jobQueueManagerAPI.isStarted()) { @@ -126,8 +132,8 @@ void test_CreateAndProcessJob() throws Exception { */ @Test void test_FailingJob() throws Exception { - - jobQueueManagerAPI.registerProcessor("failingQueue", new FailingJobProcessor()); + System.out.println("JobQueueManagerAPIIntegrationTest.test_FailingJob"); + jobQueueManagerAPI.registerProcessor("failingQueue", FailingJobProcessor.class); RetryStrategy contentImportRetryStrategy = new ExponentialBackoffRetryStrategy( 5000, 300000, 2.0, 0 ); @@ -174,9 +180,8 @@ void test_FailingJob() throws Exception { */ @Test void test_CancelJob() throws Exception { - - CancellableJobProcessor processor = new CancellableJobProcessor(); - jobQueueManagerAPI.registerProcessor("cancellableQueue", processor); + System.out.println("JobQueueManagerAPIIntegrationTest.test_CancelJob"); + jobQueueManagerAPI.registerProcessor("cancellableQueue", CancellableJobProcessor.class); if (!jobQueueManagerAPI.isStarted()) { jobQueueManagerAPI.start(); @@ -184,7 +189,12 @@ void test_CancelJob() throws Exception { } Map parameters = new HashMap<>(); - String jobId = jobQueueManagerAPI.createJob("cancellableQueue", parameters); + final String jobId = jobQueueManagerAPI.createJob("cancellableQueue", parameters); + + //Get the instance of the job processor immediately after creating the job cuz once it gets cancelled, it will be removed from the map + final Optional instance = jobQueueManagerAPI.getInstance(jobId); + assertTrue(instance.isPresent(),()->"Should have been able to create an instance of the job processor"); + final CancellableJobProcessor processor = (CancellableJobProcessor)instance.get(); Awaitility.await().atMost(5, TimeUnit.SECONDS) .until(() -> { @@ -212,6 +222,7 @@ void test_CancelJob() throws Exception { Job job = jobQueueManagerAPI.getJob(jobId); assertEquals(JobState.CANCELED, job.state(), "Job should be in CANCELED state"); + assertTrue(processor.wasCanceled(), "Job processor should have been canceled"); }); @@ -226,10 +237,9 @@ void test_CancelJob() throws Exception { */ @Test void test_JobRetry() throws Exception { - - int maxRetries = 3; - RetryingJobProcessor processor = new RetryingJobProcessor(maxRetries); - jobQueueManagerAPI.registerProcessor("retryQueue", processor); + System.out.println("JobQueueManagerAPIIntegrationTest.test_JobRetry"); + final int maxRetries = RetryingJobProcessor.MAX_RETRIES; + jobQueueManagerAPI.registerProcessor("retryQueue", RetryingJobProcessor.class); RetryStrategy retryStrategy = new ExponentialBackoffRetryStrategy( 100, 1000, 2.0, maxRetries @@ -243,6 +253,9 @@ void test_JobRetry() throws Exception { Map parameters = new HashMap<>(); String jobId = jobQueueManagerAPI.createJob("retryQueue", parameters); + final Optional instance = jobQueueManagerAPI.getInstance(jobId); + assertTrue(instance.isPresent(),()->"Should be able to create an instance of the job processor"); + RetryingJobProcessor processor = (RetryingJobProcessor)instance.get(); CountDownLatch latch = new CountDownLatch(1); jobQueueManagerAPI.watchJob(jobId, job -> { @@ -274,10 +287,9 @@ void test_JobRetry() throws Exception { */ @Test void test_JobWithProgressTracker() throws Exception { - + System.out.println("JobQueueManagerAPIIntegrationTest.test_JobWithProgressTracker"); // Register a processor that uses progress tracking - ProgressTrackingJobProcessor processor = new ProgressTrackingJobProcessor(); - jobQueueManagerAPI.registerProcessor("progressQueue", processor); + jobQueueManagerAPI.registerProcessor("progressQueue", ProgressTrackingJobProcessor.class); // Start the JobQueueManagerAPI if (!jobQueueManagerAPI.isStarted()) { @@ -336,11 +348,11 @@ void test_JobWithProgressTracker() throws Exception { */ @Test void test_CombinedScenarios() throws Exception { - + System.out.println("JobQueueManagerAPIIntegrationTest.test_CombinedScenarios"); // Register processors for different scenarios - jobQueueManagerAPI.registerProcessor("successQueue", new TestJobProcessor()); - jobQueueManagerAPI.registerProcessor("failQueue", new FailingJobProcessor()); - jobQueueManagerAPI.registerProcessor("cancelQueue", new CancellableJobProcessor()); + jobQueueManagerAPI.registerProcessor("successQueue", TestJobProcessor.class); + jobQueueManagerAPI.registerProcessor("failQueue", FailingJobProcessor.class); + jobQueueManagerAPI.registerProcessor("cancelQueue", CancellableJobProcessor.class); // Set up retry strategy for failing jobs RetryStrategy retryStrategy = new ExponentialBackoffRetryStrategy( @@ -424,7 +436,7 @@ void test_CombinedScenarios() throws Exception { }); } - private static class ProgressTrackingJobProcessor implements JobProcessor { + static class ProgressTrackingJobProcessor implements JobProcessor { @Override public void process(Job job) { ProgressTracker tracker = job.progressTracker().orElseThrow( @@ -445,22 +457,23 @@ public Map getResultMetadata(Job job) { } } - private static class RetryingJobProcessor implements JobProcessor { + static class RetryingJobProcessor implements JobProcessor { - private final int maxRetries; + public static final int MAX_RETRIES = 3; private int attempts = 0; - public RetryingJobProcessor(int maxRetries) { - this.maxRetries = maxRetries; + public RetryingJobProcessor() { + // needed for instantiation purposes } @Override public void process(Job job) { attempts++; - if (attempts <= maxRetries) { + if (attempts <= MAX_RETRIES) { throw new RuntimeException("Simulated failure, attempt " + attempts); } // If we've reached here, we've exceeded maxRetries and the job should succeed + System.out.println("Job succeeded after " + attempts + " attempts"); } @Override @@ -475,7 +488,7 @@ public int getAttempts() { } } - private static class FailingJobProcessor implements JobProcessor { + static class FailingJobProcessor implements JobProcessor { @Override public void process(Job job) { @@ -488,7 +501,7 @@ public Map getResultMetadata(Job job) { } } - private static class CancellableJobProcessor implements JobProcessor, Cancellable { + static class CancellableJobProcessor implements JobProcessor, Cancellable { private final AtomicBoolean canceled = new AtomicBoolean(false); private final AtomicBoolean wasCanceled = new AtomicBoolean(false); @@ -519,7 +532,7 @@ public boolean wasCanceled() { } } - private static class TestJobProcessor implements JobProcessor { + static class TestJobProcessor implements JobProcessor { @Override public void process(Job job) { diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 72fdfb7188d0..f4740638990b 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -93,7 +93,7 @@ public void setUp() { 1, 10 ); - jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + //jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); jobQueueManagerAPI.setRetryStrategy("testQueue", mockRetryStrategy); var event = mock(Event.class); @@ -912,7 +912,7 @@ public void test_CircuitBreaker_Opens() throws Exception { 1, 1000 ); - jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + //jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); // Start the job queue jobQueueManagerAPI.start(); @@ -996,7 +996,7 @@ public void test_CircuitBreaker_Closes() throws Exception { mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, 1, 1000 ); - jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + //jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); // Start the job queue jobQueueManagerAPI.start(); @@ -1059,7 +1059,7 @@ public void test_CircuitBreaker_Reset() throws Exception { mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, 1, 1000 ); - jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); + //jobQueueManagerAPI.registerProcessor("testQueue", mockJobProcessor); // Start the job queue jobQueueManagerAPI.start(); @@ -1119,7 +1119,7 @@ public Map getResultMetadata(Job job) { TestJobProcessor mockCancellableProcessor = mock(TestJobProcessor.class); // Set up the job queue manager to return our mock cancellable processor - jobQueueManagerAPI.registerProcessor("testQueue", mockCancellableProcessor); + //jobQueueManagerAPI.registerProcessor("testQueue", mockCancellableProcessor); // Perform the cancellation jobQueueManagerAPI.cancelJob("job123"); @@ -1219,7 +1219,7 @@ public boolean awaitProcessingCompleted(long timeout, TimeUnit unit) .thenAnswer(invocation -> Collections.singletonList(mockJob)); // Register the test processor - jobQueueManagerAPI.registerProcessor("testQueue", testJobProcessor); + //jobQueueManagerAPI.registerProcessor("testQueue", testJobProcessor); // Configure circuit breaker when(mockCircuitBreaker.allowRequest()).thenReturn(true);