Skip to content

Commit

Permalink
#30545 Changed job manager initialization to the InitServlet, also, c…
Browse files Browse the repository at this point in the history
…hanged the processors discovery from Jandex to CDI.
  • Loading branch information
jgambarios committed Dec 9, 2024
1 parent 8c6f422 commit 085eb37
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 271 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.util.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;

/**
* Discovers all classes that implement the JobProcessor interface using CDI.
*/
@ApplicationScoped
public class JobProcessorDiscovery {

private final BeanManager beanManager;

@Inject
public JobProcessorDiscovery(BeanManager beanManager) {
this.beanManager = beanManager;
}

/**
* Default constructor required by CDI.
*/
public JobProcessorDiscovery() {
this.beanManager = null;
}

/**
* Discovers all classes that implement the JobProcessor interface using CDI. Does not create
* instances, only finds the classes.
*
* @return A list of classes that implement the JobProcessor interface.
*/
public List<Class<? extends JobProcessor>> discoverJobProcessors() {

List<Class<? extends JobProcessor>> processors = new ArrayList<>();

try {
Set<Bean<?>> beans = beanManager.getBeans(JobProcessor.class, Any.Literal.INSTANCE);

for (Bean<?> bean : beans) {
Class<?> beanClass = bean.getBeanClass();

if (JobProcessor.class.isAssignableFrom(beanClass)) {

// Validate that the bean is in the correct scope
validateScope(bean);

processors.add((Class<? extends JobProcessor>) beanClass);
Logger.debug(this, "Discovered JobProcessor: " + beanClass.getName());
}
}
} catch (Exception e) {
var errorMessage = "Error discovering JobProcessors";
Logger.error(this, errorMessage, e);
throw new DotRuntimeException(errorMessage, e);
}

if (processors.isEmpty()) {
Logger.warn(this, "No JobProcessors were discovered");
}

return processors;
}

/**
* Validates that the scope of the bean is correct for a JobProcessor.
*
* @param bean The bean to validate.
*/
private void validateScope(Bean<?> bean) {
Class<?> scope = bean.getScope();
if (scope != Dependent.class) {
throw new DotRuntimeException(
"JobProcessor " + bean.getBeanClass().getName() +
" must use @Dependent scope, found: " + scope.getName());
}
}

}
Original file line number Diff line number Diff line change
@@ -1,32 +1,44 @@
package com.dotcms.jobs.business.api;

