diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorDiscovery.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorDiscovery.java new file mode 100644 index 000000000000..c6e79bb18cb3 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorDiscovery.java @@ -0,0 +1,88 @@ +package com.dotcms.jobs.business.api; + +import com.dotcms.jobs.business.processor.JobProcessor; +import com.dotmarketing.exception.DotRuntimeException; +import com.dotmarketing.util.Logger; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.context.Dependent; +import javax.enterprise.inject.Any; +import javax.enterprise.inject.spi.Bean; +import javax.enterprise.inject.spi.BeanManager; +import javax.inject.Inject; + +/** + * Discovers all classes that implement the JobProcessor interface using CDI. + */ +@ApplicationScoped +public class JobProcessorDiscovery { + + private final BeanManager beanManager; + + @Inject + public JobProcessorDiscovery(BeanManager beanManager) { + this.beanManager = beanManager; + } + + /** + * Default constructor required by CDI. + */ + public JobProcessorDiscovery() { + this.beanManager = null; + } + + /** + * Discovers all classes that implement the JobProcessor interface using CDI. Does not create + * instances, only finds the classes. + * + * @return A list of classes that implement the JobProcessor interface. + */ + public List> discoverJobProcessors() { + + List> processors = new ArrayList<>(); + + try { + Set> beans = beanManager.getBeans(JobProcessor.class, Any.Literal.INSTANCE); + + for (Bean bean : beans) { + Class beanClass = bean.getBeanClass(); + + if (JobProcessor.class.isAssignableFrom(beanClass)) { + + // Validate that the bean is in the correct scope + validateScope(bean); + + processors.add((Class) beanClass); + Logger.debug(this, "Discovered JobProcessor: " + beanClass.getName()); + } + } + } catch (Exception e) { + var errorMessage = "Error discovering JobProcessors"; + Logger.error(this, errorMessage, e); + throw new DotRuntimeException(errorMessage, e); + } + + if (processors.isEmpty()) { + Logger.warn(this, "No JobProcessors were discovered"); + } + + return processors; + } + + /** + * Validates that the scope of the bean is correct for a JobProcessor. + * + * @param bean The bean to validate. + */ + private void validateScope(Bean bean) { + Class scope = bean.getScope(); + if (scope != Dependent.class) { + throw new DotRuntimeException( + "JobProcessor " + bean.getBeanClass().getName() + + " must use @Dependent scope, found: " + scope.getName()); + } + } + +} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorFactory.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorFactory.java index 71855f94d630..6e13408ce894 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorFactory.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorFactory.java @@ -1,32 +1,44 @@ package com.dotcms.jobs.business.api; +import com.dotcms.cdi.CDIUtils; import com.dotcms.jobs.business.error.JobProcessorInstantiationException; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotmarketing.util.Logger; +import java.util.Optional; import javax.enterprise.context.ApplicationScoped; @ApplicationScoped public class JobProcessorFactory { - public JobProcessorFactory() { - // Default constructor for CDI - } - /** * Creates a new instance of the specified job processor class. + * First attempts to get the processor from CDI context, falls back to direct instantiation if that fails. * * @param processorClass The class of the job processor to create. - * @return An optional containing the new job processor instance, or an empty optional if the - * processor could not be created. + * @return A new job processor instance + * @throws JobProcessorInstantiationException if creation fails through both methods */ - JobProcessor newInstance( - Class processorClass) { + JobProcessor newInstance(Class processorClass) { + + Optional cdiInstance = CDIUtils.getBean(processorClass); + + if (cdiInstance.isPresent()) { + return cdiInstance.get(); + } + + // If CDI fails, try direct instantiation + return createInstance(processorClass); + } + + /** + * Creates a new instance using reflection. + */ + private JobProcessor createInstance(Class processorClass) { try { return processorClass.getDeclaredConstructor().newInstance(); } catch (Exception e) { - Logger.error(this, "Error creating job processor", e); + Logger.error(this, "Error creating job processor via reflection", e); throw new JobProcessorInstantiationException(processorClass, e); } } - -} +} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorScanner.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorScanner.java deleted file mode 100644 index 701f2f06b6af..000000000000 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobProcessorScanner.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.dotcms.jobs.business.api; - -import com.dotcms.jobs.business.processor.JobProcessor; -import com.dotmarketing.util.Logger; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import javax.enterprise.context.ApplicationScoped; -import org.jboss.jandex.ClassInfo; -import org.jboss.jandex.DotName; -import org.jboss.jandex.Index; -import org.jboss.jandex.IndexReader; - -/** - * Scans the classpath for classes that implement the JobProcessor interface. - * This class uses Jandex to scan the classpath for classes that implement the JobProcessor interface. - */ -@ApplicationScoped -public class JobProcessorScanner { - - /** - * Discovers all classes that implement the JobProcessor interface. - * @return A list of classes that implement the JobProcessor interface. - */ - public List> discoverJobProcessors() { - List> jobProcessors = new ArrayList<>(); - try { - - Index index = getJandexIndex(); - DotName jobProcessorInterface = DotName.createSimple(JobProcessor.class.getName()); - - Collection implementors = index.getAllKnownImplementors(jobProcessorInterface); - - for (ClassInfo classInfo : implementors) { - String className = classInfo.name().toString(); - - Class clazz = Class.forName(className); - if (JobProcessor.class.isAssignableFrom(clazz)) { - jobProcessors.add((Class) clazz); - } - } - - } catch (IOException | ClassNotFoundException e) { - Logger.error(JobProcessorScanner.class, "Error discovering JobProcessors", e); - - } - return jobProcessors; - } - - /** - * Reads the Jandex index file. - * @return The Jandex index. - * @throws IOException If the Jandex index file cannot be read. - */ - private Index getJandexIndex() throws IOException { - InputStream input = getClass().getClassLoader().getResourceAsStream("META-INF/jandex.idx"); - if (input == null) { - throw new IOException("Jandex index not found"); - } - IndexReader reader = new IndexReader(input); - return reader.read(); - } - -} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 0fcb72c595f3..a4ad9347c4a5 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -51,13 +51,20 @@ public interface JobQueueManagerAPI { */ void close() throws Exception; + /** + * Registers a job processor + * + * @param processor The job processor to register + */ + void registerProcessor(Class processor); + /** * Registers a job processor for a specific queue. * * @param queueName The name of the queue * @param processor The job processor to register */ - void registerProcessor(final String queueName, final Class processor); + void registerProcessor(String queueName, Class processor); /** * Retrieves the job processors for all registered queues. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index e789c70bd919..7fea2b488969 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -28,6 +28,7 @@ import com.dotcms.jobs.business.processor.DefaultRetryStrategy; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; +import com.dotcms.jobs.business.processor.Queue; import com.dotcms.jobs.business.processor.Validator; import com.dotcms.jobs.business.queue.JobQueue; import com.dotcms.jobs.business.queue.error.JobNotFoundException; @@ -35,6 +36,7 @@ import com.dotcms.jobs.business.queue.error.JobQueueException; import com.dotcms.jobs.business.util.JobUtil; import com.dotcms.system.event.local.model.EventSubscriber; +import com.dotcms.util.AnnotationUtils; import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DoesNotExistException; import com.dotmarketing.exception.DotDataException; @@ -45,6 +47,7 @@ import java.time.ZoneId; import java.util.Arrays; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -127,17 +130,19 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private static final int EMPTY_QUEUE_RESET_THRESHOLD = Integer.MAX_VALUE - 1000; /** - * Constructs a new JobQueueManagerAPIImpl. - * This constructor initializes the job queue manager with all necessary dependencies and configurations. + * Constructs a new JobQueueManagerAPIImpl. This constructor initializes the job queue manager + * with all necessary dependencies and configurations. * * @param jobQueue The JobQueue implementation to use for managing jobs. - * @param jobQueueConfig The JobQueueConfig implementation providing configuration settings. + * @param jobQueueConfig The JobQueueConfig implementation providing configuration + * settings. * @param circuitBreaker The CircuitBreaker implementation for fault tolerance. * @param defaultRetryStrategy The default retry strategy to use for failed jobs. * @param realTimeJobMonitor The RealTimeJobMonitor for handling real-time job updates. * @param jobProcessorFactory The JobProcessorFactory for creating job processors instances. * @param retryPolicyProcessor The RetryPolicyProcessor for processing retry policies. * @param abandonedJobDetector The AbandonedJobDetector for detecting abandoned jobs. + * @param discovery The JobProcessorDiscovery for discovering job processors. */ @Inject public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, @@ -147,7 +152,8 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, RealTimeJobMonitor realTimeJobMonitor, JobProcessorFactory jobProcessorFactory, RetryPolicyProcessor retryPolicyProcessor, - AbandonedJobDetector abandonedJobDetector) { + AbandonedJobDetector abandonedJobDetector, + JobProcessorDiscovery discovery) { this.jobQueue = jobQueue; this.threadPoolSize = jobQueueConfig.getThreadPoolSize(); @@ -161,6 +167,9 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, this.abandonedJobDetector = abandonedJobDetector; this.realTimeJobMonitor = realTimeJobMonitor; + // Register discovered processors by CDI + discovery.discoverJobProcessors().forEach(this::registerProcessor); + APILocator.getLocalSystemEventsAPI().subscribe( JobCancelRequestEvent.class, (EventSubscriber) this::onCancelRequestJob @@ -242,8 +251,19 @@ public void close() throws Exception { } } + @Override + public void registerProcessor(final Class processor) { + + Queue queue = AnnotationUtils.getBeanAnnotation(processor, Queue.class); + String queueName = Objects.nonNull(queue) ? queue.value() : processor.getName(); + registerProcessor(queueName, processor); + } + @Override public void registerProcessor(final String queueName, final Class processor) { + + Logger.info(this, "Registering JobProcessor: " + processor.getName()); + final Class jobProcessor = processors.get(queueName); if (null != jobProcessor) { Logger.warn(this, String.format( @@ -860,7 +880,6 @@ private void removeInstanceRef(final String jobId) { * @param job The job that completed. * @param processor The processor that handled the job. */ - @WrapInTransaction private void handleJobCompletion(final Job job, final JobProcessor processor) throws DotDataException { @@ -937,7 +956,6 @@ private void handleJobCancelling(final Job job, final JobProcessor processor) { * @param exception The exception that caused the failure. * @param processingStage The stage of processing where the failure occurred. */ - @WrapInTransaction private void handleJobFailure(final Job job, final JobProcessor processor, final Exception exception, final String processingStage) throws DotDataException { @@ -961,7 +979,6 @@ private void handleJobFailure(final Job job, final JobProcessor processor, * @param errorMessage The error message to include in the job result. * @param processingStage The stage of processing where the failure occurred. */ - @WrapInTransaction private void handleJobFailure(final Job job, final JobProcessor processor, final Exception exception, final String errorMessage, final String processingStage) throws DotDataException { diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailSuccessJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailSuccessJob.java index ae34091b2bbd..dbe4bce45963 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailSuccessJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/FailSuccessJob.java @@ -5,8 +5,10 @@ import com.dotcms.jobs.business.processor.Queue; import com.dotmarketing.exception.DotRuntimeException; import java.util.Map; +import javax.enterprise.context.Dependent; @Queue("failSuccess") +@Dependent public class FailSuccessJob implements JobProcessor { @Override diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java index cdfa600a319f..96a99f2e2f35 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java @@ -45,6 +45,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongConsumer; +import javax.enterprise.context.Dependent; /** * Processor implementation for handling content import operations in dotCMS. This class provides @@ -73,6 +74,7 @@ */ @Queue("importContentlets") @NoRetryPolicy +@Dependent public class ImportContentletsProcessor implements JobProcessor, Validator, Cancellable { private static final String PARAMETER_LANGUAGE = "language"; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java index 1fc4b9708984..5065c41bd5dd 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/LargeFileReader.java @@ -15,12 +15,14 @@ import java.io.FileReader; import java.util.Map; import java.util.Optional; +import javax.enterprise.context.Dependent; /** * This class reads a large file and prints the content to the log. * It is here for the sole purpose of demonstrating the job queue system. */ @Queue("demo") +@Dependent public class LargeFileReader implements JobProcessor, Cancellable { public static final int LOG_EVERY_LINES = 1; diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/JobQueueManagerHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/JobQueueManagerHelper.java deleted file mode 100644 index 38f104aed278..000000000000 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/JobQueueManagerHelper.java +++ /dev/null @@ -1,121 +0,0 @@ -package com.dotcms.rest.api.v1; - -import com.dotcms.jobs.business.api.JobProcessorScanner; -import com.dotcms.jobs.business.api.JobQueueManagerAPI; -import com.dotcms.jobs.business.processor.JobProcessor; -import com.dotcms.jobs.business.processor.Queue; -import com.dotcms.util.AnnotationUtils; -import com.dotmarketing.util.Logger; - -import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import java.lang.reflect.Constructor; -import java.util.List; -import java.util.Objects; - -/** - * Helper class for managing job queue processors in the JobQueueManagerAPI. - *

- * This class is responsible for discovering job processors, registering them with - * the JobQueueManagerAPI, and shutting down the JobQueueManagerAPI when needed. - */ -@ApplicationScoped -public class JobQueueManagerHelper { - - private final JobQueueManagerAPI jobQueueManagerAPI; - private final JobProcessorScanner scanner; - - /** - * Constructor that injects the {@link JobProcessorScanner} and {@link JobQueueManagerAPI}. - * - * @param scanner The JobProcessorScanner to discover job processors - * @param jobQueueManagerAPI The JobQueueManagerAPI instance to register processors with - */ - @Inject - public JobQueueManagerHelper(final JobProcessorScanner scanner, final JobQueueManagerAPI jobQueueManagerAPI) { - this.scanner = scanner; - this.jobQueueManagerAPI = jobQueueManagerAPI; - } - - /** - * Default constructor required by CDI. - */ - public JobQueueManagerHelper() { - this.scanner = null; - this.jobQueueManagerAPI = null; - } - - /** - * Registers all discovered job processors with the JobQueueManagerAPI. - * If the JobQueueManagerAPI is not started, it starts the API before registering the processors. - */ - public void registerProcessors() { - if (!jobQueueManagerAPI.isStarted()) { - jobQueueManagerAPI.start(); - Logger.info(this.getClass(), "JobQueueManagerAPI started"); - } - - List> processors = scanner.discoverJobProcessors(); - processors.forEach(processor -> { - try { - if (!testInstantiation(processor)) { - return; - } - Logger.info(this.getClass(), "Registering JobProcessor: " + processor.getName()); - registerProcessor(processor); - } catch (Exception e) { - Logger.error(this.getClass(), "Unable to register JobProcessor ", e); - } - }); - } - - /** - * Tests whether a given job processor can be instantiated by attempting to - * create an instance of the processor using its default constructor. - * - * @param processor The processor class to test for instantiation - * @return true if the processor can be instantiated, false otherwise - */ - private boolean testInstantiation(final Class processor) { - try { - Constructor declaredConstructor = processor.getDeclaredConstructor(); - declaredConstructor.newInstance(); - return true; - } catch (Exception e) { - Logger.error(this.getClass(), String.format(" JobProcessor [%s] cannot be instantiated and will be ignored.", processor.getName()), e); - } - return false; - } - - /** - * Registers a job processor with the JobQueueManagerAPI using the queue name specified - * in the {@link Queue} annotation, if present. If no annotation is found, the processor's - * class name is used as the queue name. - * - * @param processor the processor class to register - */ - private void registerProcessor(final Class processor) { - Queue queue = AnnotationUtils.getBeanAnnotation(processor, Queue.class); - if (Objects.nonNull(queue)) { - jobQueueManagerAPI.registerProcessor(queue.value(), processor); - } else { - jobQueueManagerAPI.registerProcessor(processor.getName(), processor); - } - } - - /** - * Shuts down the JobQueueManagerAPI if it is currently started. - * If the JobQueueManagerAPI is started, it attempts to close it gracefully. - * In case of an error during the shutdown process, the error is logged. - */ - public void shutdown() { - if (jobQueueManagerAPI.isStarted()) { - try { - jobQueueManagerAPI.close(); - Logger.info(this.getClass(), "JobQueueManagerAPI successfully closed"); - } catch (Exception e) { - Logger.error(this.getClass(), e.getMessage(), e); - } - } - } -} diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/content/dotimport/ContentImportHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/content/dotimport/ContentImportHelper.java index 8487fdbe5a79..ac376e1d7c6a 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/content/dotimport/ContentImportHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/content/dotimport/ContentImportHelper.java @@ -2,7 +2,6 @@ import com.dotcms.jobs.business.api.JobQueueManagerAPI; import com.dotcms.jobs.business.job.Job; -import com.dotcms.rest.api.v1.JobQueueManagerHelper; import com.dotcms.rest.api.v1.temp.DotTempFile; import com.dotmarketing.business.APILocator; import com.dotmarketing.business.web.WebAPILocator; @@ -11,14 +10,11 @@ import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; import com.liferay.portal.model.User; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; +import java.util.HashMap; +import java.util.Map; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; -import java.util.HashMap; -import java.util.Map; /** * Helper class for managing content import operations in the dotCMS application. @@ -32,18 +28,15 @@ public class ContentImportHelper { private final JobQueueManagerAPI jobQueueManagerAPI; - private final JobQueueManagerHelper jobQueueManagerHelper; /** * Constructor for dependency injection. * * @param jobQueueManagerAPI The API for managing job queues. - * @param jobQueueManagerHelper Helper for job queue management. */ @Inject - public ContentImportHelper(final JobQueueManagerAPI jobQueueManagerAPI, final JobQueueManagerHelper jobQueueManagerHelper) { + public ContentImportHelper(final JobQueueManagerAPI jobQueueManagerAPI) { this.jobQueueManagerAPI = jobQueueManagerAPI; - this.jobQueueManagerHelper = jobQueueManagerHelper; } /** @@ -51,23 +44,6 @@ public ContentImportHelper(final JobQueueManagerAPI jobQueueManagerAPI, final Jo */ public ContentImportHelper() { this.jobQueueManagerAPI = null; - this.jobQueueManagerHelper = null; - } - - /** - * Initializes the helper by registering job processors during application startup. - */ - @PostConstruct - public void onInit() { - jobQueueManagerHelper.registerProcessors(); - } - - /** - * Cleans up resources and shuts down the helper during application shutdown. - */ - @PreDestroy - public void onDestroy() { - jobQueueManagerHelper.shutdown(); } /** diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java index c3e86f797c71..a3368e5b28ea 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -8,8 +8,6 @@ import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.jobs.business.job.JobState; -import com.dotcms.jobs.business.processor.JobProcessor; -import com.dotcms.rest.api.v1.JobQueueManagerHelper; import com.dotcms.rest.api.v1.temp.DotTempFile; import com.dotcms.rest.api.v1.temp.TempFileAPI; import com.dotmarketing.business.APILocator; @@ -18,7 +16,6 @@ import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.annotations.VisibleForTesting; import com.liferay.portal.model.User; import java.io.InputStream; import java.time.format.DateTimeFormatter; @@ -26,8 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Consumer; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; @@ -40,36 +35,14 @@ public class JobQueueHelper { private JobQueueManagerAPI jobQueueManagerAPI; - private JobQueueManagerHelper jobQueueManagerHelper; public JobQueueHelper() { //default constructor Mandatory for CDI } @Inject - public JobQueueHelper(JobQueueManagerAPI jobQueueManagerAPI, JobQueueManagerHelper jobQueueManagerHelper) { + public JobQueueHelper(JobQueueManagerAPI jobQueueManagerAPI) { this.jobQueueManagerAPI = jobQueueManagerAPI; - this.jobQueueManagerHelper = jobQueueManagerHelper; - } - - /** - * Registers a processor - * @param queueName The name of the queue - * @param processor Class of the processor - */ - @VisibleForTesting - void registerProcessor(final String queueName, final Class processor){ - jobQueueManagerAPI.registerProcessor(queueName, processor); - } - - @PostConstruct - public void onInit() { - jobQueueManagerHelper.registerProcessors(); - } - - @PreDestroy - public void onDestroy() { - jobQueueManagerHelper.shutdown(); } /** diff --git a/dotCMS/src/main/java/com/dotmarketing/listeners/ContextLifecycleListener.java b/dotCMS/src/main/java/com/dotmarketing/listeners/ContextLifecycleListener.java index 0cf3775d0d0c..e3624c919066 100644 --- a/dotCMS/src/main/java/com/dotmarketing/listeners/ContextLifecycleListener.java +++ b/dotCMS/src/main/java/com/dotmarketing/listeners/ContextLifecycleListener.java @@ -5,6 +5,7 @@ import com.dotcms.enterprise.LicenseUtil; import com.dotcms.rest.api.v1.system.websocket.SystemEventsWebSocketEndPoint; import com.dotcms.util.AsciiArt; +import com.dotmarketing.business.APILocator; import com.dotmarketing.business.CacheLocator; import com.dotmarketing.common.reindex.ReindexThread; import com.dotmarketing.quartz.QuartzUtils; @@ -49,6 +50,9 @@ public void contextDestroyed(ServletContextEvent arg0) { Try.run(() -> ReindexThread.stopThread()) .onFailure(e -> Logger.warn(ContextLifecycleListener.class, "Shutdown : " + e.getMessage())); + Try.run(() -> APILocator.getJobQueueManagerAPI().close()) + .onFailure(e -> Logger.warn(ContextLifecycleListener.class, "Shutdown : " + e.getMessage())); + Logger.info(this, "Shutdown : Finished."); } diff --git a/dotCMS/src/main/java/com/dotmarketing/servlets/InitServlet.java b/dotCMS/src/main/java/com/dotmarketing/servlets/InitServlet.java index d974640ceb15..3b3362d2424a 100644 --- a/dotCMS/src/main/java/com/dotmarketing/servlets/InitServlet.java +++ b/dotCMS/src/main/java/com/dotmarketing/servlets/InitServlet.java @@ -241,6 +241,9 @@ public void init(ServletConfig config) throws ServletException { // Starting the re-indexation thread ReindexThread.startThread(); + + // Start the job queue manager + APILocator.getJobQueueManagerAPI().start(); // Tell the world we are started up System.setProperty(WebKeys.DOTCMS_STARTED_UP, "true"); diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index 09d25aabb0ad..828fa0f99c15 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -147,6 +147,8 @@ public boolean awaitProcessingCompleted(long timeout, TimeUnit unit) private AbandonedJobDetector abandonedJobDetector; + private JobProcessorDiscovery jobProcessorDiscovery; + /** * Factory to create mock JobProcessor instances for testing. * This is how we instruct the JobQueueManagerAPI to use our mock processors. @@ -177,10 +179,12 @@ public void setUp() { mockCircuitBreaker = mock(CircuitBreaker.class); retryPolicyProcessor = mock(RetryPolicyProcessor.class); abandonedJobDetector = mock(AbandonedJobDetector.class); + jobProcessorDiscovery = mock(JobProcessorDiscovery.class); jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, mockCircuitBreaker, mockRetryStrategy, jobProcessorFactory, - retryPolicyProcessor, abandonedJobDetector, 1 + retryPolicyProcessor, abandonedJobDetector, + jobProcessorDiscovery, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -803,7 +807,8 @@ public void test_CircuitBreaker_Opens() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, - retryPolicyProcessor, abandonedJobDetector, 1 + retryPolicyProcessor, abandonedJobDetector, + jobProcessorDiscovery, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -888,7 +893,8 @@ public void test_CircuitBreaker_Closes() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, - retryPolicyProcessor, abandonedJobDetector, 1 + retryPolicyProcessor, abandonedJobDetector, + jobProcessorDiscovery, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -951,7 +957,8 @@ public void test_CircuitBreaker_Reset() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, - retryPolicyProcessor, abandonedJobDetector, 1 + retryPolicyProcessor, abandonedJobDetector, + jobProcessorDiscovery, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1185,6 +1192,7 @@ private JobQueueManagerAPI newJobQueueManagerAPI(JobQueue jobQueue, JobProcessorFactory jobProcessorFactory, RetryPolicyProcessor retryPolicyProcessor, AbandonedJobDetector abandonedJobDetector, + JobProcessorDiscovery jobProcessorDiscovery, int threadPoolSize) { final var realTimeJobMonitor = new RealTimeJobMonitor(); @@ -1192,7 +1200,8 @@ private JobQueueManagerAPI newJobQueueManagerAPI(JobQueue jobQueue, return new JobQueueManagerAPIImpl( jobQueue, new JobQueueConfig(threadPoolSize), circuitBreaker, retryStrategy, realTimeJobMonitor, - jobProcessorFactory, retryPolicyProcessor, abandonedJobDetector + jobProcessorFactory, retryPolicyProcessor, abandonedJobDetector, + jobProcessorDiscovery ); } diff --git a/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java index 144f22a8822b..5b65e2bb2569 100644 --- a/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/rest/api/v1/job/JobQueueHelperIntegrationTest.java @@ -5,6 +5,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.dotcms.jobs.business.api.JobQueueManagerAPI; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotmarketing.exception.DoesNotExistException; @@ -36,6 +37,9 @@ public class JobQueueHelperIntegrationTest extends com.dotcms.Junit5WeldBaseTes @Inject JobQueueHelper jobQueueHelper; + @Inject + JobQueueManagerAPI jobQueueManagerAPI; + /** * Test with no parameters in the JobParams creating a job * Given scenario: create a job with no parameters and valid queue name @@ -46,7 +50,7 @@ public class JobQueueHelperIntegrationTest extends com.dotcms.Junit5WeldBaseTes @Test void testEmptyParams() throws DotDataException, JsonProcessingException { - jobQueueHelper.registerProcessor("demoQueue", DemoJobProcessor.class); + jobQueueManagerAPI.registerProcessor("demoQueue", DemoJobProcessor.class); final var jobParams = new JobParams(); final var user = mock(User.class); @@ -74,7 +78,7 @@ void testEmptyParams() throws DotDataException, JsonProcessingException { @Test void testCreateJobWithNoParameters() throws DotDataException { - jobQueueHelper.registerProcessor("demoQueue", DemoJobProcessor.class); + jobQueueManagerAPI.registerProcessor("demoQueue", DemoJobProcessor.class); final var user = mock(User.class); when(user.getUserId()).thenReturn("dotcms.org.1"); @@ -130,7 +134,7 @@ public Map getResultMetadata(Job job) { */ @Test void testWithValidParamsAndQueueName() throws DotDataException, JsonProcessingException { - jobQueueHelper.registerProcessor("demoQueue", DemoJobProcessor.class); + jobQueueManagerAPI.registerProcessor("demoQueue", DemoJobProcessor.class); final JobParams jobParams = new JobParams(); jobParams.setJsonParams("{}"); @@ -157,7 +161,7 @@ void testWithValidParamsAndQueueName() throws DotDataException, JsonProcessingEx */ @Test void testIsWatchable() throws DotDataException, JsonProcessingException { - jobQueueHelper.registerProcessor("testQueue", DemoJobProcessor.class); + jobQueueManagerAPI.registerProcessor("testQueue", DemoJobProcessor.class); final JobParams jobParams = new JobParams(); jobParams.setJsonParams("{}"); @@ -181,7 +185,7 @@ void testIsWatchable() throws DotDataException, JsonProcessingException { */ @Test void testGetStatusInfo() throws DotDataException, JsonProcessingException { - jobQueueHelper.registerProcessor("testQueue", DemoJobProcessor.class); + jobQueueManagerAPI.registerProcessor("testQueue", DemoJobProcessor.class); final JobParams jobParams = new JobParams(); jobParams.setJsonParams("{}");