From ab2ddd41c164a4629d1e993d995abab8c488a74b Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Wed, 20 Nov 2024 18:46:39 -0600 Subject: [PATCH] Feat (Core): Implement abandoned job detection and recovery (#30710) This pull request includes some refactoring and improvements to the job queue management system. The changes primarily focus on removing the job polling mechanism, enhancing job validation, and adding an abandoned job detector. Below are the most important changes: ### Removal of Job Polling Mechanism: * [`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java`](diffhunk://#diff-4c8b70e959cc00105adc8298bf8a977a3c18a6ceb5d55110bda6d023159a5e5aL13-L24): Removed the `pollJobUpdatesIntervalMilliseconds` parameter and its associated methods. [[1]](diffhunk://#diff-4c8b70e959cc00105adc8298bf8a977a3c18a6ceb5d55110bda6d023159a5e5aL13-L24) [[2]](diffhunk://#diff-4c8b70e959cc00105adc8298bf8a977a3c18a6ceb5d55110bda6d023159a5e5aL36-L44) * [`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java`](diffhunk://#diff-1367d122323650421a37296f7a223becc19bb3d6c1d2a965dd52ae6bbd2f5e74L19-L23): Removed the default polling interval configuration and its usage. [[1]](diffhunk://#diff-1367d122323650421a37296f7a223becc19bb3d6c1d2a965dd52ae6bbd2f5e74L19-L23) [[2]](diffhunk://#diff-1367d122323650421a37296f7a223becc19bb3d6c1d2a965dd52ae6bbd2f5e74L33-R28) * [`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L121-R122): Removed the polling scheduler and related methods. [[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L121-R122) [[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L477-L498) ### Addition of Abandoned Job Detector: * [`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R112): Added `AbandonedJobDetector` as a dependency and integrated it into the job queue lifecycle methods. [[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R112) [[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L142-R151) [[3]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L170-L179) [[4]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L221-R209) [[5]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R230-L243) ### Enhancements to Job Validation: * [`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R283-R303): Added validation for job parameters if the processor implements `Validator` and handled validation exceptions. ### Event Handling Improvements: * [`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R283-R303): Replaced direct event firing with `JobUtil.sendEvents` for better event management. [[1]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561R283-R303) [[2]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L543-R526) [[3]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L668-R649) [[4]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L721-R703) [[5]](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L743-R724) ### Miscellaneous Changes: * [`dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java`](diffhunk://#diff-c092f8af2f800c0ca84f2c2f4aed0af60d0ed488e20fa32992967e154064f561L764-R743): Adjusted the job progress update interval from 2 to 3 seconds. --- .../jobs/business/api/JobQueueConfig.java | 16 +- .../business/api/JobQueueConfigProducer.java | 8 +- .../business/api/JobQueueManagerAPIImpl.java | 163 +++++------- .../business/api/events/EventProducer.java | 57 ---- .../api/events/JobAbandonedEvent.java | 39 +++ .../api/events/JobCancelRequestEvent.java | 2 +- .../business/api/events/JobCanceledEvent.java | 2 +- .../api/events/JobCancellingEvent.java | 2 +- .../api/events/JobCompletedEvent.java | 2 +- .../business/api/events/JobCreatedEvent.java | 2 +- .../jobs/business/api/events/JobEvent.java | 9 + .../business/api/events/JobFailedEvent.java | 2 +- .../api/events/JobProgressUpdatedEvent.java | 2 +- .../api/events/JobRemovedFromQueueEvent.java | 2 +- .../business/api/events/JobStartedEvent.java | 2 +- .../api/events/RealTimeJobMonitor.java | 83 +++++- .../detector/AbandonedJobDetector.java | 189 ++++++++++++++ .../detector/AbandonedJobDetectorConfig.java | 50 ++++ .../AbandonedJobDetectorConfigProducer.java | 49 ++++ .../business/error/JobAbandonedException.java | 37 +++ .../error/JobProcessingException.java | 10 + .../error/JobValidationException.java | 19 ++ .../dotcms/jobs/business/job/AbstractJob.java | 15 ++ .../dotcms/jobs/business/job/JobState.java | 5 + .../jobs/business/processor/Validator.java | 20 ++ .../impl/ImportContentletsProcessor.java | 182 ++++++------- .../dotcms/jobs/business/queue/JobQueue.java | 14 + .../jobs/business/queue/PostgresJobQueue.java | 97 ++++++- .../dotcms/jobs/business/util/JobUtil.java | 40 +++ .../rest/api/v1/job/JobQueueResource.java | 30 ++- .../JobQueueManagerAPIIntegrationTest.java | 141 +++++++++- .../business/api/JobQueueManagerAPITest.java | 243 ++---------------- ...rtContentletsProcessorIntegrationTest.java | 5 +- .../PostgresJobQueueIntegrationTest.java | 135 ++++++++++ 34 files changed, 1144 insertions(+), 530 deletions(-) delete mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/api/events/EventProducer.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobAbandonedEvent.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobEvent.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetector.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfig.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfigProducer.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/error/JobAbandonedException.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/processor/Validator.java diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java index 9bfaa637063f..cdfe2b29d287 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfig.java @@ -10,18 +10,13 @@ public class JobQueueConfig { */ private final int threadPoolSize; - // The interval in milliseconds to poll for job updates - private final int pollJobUpdatesIntervalMilliseconds; - /** * Constructs a new JobQueueConfig * * @param threadPoolSize The number of threads to use for job processing. - * @param pollJobUpdatesIntervalMilliseconds The interval in milliseconds to poll for job updates. */ - public JobQueueConfig(int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) { + public JobQueueConfig(int threadPoolSize) { this.threadPoolSize = threadPoolSize; - this.pollJobUpdatesIntervalMilliseconds = pollJobUpdatesIntervalMilliseconds; } /** @@ -33,13 +28,4 @@ public int getThreadPoolSize() { return threadPoolSize; } - /** - * Gets the interval in milliseconds to poll for job updates. - * - * @return The interval in milliseconds to poll for job updates. - */ - public int getPollJobUpdatesIntervalMilliseconds() { - return pollJobUpdatesIntervalMilliseconds; - } - } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java index 58aa8a9869a3..c63d2616dcc6 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueConfigProducer.java @@ -16,11 +16,6 @@ public class JobQueueConfigProducer { "JOB_QUEUE_THREAD_POOL_SIZE", 10 ); - // The interval in milliseconds to poll for job updates. - static final int DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS = Config.getIntProperty( - "JOB_QUEUE_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS", 3000 - ); - /** * Produces a JobQueueConfig object. This method is called by the CDI container to create a * JobQueueConfig instance when it is necessary for dependency injection. @@ -30,8 +25,7 @@ public class JobQueueConfigProducer { @Produces public JobQueueConfig produceJobQueueConfig() { return new JobQueueConfig( - DEFAULT_THREAD_POOL_SIZE, - DEFAULT_POLL_JOB_UPDATES_INTERVAL_MILLISECONDS + DEFAULT_THREAD_POOL_SIZE ); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 0c74f378d4e6..b63796ce3c7f 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -1,10 +1,7 @@ package com.dotcms.jobs.business.api; -import com.dotcms.api.system.event.Payload; -import com.dotcms.api.system.event.SystemEventType; import com.dotcms.business.CloseDBIfOpened; import com.dotcms.business.WrapInTransaction; -import com.dotcms.jobs.business.api.events.EventProducer; import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.JobCanceledEvent; import com.dotcms.jobs.business.api.events.JobCancellingEvent; @@ -15,10 +12,12 @@ import com.dotcms.jobs.business.api.events.JobRemovedFromQueueEvent; import com.dotcms.jobs.business.api.events.JobStartedEvent; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; +import com.dotcms.jobs.business.detector.AbandonedJobDetector; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.error.JobCancellationException; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; +import com.dotcms.jobs.business.error.JobValidationException; import com.dotcms.jobs.business.error.RetryPolicyProcessor; import com.dotcms.jobs.business.error.RetryStrategy; import com.dotcms.jobs.business.job.Job; @@ -30,10 +29,12 @@ import com.dotcms.jobs.business.processor.DefaultRetryStrategy; import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; +import com.dotcms.jobs.business.processor.Validator; import com.dotcms.jobs.business.queue.JobQueue; import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import com.dotcms.jobs.business.util.JobUtil; import com.dotcms.system.event.local.model.EventSubscriber; import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DoesNotExistException; @@ -41,11 +42,9 @@ import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import com.google.common.annotations.VisibleForTesting; -import io.vavr.control.Try; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -110,6 +109,7 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private final CircuitBreaker circuitBreaker; private final JobQueue jobQueue; + private final AbandonedJobDetector abandonedJobDetector; private final Map> processors; private final Map processorInstancesByJobId; private final int threadPoolSize; @@ -118,11 +118,8 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { private final RetryStrategy defaultRetryStrategy; private final RetryPolicyProcessor retryPolicyProcessor; - private final ScheduledExecutorService pollJobUpdatesScheduler; - private LocalDateTime lastPollJobUpdateTime = LocalDateTime.now(); - private final RealTimeJobMonitor realTimeJobMonitor; - private final EventProducer eventProducer; + private final JobProcessorFactory jobProcessorFactory; // Cap to prevent overflow @@ -139,14 +136,9 @@ public class JobQueueManagerAPIImpl implements JobQueueManagerAPI { * @param circuitBreaker The CircuitBreaker implementation for fault tolerance. * @param defaultRetryStrategy The default retry strategy to use for failed jobs. * @param realTimeJobMonitor The RealTimeJobMonitor for handling real-time job updates. - * @param eventProducer The EventProducer for firing job-related events. - *

- * This constructor performs the following initializations: - * - Sets up the job queue and related configurations. - * - Initializes thread pool and job processors. - * - Sets up the circuit breaker and retry strategies. - * - Configures the job update polling mechanism. - * - Initializes event handlers for various job state changes. + * @param jobProcessorFactory The JobProcessorFactory for creating job processors instances. + * @param retryPolicyProcessor The RetryPolicyProcessor for processing retry policies. + * @param abandonedJobDetector The AbandonedJobDetector for detecting abandoned jobs. */ @Inject public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, @@ -154,9 +146,9 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, CircuitBreaker circuitBreaker, @DefaultRetryStrategy RetryStrategy defaultRetryStrategy, RealTimeJobMonitor realTimeJobMonitor, - EventProducer eventProducer, JobProcessorFactory jobProcessorFactory, - RetryPolicyProcessor retryPolicyProcessor) { + RetryPolicyProcessor retryPolicyProcessor, + AbandonedJobDetector abandonedJobDetector) { this.jobQueue = jobQueue; this.threadPoolSize = jobQueueConfig.getThreadPoolSize(); @@ -167,16 +159,8 @@ public JobQueueManagerAPIImpl(@Named("queueProducer") JobQueue jobQueue, this.circuitBreaker = circuitBreaker; this.jobProcessorFactory = jobProcessorFactory; this.retryPolicyProcessor = retryPolicyProcessor; - - this.pollJobUpdatesScheduler = Executors.newSingleThreadScheduledExecutor(); - pollJobUpdatesScheduler.scheduleAtFixedRate( - this::pollJobUpdates, 0, - jobQueueConfig.getPollJobUpdatesIntervalMilliseconds(), TimeUnit.MILLISECONDS - ); - - // Events + this.abandonedJobDetector = abandonedJobDetector; this.realTimeJobMonitor = realTimeJobMonitor; - this.eventProducer = eventProducer; APILocator.getLocalSystemEventsAPI().subscribe( JobCancelRequestEvent.class, @@ -218,7 +202,11 @@ public void start() { }); } - Logger.info(this, "JobQueue has been successfully started."); + // Start the abandoned job detector + abandonedJobDetector.start(); + + Logger.info(this, + "JobQueue and abandoned job detector have been successfully started."); } else { Logger.warn(this, "Attempt to start JobQueue that is already running. Ignoring." @@ -239,8 +227,11 @@ public void close() throws Exception { isShuttingDown = true; Logger.info(this, "Closing JobQueue and stopping all job processing."); + // Stop the abandoned job detector + abandonedJobDetector.close(); + + // Close existing services closeExecutorService(executorService); - closeExecutorService(pollJobUpdatesScheduler); isShuttingDown = false; isClosed = true; @@ -278,6 +269,8 @@ public Map> getQueueNames() { @Override public String createJob(final String queueName, final Map parameters) throws JobProcessorNotFoundException, DotDataException { + + // Validate a processor is registered for the queue final Class clazz = processors.get(queueName); if (null == clazz) { final var error = new JobProcessorNotFoundException(queueName); @@ -287,14 +280,27 @@ public String createJob(final String queueName, final Map parame //first attempt instantiating the processor, cuz if we cant no use to create an entry in the db final var processor = newProcessorInstance(queueName); - // now that we know we can instantiate the processor, we can add it to the map of instances - // But first we need the job id + + // Validate job parameters if processor implements Validator + if (processor instanceof Validator) { + try { + ((Validator) processor).validate(parameters); + } catch (JobValidationException e) { + Logger.error(this, "Job validation failed: " + e.getMessage(), e); + throw e; + } + } + try { + + // now that we know we can instantiate the processor, we can add it to the map of instances + // But first we need the job id final String jobId = jobQueue.createJob(queueName, parameters); addInstanceRef(jobId, processor); - eventProducer.getEvent(JobCreatedEvent.class).fire( - new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters) - ); + + // Send the job created events + JobUtil.sendEvents( + new JobCreatedEvent(jobId, queueName, LocalDateTime.now(), parameters)); return jobId; } catch (JobQueueException e) { throw new DotDataException("Error creating job", e); @@ -374,7 +380,6 @@ public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataEx } } - @WrapInTransaction @Override public void cancelJob(final String jobId) throws DotDataException { @@ -398,7 +403,6 @@ public void cancelJob(final String jobId) throws DotDataException { * @param event The event that triggers the job cancellation request. */ @VisibleForTesting - @WrapInTransaction void onCancelRequestJob(final JobCancelRequestEvent event) { try { @@ -474,28 +478,6 @@ public RetryStrategy getDefaultRetryStrategy() { return this.defaultRetryStrategy; } - /** - * Polls the job queue for updates to watched jobs and notifies their watchers. - */ - @CloseDBIfOpened - private void pollJobUpdates() { - try { - final var watchedJobIds = realTimeJobMonitor.getWatchedJobIds(); - if (watchedJobIds.isEmpty()) { - return; // No jobs are being watched, skip polling - } - - final var currentPollTime = LocalDateTime.now(); - List updatedJobs = jobQueue.getUpdatedJobsSince( - watchedJobIds, lastPollJobUpdateTime - ); - realTimeJobMonitor.updateWatchers(updatedJobs); - lastPollJobUpdateTime = currentPollTime; - } catch (Exception e) { - Logger.error(this, "Error polling job updates: " + e.getMessage(), e);// - } - } - /** * Fetches the state of a job from the job queue using the provided job ID. * @@ -521,7 +503,6 @@ private JobState getJobState(final String jobId) throws DotDataException { * @param progressTracker The processor progress tracker * @param previousProgress The previous progress value */ - @WrapInTransaction private float updateJobProgress(final Job job, final ProgressTracker progressTracker, final float previousProgress) throws DotDataException { @@ -540,9 +521,9 @@ private float updateJobProgress(final Job job, final ProgressTracker progressTra Job updatedJob = job.withProgress(progress).withState(latestState); jobQueue.updateJobProgress(job.id(), updatedJob.progress()); - eventProducer.getEvent(JobProgressUpdatedEvent.class).fire( - new JobProgressUpdatedEvent(updatedJob, LocalDateTime.now()) - ); + + // Send the update job progress events + JobUtil.sendEvents(updatedJob, JobProgressUpdatedEvent::new); return progress; } @@ -665,7 +646,7 @@ private boolean isCircuitBreakerOpen() { */ private void processJobWithRetry(final Job job) throws DotDataException { - if (job.state() == JobState.FAILED) { + if (job.state() == JobState.FAILED || job.state() == JobState.ABANDONED) { if (canRetry(job)) { @@ -718,9 +699,8 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException try { jobQueue.removeJobFromQueue(job.id()); - eventProducer.getEvent(JobRemovedFromQueueEvent.class).fire( - new JobRemovedFromQueueEvent(job, LocalDateTime.now()) - ); + // Send the job removed from queue events + JobUtil.sendEvents(job, JobRemovedFromQueueEvent::new); } catch (JobQueueDataException e) { throw new DotDataException("Error removing failed job", e); } @@ -740,9 +720,8 @@ private void processJob(final Job job) throws DotDataException { final ProgressTracker progressTracker = new DefaultProgressTracker(); Job runningJob = job.markAsRunning().withProgressTracker(progressTracker); updateJobStatus(runningJob); - eventProducer.getEvent(JobStartedEvent.class).fire( - new JobStartedEvent(runningJob, LocalDateTime.now()) - ); + // Send the job started events + JobUtil.sendEvents(runningJob, JobStartedEvent::new); try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) { @@ -761,7 +740,7 @@ private void processJob(final Job job) throws DotDataException { } catch (DotDataException e) { throw new DotRuntimeException("Error updating job progress", e); } - }, 0, 2, TimeUnit.SECONDS + }, 0, 3, TimeUnit.SECONDS ); // Process the job @@ -863,15 +842,13 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) if (jobQueue.hasJobBeenInState(job.id(), JobState.CANCEL_REQUESTED, JobState.CANCELLING)) { Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); updateJobStatus(canceledJob); - eventProducer.getEvent(JobCanceledEvent.class).fire( - new JobCanceledEvent(canceledJob, LocalDateTime.now()) - ); + // Send the job canceled events + JobUtil.sendEvents(canceledJob, JobCanceledEvent::new); } else { final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); updateJobStatus(completedJob); - eventProducer.getEvent(JobCompletedEvent.class).fire( - new JobCompletedEvent(completedJob, LocalDateTime.now()) - ); + // Send the job completed events + JobUtil.sendEvents(completedJob, JobCompletedEvent::new); } } catch (JobQueueDataException e) { final var errorMessage = "Error updating job status"; @@ -885,29 +862,13 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) * * @param job The job to cancel. */ - @WrapInTransaction private void handleJobCancelRequest(final Job job) throws DotDataException { Job cancelJob = job.withState(JobState.CANCEL_REQUESTED); updateJobStatus(cancelJob); - // Prepare the cancel request events - final JobCancelRequestEvent cancelRequestEvent = new JobCancelRequestEvent( - cancelJob, LocalDateTime.now() - ); - - // LOCAL event - APILocator.getLocalSystemEventsAPI().notify(cancelRequestEvent); - - // CLUSTER WIDE event - Try.run(() -> APILocator.getSystemEventsAPI() - .push(SystemEventType.CLUSTER_WIDE_EVENT, new Payload(cancelRequestEvent))) - .onFailure(e -> Logger.error(JobQueueManagerAPIImpl.this, e.getMessage())); - - // CDI event - eventProducer.getEvent(JobCancelRequestEvent.class).fire( - cancelRequestEvent - ); + // Send the cancel request events + JobUtil.sendEvents(cancelJob, JobCancelRequestEvent::new); } /** @@ -916,7 +877,6 @@ private void handleJobCancelRequest(final Job job) throws DotDataException { * @param job The job that was canceled. * @param processor The processor that handled the job. */ - @WrapInTransaction private void handleJobCancelling(final Job job, final JobProcessor processor) { try { @@ -925,9 +885,8 @@ private void handleJobCancelling(final Job job, final JobProcessor processor) { Job cancelJob = job.withState(JobState.CANCELLING); updateJobStatus(cancelJob); - eventProducer.getEvent(JobCancellingEvent.class).fire( - new JobCancellingEvent(cancelJob, LocalDateTime.now()) - ); + // Send the job cancelling events + JobUtil.sendEvents(cancelJob, JobCancellingEvent::new); } catch (DotDataException e) { final var error = new JobCancellationException(job.id(), e); Logger.error(this, error); @@ -997,9 +956,8 @@ private void handleJobFailure(final Job job, final JobProcessor processor, final Job failedJob = job.markAsFailed(jobResult).withProgress(progress); updateJobStatus(failedJob); - eventProducer.getEvent(JobFailedEvent.class).fire( - new JobFailedEvent(failedJob, LocalDateTime.now()) - ); + // Send the job failed events + JobUtil.sendEvents(failedJob, JobFailedEvent::new); try { // Put the job back in the queue for later retry @@ -1018,7 +976,6 @@ private void handleJobFailure(final Job job, final JobProcessor processor, * @param job The job to update. * @throws DotDataException if there's an error updating the job status. */ - @WrapInTransaction private void updateJobStatus(final Job job) throws DotDataException { try { jobQueue.updateJobStatus(job); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/EventProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/EventProducer.java deleted file mode 100644 index f6f22e3570f5..000000000000 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/EventProducer.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.dotcms.jobs.business.api.events; - -import java.lang.annotation.Annotation; -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.event.Event; -import javax.enterprise.inject.spi.BeanManager; -import javax.inject.Inject; - -/** - * A producer class for CDI events. This class provides a centralized way to obtain event objects - * for firing events within the application. - * - *

This class is application scoped, ensuring a single instance is used throughout - * the application's lifecycle.

- */ -@ApplicationScoped -public class EventProducer { - - private BeanManager beanManager; - - public EventProducer() { - // Default constructor for CDI - } - - /** - * Constructs a new EventProducer. - * - * @param beanManager The CDI BeanManager, injected by the container. - */ - @Inject - public EventProducer(BeanManager beanManager) { - this.beanManager = beanManager; - } - - /** - * Retrieves an Event object for the specified event type and qualifiers. - * - *

This method allows for type-safe event firing. It uses the BeanManager to - * create an Event object that can be used to fire events of the specified type.

- * - *

Usage example:

- *
-     * EventProducer producer = ...;
-     * Event event = producer.getEvent(MyCustomEvent.class);
-     * event.fire(new MyCustomEvent(...));
-     * 
- * - * @param The type of the event. - * @param eventType The Class object representing the event type. - * @param qualifiers Optional qualifiers for the event. - * @return An Event object that can be used to fire events of the specified type. - */ - public Event getEvent(Class eventType, Annotation... qualifiers) { - return beanManager.getEvent().select(eventType, qualifiers); - } - -} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobAbandonedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobAbandonedEvent.java new file mode 100644 index 000000000000..8e4efacaeb71 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobAbandonedEvent.java @@ -0,0 +1,39 @@ +package com.dotcms.jobs.business.api.events; + +import com.dotcms.jobs.business.job.Job; +import java.time.LocalDateTime; + +/** + * Event fired when an abandoned job is detected. + */ +public class JobAbandonedEvent implements JobEvent { + + private final Job job; + private final LocalDateTime detectedAt; + + /** + * Constructs a new JobAbandonedEvent. + * + * @param job The job. + * @param detectedAt The timestamp when the abandoned job was detected. + */ + public JobAbandonedEvent(Job job, LocalDateTime detectedAt) { + this.job = job; + this.detectedAt = detectedAt; + } + + /** + * @return The abandoned job. + */ + public Job getJob() { + return job; + } + + /** + * @return The timestamp when the abandoned job was detected. + */ + public LocalDateTime getDetectedAt() { + return detectedAt; + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java index 9554b1e8f573..1c06e69052bc 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancelRequestEvent.java @@ -6,7 +6,7 @@ /** * Event fired when there is a request to cancel a job. */ -public class JobCancelRequestEvent { +public class JobCancelRequestEvent implements JobEvent { private final Job job; private final LocalDateTime canceledOn; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java index 9a9896604aa6..55046c416556 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job is canceled. */ -public class JobCanceledEvent { +public class JobCanceledEvent implements JobEvent { private final Job job; private final LocalDateTime canceledAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java index 20877a890387..32b3c10d2b5c 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCancellingEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job is being canceled. */ -public class JobCancellingEvent { +public class JobCancellingEvent implements JobEvent { private final Job job; private final LocalDateTime canceledAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java index 7c86c3940d6d..03460fd2072d 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCompletedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job completes successfully. */ -public class JobCompletedEvent { +public class JobCompletedEvent implements JobEvent { private final Job job; private final LocalDateTime completedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java index d7da52d2f5f8..e70000e684e9 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCreatedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a new job is created and added to the queue. */ -public class JobCreatedEvent { +public class JobCreatedEvent implements JobEvent { private final String jobId; private final String queueName; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobEvent.java new file mode 100644 index 000000000000..f26488361ae7 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobEvent.java @@ -0,0 +1,9 @@ +package com.dotcms.jobs.business.api.events; + +/** + * Base marker interface for all job-related events in the job queue system. All specific job event + * types (e.g., JobStartedEvent, JobCompletedEvent, etc.) should implement this interface. + */ +public interface JobEvent { + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java index 8d098c1eae43..7df87cf1555a 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobFailedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job fails during processing. */ -public class JobFailedEvent { +public class JobFailedEvent implements JobEvent { private final Job job; private final LocalDateTime failedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java index e859284ac09f..3ed6bd142912 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobProgressUpdatedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job's progress is updated. */ -public class JobProgressUpdatedEvent { +public class JobProgressUpdatedEvent implements JobEvent { private final Job job; private final LocalDateTime updatedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java index 0aa08dcaa886..b93504e600ec 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job is removed from the queue because failed and is not retryable. */ -public class JobRemovedFromQueueEvent { +public class JobRemovedFromQueueEvent implements JobEvent { private final Job job; private final LocalDateTime removedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java index dc357c6822d6..f3c2b0edeadc 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobStartedEvent.java @@ -6,7 +6,7 @@ /** * Event fired when a job starts processing. */ -public class JobStartedEvent { +public class JobStartedEvent implements JobEvent { private final Job job; private final LocalDateTime startedAt; diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java index 4c761158fa0e..ef33514f30d0 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java @@ -2,6 +2,8 @@ import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobState; +import com.dotcms.system.event.local.model.EventSubscriber; +import com.dotmarketing.business.APILocator; import com.dotmarketing.util.Logger; import java.util.Arrays; import java.util.List; @@ -12,8 +14,8 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; import java.util.function.Predicate; +import javax.annotation.PostConstruct; import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.event.Observes; /** * Manages real-time monitoring of jobs in the system. This class provides functionality to register @@ -67,6 +69,60 @@ public class RealTimeJobMonitor { private final Map> jobWatchers = new ConcurrentHashMap<>(); + public RealTimeJobMonitor() { + // Default constructor required for CDI + } + + @PostConstruct + protected void init() { + + APILocator.getLocalSystemEventsAPI().subscribe( + JobProgressUpdatedEvent.class, + (EventSubscriber) this::onJobProgressUpdated + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobFailedEvent.class, + (EventSubscriber) this::onJobFailed + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobRemovedFromQueueEvent.class, + (EventSubscriber) this::onJobRemovedFromQueueEvent + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCompletedEvent.class, + (EventSubscriber) this::onJobCompleted + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCanceledEvent.class, + (EventSubscriber) this::onJobCanceled + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCancellingEvent.class, + (EventSubscriber) this::onJobCancelling + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobCancelRequestEvent.class, + (EventSubscriber) this::onJobCancelRequest + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobStartedEvent.class, + (EventSubscriber) this::onJobStarted + ); + + APILocator.getLocalSystemEventsAPI().subscribe( + JobAbandonedEvent.class, + (EventSubscriber) this::onAbandonedJob + ); + + } + /** * Registers a watcher for a specific job with optional filtering of updates. The watcher will * be notified of job updates that match the provided filter predicate. If no filter is provided @@ -237,7 +293,7 @@ private void removeWatcher(String jobId) { * * @param event The JobStartedEvent. */ - public void onJobStarted(@Observes JobStartedEvent event) { + public void onJobStarted(JobStartedEvent event) { updateWatchers(event.getJob()); } @@ -246,7 +302,16 @@ public void onJobStarted(@Observes JobStartedEvent event) { * * @param event The JobCancelRequestEvent. */ - public void onJobCancelRequest(@Observes JobCancelRequestEvent event) { + public void onJobCancelRequest(JobCancelRequestEvent event) { + updateWatchers(event.getJob()); + } + + /** + * Handles the abandoned job event. + * + * @param event The JobAbandonedEvent. + */ + public void onAbandonedJob(JobAbandonedEvent event) { updateWatchers(event.getJob()); } @@ -255,7 +320,7 @@ public void onJobCancelRequest(@Observes JobCancelRequestEvent event) { * * @param event The JobCancellingEvent. */ - public void onJobCancelling(@Observes JobCancellingEvent event) { + public void onJobCancelling(JobCancellingEvent event) { updateWatchers(event.getJob()); } @@ -264,7 +329,7 @@ public void onJobCancelling(@Observes JobCancellingEvent event) { * * @param event The JobCanceledEvent. */ - public void onJobCanceled(@Observes JobCanceledEvent event) { + public void onJobCanceled(JobCanceledEvent event) { updateWatchers(event.getJob()); removeWatcher(event.getJob().id()); } @@ -274,7 +339,7 @@ public void onJobCanceled(@Observes JobCanceledEvent event) { * * @param event The JobCompletedEvent. */ - public void onJobCompleted(@Observes JobCompletedEvent event) { + public void onJobCompleted(JobCompletedEvent event) { updateWatchers(event.getJob()); removeWatcher(event.getJob().id()); } @@ -284,7 +349,7 @@ public void onJobCompleted(@Observes JobCompletedEvent event) { * * @param event The JobRemovedFromQueueEvent. */ - public void onJobRemovedFromQueueEvent(@Observes JobRemovedFromQueueEvent event) { + public void onJobRemovedFromQueueEvent(JobRemovedFromQueueEvent event) { removeWatcher(event.getJob().id()); } @@ -293,7 +358,7 @@ public void onJobRemovedFromQueueEvent(@Observes JobRemovedFromQueueEvent event) * * @param event The JobFailedEvent. */ - public void onJobFailed(@Observes JobFailedEvent event) { + public void onJobFailed(JobFailedEvent event) { updateWatchers(event.getJob()); } @@ -302,7 +367,7 @@ public void onJobFailed(@Observes JobFailedEvent event) { * * @param event The JobProgressUpdatedEvent. */ - public void onJobProgressUpdated(@Observes JobProgressUpdatedEvent event) { + public void onJobProgressUpdated(JobProgressUpdatedEvent event) { updateWatchers(event.getJob()); } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetector.java b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetector.java new file mode 100644 index 000000000000..2d9224ce9ef0 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetector.java @@ -0,0 +1,189 @@ +package com.dotcms.jobs.business.detector; + +import com.dotcms.jobs.business.api.events.JobAbandonedEvent; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.jobs.business.job.JobState; +import com.dotcms.jobs.business.queue.JobQueue; +import com.dotcms.jobs.business.queue.error.JobQueueDataException; +import com.dotcms.jobs.business.util.JobUtil; +import com.dotmarketing.exception.DotRuntimeException; +import com.dotmarketing.util.Logger; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +/** + * Detects and handles abandoned jobs in the job queue system. A job is considered abandoned if it + * remains in certain states (RUNNING, CANCELLING, CANCEL_REQUESTED) without updates for longer than + * a configured threshold. + *

+ * Key features: + *

    + *
  • Periodically scans for jobs that haven't been updated within the abandonment threshold
  • + *
  • Marks abandoned jobs and puts them back in queue for retry
  • + *
  • Publishes events when abandoned jobs are detected
  • + *
  • Configurable detection interval and abandonment threshold
  • + *
+ */ +@ApplicationScoped +public class AbandonedJobDetector implements AutoCloseable { + + private static final JobState[] ABANDONMENT_CHECK_STATES = { + JobState.RUNNING, + JobState.CANCELLING, + JobState.CANCEL_REQUESTED + }; + + private final JobQueue jobQueue; + + private final ScheduledExecutorService executor; + + private final long detectionIntervalMinutes; + private final long abandonmentThresholdMinutes; + + private volatile boolean isRunning = false; + + // Required for CDI proxy + protected AbandonedJobDetector() { + this.jobQueue = null; + this.executor = null; + this.detectionIntervalMinutes = 0; + this.abandonmentThresholdMinutes = 0; + } + + /** + * Creates a new abandoned job detector with the specified configuration. + * + * @param config Configuration containing detection interval and threshold + * @param jobQueue The job queue to monitor for abandoned jobs + */ + @Inject + public AbandonedJobDetector(AbandonedJobDetectorConfig config, JobQueue jobQueue) { + + this.jobQueue = jobQueue; + this.executor = Executors.newSingleThreadScheduledExecutor(); + + // Load configuration + this.detectionIntervalMinutes = config.getDetectionIntervalMinutes(); + this.abandonmentThresholdMinutes = config.getAbandonmentThresholdMinutes(); + } + + /** + * Starts the abandoned job detection process if not already running. Schedules periodic checks + * based on the configured detection interval. + */ + public void start() { + + if (!isRunning) { + isRunning = true; + executor.scheduleWithFixedDelay( + this::detectAbandonedJobs, + detectionIntervalMinutes, + detectionIntervalMinutes, + TimeUnit.MINUTES + ); + Logger.info(this, String.format( + "Abandoned job detector started. Checking every %d minutes for jobs not " + + "updated in %d minutes", + detectionIntervalMinutes, abandonmentThresholdMinutes)); + } + } + + /** + * Checks for and handles abandoned jobs. + *

+ * Abandoned jobs are: + *

    + *
  1. Marked as abandoned in the job queue
  2. + *
  3. Logged as a warning
  4. + *
  5. Put back in queue for retry
  6. + *
  7. Generate a JobAbandonedEvent
  8. + *
+ */ + private void detectAbandonedJobs() { + + try { + + Optional abandonedJob; + while ((abandonedJob = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(abandonmentThresholdMinutes), + ABANDONMENT_CHECK_STATES)).isPresent()) { + + processAbandonedJob(abandonedJob.get()); + } + } catch (Exception e) { + final var errorMessage = "Error detecting abandoned jobs"; + Logger.error(this, errorMessage, e); + throw new DotRuntimeException(errorMessage, e); + } + } + + /** + * Processes an abandoned job by logging the event, putting it back in the queue for retry, and + * generating a JobAbandonedEvent. + */ + private void processAbandonedJob(final Job abandonedJob) throws JobQueueDataException { + + // Log the event + Logger.warn(this, String.format( + "Abandoned job found - Job ID: %s, Queue: %s, Last Updated: %s", + abandonedJob.id(), + abandonedJob.queueName(), + abandonedJob.updatedAt().orElse(null) + )); + + // Put the job back in the queue for later retry + putJobBackInQueue(abandonedJob); + + // Send events + JobUtil.sendEvents(abandonedJob, JobAbandonedEvent::new); + } + + /** + * Places an abandoned job back in the queue for retry. + * + * @param abandonedJob The job that was detected as abandoned + * @throws JobQueueDataException if there is an error re-queueing the job + */ + private void putJobBackInQueue(final Job abandonedJob) throws JobQueueDataException { + + // Put the job back in the queue for later retry + jobQueue.putJobBackInQueue(abandonedJob); + + Logger.info(this, + String.format("Abandoned job %s queued for retry", abandonedJob.id())); + } + + /** + * Stops the abandoned job detector and cleans up resources. Attempts graceful shutdown, forcing + * shutdown if tasks don't complete within 60 seconds. + */ + @Override + public void close() { + if (isRunning) { + isRunning = false; + executor.shutdown(); + try { + if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + Logger.info(this, "Abandoned job detector stopped"); + } + } + + /** + * @return true if the detector is currently running, false otherwise + */ + public boolean isRunning() { + return isRunning; + } + +} \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfig.java b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfig.java new file mode 100644 index 000000000000..46b67ca4847d --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfig.java @@ -0,0 +1,50 @@ +package com.dotcms.jobs.business.detector; + +/** + * Configuration settings for the abandoned job detection system. This class holds the timing + * parameters that control how and when jobs are determined to be abandoned. + */ +public class AbandonedJobDetectorConfig { + + private final long detectionIntervalMinutes; + private final long abandonmentThresholdMinutes; + + // Required for CDI proxy + protected AbandonedJobDetectorConfig() { + this.detectionIntervalMinutes = 0; + this.abandonmentThresholdMinutes = 0; + } + + /** + * Constructs a new configuration with the specified timing parameters. + * + * @param detectionIntervalMinutes How frequently to check for abandoned jobs, in minutes + * @param abandonmentThresholdMinutes How long a job must be inactive before being considered + * abandoned, in minutes + */ + public AbandonedJobDetectorConfig( + long detectionIntervalMinutes, + long abandonmentThresholdMinutes) { + this.detectionIntervalMinutes = detectionIntervalMinutes; + this.abandonmentThresholdMinutes = abandonmentThresholdMinutes; + } + + /** + * Gets the interval between abandoned job detection runs. + * + * @return The detection interval in minutes + */ + public long getDetectionIntervalMinutes() { + return detectionIntervalMinutes; + } + + /** + * Gets the time threshold after which an inactive job is considered abandoned. + * + * @return The abandonment threshold in minutes + */ + public long getAbandonmentThresholdMinutes() { + return abandonmentThresholdMinutes; + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfigProducer.java b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfigProducer.java new file mode 100644 index 000000000000..cfb38bf07f3e --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/detector/AbandonedJobDetectorConfigProducer.java @@ -0,0 +1,49 @@ +package com.dotcms.jobs.business.detector; + +import com.dotmarketing.util.Config; +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.inject.Produces; + +/** + * Produces configuration for the abandoned job detector system using application properties. This + * producer creates a singleton configuration object that determines the intervals for checking + * abandoned jobs and the threshold for considering a job abandoned. + *

+ * Configuration is read from the following properties: + *

    + *
  • JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES: How often to check for abandoned jobs (default: 5)
  • + *
  • JOB_ABANDONMENT_THRESHOLD_MINUTES: How long before a non-updating job is considered abandoned (default: 30)
  • + *
+ */ +@ApplicationScoped +public class AbandonedJobDetectorConfigProducer { + + /** + * The default interval in minutes between checks for abandoned jobs. + */ + static final int DEFAULT_JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES = Config.getIntProperty( + "JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", 5 + ); + + /** + * The default time in minutes after which an inactive job is considered abandoned. + */ + static final int DEFAULT_JOB_ABANDONMENT_THRESHOLD_MINUTES = Config.getIntProperty( + "JOB_ABANDONMENT_THRESHOLD_MINUTES", 30 + ); + + /** + * Produces a configuration object for the abandoned job detector. + * + * @return A new configuration object with the current property values + */ + @ApplicationScoped + @Produces + public AbandonedJobDetectorConfig produceAbandonedJobDetectorConfig() { + return new AbandonedJobDetectorConfig( + DEFAULT_JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES, + DEFAULT_JOB_ABANDONMENT_THRESHOLD_MINUTES + ); + } + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobAbandonedException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobAbandonedException.java new file mode 100644 index 000000000000..3fcd5a7ed84e --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobAbandonedException.java @@ -0,0 +1,37 @@ +package com.dotcms.jobs.business.error; + +/** + * Exception thrown when a job is detected to be abandoned in the job queue system. A job is + * considered abandoned when it remains in an active state without updates beyond the configured + * abandonment threshold. + */ +public class JobAbandonedException extends RuntimeException { + + /** + * Creates a new JobAbandonedException with the specified message. + * + * @param message Details about why the job was considered abandoned + */ + public JobAbandonedException(String message) { + super(message); + } + + /** + * Creates a new JobAbandonedException with a message and underlying cause. + * + * @param message Details about why the job was considered abandoned + * @param cause The underlying exception that led to the job being abandoned + */ + public JobAbandonedException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Creates a new JobAbandonedException with an underlying cause. + * + * @param cause The underlying exception that led to the job being abandoned + */ + public JobAbandonedException(Throwable cause) { + super(cause); + } +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java index b656c8f5b79b..79a335e57433 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobProcessingException.java @@ -7,6 +7,16 @@ */ public class JobProcessingException extends RuntimeException { + /** + * Constructs a new JobProcessingException with the specified message and cause. + * + * @param message A description of the error + * @param cause The underlying cause of the error (can be null) + */ + public JobProcessingException(String message, Throwable cause) { + super(message, cause); + } + /** * Constructs a new JobProcessingException with the specified job ID, reason, and cause. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java index 00dca669a109..2f871e0be580 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/JobValidationException.java @@ -7,6 +7,25 @@ */ public class JobValidationException extends RuntimeException { + /** + * Constructs a new JobValidationException with the specified message and cause. + * + * @param message A description of the validation failure + * @param cause The underlying cause of the validation failure (can be null) + */ + public JobValidationException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a new JobValidationException with the specified message + * + * @param message A description of the validation failure + */ + public JobValidationException(String message) { + super(message); + } + /** * Constructs a new JobValidationException with the specified job ID, reason, and cause. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java index 590458a07f2f..faf8d5745bec 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java @@ -87,6 +87,21 @@ default Job markAsFailed(final JobResult result) { .build(); } + /** + * Creates a new Job marked as abandoned with the result details. + * + * @param result The result details of the abandoned job. + * @return A new Job instance marked as abandoned. + */ + default Job markAsAbandoned(final JobResult result) { + return Job.builder().from(this) + .state(JobState.ABANDONED) + .result(result) + .completedAt(Optional.of(LocalDateTime.now())) + .updatedAt(LocalDateTime.now()) + .build(); + } + /** * Creates a new Job marked as running. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java index c13b8a6d3587..f3e7ebdc0880 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java @@ -25,6 +25,11 @@ public enum JobState { */ FAILED, + /** + * The job was abandoned before it could complete. + */ + ABANDONED, + /** * The job is waiting to be canceled. */ diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Validator.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Validator.java new file mode 100644 index 000000000000..29712d5ab328 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/Validator.java @@ -0,0 +1,20 @@ +package com.dotcms.jobs.business.processor; + +import com.dotcms.jobs.business.error.JobValidationException; +import java.util.Map; + +/** + * Interface for validating job parameters before creation. Processors can implement this interface + * to provide validation logic that will be executed before a job is created. + */ +public interface Validator { + + /** + * Validates the job parameters before job creation. + * + * @param parameters The parameters to validate + * @throws JobValidationException if the parameters are invalid + */ + void validate(Map parameters) throws JobValidationException; + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java index 3c8b446f8b39..04420a9bf468 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java @@ -11,6 +11,7 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.NoRetryPolicy; import com.dotcms.jobs.business.processor.Queue; +import com.dotcms.jobs.business.processor.Validator; import com.dotcms.jobs.business.util.JobUtil; import com.dotcms.repackage.com.csvreader.CsvReader; import com.dotcms.rest.api.v1.temp.DotTempFile; @@ -51,9 +52,10 @@ * functionality to import content from CSV files, with support for both preview and publish * operations, as well as multilingual content handling. * - *

The processor implements both {@link JobProcessor} and {@link Cancellable} interfaces to - * provide job processing and cancellation capabilities. It's annotated with {@link Queue} to - * specify the queue name and {@link ExponentialBackoffRetryPolicy} to define retry behavior.

+ *

The processor implements both {@link JobProcessor} {@link Cancellable} and {@link Validator} + * interfaces to provide job processing and cancellation capabilities. It's annotated with + * {@link Queue} to specify the queue name and {@link ExponentialBackoffRetryPolicy} to define + * retry behavior.

* *

Key features:

*
    @@ -66,12 +68,13 @@ * * @see JobProcessor * @see Cancellable + * @see Validator * @see Queue * @see ExponentialBackoffRetryPolicy */ @Queue("importContentlets") @NoRetryPolicy -public class ImportContentletsProcessor implements JobProcessor, Cancellable { +public class ImportContentletsProcessor implements JobProcessor, Validator, Cancellable { private static final String PARAMETER_LANGUAGE = "language"; private static final String PARAMETER_FIELDS = "fields"; @@ -120,7 +123,7 @@ public void process(final Job job) throws JobProcessingException { final User user; try { - user = getUser(job); + user = getUser(job.parameters()); } catch (Exception e) { Logger.error(this, "Error retrieving user", e); throw new JobProcessingException(job.id(), "Error retrieving user", e); @@ -136,9 +139,6 @@ public void process(final Job job) throws JobProcessingException { throw new JobValidationException(job.id(), "Unable to retrieve the import file."); } - // Validate the job has the required data - validate(job); - final var fileToImport = tempFile.get().file; final long totalLines = totalLines(job, fileToImport); @@ -163,6 +163,55 @@ public void process(final Job job) throws JobProcessingException { } } + /** + * Validates the job parameters and content type. Performs security checks to prevent + * unauthorized host imports. + * + * @param parameters The parameters to validate + * @throws JobValidationException if validation fails + */ + @Override + public void validate(final Map parameters) throws JobValidationException { + + // Validating the language (will throw an exception if it doesn't) + final Language language = findLanguage(parameters); + + if (getContentType(parameters) != null && getContentType(parameters).isEmpty()) { + final var errorMessage = "A Content Type id or variable is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(errorMessage); + } else if (getWorkflowActionId(parameters) != null + && getWorkflowActionId(parameters).isEmpty()) { + final var errorMessage = "A Workflow Action id is required"; + Logger.error(this.getClass(), errorMessage); + throw new JobValidationException(errorMessage); + } else if (language == null && getFields(parameters).length == 0) { + final var errorMessage = + "A key identifying the different Language versions of the same " + + "content must be defined when importing multilingual files."; + Logger.error(this, errorMessage); + throw new JobValidationException(errorMessage); + } + + try { + + // Make sure the content type exist (will throw an exception if it doesn't) + final var contentTypeFound = findContentType(parameters); + + // Security measure to prevent invalid attempts to import a host. + final ContentType hostContentType = APILocator.getContentTypeAPI( + APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME); + final boolean isHost = (hostContentType.id().equals(contentTypeFound.id())); + if (isHost) { + final var errorMessage = "Invalid attempt to import a host."; + Logger.error(this, errorMessage); + throw new JobValidationException(errorMessage); + } + } catch (DotSecurityException | DotDataException e) { + throw new JobProcessingException("Error validating content type", e); + } + } + /** * Handles cancellation requests for the import operation. When called, it marks the operation * for cancellation. @@ -262,10 +311,10 @@ private Map> processImport(final boolean preview, final Job final var currentSiteId = getSiteIdentifier(job); final var currentSiteName = getSiteName(job); - final var contentType = findContentType(job); - final var fields = getFields(job); - final var language = findLanguage(job); - final var workflowActionId = getWorkflowActionId(job); + final var contentType = findContentType(job.parameters()); + final var fields = getFields(job.parameters()); + final var language = findLanguage(job.parameters()); + final var workflowActionId = getWorkflowActionId(job.parameters()); final var httpReq = JobUtil.generateMockRequest(user, currentSiteName); final var importId = jobIdToLong(job.id()); @@ -300,13 +349,14 @@ private String getCommand(final Job job) { /** * Retrieve the user from the job parameters * - * @param job input job + * @param parameters job parameters * @return the user from the job parameters * @throws DotDataException if an error occurs during the user retrieval * @throws DotSecurityException if we don't have the necessary permissions to retrieve the user */ - private User getUser(final Job job) throws DotDataException, DotSecurityException { - final var userId = (String) job.parameters().get(PARAMETER_USER_ID); + private User getUser(final Map parameters) + throws DotDataException, DotSecurityException { + final var userId = (String) parameters.get(PARAMETER_USER_ID); return APILocator.getUserAPI().loadUserById(userId); } @@ -333,53 +383,53 @@ private String getSiteName(final Job job) { /** * Retrieves the content type from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return The content type string, or null if not present in parameters */ - private String getContentType(final Job job) { - return (String) job.parameters().get(PARAMETER_CONTENT_TYPE); + private String getContentType(final Map parameters) { + return (String) parameters.get(PARAMETER_CONTENT_TYPE); } /** * Retrieves the workflow action ID from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return The workflow action ID string, or null if not present in parameters */ - private String getWorkflowActionId(final Job job) { - return (String) job.parameters().get(PARAMETER_WORKFLOW_ACTION_ID); + private String getWorkflowActionId(final Map parameters) { + return (String) parameters.get(PARAMETER_WORKFLOW_ACTION_ID); } /** * Retrieves the language from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return An optional containing the language string, or an empty optional if not present */ - private Optional getLanguage(final Job job) { + private Optional getLanguage(final Map parameters) { - if (!job.parameters().containsKey(PARAMETER_LANGUAGE) - || job.parameters().get(PARAMETER_LANGUAGE) == null) { + if (!parameters.containsKey(PARAMETER_LANGUAGE) + || parameters.get(PARAMETER_LANGUAGE) == null) { return Optional.empty(); } - return Optional.of((String) job.parameters().get(PARAMETER_LANGUAGE)); + return Optional.of((String) parameters.get(PARAMETER_LANGUAGE)); } /** * Retrieves the fields array from the job parameters. * - * @param job The job containing the parameters + * @param parameters job parameters * @return An array of field strings, or an empty array if no fields are specified */ - public String[] getFields(final Job job) { + public String[] getFields(final Map parameters) { - if (!job.parameters().containsKey(PARAMETER_FIELDS) - || job.parameters().get(PARAMETER_FIELDS) == null) { + if (!parameters.containsKey(PARAMETER_FIELDS) + || parameters.get(PARAMETER_FIELDS) == null) { return new String[0]; } - final var fields = job.parameters().get(PARAMETER_FIELDS); + final var fields = parameters.get(PARAMETER_FIELDS); if (fields instanceof List) { return ((List) fields).toArray(new String[0]); } @@ -387,54 +437,6 @@ public String[] getFields(final Job job) { return (String[]) fields; } - /** - * Validates the job parameters and content type. Performs security checks to prevent - * unauthorized host imports. - * - * @param job The job to validate - * @throws JobValidationException if validation fails - * @throws JobProcessingException if an error occurs during content type validation - */ - private void validate(final Job job) { - - // Validating the language (will throw an exception if it doesn't) - final Language language = findLanguage(job); - - if (getContentType(job) != null && getContentType(job).isEmpty()) { - final var errorMessage = "A Content Type id or variable is required"; - Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } else if (getWorkflowActionId(job) != null && getWorkflowActionId(job).isEmpty()) { - final var errorMessage = "A Workflow Action id is required"; - Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } else if (language == null && getFields(job).length == 0) { - final var errorMessage = - "A key identifying the different Language versions of the same " - + "content must be defined when importing multilingual files."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - - try { - - // Make sure the content type exist (will throw an exception if it doesn't) - final var contentTypeFound = findContentType(job); - - // Security measure to prevent invalid attempts to import a host. - final ContentType hostContentType = APILocator.getContentTypeAPI( - APILocator.systemUser()).find(Host.HOST_VELOCITY_VAR_NAME); - final boolean isHost = (hostContentType.id().equals(contentTypeFound.id())); - if (isHost) { - final var errorMessage = "Invalid attempt to import a host."; - Logger.error(this, errorMessage); - throw new JobValidationException(job.id(), errorMessage); - } - } catch (DotSecurityException | DotDataException e) { - throw new JobProcessingException(job.id(), "Error validating content type", e); - } - } - /** * Utility method to convert a job ID to a long value for internal processing. Uses FarmHash for * efficient hash generation and distribution. @@ -579,23 +581,23 @@ private void validateLanguageColumns(Job job, int languageCodeColumn, int countr /** * Retrieves the existing content type based on an id or variable. * - * @param job The current import job. + * @param parameters job parameters * @return The existing content type if found, otherwise fails with an exception. * @throws DotSecurityException If there are security restrictions preventing the evaluation. */ - private ContentType findContentType(final Job job) + private ContentType findContentType(final Map parameters) throws DotSecurityException { - final var contentTypeIdOrVar = getContentType(job); + final var contentTypeIdOrVar = getContentType(parameters); final User user; // Retrieving the user requesting the import try { - user = getUser(job); + user = getUser(parameters); } catch (DotDataException e) { final var errorMessage = "Error retrieving user."; Logger.error(this.getClass(), errorMessage); - throw new JobProcessingException(job.id(), errorMessage, e); + throw new JobProcessingException(errorMessage, e); } try { @@ -606,26 +608,26 @@ private ContentType findContentType(final Job job) "Content Type [%s] not found.", contentTypeIdOrVar ); Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); + throw new JobValidationException(errorMessage); } catch (DotDataException e) { final var errorMessage = String.format( "Error finding Content Type [%s].", contentTypeIdOrVar ); Logger.error(this.getClass(), errorMessage); - throw new JobProcessingException(job.id(), errorMessage, e); + throw new JobProcessingException(errorMessage, e); } } /** * Retrieves the existing language based on an id or ISO code. * - * @param job The current import job. + * @param parameters job parameters * @return The existing language if found, otherwise fails with an exception. */ - private Language findLanguage(final Job job) { + private Language findLanguage(final Map parameters) { // Read the language from the job parameters - final var languageIsoOrIdOptional = getLanguage(job); + final var languageIsoOrIdOptional = getLanguage(parameters); if (languageIsoOrIdOptional.isEmpty()) { return null; } @@ -652,7 +654,7 @@ private Language findLanguage(final Job job) { "Language [%s] not found.", languageIsoOrId ); Logger.error(this.getClass(), errorMessage); - throw new JobValidationException(job.id(), errorMessage); + throw new JobValidationException(errorMessage); } /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index 0e792c1424fc..9b2a043c00a9 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -7,9 +7,11 @@ import com.dotcms.jobs.business.queue.error.JobNotFoundException; import com.dotcms.jobs.business.queue.error.JobQueueDataException; import com.dotcms.jobs.business.queue.error.JobQueueException; +import java.time.Duration; import java.time.LocalDateTime; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** @@ -166,6 +168,18 @@ List getUpdatedJobsSince(Set jobIds, LocalDateTime since) */ Job nextJob() throws JobQueueDataException, JobLockingException; + /** + * Detects and marks jobs as abandoned if they haven't been updated within the specified + * threshold. + * + * @param threshold The time duration after which a job is considered abandoned + * @param inStates The states to check for abandoned jobs + * @return The abandoned job if one was found and marked, null otherwise + * @throws JobQueueDataException if there's a data storage error + */ + Optional detectAndMarkAbandoned(Duration threshold, JobState... inStates) + throws JobQueueDataException; + /** * Updates the progress of a job. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java index 10298c44e578..72ca197b4e03 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java @@ -1,7 +1,11 @@ package com.dotcms.jobs.business.queue; +import com.dotcms.business.CloseDBIfOpened; +import com.dotcms.business.WrapInTransaction; +import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; +import com.dotcms.jobs.business.job.JobResult; import com.dotcms.jobs.business.job.JobState; import com.dotcms.jobs.business.queue.error.JobLockingException; import com.dotcms.jobs.business.queue.error.JobNotFoundException; @@ -22,11 +26,13 @@ import com.github.jonpeterson.jackson.module.versioning.VersioningModule; import io.vavr.Lazy; import java.sql.Timestamp; +import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -77,6 +83,24 @@ public class PostgresJobQueue implements JobQueue { + "ORDER BY priority DESC, created_at ASC LIMIT 1 FOR UPDATE SKIP LOCKED) " + "RETURNING *"; + private static final String DETECT_AND_MARK_ABANDONED_WITH_LOCK_QUERY = + "WITH active_states AS (" + + " SELECT unnest(ARRAY[$??$]) as state" + + + "), abandoned_jobs AS (" + + " SELECT j.*, j.result as existing_result" + + " FROM job j" + + " INNER JOIN active_states a ON j.state = a.state::text" + + " WHERE j.updated_at < ?" + + " ORDER BY j.updated_at ASC" + + " LIMIT 1" + + " FOR UPDATE SKIP LOCKED" + + ") " + + "UPDATE job " + + "SET state = ?, updated_at = ? " + + "WHERE id IN (SELECT id FROM abandoned_jobs) " + + "RETURNING *"; + private static final String GET_ACTIVE_JOBS_QUERY_FOR_QUEUE = "WITH total AS (SELECT COUNT(*) AS total_count " + " FROM job WHERE queue_name = ? AND state IN (?, ?) " + @@ -171,6 +195,7 @@ public class PostgresJobQueue implements JobQueue { return mapper; }); + @WrapInTransaction @Override public String createJob(final String queueName, final Map parameters) throws JobQueueException { @@ -223,6 +248,7 @@ public String createJob(final String queueName, final Map parame } } + @CloseDBIfOpened @Override public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataException { @@ -256,6 +282,7 @@ public Job getJob(final String jobId) throws JobNotFoundException, JobQueueDataE } } + @CloseDBIfOpened @Override public JobState getJobState(final String jobId) throws JobNotFoundException, JobQueueDataException { @@ -274,6 +301,7 @@ public JobState getJobState(final String jobId) return job.state(); } + @CloseDBIfOpened @Override public JobPaginatedResult getActiveJobs(final String queueName, final int page, final int pageSize) throws JobQueueDataException { @@ -300,6 +328,7 @@ public JobPaginatedResult getActiveJobs(final String queueName, final int page, } } + @CloseDBIfOpened @Override public JobPaginatedResult getCompletedJobs(final String queueName, final LocalDateTime startDate, @@ -329,6 +358,7 @@ public JobPaginatedResult getCompletedJobs(final String queueName, } } + @CloseDBIfOpened @Override public JobPaginatedResult getJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -346,6 +376,7 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getActiveJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -372,6 +403,7 @@ public JobPaginatedResult getActiveJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getCompletedJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -396,6 +428,7 @@ public JobPaginatedResult getCompletedJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getCanceledJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -420,6 +453,7 @@ public JobPaginatedResult getCanceledJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public JobPaginatedResult getFailedJobs(final int page, final int pageSize) throws JobQueueDataException { @@ -444,6 +478,7 @@ public JobPaginatedResult getFailedJobs(final int page, final int pageSize) } } + @CloseDBIfOpened @Override public void updateJobStatus(final Job job) throws JobQueueDataException { @@ -489,9 +524,10 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { }).orElse(null)); historyDc.loadResult(); - // Remove from job_queue if completed, failed, or canceled + // Remove from job_queue if completed, failed, abandoned or canceled if (job.state() == JobState.COMPLETED || job.state() == JobState.FAILED + || job.state() == JobState.ABANDONED || job.state() == JobState.CANCELED) { removeJobFromQueue(job.id()); } @@ -506,6 +542,7 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { } } + @CloseDBIfOpened @Override public List getUpdatedJobsSince(final Set jobIds, final LocalDateTime since) throws JobQueueDataException { @@ -528,6 +565,7 @@ public List getUpdatedJobsSince(final Set jobIds, final LocalDateTi } } + @CloseDBIfOpened @Override public void putJobBackInQueue(final Job job) throws JobQueueDataException { @@ -550,6 +588,7 @@ public void putJobBackInQueue(final Job job) throws JobQueueDataException { } } + @CloseDBIfOpened @Override public Job nextJob() throws JobQueueDataException, JobLockingException { @@ -580,6 +619,60 @@ public Job nextJob() throws JobQueueDataException, JobLockingException { } } + @CloseDBIfOpened + @Override + public Optional detectAndMarkAbandoned(final Duration threshold, final JobState... inStates) + throws JobQueueDataException { + + try { + + String parameters = String.join(", ", Collections.nCopies(inStates.length, "?")); + + var query = DETECT_AND_MARK_ABANDONED_WITH_LOCK_QUERY + .replace(REPLACE_TOKEN_PARAMETERS, parameters); + + LocalDateTime thresholdTime = LocalDateTime.now().minus(threshold); + + DotConnect dc = new DotConnect(); + dc.setSQL(query); + for (JobState state : inStates) { + dc.addParam(state.name()); + } + dc.addParam(Timestamp.valueOf(thresholdTime)); + dc.addParam(JobState.ABANDONED.name()); + dc.addParam(Timestamp.valueOf(LocalDateTime.now())); + + List> results = dc.loadObjectResults(); + if (!results.isEmpty()) { + final var foundAbandonedJob = DBJobTransformer.toJob(results.get(0)); + + // Create error detail for abandoned job + final ErrorDetail errorDetail = ErrorDetail.builder() + .message("Job abandoned due to no updates within " + + threshold.toMinutes() + " minutes") + .exceptionClass("com.dotcms.jobs.business.error.JobAbandonedException") + .timestamp(LocalDateTime.now()) + .processingStage("Abandoned Job Detection") + .stackTrace("Job exceeded inactivity threshold of " + + threshold.toMinutes() + " minutes") + .build(); + final JobResult jobResult = JobResult.builder().errorDetail(errorDetail).build(); + + final Job abandonedJob = foundAbandonedJob.markAsAbandoned(jobResult); + updateJobStatus(abandonedJob); + + return Optional.of(abandonedJob); + } + + return Optional.empty(); + } catch (DotDataException e) { + final var errorMessage = "Database error while detecting abandoned jobs"; + Logger.error(this, errorMessage, e); + throw new JobQueueDataException(errorMessage, e); + } + } + + @CloseDBIfOpened @Override public void updateJobProgress(final String jobId, final float progress) throws JobQueueDataException { @@ -601,6 +694,7 @@ public void updateJobProgress(final String jobId, final float progress) } } + @CloseDBIfOpened @Override public void removeJobFromQueue(final String jobId) throws JobQueueDataException { @@ -615,6 +709,7 @@ public void removeJobFromQueue(final String jobId) throws JobQueueDataException } } + @CloseDBIfOpened @Override public boolean hasJobBeenInState(final String jobId, final JobState... states) throws JobQueueDataException { diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java index c19e19056153..126d493b8f60 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/util/JobUtil.java @@ -1,6 +1,9 @@ package com.dotcms.jobs.business.util; +import com.dotcms.api.system.event.Payload; +import com.dotcms.api.system.event.SystemEventType; import com.dotcms.api.web.HttpServletRequestThreadLocal; +import com.dotcms.jobs.business.api.events.JobEvent; import com.dotcms.jobs.business.job.Job; import com.dotcms.mock.request.FakeHttpRequest; import com.dotcms.mock.request.MockHeaderRequest; @@ -12,11 +15,14 @@ import com.dotmarketing.util.UtilMethods; import com.dotmarketing.util.WebKeys; import com.liferay.portal.model.User; +import io.vavr.control.Try; import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.BiFunction; import javax.servlet.http.HttpServletRequest; /** @@ -120,4 +126,38 @@ public static float roundedProgress(final float progress) { return Math.round(roundedProgress * 1000f) / 1000f; } + /** + * Helper method to send both local and cluster-wide events for a job state change + * + * @param job The job that triggered the event + * @param eventFactory Factory function to create the specific event type + * @param The type of event being created (must extend JobEvent) + */ + public static void sendEvents( + final Job job, + final BiFunction eventFactory) { + + // Create the event + final T event = eventFactory.apply(job, LocalDateTime.now()); + + // Send the event notifications + sendEvents(event); + } + + /** + * Helper method to send both local and cluster-wide notifications for a job event. + * + * @param event The event to send (must implement the JobEvent interface) + */ + public static void sendEvents(final T event) { + + // LOCAL event + APILocator.getLocalSystemEventsAPI().notify(event); + + // CLUSTER WIDE event + Try.run(() -> APILocator.getSystemEventsAPI() + .push(SystemEventType.CLUSTER_WIDE_EVENT, new Payload(event))) + .onFailure(e -> Logger.error(JobUtil.class, e.getMessage())); + } + } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index d2b537e94f03..268ca26b38b4 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -1,9 +1,11 @@ package com.dotcms.rest.api.v1.job; +import com.dotcms.jobs.business.error.JobValidationException; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.rest.ResponseEntityView; import com.dotcms.rest.WebResource; +import com.dotcms.rest.exception.mapper.ExceptionMapperUtil; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; @@ -25,6 +27,7 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.glassfish.jersey.media.sse.EventOutput; import org.glassfish.jersey.media.sse.OutboundEvent; import org.glassfish.jersey.media.sse.SseFeature; @@ -54,37 +57,50 @@ public JobQueueResource(WebResource webResource, JobQueueHelper helper, @Path("/{queueName}") @Consumes(MediaType.MULTIPART_FORM_DATA) @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView createJob( + public Response createJob( @Context HttpServletRequest request, @PathParam("queueName") String queueName, @BeanParam JobParams form) throws JsonProcessingException, DotDataException { + final var initDataObject = new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - final String jobId = helper.createJob(queueName, form, initDataObject.getUser(), request); - return new ResponseEntityView<>(jobId); + + try { + final String jobId = helper.createJob( + queueName, form, initDataObject.getUser(), request); + return Response.ok(new ResponseEntityView<>(jobId)).build(); + } catch (JobValidationException e) { + return ExceptionMapperUtil.createResponse(null, e.getMessage()); + } } @POST @Path("/{queueName}") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView createJob( + public Response createJob( @Context HttpServletRequest request, @PathParam("queueName") String queueName, Map parameters) throws DotDataException { + final var initDataObject = new WebResource.InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) .rejectWhenNoUser(true) .init(); - final String jobId = helper.createJob( - queueName, parameters, initDataObject.getUser(), request); - return new ResponseEntityView<>(jobId); + + try { + final String jobId = helper.createJob( + queueName, parameters, initDataObject.getUser(), request); + return Response.ok(new ResponseEntityView<>(jobId)).build(); + } catch (JobValidationException e) { + return ExceptionMapperUtil.createResponse(null, e.getMessage()); + } } @GET diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 97c3387fde78..986b2de95b94 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -13,15 +13,21 @@ import com.dotcms.jobs.business.processor.JobProcessor; import com.dotcms.jobs.business.processor.ProgressTracker; import com.dotcms.util.IntegrationTestInitService; +import com.dotmarketing.business.APILocator; import com.dotmarketing.common.db.DotConnect; import com.dotmarketing.exception.DotDataException; +import com.dotmarketing.util.Config; import com.dotmarketing.util.Logger; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.Timestamp; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -63,6 +69,9 @@ public class JobQueueManagerAPIIntegrationTest extends com.dotcms.Junit5WeldBase static void setUp() throws Exception { // Initialize the test environment IntegrationTestInitService.getInstance().init(); + + Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "1"); + Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "2"); } /** @@ -73,10 +82,15 @@ static void setUp() throws Exception { */ @AfterAll void cleanUp() throws Exception { - if(null != jobQueueManagerAPI) { - jobQueueManagerAPI.close(); - } + clearJobs(); + + Config.setProperty("JOB_ABANDONMENT_DETECTION_INTERVAL_MINUTES", "5"); + Config.setProperty("JOB_ABANDONMENT_THRESHOLD_MINUTES", "30"); + + if (null != jobQueueManagerAPI) { + jobQueueManagerAPI.close(); + } } @BeforeEach @@ -450,6 +464,127 @@ void test_CombinedScenarios() throws Exception { }); } + /** + * Tests the abandoned job detection functionality. + * Given Scenario: A job exists in RUNNING state with an old timestamp + * ExpectedResult: The job is detected as abandoned, marked accordingly and retried successfully + */ + @Test + @Order(7) + void test_AbandonedJobDetection() throws Exception { + + final String jobId = UUID.randomUUID().toString(); + final String queueName = "abandonedQueue"; + final Map parameters = Collections.singletonMap("test", "value"); + final String serverId = APILocator.getServerAPI().readServerId(); + final LocalDateTime oldTimestamp = LocalDateTime.now().minusMinutes(5); + + // Create a job directly in the database in RUNNING state to simulate an abandoned job + DotConnect dc = new DotConnect(); + + // Insert into job table + dc.setSQL("INSERT INTO job (id, queue_name, state, parameters, created_at, updated_at, started_at, execution_node) VALUES (?, ?, ?, ?::jsonb, ?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(new ObjectMapper().writeValueAsString(parameters)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(serverId) + .loadResult(); + + // Insert into job_queue table + dc.setSQL("INSERT INTO job_queue (id, queue_name, state, created_at) VALUES (?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Insert initial state into job_history + dc.setSQL("INSERT INTO job_history (id, job_id, state, execution_node, created_at) VALUES (?, ?, ?, ?, ?)") + .addParam(UUID.randomUUID().toString()) + .addParam(jobId) + .addParam(JobState.RUNNING.name()) + .addParam(serverId) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Verify the job was created in RUNNING state + Job initialJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.RUNNING, initialJob.state(), + "Job should be in RUNNING state initially"); + + // Start job queue manager if not started + if (!jobQueueManagerAPI.isStarted()) { + jobQueueManagerAPI.start(); + jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS); + } + + // Register a processor for the abandoned job + jobQueueManagerAPI.registerProcessor(queueName, AbbandonedJobProcessor.class); + + // The job should be marked as abandoned + CountDownLatch latch = new CountDownLatch(1); + jobQueueManagerAPI.watchJob(jobId, job -> { + if (job.state() == JobState.ABANDONED) { + latch.countDown(); + } + }); + + boolean abandoned = latch.await(3, TimeUnit.MINUTES); + assertTrue(abandoned, "Job should be marked as abandoned within timeout period"); + + // Verify the abandoned job state and error details + Job abandonedJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.ABANDONED, abandonedJob.state(), + "Job should be in ABANDONED state"); + assertTrue(abandonedJob.result().isPresent(), + "Abandoned job should have a result"); + assertTrue(abandonedJob.result().get().errorDetail().isPresent(), + "Abandoned job should have error details"); + assertTrue(abandonedJob.result().get().errorDetail().get().message() + .contains("abandoned due to no updates"), + "Error message should indicate abandonment"); + + // Verify the job was put back in queue for retry and completed successfully + Awaitility.await().atMost(15, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Job job = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.COMPLETED, job.state(), + "Job should be in COMPLETED state"); + }); + + // Verify job history contains the state transitions + dc.setSQL("SELECT state FROM job_history WHERE job_id = ? ORDER BY created_at") + .addParam(jobId); + List> history = dc.loadObjectResults(); + + assertFalse(history.isEmpty(), "Job should have history records"); + assertEquals(JobState.RUNNING.name(), history.get(0).get("state"), + "First state should be RUNNING"); + assertEquals(JobState.ABANDONED.name(), history.get(1).get("state"), + "Second state should be ABANDONED"); + assertEquals(JobState.RUNNING.name(), history.get(2).get("state"), + "Third state should be RUNNING"); + assertEquals(JobState.COMPLETED.name(), history.get(3).get("state"), + "Latest state should be COMPLETED"); + } + + static class AbbandonedJobProcessor implements JobProcessor { + + @Override + public void process(Job job) { + } + + @Override + public Map getResultMetadata(Job job) { + return Collections.emptyMap(); + } + } + static class ProgressTrackingJobProcessor implements JobProcessor { @Override public void process(Job job) { diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index ccfe76f90c60..d9f7d36bea68 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -1,11 +1,9 @@ package com.dotcms.jobs.business.api; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.anyFloat; -import static org.mockito.ArgumentMatchers.anySet; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; @@ -24,9 +22,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.dotcms.jobs.business.api.events.EventProducer; import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; +import com.dotcms.jobs.business.detector.AbandonedJobDetector; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.ErrorDetail; import com.dotcms.jobs.business.error.JobCancellationException; @@ -50,9 +48,7 @@ import com.dotmarketing.business.APILocator; import com.dotmarketing.exception.DotDataException; import java.time.LocalDateTime; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -63,8 +59,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; -import javax.enterprise.event.Event; import org.awaitility.Awaitility; import org.junit.Before; import org.junit.Test; @@ -149,10 +143,10 @@ public boolean awaitProcessingCompleted(long timeout, TimeUnit unit) private JobQueueManagerAPI jobQueueManagerAPI; - private EventProducer eventProducer; - private RetryPolicyProcessor retryPolicyProcessor; + private AbandonedJobDetector abandonedJobDetector; + /** * Factory to create mock JobProcessor instances for testing. * This is how we instruct the JobQueueManagerAPI to use our mock processors. @@ -181,19 +175,16 @@ public void setUp() { mockCancellableProcessor = mock(SimpleCancellableJobProcessor.class); mockRetryStrategy = mock(RetryStrategy.class); mockCircuitBreaker = mock(CircuitBreaker.class); - eventProducer = mock(EventProducer.class); retryPolicyProcessor = mock(RetryPolicyProcessor.class); + abandonedJobDetector = mock(AbandonedJobDetector.class); jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, mockCircuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 10 + mockJobQueue, mockCircuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); jobQueueManagerAPI.setRetryStrategy("testQueue", mockRetryStrategy); - - var event = mock(Event.class); - when(eventProducer.getEvent(any())).thenReturn(event); } /** @@ -343,79 +334,6 @@ public void test_close() throws Exception { assertFalse(jobQueueManagerAPI.isStarted()); } - /** - * Method to test: watchJob in JobQueueManagerAPI - * Given Scenario: Valid job ID and watcher are provided - * ExpectedResult: Watcher receives all job state updates correctly - */ - @Test - public void test_watchJob() throws Exception { - - // Create a mock job - String jobId = "job123"; - Job mockJob = mock(Job.class); - when(mockJob.id()).thenReturn(jobId); - when(mockJob.queueName()).thenReturn("testQueue"); - - // Mock JobQueue behavior - when(mockJobQueue.getJob(jobId)).thenReturn(mockJob); - when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); - when(mockJob.markAsRunning()).thenReturn(mockJob); - when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); - - // Make the circuit breaker always allow requests - when(mockCircuitBreaker.allowRequest()).thenReturn(true); - - // Mock JobProcessor behavior - ProgressTracker mockProgressTracker = mock(ProgressTracker.class); - when(mockJob.progressTracker()).thenReturn(Optional.ofNullable(mockProgressTracker)); - when(mockJob.progress()).thenReturn(0f); - when(mockJob.withProgress(anyFloat())).thenReturn(mockJob); - - AtomicReference jobState = new AtomicReference<>(JobState.PENDING); - when(mockJob.state()).thenAnswer(inv -> jobState.get()); - - when(mockJob.withState(any())).thenAnswer(inv -> { - jobState.set(inv.getArgument(0)); - return mockJob; - }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); - return mockJob; - }); - when(mockJob.markAsFailed(any())).thenAnswer(inv -> { - jobState.set(JobState.FAILED); - return mockJob; - }); - - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); - - // Create a list to capture job states - List capturedStates = Collections.synchronizedList(new ArrayList<>()); - // Create a test watcher - Consumer testWatcher = job -> { - assertNotNull(job); - assertEquals(jobId, job.id()); - capturedStates.add(job.state()); - }; - - // Start the JobQueueManagerAPI - jobQueueManagerAPI.start(); - - // Register the watcher - jobQueueManagerAPI.watchJob(jobId, testWatcher); - - // Wait for job processing to complete - Awaitility.await() - .atMost(10, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> capturedStates.contains(JobState.COMPLETED)); - - // Stop the JobQueueManagerAPI - jobQueueManagerAPI.close(); - } - /** * Method to test: Job retry mechanism in JobQueueManagerAPI * Given Scenario: Job fails on first attempt but succeeds on retry @@ -842,128 +760,6 @@ public void test_Job_NotRetryable() throws Exception { jobQueueManagerAPI.close(); } - /** - * Method to test: Job progress tracking in JobQueueManagerAPI - * Given Scenario: Job with multiple progress updates - * ExpectedResult: Progress is tracked correctly and increases monotonically - */ - @Test - public void test_JobProgressTracking() throws Exception { - - // Create a mock job - Job mockJob = mock(Job.class); - when(mockJob.id()).thenReturn("progress-test-job"); - when(mockJob.queueName()).thenReturn("testQueue"); - - AtomicReference jobProgress = new AtomicReference<>(0f); - AtomicReference jobState = new AtomicReference<>(JobState.PENDING); - - when(mockJob.state()).thenAnswer(inv -> jobState.get()); - when(mockJob.withState(any())).thenAnswer(inv -> { - jobState.set(inv.getArgument(0)); - return mockJob; - }); - when(mockJob.markAsRunning()).thenAnswer(inv -> { - jobState.set(JobState.RUNNING); - return mockJob; - }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); - return mockJob; - }); - when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); - - when(mockJob.progress()).thenAnswer(inv -> jobProgress.get()); - when(mockJob.withProgress(anyFloat())).thenAnswer(inv -> { - jobProgress.set(inv.getArgument(0)); - return mockJob; - }); - - // Set up the job queue to return our mock job - when(mockJobQueue.nextJob()).thenReturn(mockJob).thenReturn(null); - when(mockJobQueue.getJob(anyString())).thenReturn(mockJob); - - // Create a real ProgressTracker - ProgressTracker realProgressTracker = new DefaultProgressTracker(); - when(mockJob.progressTracker()).thenReturn(Optional.of(realProgressTracker)); - - // Make the circuit breaker always allow requests - when(mockCircuitBreaker.allowRequest()).thenReturn(true); - - // List to store progress updates - List progressUpdates = Collections.synchronizedList(new ArrayList<>()); - - // Configure the mockJobProcessor to update progress - doAnswer(inv -> { - - for (int i = 0; i <= 10; i++) { - float progress = i / 10f; - realProgressTracker.updateProgress(progress); - // Simulate the effect of updateJobProgress - jobProgress.set(progress); - - // Simulate work - long startTime = System.currentTimeMillis(); - Awaitility.await() - .atMost(3, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .until(() -> System.currentTimeMillis() - startTime >= 50); - } - - Job job = inv.getArgument(0); - job.markAsCompleted(any()); - return null; - }).when(mockJobProcessor).process(any()); - - // Set up a job watcher to capture progress updates - jobQueueManagerAPI.watchJob("progress-test-job", job -> { - progressUpdates.add(job.progress()); - }); - - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); - - // Start the job queue - jobQueueManagerAPI.start(); - - // Wait for job processing to complete - Awaitility.await().atMost(10, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .untilAsserted(() -> { - assertEquals(JobState.COMPLETED, jobState.get()); - }); - - // Verify that progress was tracked correctly - assertTrue( - "Should have multiple progress updates", - progressUpdates.size() > 1 - ); - assertEquals( - 0.0f, progressUpdates.get(0), 0.01f, - "Initial progress should be 0" - ); - assertEquals( - 1.0f, progressUpdates.get(progressUpdates.size() - 1), 0.01f, - "Final progress should be 1" - ); - - // Verify that progress increased monotonically - for (int i = 1; i < progressUpdates.size(); i++) { - assertTrue("Progress should increase or stay the same", - progressUpdates.get(i) >= progressUpdates.get(i - 1) - ); - } - - // Verify that the job was processed - verify(mockJobProcessor, times(1)).process(any()); - verify(mockJobQueue, times(2)).updateJobStatus(any()); - verify(mockJobQueue, times(2)). - updateJobStatus(argThat(job -> job.state() == JobState.COMPLETED)); - - // Stop the job queue - jobQueueManagerAPI.close(); - } - /** * Method to test: Circuit breaker mechanism in JobQueueManagerAPI * Given Scenario: Multiple job failures occur @@ -1004,8 +800,8 @@ public void test_CircuitBreaker_Opens() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 1000 + mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1089,8 +885,8 @@ public void test_CircuitBreaker_Closes() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 1000 + mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1152,8 +948,8 @@ public void test_CircuitBreaker_Reset() throws Exception { // Create JobQueueManagerAPIImpl with the real CircuitBreaker JobQueueManagerAPI jobQueueManagerAPI = newJobQueueManagerAPI( - mockJobQueue, circuitBreaker, mockRetryStrategy, eventProducer, jobProcessorFactory, - retryPolicyProcessor, 1, 1000 + mockJobQueue, circuitBreaker, mockRetryStrategy, jobProcessorFactory, + retryPolicyProcessor, abandonedJobDetector, 1 ); jobQueueManagerAPI.registerProcessor("testQueue", JobProcessor.class); @@ -1296,9 +1092,6 @@ public void test_complex_cancelJob() throws Exception { when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn( mockJob); - when(mockJobQueue.getUpdatedJobsSince(anySet(), any(LocalDateTime.class))) - .thenAnswer(invocation -> Collections.singletonList(mockJob)); - // Create Mock Job Event JobCancelRequestEvent mockEvent = mock(JobCancelRequestEvent.class); when(mockEvent.getJob()).thenReturn(mockJob); @@ -1382,24 +1175,22 @@ public void test_calculateBackoffTime() { * failures. * @param retryStrategy The strategy to use for retrying failed jobs. * @param threadPoolSize The size of the thread pool for job processing. - * @param pollJobUpdatesIntervalMilliseconds The interval in milliseconds for polling job - * updates. * @return A newly created instance of JobQueueManagerAPI. */ private JobQueueManagerAPI newJobQueueManagerAPI(JobQueue jobQueue, CircuitBreaker circuitBreaker, RetryStrategy retryStrategy, - EventProducer eventProducer, JobProcessorFactory jobProcessorFactory, RetryPolicyProcessor retryPolicyProcessor, - int threadPoolSize, int pollJobUpdatesIntervalMilliseconds) { + AbandonedJobDetector abandonedJobDetector, + int threadPoolSize) { final var realTimeJobMonitor = new RealTimeJobMonitor(); return new JobQueueManagerAPIImpl( - jobQueue, new JobQueueConfig(threadPoolSize, pollJobUpdatesIntervalMilliseconds), - circuitBreaker, retryStrategy, realTimeJobMonitor, eventProducer, - jobProcessorFactory, retryPolicyProcessor + jobQueue, new JobQueueConfig(threadPoolSize), + circuitBreaker, retryStrategy, realTimeJobMonitor, + jobProcessorFactory, retryPolicyProcessor, abandonedJobDetector ); } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java index e25ee222c34d..9055a905d33c 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessorIntegrationTest.java @@ -159,7 +159,7 @@ void test_process_preview_invalid_content_type_variable() throws Exception { try { // Process the job in preview mode - processor.process(testJob); + processor.validate(testJob.parameters()); Assertions.fail("A JobValidationException should have been thrown here."); } catch (Exception e) { Assertions.assertInstanceOf(JobValidationException.class, e); @@ -249,8 +249,7 @@ void test_process_preview_invalid_language() throws Exception { ); try { - // Process the job in preview mode - processor.process(testJob); + processor.validate(testJob.parameters()); Assertions.fail("A JobValidationException should have been thrown here."); } catch (Exception e) { Assertions.assertInstanceOf(JobValidationException.class, e); diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java index 62d180e3088f..f8fed037402e 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java @@ -19,6 +19,8 @@ import com.dotmarketing.common.db.DotConnect; import com.dotmarketing.exception.DotDataException; import com.dotmarketing.util.Logger; +import java.sql.Timestamp; +import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -310,6 +312,139 @@ void test_nextJob() throws Exception { } } + /** + * Method to test: detectAndMarkAbandoned in PostgresJobQueue + * Given Scenario: Multiple threads attempt to process abandoned jobs concurrently + * ExpectedResult: + * - Each job is processed exactly once + * - All jobs are marked as ABANDONED + * - No job is processed more than once (thread safety) + * - All created jobs are processed + */ + @Test + void test_detectAndMarkAbandoned() throws Exception { + + final int NUM_JOBS = 10; + final int NUM_THREADS = 5; + String queueName = "testQueue"; + + // Create jobs + Set createdJobIds = new HashSet<>(); + for (int i = 0; i < NUM_JOBS; i++) { + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + createdJobIds.add(jobId); + + // Changing the job state to running + final var createdJob = jobQueue.getJob(jobId); + jobQueue.updateJobStatus(createdJob.withState(JobState.RUNNING)); + } + + // Update the updated_at timestamp to be old enough to be considered abandoned + new DotConnect() + .setSQL("UPDATE job SET updated_at = ? WHERE id IN (SELECT id FROM job_queue)") + .addParam(Timestamp.valueOf(LocalDateTime.now().minusMinutes(5))) + .loadResult(); + + // Set to keep track of processed job IDs + Set processedJobIds = Collections.synchronizedSet(new HashSet<>()); + + // Create and start threads + List threads = new ArrayList<>(); + for (int i = 0; i < NUM_THREADS; i++) { + Thread thread = new Thread(() -> { + try { + while (true) { + Optional abandonedJobOptional = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(1), + JobState.RUNNING, JobState.CANCEL_REQUESTED, JobState.CANCELLING); + if (abandonedJobOptional.isEmpty()) { + break; // No more jobs to process + } + + Job abandonedJob = abandonedJobOptional.get(); + // Ensure this job hasn't been processed before + assertTrue(processedJobIds.add(abandonedJob.id()), + "Job " + abandonedJob.id() + " was processed more than once"); + assertEquals(JobState.ABANDONED, abandonedJob.state()); + } + } catch (Exception e) { + fail("Exception in thread: " + e.getMessage()); + } + }); + threads.add(thread); + thread.start(); + } + + // Wait for all threads to complete + for (Thread thread : threads) { + thread.join(); + } + + // Verify all jobs were processed + assertEquals(NUM_JOBS, processedJobIds.size(), "Not all jobs were processed"); + assertEquals(createdJobIds, processedJobIds, "Processed jobs don't match created jobs"); + + // Verify no more jobs are detected + assertTrue(jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(1), + JobState.RUNNING, JobState.CANCEL_REQUESTED, JobState.CANCELLING).isEmpty(), + "There should be no more jobs abandoned"); + + // Verify all jobs are in ABANDONED state + for (String jobId : createdJobIds) { + Job job = jobQueue.getJob(jobId); + assertEquals(JobState.ABANDONED, job.state(), + "Job " + jobId + " is not in ABANDONED state"); + } + } + + /** + * Method to test: detectAndMarkAbandoned in PostgresJobQueue + * Given Scenario: Single job with different threshold values + * ExpectedResult: + * - Job is not marked as abandoned when threshold is higher than its idle time + * - Job is marked as abandoned when threshold is lower than its idle time + * - Job state transitions correctly to ABANDONED when threshold is met + */ + @Test + void test_detectAndMarkAbandoned_threshold() throws Exception { + + // Create a job + String queueName = "testQueue"; + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + + // Change state to running + Job job = jobQueue.getJob(jobId); + jobQueue.updateJobStatus(job.withState(JobState.RUNNING)); + + // Set the updated_at to 2 minutes ago + new DotConnect() + .setSQL("UPDATE job SET updated_at = ? WHERE id = ?") + .addParam(Timestamp.valueOf(LocalDateTime.now().minusMinutes(2))) + .addParam(jobId) + .loadResult(); + + // First check: Using 3 minutes threshold - Should NOT be considered abandoned + Optional abandonedJob = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(3), + JobState.RUNNING + ); + assertTrue(abandonedJob.isEmpty(), "Job should not be considered abandoned yet"); + + // Second check: Using 1 minute threshold - Should be considered abandoned + abandonedJob = jobQueue.detectAndMarkAbandoned( + Duration.ofMinutes(1), + JobState.RUNNING + ); + assertTrue(abandonedJob.isPresent(), "Job should be considered abandoned"); + assertEquals(jobId, abandonedJob.get().id(), "Wrong job was marked as abandoned"); + assertEquals(JobState.ABANDONED, abandonedJob.get().state(), "Job should be in ABANDONED state"); + + // Verify the job state in database + Job finalJob = jobQueue.getJob(jobId); + assertEquals(JobState.ABANDONED, finalJob.state(), "Job state in database should be ABANDONED"); + } + /** * Method to test: getJob in PostgresJobQueue with non-existent ID * Given Scenario: Attempt to retrieve a job with a non-existent ID