import com.dotcms.cdi.CDIUtils;
import com.dotcms.jobs.business.error.JobProcessorInstantiationException;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotmarketing.util.Logger;
import java.util.Optional;
import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class JobProcessorFactory {

public JobProcessorFactory() {
// Default constructor for CDI
}

/**
* Creates a new instance of the specified job processor class.
* First attempts to get the processor from CDI context, falls back to direct instantiation if that fails.
*
* @param processorClass The class of the job processor to create.
* @return An optional containing the new job processor instance, or an empty optional if the
* processor could not be created.
* @return A new job processor instance
* @throws JobProcessorInstantiationException if creation fails through both methods
*/
JobProcessor newInstance(
Class<? extends JobProcessor> processorClass) {
JobProcessor newInstance(Class<? extends JobProcessor> processorClass) {

Optional<? extends JobProcessor> cdiInstance = CDIUtils.getBean(processorClass);

if (cdiInstance.isPresent()) {
return cdiInstance.get();
}

// If CDI fails, try direct instantiation
return createInstance(processorClass);
}

/**
* Creates a new instance using reflection.
*/
private JobProcessor createInstance(Class<? extends JobProcessor> processorClass) {
try {
return processorClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
Logger.error(this, "Error creating job processor", e);
Logger.error(this, "Error creating job processor via reflection", e);
throw new JobProcessorInstantiationException(processorClass, e);
}
}

}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,20 @@ public interface JobQueueManagerAPI {
*/
void close() throws Exception;

/**
* Registers a job processor
*
* @param processor The job processor to register
*/
void registerProcessor(Class<? extends JobProcessor> processor);

/**
* Registers a job processor for a specific queue.
*
* @param queueName The name of the queue
* @param processor The job processor to register
*/
void registerProcessor(final String queueName, final Class<? extends JobProcessor> processor);
void registerProcessor(String queueName, Class<? extends JobProcessor> processor);

/**
* Retrieves the job processors for all registered queues.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import com.dotcms.jobs.business.processor.DefaultRetryStrategy;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.jobs.business.processor.Validator;
import com.dotcms.jobs.business.queue.JobQueue;
import com.dotcms.jobs.business.queue.error.JobNotFoundException;
import com.dotcms.jobs.business.queue.error.JobQueueDataException;
import com.dotcms.jobs.business.queue.error.JobQueueException;
import com.dotcms.jobs.business.util.JobUtil;
import com.dotcms.system.event.local.model.EventSubscriber;
import com.dotcms.util.AnnotationUtils;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.exception.DoesNotExistException;
import com.dotmarketing.exception.DotDataException;
Expand All @@ -45,6 +47,7 @@
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -127,17 +130,19 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {
private static final int EMPTY_QUEUE_RESET_THRESHOLD = Integer.MAX_VALUE - 1000;

/**
* Constructs a new JobQueueManagerAPIImpl.
* This constructor initializes the job queue manager with all necessary dependencies and configurations.
* Constructs a new JobQueueManagerAPIImpl. This constructor initializes the job queue manager
* with all necessary dependencies and configurations.
*
* @param jobQueue The JobQueue implementation to use for managing jobs.
* @param jobQueueConfig The JobQueueConfig implementation providing configuration settings.
* @param jobQueueConfig The JobQueueConfig implementation providing configuration
* settings.
* @param circuitBreaker The CircuitBreaker implementation for fault tolerance.
* @param defaultRetryStrategy The default retry strategy to use for failed jobs.
* @param realTimeJobMonitor The RealTimeJobMonitor for handling real-time job updates.
* @param jobProcessorFactory The JobProcessorFactory for creating job processors instances.
* @param retryPolicyProcessor The RetryPolicyProcessor for processing retry policies.
* @param abandonedJobDetector The AbandonedJobDetector for detecting abandoned jobs.
* @param discovery The JobProcessorDiscovery for discovering job processors.
*/
@Inject
public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
Expand All @@ -147,7 +152,8 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
RealTimeJobMonitor realTimeJobMonitor,
JobProcessorFactory jobProcessorFactory,
RetryPolicyProcessor retryPolicyProcessor,
AbandonedJobDetector abandonedJobDetector) {
AbandonedJobDetector abandonedJobDetector,
JobProcessorDiscovery discovery) {

this.jobQueue = jobQueue;
this.threadPoolSize = jobQueueConfig.getThreadPoolSize();
Expand All @@ -161,6 +167,9 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue,
this.abandonedJobDetector = abandonedJobDetector;
this.realTimeJobMonitor = realTimeJobMonitor;

// Register discovered processors by CDI
discovery.discoverJobProcessors().forEach(this::registerProcessor);

APILocator.getLocalSystemEventsAPI().subscribe(
JobCancelRequestEvent.class,
(EventSubscriber<JobCancelRequestEvent>) this::onCancelRequestJob
Expand Down Expand Up @@ -242,8 +251,19 @@ public void close() throws Exception {
}
}

@Override
public void registerProcessor(final Class<? extends JobProcessor> processor) {

Queue queue = AnnotationUtils.getBeanAnnotation(processor, Queue.class);
String queueName = Objects.nonNull(queue) ? queue.value() : processor.getName();
registerProcessor(queueName, processor);
}

@Override
public void registerProcessor(final String queueName, final Class<? extends JobProcessor> processor) {

Logger.info(this, "Registering JobProcessor: " + processor.getName());

final Class<? extends JobProcessor> jobProcessor = processors.get(queueName);
if (null != jobProcessor) {
Logger.warn(this, String.format(
Expand Down Expand Up @@ -860,7 +880,6 @@ private void removeInstanceRef(final String jobId) {
* @param job The job that completed.
* @param processor The processor that handled the job.
*/
@WrapInTransaction
private void handleJobCompletion(final Job job, final JobProcessor processor)
throws DotDataException {

Expand Down Expand Up @@ -937,7 +956,6 @@ private void handleJobCancelling(final Job job, final JobProcessor processor) {
* @param exception The exception that caused the failure.
* @param processingStage The stage of processing where the failure occurred.
*/
@WrapInTransaction
private void handleJobFailure(final Job job, final JobProcessor processor,
final Exception exception, final String processingStage) throws DotDataException {

Expand All @@ -961,7 +979,6 @@ private void handleJobFailure(final Job job, final JobProcessor processor,
* @param errorMessage The error message to include in the job result.
* @param processingStage The stage of processing where the failure occurred.
*/
@WrapInTransaction
private void handleJobFailure(final Job job, final JobProcessor processor,
final Exception exception, final String errorMessage, final String processingStage)
throws DotDataException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import com.dotcms.jobs.business.processor.Queue;
import com.dotmarketing.exception.DotRuntimeException;
import java.util.Map;
import javax.enterprise.context.Dependent;

@Queue("failSuccess")
@Dependent
public class FailSuccessJob implements JobProcessor {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongConsumer;
import javax.enterprise.context.Dependent;

/**
* Processor implementation for handling content import operations in dotCMS. This class provides
Expand Down Expand Up @@ -73,6 +74,7 @@
*/
@Queue("importContentlets")
@NoRetryPolicy
@Dependent
public class ImportContentletsProcessor implements JobProcessor, Validator, Cancellable {

private static final String PARAMETER_LANGUAGE = "language";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import java.io.FileReader;
import java.util.Map;
import java.util.Optional;
import javax.enterprise.context.Dependent;

/**
* This class reads a large file and prints the content to the log.
* It is here for the sole purpose of demonstrating the job queue system.
*/
@Queue("demo")
@Dependent
public class LargeFileReader implements JobProcessor, Cancellable {

public static final int LOG_EVERY_LINES = 1;
Expand Down
Loading

0 comments on commit 085eb37

Please sign in to comment.