diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java index 9bfaa637063f..cdfe2b29d287 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java @@ -10,18 +10,13 @@ public class JobQueueConfig { */ private final int threadPoolSize; - // The interval in milliseconds to poll for job updates - private final int pollJobUpdatesIntervalMilliseconds; - /** * Constructs a new JobQueueConfig * * @param threadPoolSize The number of threads to use for job processing. - * @param pollJobUpdatesIntervalMilliseconds The interval in milliseconds to poll for job updates. */ - public JobQueueConfig(int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) { + public JobQueueConfig(int threadPoolSize) { this.threadPoolSize = threadPoolSize; - this.pollJobUpdatesIntervalMilliseconds = pollJobUpdatesIntervalMilliseconds; } /** @@ -33,13 +28,4 @@ public int getThreadPoolSize() { return threadPoolSize; } - /** - * Gets the interval in milliseconds to poll for job updates. - * - * @return The interval in milliseconds to poll for job updates. - */ - public int getPollJobUpdatesIntervalMilliseconds() { - return pollJobUpdatesIntervalMilliseconds; - } - } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java index 58aa8a9869a3..c63d2616dcc6 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java @@ -16,11 +16,6 @@ public class JobQueueConfigProducer { "JOB_QUEUE_THREAD_POOL_SIZE", 10 ); - // The interval in milliseconds to poll for job updates. - static final int DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS = Config.getIntProperty( - "JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 3000 - ); - /** * Produces a JobQueueConfig object. This method is called by the CDI container to create a * JobQueueConfig instance when it is necessary for dependency injection. @@ -30,8 +25,7 @@ public class JobQueueConfigProducer { @Produces public JobQueueConfig produceJobQueueConfig() { return new JobQueueConfig( - DEFAULT_THREAD_POOL_SIZE, - DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS + DEFAULT_THREAD_POOL_SIZE ); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 0c74f378d4e6..b63796ce3c7f 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -1,10 +1,7 @@ package com.dotcms.jobs.business.api; -import com.dotcms.api.system.event.Payload; -import com.dotcms.api.system.event.SystemEventType; import com.dotcms.business.CloseDBIfOpened; import com.dotcms.business.WrapInTransaction; -import com.dotcms.jobs.business.api.events.EventProducer; import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.JobCanceledEvent; import com.dotcms.jobs.business.api.events.JobCancellingEvent; @@ -15,10 +12,12 @@ import com.dotcms.jobs.business.api.events.JobRemovedFromQueueEvent; import com.dotcms.jobs.business.api.events.JobStartedEvent; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; +import com.dotcms.jobs.business.detector.AbandonedJobDetector; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; +import com.dotcms.jobs.business.error.JobValidationException; import com.dotcms.jobs.business.error.RetryPolicyProcessor; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; @@ -30,10 +29,12 @@ import com.dotcms.jobs.business.processor.DefaultRetryStrategy; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; +import com.dotcms.jobs.business.processor.Validator; import com.dotcms.jobs.business.queue.JobQueue; import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.jobs.business.util.JobUtil; import com.dotcms.system.event.local.model.EventSubscriber; import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DoesNotExistException; @@ -41,11 +42,9 @@ import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import com.google.common.annotations.VisibleForTesting; -import io.vavr.control.Try; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -110,6 +109,7 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private final CircuitBreaker circuitBreaker; private final JobQueue jobQueue; + private final AbandonedJobDetector abandonedJobDetector; private final Map> processors; private final Map processorInstancesByJobId; private final int threadPoolSize; @@ -118,11 +118,8 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private final RetryStrategy defaultRetryStrategy; private final RetryPolicyProcessor retryPolicyProcessor; - private final ScheduledExecutorService pollJobUpdatesScheduler; - private LocalDateTime lastPollJobUpdateTime = LocalDateTime.now(); - private final RealTimeJobMonitor realTimeJobMonitor; - private final EventProducer eventProducer; + private final JobProcessorFactory jobProcessorFactory; // Cap to prevent overflow @@ -139,14 +136,9 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { * @param circuitBreaker The CircuitBreaker implementation for fault tolerance. * @param defaultRetryStrategy The default retry strategy to use for failed jobs. * @param realTimeJobMonitor The RealTimeJobMonitor for handling real-time job updates. - * @param eventProducer The EventProducer for firing job-related events. - *

- * This constructor performs the following initializations: - * - Sets up the job queue and related configurations. - * - Initializes thread pool and job processors. - * - Sets up the circuit breaker and retry strategies. - * - Configures the job update polling mechanism. - * - Initializes event handlers for various job state changes. + * @param jobProcessorFactory The JobProcessorFactory for creating job processors instances. + * @param retryPolicyProcessor The RetryPolicyProcessor for processing retry policies. + * @param abandonedJobDetector The AbandonedJobDetector for detecting abandoned jobs. */ @Inject public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, @@ -154,9 +146,9 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, CircuitBreaker circuitBreaker, @DefaultRetryStrategy RetryStrategy defaultRetryStrategy, RealTimeJobMonitor realTimeJobMonitor, - EventProducer eventProducer, JobProcessorFactory jobProcessorFactory, - RetryPolicyProcessor retryPolicyProcessor) { + RetryPolicyProcessor retryPolicyProcessor, + AbandonedJobDetector abandonedJobDetector) { this.jobQueue = jobQueue; this.threadPoolSize = jobQueueConfig.getThreadPoolSize(); @@ -167,16 +159,8 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, this.circuitBreaker = circuitBreaker; this.jobProcessorFactory = jobProcessorFactory; this.retryPolicyProcessor = retryPolicyProcessor; - - this.pollJobUpdatesScheduler = Executors.newSingleThreadScheduledExecutor(); - pollJobUpdatesScheduler.scheduleAtFixedRate( - this::pollJobUpdates, 0, - jobQueueConfig.getPollJobUpdatesIntervalMilliseconds(), TimeUnit.MILLISECONDS - ); - - // Events + this.abandonedJobDetector = abandonedJobDetector; this.realTimeJobMonitor = realTimeJobMonitor; - this.eventProducer = eventProducer; APILocator.getLocalSystemEventsAPI().subscribe( JobCancelRequestEvent.class, @@ -218,7 +202,11 @@ public void start() { }); } - Logger.info(this, "JobQueue has been successfully started."); + // Start the abandoned job detector + abandonedJobDetector.start(); + + Logger.info(this, + "JobQueue and abandoned job detector have been successfully started."); } else { Logger.warn(this, "Attempt to start JobQueue that is already running. Ignoring." @@ -239,8 +227,11 @@ public void close() throws Exception { isShuttingDown = true; Logger.info(this, "Closing JobQueue and stopping all job processing."); + // Stop the abandoned job detector + abandonedJobDetector.close(); + + // Close existing services closeExecutorService(executorService); - closeExecutorService(pollJobUpdatesScheduler); isShuttingDown = false; isClosed = true; @@ -278,6 +269,8 @@ public Map> getQueueNames() { @Override public String createJob(final String queueName, final Map parameters) throws JobProcessorNotFoundException, DotDataException { + + // Validate a processor is registered for the queue final Class clazz = processors.get(queueName); if (null == clazz) { final var error = new JobProcessorNotFoundException(queueName); @@ -287,14 +280,27 @@ public String createJob(final String queueName, final Map parame //first attempt instantiating the processor, cuz if we cant no use to create an entry in the db final var processor = newProcessorInstance(queueName); - // now that we know we can instantiate the processor, we can add it to the map of instances - // But first we need the job id + + // Validate job parameters if processor implements Validator + if (processor instanceof Validator) { + try { + ((Validator) processor).validate(parameters); + } catch (JobValidationException e) { + Logger.error(this, "Job validation failed: " + e.getMessage(), e); + throw e; + } + } + try { + + // now that we know we can instantiate the processor, we can add it to the map of instances + // But first we need the job id final String jobId = jobQueue.createJob(queueName, parameters); addInstanceRef(jobId, processor); - eventProducer.getEvent(JobCreatedEvent.class).fire( - new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters) - ); + + // Send the job created events + JobUtil.sendEvents( + new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)); return jobId; } catch (JobQueueException e) { throw new DotDataException("Error creating job", e); @@ -374,7 +380,6 @@ public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataEx } } - @WrapInTransaction @Override public void cancelJob(final String jobId) throws DotDataException { @@ -398,7 +403,6 @@ public void cancelJob(final String jobId) throws DotDataException { * @param event The event that triggers the job cancellation request. */ @VisibleForTesting - @WrapInTransaction void onCancelRequestJob(final JobCancelRequestEvent event) { try { @@ -474,28 +478,6 @@ public RetryStrategy getDefaultRetryStrategy() { return this.defaultRetryStrategy; } - /** - * Polls the job queue for updates to watched jobs and notifies their watchers. - */ - @CloseDBIfOpened - private void pollJobUpdates() { - try { - final var watchedJobIds = realTimeJobMonitor.getWatchedJobIds(); - if (watchedJobIds.isEmpty()) { - return; // No jobs are being watched, skip polling - } - - final var currentPollTime = LocalDateTime.now(); - List updatedJobs = jobQueue.getUpdatedJobsSince( - watchedJobIds, lastPollJobUpdateTime - ); - realTimeJobMonitor.updateWatchers(updatedJobs); - lastPollJobUpdateTime = currentPollTime; - } catch (Exception e) { - Logger.error(this, "Error polling job updates: " + e.getMessage(), e);// - } - } - /** * Fetches the state of a job from the job queue using the provided job ID. * @@ -521,7 +503,6 @@ private JobState getJobState(final String jobId) throws DotDataException { * @param progressTracker The processor progress tracker * @param previousProgress The previous progress value */ - @WrapInTransaction private float updateJobProgress(final Job job, final ProgressTracker progressTracker, final float previousProgress) throws DotDataException { @@ -540,9 +521,9 @@ private float updateJobProgress(final Job job, final ProgressTracker progressTra Job updatedJob = job.withProgress(progress).withState(latestState); jobQueue.updateJobProgress(job.id(), updatedJob.progress()); - eventProducer.getEvent(JobProgressUpdatedEvent.class).fire( - new JobProgressUpdatedEvent(updatedJob, LocalDateTime.now()) - ); + + // Send the update job progress events + JobUtil.sendEvents(updatedJob, JobProgressUpdatedEvent::new); return progress; } @@ -665,7 +646,7 @@ private boolean isCircuitBreakerOpen() { */ private void processJobWithRetry(final Job job) throws DotDataException { - if (job.state() == JobState.FAILED) { + if (job.state() == JobState.FAILED || job.state() == JobState.ABANDONED) { if (canRetry(job)) { @@ -718,9 +699,8 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException try { jobQueue.removeJobFromQueue(job.id()); - eventProducer.getEvent(JobRemovedFromQueueEvent.class).fire( - new JobRemovedFromQueueEvent(job, LocalDateTime.now()) - ); + // Send the job removed from queue events + JobUtil.sendEvents(job, JobRemovedFromQueueEvent::new); } catch (JobQueueDataException e) { throw new DotDataException("Error removing failed job", e); } @@ -740,9 +720,8 @@ private void processJob(final Job job) throws DotDataException { final ProgressTracker progressTracker = new DefaultProgressTracker(); Job runningJob = job.markAsRunning().withProgressTracker(progressTracker); updateJobStatus(runningJob); - eventProducer.getEvent(JobStartedEvent.class).fire( - new JobStartedEvent(runningJob, LocalDateTime.now()) - ); + // Send the job started events + JobUtil.sendEvents(runningJob, JobStartedEvent::new); try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) { @@ -761,7 +740,7 @@ private void processJob(final Job job) throws DotDataException { } catch (DotDataException e) { throw new DotRuntimeException("Error updating job progress", e); } - }, 0, 2, TimeUnit.SECONDS + }, 0, 3, TimeUnit.SECONDS ); // Process the job @@ -863,15 +842,13 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) if (jobQueue.hasJobBeenInState(job.id(), JobState.CANCEL_REQUESTED, JobState.CANCELLING)) { Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); updateJobStatus(canceledJob); - eventProducer.getEvent(JobCanceledEvent.class).fire( - new JobCanceledEvent(canceledJob, LocalDateTime.now()) - ); + // Send the job canceled events + JobUtil.sendEvents(canceledJob, JobCanceledEvent::new); } else { final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); updateJobStatus(completedJob); - eventProducer.getEvent(JobCompletedEvent.class).fire( - new JobCompletedEvent(completedJob, LocalDateTime.now()) - ); + // Send the job completed events + JobUtil.sendEvents(completedJob, JobCompletedEvent::new); } } catch (JobQueueDataException e) { final var errorMessage = "Error updating job status"; @@ -885,29 +862,13 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) * * @param job The job to cancel. */ - @WrapInTransaction private void handleJobCancelRequest(final Job job) throws DotDataException { Job cancelJob = job.withState(JobState.CANCEL_REQUESTED); updateJobStatus(cancelJob); - // Prepare the cancel request events - final JobCancelRequestEvent cancelRequestEvent = new JobCancelRequestEvent( - cancelJob, LocalDateTime.now() - ); - - // LOCAL event - APILocator.getLocalSystemEventsAPI().notify(cancelRequestEvent); - - // CLUSTER WIDE event - Try.run(() -> APILocator.getSystemEventsAPI() - .push(SystemEventType.CLUSTER_WIDE_EVENT, new Payload(cancelRequestEvent))) - .onFailure(e -> Logger.error(JobQueueManagerAPIImpl.this, e.getMessage())); - - // CDI event - eventProducer.getEvent(JobCancelRequestEvent.class).fire( - cancelRequestEvent - ); + // Send the cancel request events + JobUtil.sendEvents(cancelJob, JobCancelRequestEvent::new); } /** @@ -916,7 +877,6 @@ private void handleJobCancelRequest(final Job job) throws DotDataException { * @param job The job that was canceled. * @param processor The processor that handled the job. */ - @WrapInTransaction private void handleJobCancelling(final Job job, final JobProcessor processor) { try { @@ -925,9 +885,8 @@ private void handleJobCancelling(final Job job, final JobProcessor processor) { Job cancelJob = job.withState(JobState.CANCELLING); updateJobStatus(cancelJob); - eventProducer.getEvent(JobCancellingEvent.class).fire( - new JobCancellingEvent(cancelJob, LocalDateTime.now()) - ); + // Send the job cancelling events + JobUtil.sendEvents(cancelJob, JobCancellingEvent::new); } catch (DotDataException e) { final var error = new JobCancellationException(job.id(), e); Logger.error(this, error); @@ -997,9 +956,8 @@ private void handleJobFailure(final Job job, final JobProcessor processor, final Job failedJob = job.markAsFailed(jobResult).withProgress(progress); updateJobStatus(failedJob); - eventProducer.getEvent(JobFailedEvent.class).fire( - new JobFailedEvent(failedJob, LocalDateTime.now()) - ); + // Send the job failed events + JobUtil.sendEvents(failedJob, JobFailedEvent::new); try { // Put the job back in the queue for later retry @@ -1018,7 +976,6 @@ private void handleJobFailure(final Job job, final JobProcessor processor, * @param job The job to update. * @throws DotDataException if there's an error updating the job status. */ - @WrapInTransaction private void updateJobStatus(final Job job) throws DotDataException { try { jobQueue.updateJobStatus(job); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/EventProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/EventProducer.java deleted file mode 100644 index f6f22e3570f5..000000000000 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/EventProducer.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.dotcms.jobs.business.api.events; - -import java.lang.annotation.Annotation; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.event.Event; -import javax.enterprise.inject.spi.BeanManager; -import javax.inject.Inject; - -/** - * A producer class for CDI events. This class provides a centralized way to obtain event objects - * for firing events within the application. - * - *

This class is application scoped, ensuring a single instance is used throughout - * the application's lifecycle.

- */ -@ApplicationScoped -public class EventProducer { - - private BeanManager beanManager; - - public EventProducer() { - // Default constructor for CDI - } - - /** - * Constructs a new EventProducer. - * - * @param beanManager The CDI BeanManager, injected by the container. - */ - @Inject - public EventProducer(BeanManager beanManager) { - this.beanManager = beanManager; - } - - /** - * Retrieves an Event object for the specified event type and qualifiers. - * - *

This method allows for type-safe event firing. It uses the BeanManager to - * create an Event object that can be used to fire events of the specified type.

- * - *

Usage example:

- *
-     * EventProducer producer = ...;
-     * Event event = producer.getEvent(MyCustomEvent.class);
-     * event.fire(new MyCustomEvent(...));
-     * 
- * - * @param The type of the event. - * @param eventType The Class object representing the event type. - * @param qualifiers Optional qualifiers for the event. - * @return An Event object that can be used to fire events of the specified type. - */ - public Event getEvent(Class eventType, Annotation... qualifiers) { - return beanManager.getEvent().select(eventType, qualifiers); - } - -} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobAbandonedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobAbandonedEvent.java new file mode 100644 index 000000000000..8e4efacaeb71 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobAbandonedEvent.java @@ -0,0 +1,39 @@ +package com.dotcms.jobs.business.api.events; + +import com.dotcms.jobs.business.job.Job; +import java.time.LocalDateTime; + +/** + * Event fired when an abandoned job is detected. + */ +public class JobAbandonedEvent implements JobEvent { + + private final Job job; + private final LocalDateTime detectedAt; + + /** + * Constructs a new JobAbandonedEvent. + * + * @param job The job. + * @param detectedAt The timestamp when the abandoned job was detected. + */ + public JobAbandonedEvent(Job job, LocalDateTime detectedAt) { + this.job = job; + this.detectedAt = detectedAt; + } + + /** + * @return The abandoned job. + */ + public Job getJob() { + return job; + } + + /** + * @return The timestamp when the abandoned job was detected. + */ + public LocalDateTime getDetectedAt() { + return detectedAt; + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java index 9554b1e8f573..1c06e69052bc 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java @@ -6,7 +6,7 @@ /** * Event fired when there is a request to cancel a job. */ -public class JobCancelRequestEvent { +public class JobCancelRequestEvent implements JobEvent { private final Job job; private final LocalDateTime canceledOn; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java index 9a9896604aa6..55046c416556 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job is canceled. */ -public class JobCanceledEvent { +public class JobCanceledEvent implements JobEvent { private final Job job; private final LocalDateTime canceledAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java index 20877a890387..32b3c10d2b5c 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job is being canceled. */ -public class JobCancellingEvent { +public class JobCancellingEvent implements JobEvent { private final Job job; private final LocalDateTime canceledAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java index 7c86c3940d6d..03460fd2072d 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job completes successfully. */ -public class JobCompletedEvent { +public class JobCompletedEvent implements JobEvent { private final Job job; private final LocalDateTime completedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java index d7da52d2f5f8..e70000e684e9 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a new job is created and added to the queue. */ -public class JobCreatedEvent { +public class JobCreatedEvent implements JobEvent { private final String jobId; private final String queueName; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobEvent.java new file mode 100644 index 000000000000..f26488361ae7 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobEvent.java @@ -0,0 +1,9 @@ +package com.dotcms.jobs.business.api.events; + +/** + * Base marker interface for all job-related events in the job queue system. All specific job event + * types (e.g., JobStartedEvent, JobCompletedEvent, etc.) should implement this interface. + */ +public interface JobEvent { + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java index 8d098c1eae43..7df87cf1555a 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job fails during processing. */ -public class JobFailedEvent { +public class JobFailedEvent implements JobEvent { private final Job job; private final LocalDateTime failedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java index e859284ac09f..3ed6bd142912 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job's progress is updated. */ -public class JobProgressUpdatedEvent { +public class JobProgressUpdatedEvent implements JobEvent { private final Job job; private final LocalDateTime updatedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java index 0aa08dcaa886..b93504e600ec 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job is removed from the queue because failed and is not retryable. */ -public class JobRemovedFromQueueEvent { +public class JobRemovedFromQueueEvent implements JobEvent { private final Job job; private final LocalDateTime removedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java index dc357c6822d6..f3c2b0edeadc 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job starts processing. */ -public class JobStartedEvent { +public class JobStartedEvent implements JobEvent { private final Job job; private final LocalDateTime startedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java index 4c761158fa0e..ef33514f30d0 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java @@ -2,6 +2,8 @@ import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; +import com.dotcms.system.event.local.model.EventSubscriber; +import com.dotmarketing.business.APILocator; import com.dotmarketing.util.Logger; import java.util.Arrays; import java.util.List; @@ -12,8 +14,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import java.util.function.Predicate; +import javax.annotation.PostConstruct; import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.event.Observes; /** * Manages real-time monitoring of jobs in the system. This class provides functionality to register @@ -67,6 +69,60 @@ public class RealTimeJobMonitor { private final Map> jobWatchers = new ConcurrentHashMap<>(); + public RealTimeJobMonitor() { + // Default constructor required for CDI + } + + @PostConstruct + protected void init() { + + APILocator.getLocalSystemEventsAPI().subscribe( + JobProgressUpdatedEvent.class, + (EventSubscriber) this::onJobProgressUpdated + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobFailedEvent.class, + (EventSubscriber) this::onJobFailed + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobRemovedFromQueueEvent.class, + (EventSubscriber) this::onJobRemovedFromQueueEvent + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCompletedEvent.class, + (EventSubscriber) this::onJobCompleted + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCanceledEvent.class, + (EventSubscriber) this::onJobCanceled + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCancellingEvent.class, + (EventSubscriber) this::onJobCancelling + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCancelRequestEvent.class, + (EventSubscriber) this::onJobCancelRequest + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobStartedEvent.class, + (EventSubscriber) this::onJobStarted + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobAbandonedEvent.class, + (EventSubscriber) this::onAbandonedJob + ); + + } + /** * Registers a watcher for a specific job with optional filtering of updates. The watcher will * be notified of job updates that match the provided filter predicate. If no filter is provided @@ -237,7 +293,7 @@ private void removeWatcher(String jobId) { * * @param event The JobStartedEvent. */ - public void onJobStarted(@Observes JobStartedEvent event) { + public void onJobStarted(JobStartedEvent event) { updateWatchers(event.getJob()); } @@ -246,7 +302,16 @@ public void onJobStarted(@Observes JobStartedEvent event) { * * @param event The JobCancelRequestEvent. */ - public void onJobCancelRequest(@Observes JobCancelRequestEvent event) { + public void onJobCancelRequest(JobCancelRequestEvent event) { + updateWatchers(event.getJob()); + } + + /** + * Handles the abandoned job event. + * + * @param event The JobAbandonedEvent. + */ + public void onAbandonedJob(JobAbandonedEvent event) { updateWatchers(event.getJob()); } @@ -255,7 +320,7 @@ public void onJobCancelRequest(@Observes JobCancelRequestEvent event) { * * @param event The JobCancellingEvent. */ - public void onJobCancelling(@Observes JobCancellingEvent event) { + public void onJobCancelling(JobCancellingEvent event) { updateWatchers(event.getJob()); } @@ -264,7 +329,7 @@ public void onJobCancelling(@Observes JobCancellingEvent event) { * * @param event The JobCanceledEvent. */ - public void onJobCanceled(@Observes JobCanceledEvent event) { + public void onJobCanceled(JobCanceledEvent event) { updateWatchers(event.getJob()); removeWatcher(event.getJob().id()); } @@ -274,7 +339,7 @@ public void onJobCanceled(@Observes JobCanceledEvent event) { * * @param event The JobCompletedEvent. */ - public void onJobCompleted(@Observes JobCompletedEvent event) { + public void onJobCompleted(JobCompletedEvent event) { updateWatchers(event.getJob()); removeWatcher(event.getJob().id()); } @@ -284,7 +349,7 @@ public void onJobCompleted(@Observes JobCompletedEvent event) { * * @param event The JobRemovedFromQueueEvent. */ - public void onJobRemovedFromQueueEvent(@Observes JobRemovedFromQueueEvent event) { + public void onJobRemovedFromQueueEvent(JobRemovedFromQueueEvent event) { removeWatcher(event.getJob().id()); } @@ -293,7 +358,7 @@ public void onJobRemovedFromQueueEvent(@Observes JobRemovedFromQueueEvent event) * * @param event The JobFailedEvent. */ - public void onJobFailed(@Observes JobFailedEvent event) { + public void onJobFailed(JobFailedEvent event) { updateWatchers(event.getJob()); } @@ -302,7 +367,7 @@ public void onJobFailed(@Observes JobFailedEvent event) { * * @param event The JobProgressUpdatedEvent. */ - public void onJobProgressUpdated(@Observes JobProgressUpdatedEvent event) { + public void onJobProgressUpdated(JobProgressUpdatedEvent event) { updateWatchers(event.getJob()); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetector.java b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetector.java new file mode 100644 index 000000000000..2d9224ce9ef0 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetector.java @@ -0,0 +1,189 @@ +package com.dotcms.jobs.business.detector; + +import com.dotcms.jobs.business.api.events.JobAbandonedEvent; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobState; +import com.dotcms.jobs.business.queue.JobQueue; +import com.dotcms.jobs.business.queue.error.JobQueueDataException; +import com.dotcms.jobs.business.util.JobUtil; +import com.dotmarketing.exception.DotRuntimeException; +import com.dotmarketing.util.Logger; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +/** + * Detects and handles abandoned jobs in the job queue system. A job is considered abandoned if it + * remains in certain states (RUNNING, CANCELLING, CANCEL_REQUESTED) without updates for longer than + * a configured threshold. + *

+ * Key features: + *

    + *
  • Periodically scans for jobs that haven't been updated within the abandonment threshold
  • + *
  • Marks abandoned jobs and puts them back in queue for retry
  • + *
  • Publishes events when abandoned jobs are detected
  • + *
  • Configurable detection interval and abandonment threshold
  • + *
+ */ +@ApplicationScoped +public class AbandonedJobDetector implements AutoCloseable { + + private static final JobState[] ABANDONMENT_CHECK_STATES = { + JobState.RUNNING, + JobState.CANCELLING, + JobState.CANCEL_REQUESTED + }; + + private final JobQueue jobQueue; + + private final ScheduledExecutorService executor; + + private final long detectionIntervalMinutes; + private final long abandonmentThresholdMinutes; + + private volatile boolean isRunning = false; + + // Required for CDI proxy + protected AbandonedJobDetector() { + this.jobQueue = null; + this.executor = null; + this.detectionIntervalMinutes = 0; + this.abandonmentThresholdMinutes = 0; + } + + /** + * Creates a new abandoned job detector with the specified configuration. + * + * @param config Configuration containing detection interval and threshold + * @param jobQueue The job queue to monitor for abandoned jobs + */ + @Inject + public AbandonedJobDetector(AbandonedJobDetectorConfig config, JobQueue jobQueue) { + + this.jobQueue = jobQueue; + this.executor = Executors.newSingleThreadScheduledExecutor(); + + // Load configuration + this.detectionIntervalMinutes = config.getDetectionIntervalMinutes(); + this.abandonmentThresholdMinutes = config.getAbandonmentThresholdMinutes(); + } + + /** + * Starts the abandoned job detection process if not already running. Schedules periodic checks + * based on the configured detection interval. + */ + public void start() { + + if (!isRunning) { + isRunning = true; + executor.scheduleWithFixedDelay( + this::detectAbandonedJobs, + detectionIntervalMinutes, + detectionIntervalMinutes, + TimeUnit.MINUTES + ); + Logger.info(this, String.format( + "Abandoned job detector started. Checking every %d minutes for jobs not " + + "updated in %d minutes", + detectionIntervalMinutes, abandonmentThresholdMinutes)); + } + } + + /** + * Checks for and handles abandoned jobs. + *

+ * Abandoned jobs are: + *

    + *
  1. Marked as abandoned in the job queue
  2. + *
  3. Logged as a warning
  4. + *
  5. Put back in queue for retry
  6. + *
  7. Generate a JobAbandonedEvent
  8. + *
+ */ + private void detectAbandonedJobs() { + + try { + + Optional abandonedJob; + while ((abandonedJob = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(abandonmentThresholdMinutes), + ABANDONMENT_CHECK_STATES)).isPresent()) { + + processAbandonedJob(abandonedJob.get()); + } + } catch (Exception e) { + final var errorMessage = "Error detecting abandoned jobs"; + Logger.error(this, errorMessage, e); + throw new DotRuntimeException(errorMessage, e); + } + } + + /** + * Processes an abandoned job by logging the event, putting it back in the queue for retry, and + * generating a JobAbandonedEvent. + */ + private void processAbandonedJob(final Job abandonedJob) throws JobQueueDataException { + + // Log the event + Logger.warn(this, String.format( + "Abandoned job found - Job ID: %s, Queue: %s, Last Updated: %s", + abandonedJob.id(), + abandonedJob.queueName(), + abandonedJob.updatedAt().orElse(null) + )); + + // Put the job back in the queue for later retry + putJobBackInQueue(abandonedJob); + + // Send events + JobUtil.sendEvents(abandonedJob, JobAbandonedEvent::new); + } + + /** + * Places an abandoned job back in the queue for retry. + * + * @param abandonedJob The job that was detected as abandoned + * @throws JobQueueDataException if there is an error re-queueing the job + */ + private void putJobBackInQueue(final Job abandonedJob) throws JobQueueDataException { + + // Put the job back in the queue for later retry + jobQueue.putJobBackInQueue(abandonedJob); + + Logger.info(this, + String.format("Abandoned job %s queued for retry", abandonedJob.id())); + } + + /** + * Stops the abandoned job detector and cleans up resources. Attempts graceful shutdown, forcing + * shutdown if tasks don't complete within 60 seconds. + */ + @Override + public void close() { + if (isRunning) { + isRunning = false; + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + Logger.info(this, "Abandoned job detector stopped"); + } + } + + /** + * @return true if the detector is currently running, false otherwise + */ + public boolean isRunning() { + return isRunning; + } + +} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfig.java b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfig.java new file mode 100644 index 000000000000..46b67ca4847d --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfig.java @@ -0,0 +1,50 @@ +package com.dotcms.jobs.business.detector; + +/** + * Configuration settings for the abandoned job detection system. This class holds the timing + * parameters that control how and when jobs are determined to be abandoned. + */ +public class AbandonedJobDetectorConfig { + + private final long detectionIntervalMinutes; + private final long abandonmentThresholdMinutes; + + // Required for CDI proxy + protected AbandonedJobDetectorConfig() { + this.detectionIntervalMinutes = 0; + this.abandonmentThresholdMinutes = 0; + } + + /** + * Constructs a new configuration with the specified timing parameters. + * + * @param detectionIntervalMinutes How frequently to check for abandoned jobs, in minutes + * @param abandonmentThresholdMinutes How long a job must be inactive before being considered + * abandoned, in minutes + */ + public AbandonedJobDetectorConfig( + long detectionIntervalMinutes, + long abandonmentThresholdMinutes) { + this.detectionIntervalMinutes = detectionIntervalMinutes; + this.abandonmentThresholdMinutes = abandonmentThresholdMinutes; + } + + /** + * Gets the interval between abandoned job detection runs. + * + * @return The detection interval in minutes + */ + public long getDetectionIntervalMinutes() { + return detectionIntervalMinutes; + } + + /** + * Gets the time threshold after which an inactive job is considered abandoned. + * + * @return The abandonment threshold in minutes + */ + public long getAbandonmentThresholdMinutes() { + return abandonmentThresholdMinutes; + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfigProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfigProducer.java new file mode 100644 index 000000000000..cfb38bf07f3e --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfigProducer.java @@ -0,0 +1,49 @@ +package com.dotcms.jobs.business.detector; + +import com.dotmarketing.util.Config; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Produces; + +/** + * Produces configuration for the abandoned job detector system using application properties. This + * producer creates a singleton configuration object that determines the intervals for checking + * abandoned jobs and the threshold for considering a job abandoned. + *

+ * Configuration is read from the following properties: + *

    + *
  • JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES: How often to check for abandoned jobs (default: 5)
  • + *
  • JOB_ABANDONMENT_THRESHOLD_MINUTES: How long before a non-updating job is considered abandoned (default: 30)
  • + *
+ */ +@ApplicationScoped +public class AbandonedJobDetectorConfigProducer { + + /** + * The default interval in minutes between checks for abandoned jobs. + */ + static final int DEFAULT_JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES = Config.getIntProperty( + "JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", 5 + ); + + /** + * The default time in minutes after which an inactive job is considered abandoned. + */ + static final int DEFAULT_JOB_ABANDONMENT_THRESHOLD_MINUTES = Config.getIntProperty( + "JOB_ABANDONMENT_THRESHOLD_MINUTES", 30 + ); + + /** + * Produces a configuration object for the abandoned job detector. + * + * @return A new configuration object with the current property values + */ + @ApplicationScoped + @Produces + public AbandonedJobDetectorConfig produceAbandonedJobDetectorConfig() { + return new AbandonedJobDetectorConfig( + DEFAULT_JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES, + DEFAULT_JOB_ABANDONMENT_THRESHOLD_MINUTES + ); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobAbandonedException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobAbandonedException.java new file mode 100644 index 000000000000..3fcd5a7ed84e --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobAbandonedException.java @@ -0,0 +1,37 @@ +package com.dotcms.jobs.business.error; + +/** + * Exception thrown when a job is detected to be abandoned in the job queue system. A job is + * considered abandoned when it remains in an active state without updates beyond the configured + * abandonment threshold. + */ +public class JobAbandonedException extends RuntimeException { + + /** + * Creates a new JobAbandonedException with the specified message. + * + * @param message Details about why the job was considered abandoned + */ + public JobAbandonedException(String message) { + super(message); + } + + /** + * Creates a new JobAbandonedException with a message and underlying cause. + * + * @param message Details about why the job was considered abandoned + * @param cause The underlying exception that led to the job being abandoned + */ + public JobAbandonedException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Creates a new JobAbandonedException with an underlying cause. + * + * @param cause The underlying exception that led to the job being abandoned + */ + public JobAbandonedException(Throwable cause) { + super(cause); + } +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java index b656c8f5b79b..79a335e57433 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java @@ -7,6 +7,16 @@ */ public class JobProcessingException extends RuntimeException { + /** + * Constructs a new JobProcessingException with the specified message and cause. + * + * @param message A description of the error + * @param cause The underlying cause of the error (can be null) + */ + public JobProcessingException(String message, Throwable cause) { + super(message, cause); + } + /** * Constructs a new JobProcessingException with the specified job ID, reason, and cause. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java index 00dca669a109..2f871e0be580 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java @@ -7,6 +7,25 @@ */ public class JobValidationException extends RuntimeException { + /** + * Constructs a new JobValidationException with the specified message and cause. + * + * @param message A description of the validation failure + * @param cause The underlying cause of the validation failure (can be null) + */ + public JobValidationException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a new JobValidationException with the specified message + * + * @param message A description of the validation failure + */ + public JobValidationException(String message) { + super(message); + } + /** * Constructs a new JobValidationException with the specified job ID, reason, and cause. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java index 590458a07f2f..faf8d5745bec 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java @@ -87,6 +87,21 @@ default Job markAsFailed(final JobResult result) { .build(); } + /** + * Creates a new Job marked as abandoned with the result details. + * + * @param result The result details of the abandoned job. + * @return A new Job instance marked as abandoned. + */ + default Job markAsAbandoned(final JobResult result) { + return Job.builder().from(this) + .state(JobState.ABANDONED) + .result(result) + .completedAt(Optional.of(LocalDateTime.now())) + .updatedAt(LocalDateTime.now()) + .build(); + } + /** * Creates a new Job marked as running. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java index c13b8a6d3587..f3e7ebdc0880 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java @@ -25,6 +25,11 @@ public enum JobState { */ FAILED, + /** + * The job was abandoned before it could complete. + */ + ABANDONED, + /** * The job is waiting to be canceled. */ diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Validator.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Validator.java new file mode 100644 index 000000000000..29712d5ab328 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Validator.java @@ -0,0 +1,20 @@ +package com.dotcms.jobs.business.processor; + +import com.dotcms.jobs.business.error.JobValidationException; +import java.util.Map; + +/** + * Interface for validating job parameters before creation. Processors can implement this interface + * to provide validation logic that will be executed before a job is created. + */ +public interface Validator { + + /** + * Validates the job parameters before job creation. + * + * @param parameters The parameters to validate + * @throws JobValidationException if the parameters are invalid + */ + void validate(Map parameters) throws JobValidationException; + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java index 3c8b446f8b39..04420a9bf468 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java @@ -11,6 +11,7 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.NoRetryPolicy; import com.dotcms.jobs.business.processor.Queue; +import com.dotcms.jobs.business.processor.Validator; import com.dotcms.jobs.business.util.JobUtil; import com.dotcms.repackage.com.csvreader.CsvReader; import com.dotcms.rest.api.v1.temp.DotTempFile; @@ -51,9 +52,10 @@ * functionality to import content from CSV files, with support for both preview and publish * operations, as well as multilingual content handling. * - *

The processor implements both {@link JobProcessor} and {@link Cancellable} interfaces to - * provide job processing and cancellation capabilities. It's annotated with {@link Queue} to - * specify the queue name and {@link ExponentialBackoffRetryPolicy} to define retry behavior.

+ *

The processor implements both {@link JobProcessor} {@link Cancellable} and {@link Validator} + * interfaces to provide job processing and cancellation capabilities. It's annotated with + * {@link Queue} to specify the queue name and {@link ExponentialBackoffRetryPolicy} to define + * retry behavior.

* *

Key features:

*
    @@ -66,12 +68,13 @@ * * @see JobProcessor * @see Cancellable + * @see Validator * @see Queue * @see ExponentialBackoffRetryPolicy */ @Queue("importContentlets") @NoRetryPolicy -public class ImportContentletsProcessor implements JobProcessor, Cancellable { +public class ImportContentletsProcessor implements JobProcessor, Validator, Cancellable { private static final String PARAMETER_LANGUAGE = "language"; private static final String PARAMETER_FIELDS = "fields"; @@ -120,7 +123,7 @@ public void process(final Job job) throws JobProcessingException { final User user; try { - user = getUser(job); + user = getUser(job.parameters()); } catch (Exception e) { Logger.error(this, "Error retrieving user", e); throw new JobProcessingException(job.id(), "Error retrieving user", e); @@ -136,9 +139,6 @@ public void process(final Job job) throws JobProcessingException { throw new JobValidationException(job.id(), "Unable to retrieve the import file."); } - // Validate the job has the required data - validate(job); - final var fileToImport = tempFile.get().file; final long totalLines = totalLines(job, fileToImport); @@ -163,6 +163,55 @@ public void process(final Job job) throws JobProcessingException { } } + /** + * Validates the job parameters and content type. Performs security checks to prevent + * unauthorized host imports. + * + * @param parameters The parameters to validate + * @throws JobValidationException if validation fails + */ + @Override + public void validate(final Map parameters) throws JobValidationException { + + // Validating the language (will throw an exception if it doesn't) + final Language language = findLanguage(parameters); + + if (getContentType(parameters) != null && getContentType(parameters).isEmpty()) { + final var errorMessage = "A Content Type id or variable is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(errorMessage); + } else if (getWorkflowActionId(parameters) != null + && getWorkflowActionId(parameters).isEmpty()) { + final var errorMessage = "A Workflow Action id is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(errorMessage); + } else if (language == null && getFields(parameters).length == 0) { + final var errorMessage = + "A key identifying the different Language versions of the same " + + "content must be defined when importing multilingual files."; + Logger.error(this, errorMessage); + throw new JobValidationException(errorMessage); + } + + try { + + // Make sure the content type exist (will throw an exception if it doesn't) + final var contentTypeFound = findContentType(parameters); + + // Security measure to prevent invalid attempts to import a host. + final ContentType hostContentType = APILocator.getContentTypeAPI( + APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME); + final boolean isHost = (hostContentType.id().equals(contentTypeFound.id())); + if (isHost) { + final var errorMessage = "Invalid attempt to import a host."; + Logger.error(this, errorMessage); + throw new JobValidationException(errorMessage); + } + } catch (DotSecurityException | DotDataException e) { + throw new JobProcessingException("Error validating content type", e); + } + } + /** * Handles cancellation requests for the import operation. When called, it marks the operation * for cancellation. @@ -262,10 +311,10 @@ private Map> processImport(final boolean preview, final Job final var currentSiteId = getSiteIdentifier(job); final var currentSiteName = getSiteName(job); - final var contentType = findContentType(job); - final var fields = getFields(job); - final var language = findLanguage(job); - final var workflowActionId = getWorkflowActionId(job); + final var contentType = findContentType(job.parameters()); + final var fields = getFields(job.parameters()); + final var language = findLanguage(job.parameters()); + final var workflowActionId = getWorkflowActionId(job.parameters()); final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); final var importId = jobIdToLong(job.id()); @@ -300,13 +349,14 @@ private String getCommand(final Job job) { /** * Retrieve the user from the job parameters * - * @param job input job + * @param parameters job parameters * @return the user from the job parameters * @throws DotDataException if an error occurs during the user retrieval * @throws DotSecurityException if we don't have the necessary permissions to retrieve the user */ - private User getUser(final Job job) throws DotDataException, DotSecurityException { - final var userId = (String) job.parameters().get(PARAMETER_USER_ID); + private User getUser(final Map parameters) + throws DotDataException, DotSecurityException { + final var userId = (String) parameters.get(PARAMETER_USER_ID); return APILocator.getUserAPI().loadUserById(userId); } @@ -333,53 +383,53 @@ private String getSiteName(final Job job) { /** * Retrieves the content type from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return The content type string, or null if not present in parameters */ - private String getContentType(final Job job) { - return (String) job.parameters().get(PARAMETER_CONTENT_TYPE); + private String getContentType(final Map parameters) { + return (String) parameters.get(PARAMETER_CONTENT_TYPE); } /** * Retrieves the workflow action ID from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return The workflow action ID string, or null if not present in parameters */ - private String getWorkflowActionId(final Job job) { - return (String) job.parameters().get(PARAMETER_WORKFLOW_ACTION_ID); + private String getWorkflowActionId(final Map parameters) { + return (String) parameters.get(PARAMETER_WORKFLOW_ACTION_ID); } /** * Retrieves the language from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return An optional containing the language string, or an empty optional if not present */ - private Optional getLanguage(final Job job) { + private Optional getLanguage(final Map parameters) { - if (!job.parameters().containsKey(PARAMETER_LANGUAGE) - || job.parameters().get(PARAMETER_LANGUAGE) == null) { + if (!parameters.containsKey(PARAMETER_LANGUAGE) + || parameters.get(PARAMETER_LANGUAGE) == null) { return Optional.empty(); } - return Optional.of((String) job.parameters().get(PARAMETER_LANGUAGE)); + return Optional.of((String) parameters.get(PARAMETER_LANGUAGE)); } /** * Retrieves the fields array from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return An array of field strings, or an empty array if no fields are specified */ - public String[] getFields(final Job job) { + public String[] getFields(final Map parameters) { - if (!job.parameters().containsKey(PARAMETER_FIELDS) - || job.parameters().get(PARAMETER_FIELDS) == null) { + if (!parameters.containsKey(PARAMETER_FIELDS) + || parameters.get(PARAMETER_FIELDS) == null) { return new String[0]; } - final var fields = job.parameters().get(PARAMETER_FIELDS); + final var fields = parameters.get(PARAMETER_FIELDS); if (fields instanceof List) { return ((List) fields).toArray(new String[0]); } @@ -387,54 +437,6 @@ public String[] getFields(final Job job) { return (String[]) fields; } - /** - * Validates the job parameters and content type. Performs security checks to prevent - * unauthorized host imports. - * - * @param job The job to validate - * @throws JobValidationException if validation fails - * @throws JobProcessingException if an error occurs during content type validation - */ - private void validate(final Job job) { - - // Validating the language (will throw an exception if it doesn't) - final Language language = findLanguage(job); - - if (getContentType(job) != null && getContentType(job).isEmpty()) { - final var errorMessage = "A Content Type id or variable is required"; - Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } else if (getWorkflowActionId(job) != null && getWorkflowActionId(job).isEmpty()) { - final var errorMessage = "A Workflow Action id is required"; - Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } else if (language == null && getFields(job).length == 0) { - final var errorMessage = - "A key identifying the different Language versions of the same " - + "content must be defined when importing multilingual files."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - - try { - - // Make sure the content type exist (will throw an exception if it doesn't) - final var contentTypeFound = findContentType(job); - - // Security measure to prevent invalid attempts to import a host. - final ContentType hostContentType = APILocator.getContentTypeAPI( - APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME); - final boolean isHost = (hostContentType.id().equals(contentTypeFound.id())); - if (isHost) { - final var errorMessage = "Invalid attempt to import a host."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - } catch (DotSecurityException | DotDataException e) { - throw new JobProcessingException(job.id(), "Error validating content type", e); - } - } - /** * Utility method to convert a job ID to a long value for internal processing. Uses FarmHash for * efficient hash generation and distribution. @@ -579,23 +581,23 @@ private void validateLanguageColumns(Job job, int languageCodeColumn, int countr /** * Retrieves the existing content type based on an id or variable. * - * @param job The current import job. + * @param parameters job parameters * @return The existing content type if found, otherwise fails with an exception. * @throws DotSecurityException If there are security restrictions preventing the evaluation. */ - private ContentType findContentType(final Job job) + private ContentType findContentType(final Map parameters) throws DotSecurityException { - final var contentTypeIdOrVar = getContentType(job); + final var contentTypeIdOrVar = getContentType(parameters); final User user; // Retrieving the user requesting the import try { - user = getUser(job); + user = getUser(parameters); } catch (DotDataException e) { final var errorMessage = "Error retrieving user."; Logger.error(this.getClass(), errorMessage); - throw new JobProcessingException(job.id(), errorMessage, e); + throw new JobProcessingException(errorMessage, e); } try { @@ -606,26 +608,26 @@ private ContentType findContentType(final Job job) "Content Type [%s] not found.", contentTypeIdOrVar ); Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); + throw new JobValidationException(errorMessage); } catch (DotDataException e) { final var errorMessage = String.format( "Error finding Content Type [%s].", contentTypeIdOrVar ); Logger.error(this.getClass(), errorMessage); - throw new JobProcessingException(job.id(), errorMessage, e); + throw new JobProcessingException(errorMessage, e); } } /** * Retrieves the existing language based on an id or ISO code. * - * @param job The current import job. + * @param parameters job parameters * @return The existing language if found, otherwise fails with an exception. */ - private Language findLanguage(final Job job) { + private Language findLanguage(final Map parameters) { // Read the language from the job parameters - final var languageIsoOrIdOptional = getLanguage(job); + final var languageIsoOrIdOptional = getLanguage(parameters); if (languageIsoOrIdOptional.isEmpty()) { return null; } @@ -652,7 +654,7 @@ private Language findLanguage(final Job job) { "Language [%s] not found.", languageIsoOrId ); Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); + throw new JobValidationException(errorMessage); } /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index 0e792c1424fc..9b2a043c00a9 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -7,9 +7,11 @@ import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -166,6 +168,18 @@ List getUpdatedJobsSince(Set jobIds, LocalDateTime since) */ Job nextJob() throws JobQueueDataException, JobLockingException; + /** + * Detects and marks jobs as abandoned if they haven't been updated within the specified + * threshold. + * + * @param threshold The time duration after which a job is considered abandoned + * @param inStates The states to check for abandoned jobs + * @return The abandoned job if one was found and marked, null otherwise + * @throws JobQueueDataException if there's a data storage error + */ + Optional detectAndMarkAbandoned(Duration threshold, JobState... inStates) + throws JobQueueDataException; + /** * Updates the progress of a job. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java index 10298c44e578..72ca197b4e03 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java @@ -1,7 +1,11 @@ package com.dotcms.jobs.business.queue; +import com.dotcms.business.CloseDBIfOpened; +import com.dotcms.business.WrapInTransaction; +import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; +import com.dotcms.jobs.business.job.JobResult; import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.queue.error.JobLockingException; import com.dotcms.jobs.business.queue.error.JobNotFoundException; @@ -22,11 +26,13 @@ import com.github.jonpeterson.jackson.module.versioning.VersioningModule; import io.vavr.Lazy; import java.sql.Timestamp; +import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -77,6 +83,24 @@ public class PostgresJobQueue implements JobQueue { + "ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED) " + "RETURNING *"; + private static final String DETECT_AND_MARK_ABANDONED_WITH_LOCK_QUERY = + "WITH active_states AS (" + + " SELECT unnest(ARRAY[$??$]) as state" + + + "), abandoned_jobs AS (" + + " SELECT j.*, j.result as existing_result" + + " FROM job j" + + " INNER JOIN active_states a ON j.state = a.state::text" + + " WHERE j.updated_at < ?" + + " ORDER BY j.updated_at ASC" + + " LIMIT 1" + + " FOR UPDATE SKIP LOCKED" + + ") " + + "UPDATE job " + + "SET state = ?, updated_at = ? " + + "WHERE id IN (SELECT id FROM abandoned_jobs) " + + "RETURNING *"; + private static final String GET_ACTIVE_JOBS_QUERY_FOR_QUEUE = "WITH total AS (SELECT COUNT(*) AS total_count " + " FROM job WHERE queue_name = ? AND state IN (?, ?) " + @@ -171,6 +195,7 @@ public class PostgresJobQueue implements JobQueue { return mapper; }); + @WrapInTransaction @Override public String createJob(final String queueName, final Map parameters) throws JobQueueException { @@ -223,6 +248,7 @@ public String createJob(final String queueName, final Map parame } } + @CloseDBIfOpened @Override public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataException { @@ -256,6 +282,7 @@ public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataE } } + @CloseDBIfOpened @Override public JobState getJobState(final String jobId) throws JobNotFoundException, JobQueueDataException { @@ -274,6 +301,7 @@ public JobState getJobState(final String jobId) return job.state(); } + @CloseDBIfOpened @Override public JobPaginatedResult getActiveJobs(final String queueName, final int page, final int pageSize) throws JobQueueDataException { @@ -300,6 +328,7 @@ public JobPaginatedResult getActiveJobs(final String queueName, final int page, } } + @CloseDBIfOpened @Override public JobPaginatedResult getCompletedJobs(final String queueName, final LocalDateTime startDate, @@ -329,6 +358,7 @@ public JobPaginatedResult getCompletedJobs(final String queueName, } } + @CloseDBIfOpened @Override public JobPaginatedResult getJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -346,6 +376,7 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getActiveJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -372,6 +403,7 @@ public JobPaginatedResult getActiveJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getCompletedJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -396,6 +428,7 @@ public JobPaginatedResult getCompletedJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getCanceledJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -420,6 +453,7 @@ public JobPaginatedResult getCanceledJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getFailedJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -444,6 +478,7 @@ public JobPaginatedResult getFailedJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public void updateJobStatus(final Job job) throws JobQueueDataException { @@ -489,9 +524,10 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { }).orElse(null)); historyDc.loadResult(); - // Remove from job_queue if completed, failed, or canceled + // Remove from job_queue if completed, failed, abandoned or canceled if (job.state() == JobState.COMPLETED || job.state() == JobState.FAILED + || job.state() == JobState.ABANDONED || job.state() == JobState.CANCELED) { removeJobFromQueue(job.id()); } @@ -506,6 +542,7 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { } } + @CloseDBIfOpened @Override public List getUpdatedJobsSince(final Set jobIds, final LocalDateTime since) throws JobQueueDataException { @@ -528,6 +565,7 @@ public List getUpdatedJobsSince(final Set jobIds, final LocalDateTi } } + @CloseDBIfOpened @Override public void putJobBackInQueue(final Job job) throws JobQueueDataException { @@ -550,6 +588,7 @@ public void putJobBackInQueue(final Job job) throws JobQueueDataException { } } + @CloseDBIfOpened @Override public Job nextJob() throws JobQueueDataException, JobLockingException { @@ -580,6 +619,60 @@ public Job nextJob() throws JobQueueDataException, JobLockingException { } } + @CloseDBIfOpened + @Override + public Optional detectAndMarkAbandoned(final Duration threshold, final JobState... inStates) + throws JobQueueDataException { + + try { + + String parameters = String.join(", ", Collections.nCopies(inStates.length, "?")); + + var query = DETECT_AND_MARK_ABANDONED_WITH_LOCK_QUERY + .replace(REPLACE_TOKEN_PARAMETERS, parameters); + + LocalDateTime thresholdTime = LocalDateTime.now().minus(threshold); + + DotConnect dc = new DotConnect(); + dc.setSQL(query); + for (JobState state : inStates) { + dc.addParam(state.name()); + } + dc.addParam(Timestamp.valueOf(thresholdTime)); + dc.addParam(JobState.ABANDONED.name()); + dc.addParam(Timestamp.valueOf(LocalDateTime.now())); + + List> results = dc.loadObjectResults(); + if (!results.isEmpty()) { + final var foundAbandonedJob = DBJobTransformer.toJob(results.get(0)); + + // Create error detail for abandoned job + final ErrorDetail errorDetail = ErrorDetail.builder() + .message("Job abandoned due to no updates within " + + threshold.toMinutes() + " minutes") + .exceptionClass("com.dotcms.jobs.business.error.JobAbandonedException") + .timestamp(LocalDateTime.now()) + .processingStage("Abandoned Job Detection") + .stackTrace("Job exceeded inactivity threshold of " + + threshold.toMinutes() + " minutes") + .build(); + final JobResult jobResult = JobResult.builder().errorDetail(errorDetail).build(); + + final Job abandonedJob = foundAbandonedJob.markAsAbandoned(jobResult); + updateJobStatus(abandonedJob); + + return Optional.of(abandonedJob); + } + + return Optional.empty(); + } catch (DotDataException e) { + final var errorMessage = "Database error while detecting abandoned jobs"; + Logger.error(this, errorMessage, e); + throw new JobQueueDataException(errorMessage, e); + } + } + + @CloseDBIfOpened @Override public void updateJobProgress(final String jobId, final float progress) throws JobQueueDataException { @@ -601,6 +694,7 @@ public void updateJobProgress(final String jobId, final float progress) } } + @CloseDBIfOpened @Override public void removeJobFromQueue(final String jobId) throws JobQueueDataException { @@ -615,6 +709,7 @@ public void removeJobFromQueue(final String jobId) throws JobQueueDataException } } + @CloseDBIfOpened @Override public boolean hasJobBeenInState(final String jobId, final JobState... states) throws JobQueueDataException { diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java index c19e19056153..126d493b8f60 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java @@ -1,6 +1,9 @@ package com.dotcms.jobs.business.util; +import com.dotcms.api.system.event.Payload; +import com.dotcms.api.system.event.SystemEventType; import com.dotcms.api.web.HttpServletRequestThreadLocal; +import com.dotcms.jobs.business.api.events.JobEvent; import com.dotcms.jobs.business.job.Job; import com.dotcms.mock.request.FakeHttpRequest; import com.dotcms.mock.request.MockHeaderRequest; @@ -12,11 +15,14 @@ import com.dotmarketing.util.UtilMethods; import com.dotmarketing.util.WebKeys; import com.liferay.portal.model.User; +import io.vavr.control.Try; import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; import javax.servlet.http.HttpServletRequest; /** @@ -120,4 +126,38 @@ public static float roundedProgress(final float progress) { return Math.round(roundedProgress * 1000f) / 1000f; } + /** + * Helper method to send both local and cluster-wide events for a job state change + * + * @param job The job that triggered the event + * @param eventFactory Factory function to create the specific event type + * @param The type of event being created (must extend JobEvent) + */ + public static void sendEvents( + final Job job, + final BiFunction eventFactory) { + + // Create the event + final T event = eventFactory.apply(job, LocalDateTime.now()); + + // Send the event notifications + sendEvents(event); + } + + /** + * Helper method to send both local and cluster-wide notifications for a job event. + * + * @param event The event to send (must implement the JobEvent interface) + */ + public static void sendEvents(final T event) { + + // LOCAL event + APILocator.getLocalSystemEventsAPI().notify(event); + + // CLUSTER WIDE event + Try.run(() -> APILocator.getSystemEventsAPI() + .push(SystemEventType.CLUSTER_WIDE_EVENT, new Payload(event))) + .onFailure(e -> Logger.error(JobUtil.class, e.getMessage())); + } + } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index d2b537e94f03..268ca26b38b4 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -1,9 +1,11 @@ package com.dotcms.rest.api.v1.job; +import com.dotcms.jobs.business.error.JobValidationException; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.rest.ResponseEntityView; import com.dotcms.rest.WebResource; +import com.dotcms.rest.exception.mapper.ExceptionMapperUtil; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; @@ -25,6 +27,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.glassfish.jersey.media.sse.EventOutput; import org.glassfish.jersey.media.sse.OutboundEvent; import org.glassfish.jersey.media.sse.SseFeature; @@ -54,37 +57,50 @@ public JobQueueResource(WebResource webResource, JobQueueHelper helper, @Path("/{queueName}") @Consumes(MediaType.MULTIPART_FORM_DATA) @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView createJob( + public Response createJob( @Context HttpServletRequest request, @PathParam("queueName") String queueName, @BeanParam JobParams form) throws JsonProcessingException, DotDataException { + final var initDataObject = new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - final String jobId = helper.createJob(queueName, form, initDataObject.getUser(), request); - return new ResponseEntityView<>(jobId); + + try { + final String jobId = helper.createJob( + queueName, form, initDataObject.getUser(), request); + return Response.ok(new ResponseEntityView<>(jobId)).build(); + } catch (JobValidationException e) { + return ExceptionMapperUtil.createResponse(null, e.getMessage()); + } } @POST @Path("/{queueName}") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView createJob( + public Response createJob( @Context HttpServletRequest request, @PathParam("queueName") String queueName, Map parameters) throws DotDataException { + final var initDataObject = new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - final String jobId = helper.createJob( - queueName, parameters, initDataObject.getUser(), request); - return new ResponseEntityView<>(jobId); + + try { + final String jobId = helper.createJob( + queueName, parameters, initDataObject.getUser(), request); + return Response.ok(new ResponseEntityView<>(jobId)).build(); + } catch (JobValidationException e) { + return ExceptionMapperUtil.createResponse(null, e.getMessage()); + } } @GET diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 97c3387fde78..986b2de95b94 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -13,15 +13,21 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.util.IntegrationTestInitService; +import com.dotmarketing.business.APILocator; import com.dotmarketing.common.db.DotConnect; import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.util.Config; import com.dotmarketing.util.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.Timestamp; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,6 +69,9 @@ public class JobQueueManagerAPIIntegrationTest extends com.dotcms.Junit5WeldBase static void setUp() throws Exception { // Initialize the test environment IntegrationTestInitService.getInstance().init(); + + Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "1"); + Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "2"); } /** @@ -73,10 +82,15 @@ static void setUp() throws Exception { */ @AfterAll void cleanUp() throws Exception { - if(null != jobQueueManagerAPI) { - jobQueueManagerAPI.close(); - } + clearJobs(); + + Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "5"); + Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "30"); + + if (null != jobQueueManagerAPI) { + jobQueueManagerAPI.close(); + } } @BeforeEach @@ -450,6 +464,127 @@ void test_CombinedScenarios() throws Exception { }); } + /** + * Tests the abandoned job detection functionality. + * Given Scenario: A job exists in RUNNING state with an old timestamp + * ExpectedResult: The job is detected as abandoned, marked accordingly and retried successfully + */ + @Test + @Order(7) + void test_AbandonedJobDetection() throws Exception { + + final String jobId = UUID.randomUUID().toString(); + final String queueName = "abandonedQueue"; + final Map parameters = Collections.singletonMap("test", "value"); + final String serverId = APILocator.getServerAPI().readServerId(); + final LocalDateTime oldTimestamp = LocalDateTime.now().minusMinutes(5); + + // Create a job directly in the database in RUNNING state to simulate an abandoned job + DotConnect dc = new DotConnect(); + + // Insert into job table + dc.setSQL("INSERT INTO job (id, queue_name, state, parameters, created_at, updated_at, started_at, execution_node) VALUES (?, ?, ?, ?::jsonb, ?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(new ObjectMapper().writeValueAsString(parameters)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(serverId) + .loadResult(); + + // Insert into job_queue table + dc.setSQL("INSERT INTO job_queue (id, queue_name, state, created_at) VALUES (?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Insert initial state into job_history + dc.setSQL("INSERT INTO job_history (id, job_id, state, execution_node, created_at) VALUES (?, ?, ?, ?, ?)") + .addParam(UUID.randomUUID().toString()) + .addParam(jobId) + .addParam(JobState.RUNNING.name()) + .addParam(serverId) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Verify the job was created in RUNNING state + Job initialJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.RUNNING, initialJob.state(), + "Job should be in RUNNING state initially"); + + // Start job queue manager if not started + if (!jobQueueManagerAPI.isStarted()) { + jobQueueManagerAPI.start(); + jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS); + } + + // Register a processor for the abandoned job + jobQueueManagerAPI.registerProcessor(queueName, AbbandonedJobProcessor.class); + + // The job should be marked as abandoned + CountDownLatch latch = new CountDownLatch(1); + jobQueueManagerAPI.watchJob(jobId, job -> { + if (job.state() == JobState.ABANDONED) { + latch.countDown(); + } + }); + + boolean abandoned = latch.await(3, TimeUnit.MINUTES); + assertTrue(abandoned, "Job should be marked as abandoned within timeout period"); + + // Verify the abandoned job state and error details + Job abandonedJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.ABANDONED, abandonedJob.state(), + "Job should be in ABANDONED state"); + assertTrue(abandonedJob.result().isPresent(), + "Abandoned job should have a result"); + assertTrue(abandonedJob.result().get().errorDetail().isPresent(), + "Abandoned job should have error details"); + assertTrue(abandonedJob.result().get().errorDetail().get().message() + .contains("abandoned due to no updates"), + "Error message should indicate abandonment"); + + // Verify the job was put back in queue for retry and completed successfully + Awaitility.await().atMost(15, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Job job = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.COMPLETED, job.state(), + "Job should be in COMPLETED state"); + }); + + // Verify job history contains the state transitions + dc.setSQL("SELECT state FROM job_history WHERE job_id = ? ORDER BY created_at") + .addParam(jobId); + List> history = dc.loadObjectResults(); + + assertFalse(history.isEmpty(), "Job should have history records"); + assertEquals(JobState.RUNNING.name(), history.get(0).get("state"), + "First state should be RUNNING"); + assertEquals(JobState.ABANDONED.name(), history.get(1).get("state"), + "Second state should be ABANDONED"); + assertEquals(JobState.RUNNING.name(), history.get(2).get("state"), + "Third state should be RUNNING"); + assertEquals(JobState.COMPLETED.name(), history.get(3).get("state"), + "Latest state should be COMPLETED"); + } + + static class AbbandonedJobProcessor implements JobProcessor { + + @Override + public void process(Job job) { + } + + @Override + public Map getResultMetadata(Job job) { + return Collections.emptyMap(); + } + } + static class ProgressTrackingJobProcessor implements JobProcessor { @Override public void process(Job job) { diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index ccfe76f90c60..d9f7d36bea68 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -1,11 +1,9 @@ package com.dotcms.jobs.business.api; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyFloat; -import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; @@ -24,9 +22,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.dotcms.jobs.business.api.events.EventProducer; import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; +import com.dotcms.jobs.business.detector.AbandonedJobDetector; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.error.JobCancellationException; @@ -50,9 +48,7 @@ import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DotDataException; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,8 +59,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import javax.enterprise.event.Event; import org.awaitility.Awaitility; import org.junit.Before; import org.junit.Test; @@ -149,10 +143,10 @@ public boolean awaitProcessingCompleted(long timeout, TimeUnit unit) private JobQueueManagerAPI jobQueueManagerAPI; - private EventProducer eventProducer; - private RetryPolicyProcessor retryPolicyProcessor; + private AbandonedJobDetector abandonedJobDetector; + /** * Factory to create mock JobProcessor instances for testing. * This is how we instruct the JobQueueManagerAPI to use our mock processors. @@ -181,19 +175,16 @@ public void setUp() { mockCancellableProcessor = mock(SimpleCancellableJobProcessor.class); mockRetryStrategy = mock(RetryStrategy.class); mockCircuitBreaker = mock(CircuitBreaker.class); - eventProducer = mock(EventProducer.class); retryPolicyProcessor = mock(RetryPolicyProcessor.class); + abandonedJobDetector = mock(AbandonedJobDetector.class); jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, mockCircuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 10 + mockJobQueue, mockCircuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); jobQueueManagerAPI.setRetryStrategy("testQueue", mockRetryStrategy); - - var event = mock(Event.class); - when(eventProducer.getEvent(any())).thenReturn(event); } /** @@ -343,79 +334,6 @@ public void test_close() throws Exception { assertFalse(jobQueueManagerAPI.isStarted()); } - /** - * Method to test: watchJob in JobQueueManagerAPI - * Given Scenario: Valid job ID and watcher are provided - * ExpectedResult: Watcher receives all job state updates correctly - */ - @Test - public void test_watchJob() throws Exception { - - // Create a mock job - String jobId = "job123"; - Job mockJob = mock(Job.class); - when(mockJob.id()).thenReturn(jobId); - when(mockJob.queueName()).thenReturn("testQueue"); - - // Mock JobQueue behavior - when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); - when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); - when(mockJob.markAsRunning()).thenReturn(mockJob); - when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); - - // Make the circuit breaker always allow requests - when(mockCircuitBreaker.allowRequest()).thenReturn(true); - - // Mock JobProcessor behavior - ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJob.progressTracker()).thenReturn(Optional.ofNullable(mockProgressTracker)); - when(mockJob.progress()).thenReturn(0f); - when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); - - AtomicReference jobState = new AtomicReference<>(JobState.PENDING); - when(mockJob.state()).thenAnswer(inv -> jobState.get()); - - when(mockJob.withState(any())).thenAnswer(inv -> { - jobState.set(inv.getArgument(0)); - return mockJob; - }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); - return mockJob; - }); - when(mockJob.markAsFailed(any())).thenAnswer(inv -> { - jobState.set(JobState.FAILED); - return mockJob; - }); - - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); - - // Create a list to capture job states - List capturedStates = Collections.synchronizedList(new ArrayList<>()); - // Create a test watcher - Consumer testWatcher = job -> { - assertNotNull(job); - assertEquals(jobId, job.id()); - capturedStates.add(job.state()); - }; - - // Start the JobQueueManagerAPI - jobQueueManagerAPI.start(); - - // Register the watcher - jobQueueManagerAPI.watchJob(jobId, testWatcher); - - // Wait for job processing to complete - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> capturedStates.contains(JobState.COMPLETED)); - - // Stop the JobQueueManagerAPI - jobQueueManagerAPI.close(); - } - /** * Method to test: Job retry mechanism in JobQueueManagerAPI * Given Scenario: Job fails on first attempt but succeeds on retry @@ -842,128 +760,6 @@ public void test_Job_NotRetryable() throws Exception { jobQueueManagerAPI.close(); } - /** - * Method to test: Job progress tracking in JobQueueManagerAPI - * Given Scenario: Job with multiple progress updates - * ExpectedResult: Progress is tracked correctly and increases monotonically - */ - @Test - public void test_JobProgressTracking() throws Exception { - - // Create a mock job - Job mockJob = mock(Job.class); - when(mockJob.id()).thenReturn("progress-test-job"); - when(mockJob.queueName()).thenReturn("testQueue"); - - AtomicReference jobProgress = new AtomicReference<>(0f); - AtomicReference jobState = new AtomicReference<>(JobState.PENDING); - - when(mockJob.state()).thenAnswer(inv -> jobState.get()); - when(mockJob.withState(any())).thenAnswer(inv -> { - jobState.set(inv.getArgument(0)); - return mockJob; - }); - when(mockJob.markAsRunning()).thenAnswer(inv -> { - jobState.set(JobState.RUNNING); - return mockJob; - }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); - return mockJob; - }); - when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); - - when(mockJob.progress()).thenAnswer(inv -> jobProgress.get()); - when(mockJob.withProgress(anyFloat())).thenAnswer(inv -> { - jobProgress.set(inv.getArgument(0)); - return mockJob; - }); - - // Set up the job queue to return our mock job - when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); - when(mockJobQueue.getJob(anyString())).thenReturn(mockJob); - - // Create a real ProgressTracker - ProgressTracker realProgressTracker = new DefaultProgressTracker(); - when(mockJob.progressTracker()).thenReturn(Optional.of(realProgressTracker)); - - // Make the circuit breaker always allow requests - when(mockCircuitBreaker.allowRequest()).thenReturn(true); - - // List to store progress updates - List progressUpdates = Collections.synchronizedList(new ArrayList<>()); - - // Configure the mockJobProcessor to update progress - doAnswer(inv -> { - - for (int i = 0; i <= 10; i++) { - float progress = i / 10f; - realProgressTracker.updateProgress(progress); - // Simulate the effect of updateJobProgress - jobProgress.set(progress); - - // Simulate work - long startTime = System.currentTimeMillis(); - Awaitility.await() - .atMost(3, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> System.currentTimeMillis() - startTime >= 50); - } - - Job job = inv.getArgument(0); - job.markAsCompleted(any()); - return null; - }).when(mockJobProcessor).process(any()); - - // Set up a job watcher to capture progress updates - jobQueueManagerAPI.watchJob("progress-test-job", job -> { - progressUpdates.add(job.progress()); - }); - - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); - - // Start the job queue - jobQueueManagerAPI.start(); - - // Wait for job processing to complete - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - assertEquals(JobState.COMPLETED, jobState.get()); - }); - - // Verify that progress was tracked correctly - assertTrue( - "Should have multiple progress updates", - progressUpdates.size() > 1 - ); - assertEquals( - 0.0f, progressUpdates.get(0), 0.01f, - "Initial progress should be 0" - ); - assertEquals( - 1.0f, progressUpdates.get(progressUpdates.size() - 1), 0.01f, - "Final progress should be 1" - ); - - // Verify that progress increased monotonically - for (int i = 1; i < progressUpdates.size(); i++) { - assertTrue("Progress should increase or stay the same", - progressUpdates.get(i) >= progressUpdates.get(i - 1) - ); - } - - // Verify that the job was processed - verify(mockJobProcessor, times(1)).process(any()); - verify(mockJobQueue, times(2)).updateJobStatus(any()); - verify(mockJobQueue, times(2)). - updateJobStatus(argThat(job -> job.state() == JobState.COMPLETED)); - - // Stop the job queue - jobQueueManagerAPI.close(); - } - /** * Method to test: Circuit breaker mechanism in JobQueueManagerAPI * Given Scenario: Multiple job failures occur @@ -1004,8 +800,8 @@ public void test_CircuitBreaker_Opens() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 1000 + mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1089,8 +885,8 @@ public void test_CircuitBreaker_Closes() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 1000 + mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1152,8 +948,8 @@ public void test_CircuitBreaker_Reset() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 1000 + mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1296,9 +1092,6 @@ public void test_complex_cancelJob() throws Exception { when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn( mockJob); - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); - // Create Mock Job Event JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); when(mockEvent.getJob()).thenReturn(mockJob); @@ -1382,24 +1175,22 @@ public void test_calculateBackoffTime() { * failures. * @param retryStrategy The strategy to use for retrying failed jobs. * @param threadPoolSize The size of the thread pool for job processing. - * @param pollJobUpdatesIntervalMilliseconds The interval in milliseconds for polling job - * updates. * @return A newly created instance of JobQueueManagerAPI. */ private JobQueueManagerAPI newJobQueueManagerAPI(JobQueue jobQueue, CircuitBreaker circuitBreaker, RetryStrategy retryStrategy, - EventProducer eventProducer, JobProcessorFactory jobProcessorFactory, RetryPolicyProcessor retryPolicyProcessor, - int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) { + AbandonedJobDetector abandonedJobDetector, + int threadPoolSize) { final var realTimeJobMonitor = new RealTimeJobMonitor(); return new JobQueueManagerAPIImpl( - jobQueue, new JobQueueConfig(threadPoolSize, pollJobUpdatesIntervalMilliseconds), - circuitBreaker, retryStrategy, realTimeJobMonitor, eventProducer, - jobProcessorFactory, retryPolicyProcessor + jobQueue, new JobQueueConfig(threadPoolSize), + circuitBreaker, retryStrategy, realTimeJobMonitor, + jobProcessorFactory, retryPolicyProcessor, abandonedJobDetector ); } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java index e25ee222c34d..9055a905d33c 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java @@ -159,7 +159,7 @@ void test_process_preview_invalid_content_type_variable() throws Exception { try { // Process the job in preview mode - processor.process(testJob); + processor.validate(testJob.parameters()); Assertions.fail("A JobValidationException should have been thrown here."); } catch (Exception e) { Assertions.assertInstanceOf(JobValidationException.class, e); @@ -249,8 +249,7 @@ void test_process_preview_invalid_language() throws Exception { ); try { - // Process the job in preview mode - processor.process(testJob); + processor.validate(testJob.parameters()); Assertions.fail("A JobValidationException should have been thrown here."); } catch (Exception e) { Assertions.assertInstanceOf(JobValidationException.class, e); diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java index 62d180e3088f..f8fed037402e 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java @@ -19,6 +19,8 @@ import com.dotmarketing.common.db.DotConnect; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; +import java.sql.Timestamp; +import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -310,6 +312,139 @@ void test_nextJob() throws Exception { } } + /** + * Method to test: detectAndMarkAbandoned in PostgresJobQueue + * Given Scenario: Multiple threads attempt to process abandoned jobs concurrently + * ExpectedResult: + * - Each job is processed exactly once + * - All jobs are marked as ABANDONED + * - No job is processed more than once (thread safety) + * - All created jobs are processed + */ + @Test + void test_detectAndMarkAbandoned() throws Exception { + + final int NUM_JOBS = 10; + final int NUM_THREADS = 5; + String queueName = "testQueue"; + + // Create jobs + Set createdJobIds = new HashSet<>(); + for (int i = 0; i < NUM_JOBS; i++) { + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + createdJobIds.add(jobId); + + // Changing the job state to running + final var createdJob = jobQueue.getJob(jobId); + jobQueue.updateJobStatus(createdJob.withState(JobState.RUNNING)); + } + + // Update the updated_at timestamp to be old enough to be considered abandoned + new DotConnect() + .setSQL("UPDATE job SET updated_at = ? WHERE id IN (SELECT id FROM job_queue)") + .addParam(Timestamp.valueOf(LocalDateTime.now().minusMinutes(5))) + .loadResult(); + + // Set to keep track of processed job IDs + Set processedJobIds = Collections.synchronizedSet(new HashSet<>()); + + // Create and start threads + List threads = new ArrayList<>(); + for (int i = 0; i < NUM_THREADS; i++) { + Thread thread = new Thread(() -> { + try { + while (true) { + Optional abandonedJobOptional = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(1), + JobState.RUNNING, JobState.CANCEL_REQUESTED, JobState.CANCELLING); + if (abandonedJobOptional.isEmpty()) { + break; // No more jobs to process + } + + Job abandonedJob = abandonedJobOptional.get(); + // Ensure this job hasn't been processed before + assertTrue(processedJobIds.add(abandonedJob.id()), + "Job " + abandonedJob.id() + " was processed more than once"); + assertEquals(JobState.ABANDONED, abandonedJob.state()); + } + } catch (Exception e) { + fail("Exception in thread: " + e.getMessage()); + } + }); + threads.add(thread); + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify all jobs were processed + assertEquals(NUM_JOBS, processedJobIds.size(), "Not all jobs were processed"); + assertEquals(createdJobIds, processedJobIds, "Processed jobs don't match created jobs"); + + // Verify no more jobs are detected + assertTrue(jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(1), + JobState.RUNNING, JobState.CANCEL_REQUESTED, JobState.CANCELLING).isEmpty(), + "There should be no more jobs abandoned"); + + // Verify all jobs are in ABANDONED state + for (String jobId : createdJobIds) { + Job job = jobQueue.getJob(jobId); + assertEquals(JobState.ABANDONED, job.state(), + "Job " + jobId + " is not in ABANDONED state"); + } + } + + /** + * Method to test: detectAndMarkAbandoned in PostgresJobQueue + * Given Scenario: Single job with different threshold values + * ExpectedResult: + * - Job is not marked as abandoned when threshold is higher than its idle time + * - Job is marked as abandoned when threshold is lower than its idle time + * - Job state transitions correctly to ABANDONED when threshold is met + */ + @Test + void test_detectAndMarkAbandoned_threshold() throws Exception { + + // Create a job + String queueName = "testQueue"; + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + + // Change state to running + Job job = jobQueue.getJob(jobId); + jobQueue.updateJobStatus(job.withState(JobState.RUNNING)); + + // Set the updated_at to 2 minutes ago + new DotConnect() + .setSQL("UPDATE job SET updated_at = ? WHERE id = ?") + .addParam(Timestamp.valueOf(LocalDateTime.now().minusMinutes(2))) + .addParam(jobId) + .loadResult(); + + // First check: Using 3 minutes threshold - Should NOT be considered abandoned + Optional abandonedJob = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(3), + JobState.RUNNING + ); + assertTrue(abandonedJob.isEmpty(), "Job should not be considered abandoned yet"); + + // Second check: Using 1 minute threshold - Should be considered abandoned + abandonedJob = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(1), + JobState.RUNNING + ); + assertTrue(abandonedJob.isPresent(), "Job should be considered abandoned"); + assertEquals(jobId, abandonedJob.get().id(), "Wrong job was marked as abandoned"); + assertEquals(JobState.ABANDONED, abandonedJob.get().state(), "Job should be in ABANDONED state"); + + // Verify the job state in database + Job finalJob = jobQueue.getJob(jobId); + assertEquals(JobState.ABANDONED, finalJob.state(), "Job state in database should be ABANDONED"); + } + /** * Method to test: getJob in PostgresJobQueue with non-existent ID * Given Scenario: Attempt to retrieve a job with a non-existent ID