From 2547bc211bb72185c27ce7c9057fbe0f1a6a7642 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Fri, 29 Nov 2024 13:59:26 -0600 Subject: [PATCH 1/9] #30367 Refactor job system and enhance SSE monitoring. Removed obsolete job events, streamlined job state management by introducing more precise states such as FAILED_PERMANENTLY and ABANDONED_PERMANENTLY. Replaced job completion terminology and refined method signatures and naming conventions to reinforce consistency. Enhanced Server-Sent Events (SSE) monitoring with a dedicated utility class for improved performance and error handling. --- .../jobs/business/api/JobQueueManagerAPI.java | 39 +- .../business/api/JobQueueManagerAPIImpl.java | 72 +++- .../business/api/events/JobCanceledEvent.java | 38 -- .../api/events/JobRemovedFromQueueEvent.java | 38 -- .../api/events/RealTimeJobMonitor.java | 185 ++++---- .../dotcms/jobs/business/job/AbstractJob.java | 38 +- .../dotcms/jobs/business/job/JobState.java | 14 +- .../impl/ImportContentletsProcessor.java | 16 +- .../AbstractJobStateQueryParameters.java | 76 ++++ .../dotcms/jobs/business/queue/JobQueue.java | 30 +- .../jobs/business/queue/PostgresJobQueue.java | 401 ++++++++++++------ .../rest/api/v1/job/JobQueueHelper.java | 104 +++-- .../rest/api/v1/job/JobQueueResource.java | 160 +++---- .../rest/api/v1/job/SSEConnectionManager.java | 45 +- .../rest/api/v1/job/SSEMonitorUtil.java | 171 ++++++++ .../java/com/dotmarketing/util/FileUtil.java | 23 +- .../JobQueueManagerAPIIntegrationTest.java | 169 ++++++-- .../business/api/JobQueueManagerAPITest.java | 48 ++- .../PostgresJobQueueIntegrationTest.java | 93 +++- 19 files changed, 1197 insertions(+), 563 deletions(-) delete mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java delete mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java create mode 100644 dotCMS/src/main/java/com/dotcms/jobs/business/queue/AbstractJobStateQueryParameters.java create mode 100644 dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java index 8ada82da14c0..0fcb72c595f3 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPI.java @@ -1,5 +1,6 @@ package com.dotcms.jobs.business.api; +import com.dotcms.jobs.business.api.events.JobWatcher; import com.dotcms.jobs.business.error.CircuitBreaker; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.error.RetryStrategy; @@ -128,6 +129,16 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) */ JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException; + /** + * Retrieves a list of successful jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of successful jobs and pagination information. + * @throws DotDataException if there's an error fetching the jobs + */ + JobPaginatedResult getSuccessfulJobs(int page, int pageSize) throws DotDataException; + /** * Retrieves a list of canceled jobs * @@ -148,6 +159,16 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) */ JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException; + /** + * Retrieves a list of abandoned jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of abandoned jobs and pagination information. + * @throws DotDataException if there's an error fetching the jobs + */ + JobPaginatedResult getAbandonedJobs(int page, int pageSize) throws DotDataException; + /** * Cancels a job. * @@ -161,8 +182,24 @@ JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) * * @param jobId The ID of the job to watch * @param watcher The consumer to be notified of job updates + * @return A JobWatcher instance representing the registered watcher + */ + JobWatcher watchJob(String jobId, Consumer watcher); + + /** + * Removes a watcher for a specific job. + * + * @param jobId The ID of the job to unwatch + * @param watcher The watcher to remove + */ + void removeJobWatcher(String jobId, JobWatcher watcher); + + /** + * Removes all watchers for a specific job. + * + * @param jobId The ID of the job */ - void watchJob(String jobId, Consumer watcher); + void removeAllJobWatchers(String jobId); /** * Sets a retry strategy for a specific queue. diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index b63796ce3c7f..7f191b17d333 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -3,14 +3,13 @@ import com.dotcms.business.CloseDBIfOpened; import com.dotcms.business.WrapInTransaction; import com.dotcms.jobs.business.api.events.JobCancelRequestEvent; -import com.dotcms.jobs.business.api.events.JobCanceledEvent; import com.dotcms.jobs.business.api.events.JobCancellingEvent; import com.dotcms.jobs.business.api.events.JobCompletedEvent; import com.dotcms.jobs.business.api.events.JobCreatedEvent; import com.dotcms.jobs.business.api.events.JobFailedEvent; import com.dotcms.jobs.business.api.events.JobProgressUpdatedEvent; -import com.dotcms.jobs.business.api.events.JobRemovedFromQueueEvent; import com.dotcms.jobs.business.api.events.JobStartedEvent; +import com.dotcms.jobs.business.api.events.JobWatcher; import com.dotcms.jobs.business.api.events.RealTimeJobMonitor; import com.dotcms.jobs.business.detector.AbandonedJobDetector; import com.dotcms.jobs.business.error.CircuitBreaker; @@ -360,6 +359,16 @@ public JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDat } } + @CloseDBIfOpened + @Override + public JobPaginatedResult getSuccessfulJobs(int page, int pageSize) throws DotDataException { + try { + return jobQueue.getSuccessfulJobs(page, pageSize); + } catch (JobQueueDataException e) { + throw new DotDataException("Error fetching successful jobs", e); + } + } + @CloseDBIfOpened @Override public JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException { @@ -380,6 +389,16 @@ public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataEx } } + @CloseDBIfOpened + @Override + public JobPaginatedResult getAbandonedJobs(int page, int pageSize) throws DotDataException { + try { + return jobQueue.getAbandonedJobs(page, pageSize); + } catch (JobQueueDataException e) { + throw new DotDataException("Error fetching abandoned jobs", e); + } + } + @Override public void cancelJob(final String jobId) throws DotDataException { @@ -439,8 +458,18 @@ void onCancelRequestJob(final JobCancelRequestEvent event) { } @Override - public void watchJob(final String jobId, final Consumer watcher) { - realTimeJobMonitor.registerWatcher(jobId, watcher); + public JobWatcher watchJob(final String jobId, final Consumer watcher) { + return realTimeJobMonitor.registerWatcher(jobId, watcher); + } + + @Override + public void removeJobWatcher(final String jobId, final JobWatcher watcher) { + realTimeJobMonitor.removeWatcher(jobId, watcher); + } + + @Override + public void removeAllJobWatchers(final String jobId) { + realTimeJobMonitor.removeAllWatchers(jobId); } @Override @@ -689,7 +718,7 @@ private boolean isReadyForRetry(Job job) throws DotDataException { /** * Handles a failed job that cannot be retried. This method logs a warning about the - * non-retryable job and removes it from the active queue. + * non-retryable job, removes it from the active queue, and marks it as failed permanently. * * @param job The failed job that cannot be retried. */ @@ -697,13 +726,16 @@ private void handleNonRetryableFailedJob(final Job job) throws DotDataException Logger.warn(this, "Job " + job.id() + " has failed and cannot be retried."); - try { - jobQueue.removeJobFromQueue(job.id()); - // Send the job removed from queue events - JobUtil.sendEvents(job, JobRemovedFromQueueEvent::new); - } catch (JobQueueDataException e) { - throw new DotDataException("Error removing failed job", e); + Job finishedJob = job.markAsFailedPermanently(); + if (job.state() == JobState.ABANDONED) { + finishedJob = job.markAsAbandonedPermanently(); } + + // Giving the job a final state + updateJobStatus(finishedJob); + + // Send the job completion events + JobUtil.sendEvents(finishedJob, JobCompletedEvent::new); } /** @@ -839,17 +871,17 @@ private void handleJobCompletion(final Job job, final JobProcessor processor) final float progress = getJobProgress(job); try { + + Job completedJob = job.markAsSuccessful(jobResult).withProgress(progress); + if (jobQueue.hasJobBeenInState(job.id(), JobState.CANCEL_REQUESTED, JobState.CANCELLING)) { - Job canceledJob = job.markAsCanceled(jobResult).withProgress(progress); - updateJobStatus(canceledJob); - // Send the job canceled events - JobUtil.sendEvents(canceledJob, JobCanceledEvent::new); - } else { - final Job completedJob = job.markAsCompleted(jobResult).withProgress(progress); - updateJobStatus(completedJob); - // Send the job completed events - JobUtil.sendEvents(completedJob, JobCompletedEvent::new); + completedJob = job.markAsCanceled(jobResult).withProgress(progress); } + + updateJobStatus(completedJob); + // Send the job completed events + JobUtil.sendEvents(completedJob, JobCompletedEvent::new); + } catch (JobQueueDataException e) { final var errorMessage = "Error updating job status"; Logger.error(this, errorMessage, e); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java deleted file mode 100644 index 55046c416556..000000000000 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobCanceledEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.dotcms.jobs.business.api.events; - -import com.dotcms.jobs.business.job.Job; -import java.time.LocalDateTime; - -/** - * Event fired when a job is canceled. - */ -public class JobCanceledEvent implements JobEvent { - - private final Job job; - private final LocalDateTime canceledAt; - - /** - * Constructs a new JobCanceledEvent. - * - * @param job The canceled job. - * @param canceledAt The timestamp when the job was canceled. - */ - public JobCanceledEvent(Job job, LocalDateTime canceledAt) { - this.job = job; - this.canceledAt = canceledAt; - } - - /** - * @return The canceled job. - */ - public Job getJob() { - return job; - } - - /** - * @return The timestamp when the job was canceled. - */ - public LocalDateTime getCanceledAt() { - return canceledAt; - } -} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java deleted file mode 100644 index b93504e600ec..000000000000 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/JobRemovedFromQueueEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.dotcms.jobs.business.api.events; - -import com.dotcms.jobs.business.job.Job; -import java.time.LocalDateTime; - -/** - * Event fired when a job is removed from the queue because failed and is not retryable. - */ -public class JobRemovedFromQueueEvent implements JobEvent { - - private final Job job; - private final LocalDateTime removedAt; - - /** - * Constructs a new JobRemovedFromQueueEvent. - * - * @param job The non-retryable job. - * @param canceledAt The timestamp when the job was removed from the queue. - */ - public JobRemovedFromQueueEvent(Job job, LocalDateTime canceledAt) { - this.job = job; - this.removedAt = canceledAt; - } - - /** - * @return The non-retryable job. - */ - public Job getJob() { - return job; - } - - /** - * @return The timestamp when the job removed from the queue. - */ - public LocalDateTime getRemovedAt() { - return removedAt; - } -} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java index ef33514f30d0..e07d1b4fbcb3 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java @@ -86,21 +86,11 @@ protected void init() { (EventSubscriber) this::onJobFailed ); - APILocator.getLocalSystemEventsAPI().subscribe( - JobRemovedFromQueueEvent.class, - (EventSubscriber) this::onJobRemovedFromQueueEvent - ); - APILocator.getLocalSystemEventsAPI().subscribe( JobCompletedEvent.class, (EventSubscriber) this::onJobCompleted ); - APILocator.getLocalSystemEventsAPI().subscribe( - JobCanceledEvent.class, - (EventSubscriber) this::onJobCanceled - ); - APILocator.getLocalSystemEventsAPI().subscribe( JobCancellingEvent.class, (EventSubscriber) this::onJobCancelling @@ -157,31 +147,36 @@ protected void init() { * @param jobId The ID of the job to watch * @param watcher The consumer to be notified of job updates * @param filter Optional predicate to filter job updates (null means receive all updates) + * @return A JobWatcher instance representing the registered watcher * @throws NullPointerException if jobId or watcher is null * @see Predicates for common filter predicates * @see CopyOnWriteArrayList for more details about the thread-safety guarantees */ - public void registerWatcher(String jobId, Consumer watcher, Predicate filter) { + public JobWatcher registerWatcher(String jobId, Consumer watcher, Predicate filter) { Objects.requireNonNull(jobId, "jobId cannot be null"); Objects.requireNonNull(watcher, "watcher cannot be null"); + final var jobWatcher = JobWatcher.builder() + .watcher(watcher) + .filter(filter != null ? filter : job -> true) + .build(); + jobWatchers.compute(jobId, (key, existingWatchers) -> { List watchers = Objects.requireNonNullElseGet( existingWatchers, CopyOnWriteArrayList::new ); - watchers.add(JobWatcher.builder() - .watcher(watcher) - .filter(filter != null ? filter : job -> true) - .build()); + watchers.add(jobWatcher); Logger.debug(this, String.format( "Added watcher for job %s. Total watchers: %d", jobId, watchers.size())); return watchers; }); + + return jobWatcher; } /** @@ -214,11 +209,12 @@ public void registerWatcher(String jobId, Consumer watcher, Predicate * * @param jobId The ID of the job to watch * @param watcher The consumer to be notified of job updates + * @return A JobWatcher instance representing the registered watcher * @throws NullPointerException if jobId or watcher is null * @see CopyOnWriteArrayList for more details about the thread-safety guarantees */ - public void registerWatcher(String jobId, Consumer watcher) { - registerWatcher(jobId, watcher, null); + public JobWatcher registerWatcher(String jobId, Consumer watcher) { + return registerWatcher(jobId, watcher, null); } /** @@ -232,44 +228,17 @@ public Set getWatchedJobIds() { } /** - * Updates watchers for a list of jobs. - * Each job's watchers are notified according to their filter predicates. + * Removes all the watchers associated with the specified job ID. * - * @param updatedJobs List of jobs that have been updated - * @throws IllegalArgumentException if updatedJobs is null + * @param jobId The ID of the job whose watchers are to be removed. */ - public void updateWatchers(List updatedJobs) { - for (Job job : updatedJobs) { - updateWatchers(job); - } - } + public void removeAllWatchers(final String jobId) { - /** - * Updates watchers for a single job. Removes watchers if the job has reached a final state. - * - * @param job The job that has been updated. - */ - private void updateWatchers(Job job) { - - List watchers = jobWatchers.get(job.id()); - if (watchers != null) { - watchers.forEach(jobWatcher -> { - try { - if (jobWatcher.filter().test(job)) { - jobWatcher.watcher().accept(job); - } - } catch (Exception e) { - Logger.error(this, "Error notifying job watcher for job " + job.id(), e); - - // Direct remove is thread-safe with CopyOnWriteArrayList - watchers.remove(jobWatcher); - - // If this was the last watcher, clean up the map entry - if (watchers.isEmpty()) { - jobWatchers.remove(job.id()); - } - } - }); + List removed = jobWatchers.remove(jobId); + if (removed != null) { + Logger.info(this, + String.format("Removed all watchers for job %s. Watchers removed: %d", + jobId, removed.size())); } } @@ -278,13 +247,16 @@ private void updateWatchers(Job job) { * * @param jobId The ID of the job whose watcher is to be removed. */ - private void removeWatcher(String jobId) { + public void removeWatcher(final String jobId, final JobWatcher watcher) { - List removed = jobWatchers.remove(jobId); - if (removed != null) { - Logger.debug(this, - String.format("Removed all watchers for job %s. Watchers removed: %d", - jobId, removed.size())); + if (jobId == null || watcher == null) { + return; + } + + // Get the list of watchers for the job + List watchers = jobWatchers.get(jobId); + if (watchers != null) { + removeWatcherFromList(jobId, watcher, watchers); } } @@ -324,16 +296,6 @@ public void onJobCancelling(JobCancellingEvent event) { updateWatchers(event.getJob()); } - /** - * Handles the job-canceled event. - * - * @param event The JobCanceledEvent. - */ - public void onJobCanceled(JobCanceledEvent event) { - updateWatchers(event.getJob()); - removeWatcher(event.getJob().id()); - } - /** * Handles the job completed event. * @@ -341,16 +303,7 @@ public void onJobCanceled(JobCanceledEvent event) { */ public void onJobCompleted(JobCompletedEvent event) { updateWatchers(event.getJob()); - removeWatcher(event.getJob().id()); - } - - /** - * Handles the job removed from queue event when failed and is not retryable. - * - * @param event The JobRemovedFromQueueEvent. - */ - public void onJobRemovedFromQueueEvent(JobRemovedFromQueueEvent event) { - removeWatcher(event.getJob().id()); + removeAllWatchers(event.getJob().id()); } /** @@ -371,6 +324,49 @@ public void onJobProgressUpdated(JobProgressUpdatedEvent event) { updateWatchers(event.getJob()); } + /** + * Updates watchers for a single job. Removes watchers if the job has reached a final state. + * + * @param job The job that has been updated. + */ + private void updateWatchers(Job job) { + + List watchers = jobWatchers.get(job.id()); + if (watchers != null) { + watchers.forEach(jobWatcher -> { + try { + if (jobWatcher.filter().test(job)) { + jobWatcher.watcher().accept(job); + } + } catch (Exception e) { + Logger.error(this, "Error notifying job watcher for job " + job.id(), e); + + // Direct remove is thread-safe with CopyOnWriteArrayList + removeWatcherFromList(job.id(), jobWatcher, watchers); + } + }); + } + } + + /** + * Removes the watcher from the list of watchers for the specified job ID. + * + * @param jobId The ID of the job whose watcher is to be removed. + * @param watcher The watcher to remove. + * @param watchers The list of watchers for the job. + */ + private void removeWatcherFromList(String jobId, JobWatcher watcher, + List watchers) { + + // Remove the watcher from the list + watchers.remove(watcher); + + // If this was the last watcher, clean up the map entry + if (watchers.isEmpty()) { + jobWatchers.remove(jobId); + } + } + /** * Common predicates for filtering job updates. These predicates can be used individually or * combined using {@link Predicate#and(Predicate)} and {@link Predicate#or(Predicate)} to create @@ -425,19 +421,44 @@ public boolean test(Job job) { * @return A predicate for matching failed jobs */ public static Predicate hasFailed() { - return job -> job.state() == JobState.FAILED + return job -> (job.state() == JobState.FAILED + || job.state() == JobState.FAILED_PERMANENTLY) && job.result().isPresent() && job.result().get().errorDetail().isPresent(); } + /** + * Creates a predicate that matches successful jobs. The predicate matches any job in the + * SUCCESS state. + * + * @return A predicate for matching successful jobs + */ + public static Predicate isCompleted() { + return job -> (job.state() == JobState.SUCCESS + || job.state() == JobState.CANCELED + || job.state() == JobState.ABANDONED_PERMANENTLY + || job.state() == JobState.FAILED_PERMANENTLY); + } + + /** + * Creates a predicate that matches successful jobs. The predicate matches any job in the + * SUCCESS state. + * + * @return A predicate for matching successful jobs + */ + public static Predicate isSuccessful() { + return job -> job.state() == JobState.SUCCESS; + } + /** * Creates a predicate that matches completed jobs. The predicate matches any job in the - * COMPLETED state. + * ABANDONED state. * * @return A predicate for matching completed jobs */ - public static Predicate isCompleted() { - return job -> job.state() == JobState.COMPLETED; + public static Predicate isAbandoned() { + return job -> (job.state() == JobState.ABANDONED + || job.state() == JobState.ABANDONED_PERMANENTLY); } /** diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java index faf8d5745bec..38969a963961 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/AbstractJob.java @@ -130,16 +130,16 @@ default Job withState(final JobState newState) { } /** - * Creates a new Job marked as completed. + * Creates a new Job marked as successful. * - * @param result The result details of the completed job. + * @param result The result details of the successful job. * - * @return A new Job instance marked as completed. + * @return A new Job instance marked as successful. */ - default Job markAsCompleted(final JobResult result) { + default Job markAsSuccessful(final JobResult result) { return Job.builder().from(this) - .state(JobState.COMPLETED) + .state(JobState.SUCCESS) .completedAt(Optional.of(LocalDateTime.now())) .updatedAt(LocalDateTime.now()) .result(result != null ? Optional.of(result) : Optional.empty()) @@ -163,4 +163,32 @@ default Job markAsCanceled(final JobResult result) { .build(); } + /** + * Creates a new Job marked as failed permanently. + * + * @return A new Job instance marked as failed permanently. + */ + default Job markAsFailedPermanently() { + + return Job.builder().from(this) + .state(JobState.FAILED_PERMANENTLY) + .completedAt(Optional.of(LocalDateTime.now())) + .updatedAt(LocalDateTime.now()) + .build(); + } + + /** + * Creates a new Job marked as abandoned permanently. + * + * @return A new Job instance marked as abandoned permanently. + */ + default Job markAsAbandonedPermanently() { + + return Job.builder().from(this) + .state(JobState.ABANDONED_PERMANENTLY) + .completedAt(Optional.of(LocalDateTime.now())) + .updatedAt(LocalDateTime.now()) + .build(); + } + } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java index f3e7ebdc0880..abed293bf094 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/job/JobState.java @@ -18,18 +18,30 @@ public enum JobState { /** * The job has finished executing successfully. */ - COMPLETED, + SUCCESS, /** * The job encountered an error and could not complete successfully. */ FAILED, + /** + * The job encountered error and could not complete successfully. The error is permanent and the + * job will not be retried. + */ + FAILED_PERMANENTLY, + /** * The job was abandoned before it could complete. */ ABANDONED, + /** + * The job was abandoned before it could complete. The error is permanent and the job will not + * be retried. + */ + ABANDONED_PERMANENTLY, + /** * The job is waiting to be canceled. */ diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java index 04420a9bf468..26e18237aa52 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/processor/impl/ImportContentletsProcessor.java @@ -24,20 +24,17 @@ import com.dotmarketing.portlets.contentlet.action.ImportAuditUtil; import com.dotmarketing.portlets.languagesmanager.model.Language; import com.dotmarketing.util.AdminLogger; +import com.dotmarketing.util.FileUtil; import com.dotmarketing.util.ImportUtil; import com.dotmarketing.util.Logger; import com.google.common.hash.Hashing; import com.liferay.portal.model.User; import com.liferay.portal.util.Constants; -import java.io.BufferedReader; import java.io.File; -import java.io.FileInputStream; -import java.io.FileReader; import java.io.IOException; -import java.io.InputStreamReader; import java.io.Reader; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.Calendar; import java.util.Collections; import java.util.HashMap; @@ -265,9 +262,8 @@ private void handleImport(final boolean preview, final Job job, final File fileT ); } - try (Reader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(fileToImport), - Charset.defaultCharset()))) { + try (Reader reader = Files.newBufferedReader( + fileToImport.toPath(), StandardCharsets.UTF_8)) { CsvReader csvReader = createCsvReader(reader); @@ -463,8 +459,8 @@ public static long jobIdToLong(final String jobId) { private Long totalLines(final Job job, final File dotTempFile) { long totalCount; - try (BufferedReader reader = new BufferedReader(new FileReader(dotTempFile))) { - totalCount = reader.lines().count(); + try { + totalCount = FileUtil.countFileLines(dotTempFile); if (totalCount == 0) { Logger.info(this.getClass(), "No lines in CSV import file: " + dotTempFile.getName()); diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/AbstractJobStateQueryParameters.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/AbstractJobStateQueryParameters.java new file mode 100644 index 000000000000..b9d7636b9576 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/AbstractJobStateQueryParameters.java @@ -0,0 +1,76 @@ +package com.dotcms.jobs.business.queue; + +import com.dotcms.jobs.business.job.JobState; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import java.time.LocalDateTime; +import java.util.Optional; +import org.immutables.value.Value; + +/** + * Interface representing the parameters for querying job states. + */ +@Value.Style(typeImmutable = "*", typeAbstract = "Abstract*") +@Value.Immutable +@JsonSerialize(as = JobStateQueryParameters.class) +@JsonDeserialize(as = JobStateQueryParameters.class) +public interface AbstractJobStateQueryParameters { + + /** + * Gets the name of the queue. + * + * @return an Optional containing the queue name, or an empty Optional if not specified. + */ + Optional queueName(); + + /** + * Gets the start date for the query. + * + * @return an Optional containing the start date, or an empty Optional if not specified. + */ + Optional startDate(); + + /** + * Gets the end date for the query. + * + * @return an Optional containing the end date, or an empty Optional if not specified. + */ + Optional endDate(); + + /** + * Gets the page number for pagination. + * + * @return the page number. + */ + int page(); + + /** + * Gets the page size for pagination. + * + * @return the page size. + */ + int pageSize(); + + /** + * Gets the column name to filter dates. + * + * @return an Optional containing the filter date column name, or an empty Optional if not + * specified. + */ + Optional filterDateColumn(); + + /** + * Gets the column name to order the results by. + * + * @return the order by column name. + */ + String orderByColumn(); + + /** + * Gets the states to filter the jobs by. + * + * @return an array of JobState values. + */ + JobState[] states(); + +} diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java index 9b2a043c00a9..1b4846c52705 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/JobQueue.java @@ -112,6 +112,16 @@ JobPaginatedResult getCompletedJobs(String queueName, LocalDateTime startDate, */ JobPaginatedResult getCompletedJobs(int page, int pageSize) throws JobQueueDataException; + /** + * Retrieves a list of successful jobs. + * + * @param page The page number (for pagination). + * @param pageSize The number of items per page. + * @return A result object containing the list of successful jobs and pagination information. + * @throws JobQueueDataException if there's a data storage error while fetching the jobs + */ + JobPaginatedResult getSuccessfulJobs(int page, int pageSize) throws JobQueueDataException; + /** * Retrieves a list of canceled jobs. * @@ -132,6 +142,16 @@ JobPaginatedResult getCompletedJobs(String queueName, LocalDateTime startDate, */ JobPaginatedResult getFailedJobs(int page, int pageSize) throws JobQueueDataException; + /** + * Retrieves a list of abandoned + * + * @param page The page number (for pagination). + * @param pageSize The number of items per page. + * @return A result object containing the list of abandoned jobs and pagination information. + * @throws JobQueueDataException if there's a data storage error while fetching the jobs + */ + JobPaginatedResult getAbandonedJobs(int page, int pageSize) throws JobQueueDataException; + /** * Updates the status of a job. * @@ -190,16 +210,6 @@ Optional detectAndMarkAbandoned(Duration threshold, JobState... inStates) */ void updateJobProgress(String jobId, float progress) throws JobQueueDataException; - /** - * Removes a job from the queue. This method should be used for jobs that have permanently - * failed and cannot be retried. Implementing classes should ensure that the job is completely - * removed from the queue and any associated resources are cleaned up. - * - * @param jobId The ID of the job to remove. - * @throws JobQueueDataException if there's a data storage error while removing the job - */ - void removeJobFromQueue(String jobId) throws JobQueueDataException; - /** * Checks if a job has ever been in a specific state. * diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java index 72ca197b4e03..a2139cd99c39 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/queue/PostgresJobQueue.java @@ -101,23 +101,25 @@ public class PostgresJobQueue implements JobQueue { "WHERE id IN (SELECT id FROM abandoned_jobs) " + "RETURNING *"; - private static final String GET_ACTIVE_JOBS_QUERY_FOR_QUEUE = + private static final String GET_JOBS_QUERY_BY_QUEUE_AND_STATE = "WITH total AS (SELECT COUNT(*) AS total_count " + - " FROM job WHERE queue_name = ? AND state IN (?, ?) " + + " FROM job WHERE queue_name = ? AND state IN $??$ " + "), " + "paginated_data AS (SELECT * " + - " FROM job WHERE queue_name = ? AND state IN (?, ?) " + - " ORDER BY created_at LIMIT ? OFFSET ? " + + " FROM job WHERE queue_name = ? AND state IN $??$ " + + " ORDER BY $ORDER_BY$ LIMIT ? OFFSET ? " + ") " + "SELECT p.*, t.total_count FROM total t LEFT JOIN paginated_data p ON true"; - private static final String GET_COMPLETED_JOBS_QUERY_FOR_QUEUE = + private static final String GET_JOBS_QUERY_BY_QUEUE_AND_STATE_IN_DATE_RANGE = "WITH total AS (SELECT COUNT(*) AS total_count " + - " FROM job WHERE queue_name = ? AND state = ? AND completed_at BETWEEN ? AND ? " + + " FROM job WHERE queue_name = ? AND state IN $??$ AND $DATE_COLUMN$ BETWEEN ? AND ? " + + "), " + "paginated_data AS (SELECT * FROM job " + - " WHERE queue_name = ? AND state = ? AND completed_at BETWEEN ? AND ? " + - " ORDER BY completed_at DESC LIMIT ? OFFSET ? " + + " WHERE queue_name = ? AND state IN $??$ AND $DATE_COLUMN$ BETWEEN ? AND ? " + + + " ORDER BY $ORDER_BY$ DESC LIMIT ? OFFSET ? " + ") " + "SELECT p.*, t.total_count FROM total t LEFT JOIN paginated_data p ON true"; @@ -177,9 +179,13 @@ public class PostgresJobQueue implements JobQueue { + "EXISTS (SELECT 1 FROM job_history WHERE job_id = ? AND state IN $??$)"; private static final String COLUMN_TOTAL_COUNT = "total_count"; + private static final String COLUMN_COMPLETED_AT = "completed_at"; + private static final String COLUMN_UPDATED_AT = "updated_at"; + private static final String COLUMN_CREATED_AT = "created_at"; private static final String REPLACE_TOKEN_PARAMETERS = "$??$"; private static final String REPLACE_TOKEN_ORDER_BY = "$ORDER_BY$"; + private static final String REPLACE_TOKEN_DATE_COLUMN = "$DATE_COLUMN$"; /** * Jackson mapper configuration and lazy initialized instance. @@ -306,26 +312,15 @@ public JobState getJobState(final String jobId) public JobPaginatedResult getActiveJobs(final String queueName, final int page, final int pageSize) throws JobQueueDataException { - try { - - DotConnect dc = new DotConnect(); - dc.setSQL(GET_ACTIVE_JOBS_QUERY_FOR_QUEUE); - dc.addParam(queueName); - dc.addParam(JobState.PENDING.name()); - dc.addParam(JobState.RUNNING.name()); - dc.addParam(queueName); // Repeated for paginated_data CTE - dc.addParam(JobState.PENDING.name()); - dc.addParam(JobState.RUNNING.name()); - dc.addParam(pageSize); - dc.addParam((page - 1) * pageSize); - - return jobPaginatedResult(page, pageSize, dc); - } catch (DotDataException e) { - Logger.error(this, - "Database error while fetching active jobs by queue", e); - throw new JobQueueDataException( - "Database error while fetching active jobs by queue", e); - } + return getJobsByState(JobStateQueryParameters.builder() + .queueName(queueName) + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_CREATED_AT) + .states(JobState.PENDING, JobState.RUNNING, + JobState.FAILED, JobState.ABANDONED, JobState.CANCEL_REQUESTED, + JobState.CANCELLING).build() + ); } @CloseDBIfOpened @@ -335,27 +330,17 @@ public JobPaginatedResult getCompletedJobs(final String queueName, final LocalDateTime endDate, final int page, final int pageSize) throws JobQueueDataException { - try { - DotConnect dc = new DotConnect(); - dc.setSQL(GET_COMPLETED_JOBS_QUERY_FOR_QUEUE); - dc.addParam(queueName); - dc.addParam(JobState.COMPLETED.name()); - dc.addParam(Timestamp.valueOf(startDate)); - dc.addParam(Timestamp.valueOf(endDate)); - dc.addParam(queueName); // Repeated for paginated_data CTE - dc.addParam(JobState.COMPLETED.name()); - dc.addParam(Timestamp.valueOf(startDate)); - dc.addParam(Timestamp.valueOf(endDate)); - dc.addParam(pageSize); - dc.addParam((page - 1) * pageSize); - - return jobPaginatedResult(page, pageSize, dc); - } catch (DotDataException e) { - Logger.error(this, - "Database error while fetching completed jobs by queue", e); - throw new JobQueueDataException( - "Database error while fetching completed jobs by queue", e); - } + return getJobsByState(JobStateQueryParameters.builder() + .queueName(queueName) + .startDate(startDate) + .endDate(endDate) + .filterDateColumn(COLUMN_COMPLETED_AT) + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_COMPLETED_AT) + .states(JobState.SUCCESS, JobState.CANCELED, + JobState.ABANDONED_PERMANENTLY, JobState.FAILED_PERMANENTLY).build() + ); } @CloseDBIfOpened @@ -381,26 +366,14 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) public JobPaginatedResult getActiveJobs(final int page, final int pageSize) throws JobQueueDataException { - try { - - var query = GET_JOBS_QUERY_BY_STATE - .replace(REPLACE_TOKEN_PARAMETERS, "(?, ?)") - .replace(REPLACE_TOKEN_ORDER_BY, "created_at"); - - DotConnect dc = new DotConnect(); - dc.setSQL(query); - dc.addParam(JobState.PENDING.name()); - dc.addParam(JobState.RUNNING.name()); - dc.addParam(JobState.PENDING.name()); // Repeated for paginated_data CTE - dc.addParam(JobState.RUNNING.name()); - dc.addParam(pageSize); - dc.addParam((page - 1) * pageSize); - - return jobPaginatedResult(page, pageSize, dc); - } catch (DotDataException e) { - Logger.error(this, "Database error while fetching active jobs", e); - throw new JobQueueDataException("Database error while fetching active jobs", e); - } + return getJobsByState(JobStateQueryParameters.builder() + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_CREATED_AT) + .states(JobState.PENDING, JobState.RUNNING, + JobState.FAILED, JobState.ABANDONED, JobState.CANCEL_REQUESTED, + JobState.CANCELLING).build() + ); } @CloseDBIfOpened @@ -408,74 +381,61 @@ public JobPaginatedResult getActiveJobs(final int page, final int pageSize) public JobPaginatedResult getCompletedJobs(final int page, final int pageSize) throws JobQueueDataException { - try { - - var query = GET_JOBS_QUERY_BY_STATE - .replace(REPLACE_TOKEN_PARAMETERS, "(?)") - .replace(REPLACE_TOKEN_ORDER_BY, "completed_at"); - - DotConnect dc = new DotConnect(); - dc.setSQL(query); - dc.addParam(JobState.COMPLETED.name()); - dc.addParam(JobState.COMPLETED.name()); // Repeated for paginated_data CTE - dc.addParam(pageSize); - dc.addParam((page - 1) * pageSize); + return getJobsByState(JobStateQueryParameters.builder() + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_COMPLETED_AT) + .states(JobState.SUCCESS, JobState.CANCELED, + JobState.ABANDONED_PERMANENTLY, JobState.FAILED_PERMANENTLY).build() + ); + } - return jobPaginatedResult(page, pageSize, dc); - } catch (DotDataException e) { - Logger.error(this, "Database error while fetching completed jobs", e); - throw new JobQueueDataException("Database error while fetching completed jobs", e); - } + @CloseDBIfOpened + @Override + public JobPaginatedResult getSuccessfulJobs(final int page, final int pageSize) + throws JobQueueDataException { + return getJobsByState(JobStateQueryParameters.builder() + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_COMPLETED_AT) + .states(JobState.SUCCESS).build() + ); } @CloseDBIfOpened @Override public JobPaginatedResult getCanceledJobs(final int page, final int pageSize) throws JobQueueDataException { - - try { - - var query = GET_JOBS_QUERY_BY_STATE - .replace(REPLACE_TOKEN_PARAMETERS, "(?)") - .replace(REPLACE_TOKEN_ORDER_BY, "completed_at"); - - DotConnect dc = new DotConnect(); - dc.setSQL(query); - dc.addParam(JobState.CANCELED.name()); - dc.addParam(JobState.CANCELED.name()); // Repeated for paginated_data CTE - dc.addParam(pageSize); - dc.addParam((page - 1) * pageSize); - - return jobPaginatedResult(page, pageSize, dc); - } catch (DotDataException e) { - Logger.error(this, "Database error while fetching cancelled jobs", e); - throw new JobQueueDataException("Database error while fetching cancelled jobs", e); - } + return getJobsByState(JobStateQueryParameters.builder() + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_UPDATED_AT) + .states(JobState.CANCEL_REQUESTED, JobState.CANCELLING, JobState.CANCELED).build() + ); } @CloseDBIfOpened @Override public JobPaginatedResult getFailedJobs(final int page, final int pageSize) throws JobQueueDataException { + return getJobsByState(JobStateQueryParameters.builder() + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_UPDATED_AT) + .states(JobState.FAILED, JobState.FAILED_PERMANENTLY).build() + ); + } - try { - - var query = GET_JOBS_QUERY_BY_STATE - .replace(REPLACE_TOKEN_PARAMETERS, "(?)") - .replace(REPLACE_TOKEN_ORDER_BY, "updated_at"); - - DotConnect dc = new DotConnect(); - dc.setSQL(query); - dc.addParam(JobState.FAILED.name()); - dc.addParam(JobState.FAILED.name()); // Repeated for paginated_data CTE - dc.addParam(pageSize); - dc.addParam((page - 1) * pageSize); - - return jobPaginatedResult(page, pageSize, dc); - } catch (DotDataException e) { - Logger.error(this, "Database error while fetching failed jobs", e); - throw new JobQueueDataException("Database error while fetching failed jobs", e); - } + @CloseDBIfOpened + @Override + public JobPaginatedResult getAbandonedJobs(final int page, final int pageSize) + throws JobQueueDataException { + return getJobsByState(JobStateQueryParameters.builder() + .page(page) + .pageSize(pageSize) + .orderByColumn(COLUMN_UPDATED_AT) + .states(JobState.ABANDONED, JobState.ABANDONED_PERMANENTLY).build() + ); } @CloseDBIfOpened @@ -524,11 +484,11 @@ public void updateJobStatus(final Job job) throws JobQueueDataException { }).orElse(null)); historyDc.loadResult(); - // Remove from job_queue if completed, failed, abandoned or canceled - if (job.state() == JobState.COMPLETED - || job.state() == JobState.FAILED - || job.state() == JobState.ABANDONED - || job.state() == JobState.CANCELED) { + // Remove from job_queue if the job is considered done + if (job.state() != JobState.PENDING + && job.state() != JobState.RUNNING + && job.state() != JobState.CANCEL_REQUESTED + && job.state() != JobState.CANCELLING) { removeJobFromQueue(job.id()); } @@ -694,9 +654,15 @@ public void updateJobProgress(final String jobId, final float progress) } } + /** + * Removes a job from the queue. This method should be used for jobs that have permanently + * failed and cannot be retried. + * + * @param jobId The ID of the job to remove. + * @throws JobQueueDataException if there's a data storage error while removing the job + */ @CloseDBIfOpened - @Override - public void removeJobFromQueue(final String jobId) throws JobQueueDataException { + private void removeJobFromQueue(final String jobId) throws JobQueueDataException { try { DotConnect dc = new DotConnect(); @@ -743,6 +709,181 @@ public boolean hasJobBeenInState(final String jobId, final JobState... states) } } + /** + * Retrieves a paginated result of jobs filtered by state, queue name, and date range. + * + * @param parameters An instance of JobStateQueryParameters containing filter and pagination + * information. + * @return A JobPaginatedResult containing the jobs that match the specified filters and + * pagination criteria. + * @throws JobQueueDataException if there is a data storage error while fetching the jobs. + */ + @CloseDBIfOpened + private JobPaginatedResult getJobsByState(final JobStateQueryParameters parameters) + throws JobQueueDataException { + + if (parameters.queueName().isPresent() && parameters.startDate().isPresent() + && parameters.endDate().isPresent()) { + return getJobsFilterByNameDateAndState(parameters); + } else if (parameters.queueName().isPresent()) { + return getJobsFilterByNameAndState(parameters); + } + + return getJobsFilterByState(parameters); + } + + /** + * Helper method to fetch jobs by state and return a paginated result. + * + * @param parameters An instance of JobStateQueryParameters containing filter and pagination. + * This includes page number, page size, job states, and order by column. + * @return A JobPaginatedResult instance + * @throws JobQueueDataException if there's a data storage error while fetching the jobs + */ + @CloseDBIfOpened + private JobPaginatedResult getJobsFilterByState( + final JobStateQueryParameters parameters) throws JobQueueDataException { + + final var states = parameters.states(); + final var page = parameters.page(); + final var pageSize = parameters.pageSize(); + final var orderByColumn = parameters.orderByColumn(); + + try { + + String statesParam = String.join(", ", Collections.nCopies(states.length, "?")); + + var query = GET_JOBS_QUERY_BY_STATE + .replace(REPLACE_TOKEN_PARAMETERS, "(" + statesParam + ")") + .replace(REPLACE_TOKEN_ORDER_BY, orderByColumn); + + DotConnect dc = new DotConnect(); + dc.setSQL(query); + for (JobState state : states) { + dc.addParam(state.name()); + } + for (JobState state : states) {// Repeated for paginated_data CTE + dc.addParam(state.name()); + } + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + return jobPaginatedResult(page, pageSize, dc); + } catch (DotDataException e) { + final var message = "Database error while fetching jobs by state"; + Logger.error(this, message, e); + throw new JobQueueDataException(message, e); + } + } + + /** + * Retrieves a paginated result of jobs filtered by state and queue name. + * + * @param parameters An instance of JobStateQueryParameters containing filter and pagination + * information. This includes queue name, job states, page number, page size + * and order by column. + * @return A JobPaginatedResult containing the jobs that match the specified filters and + * pagination criteria. + * @throws JobQueueDataException if there is a data storage error while fetching the jobs. + */ + @CloseDBIfOpened + public JobPaginatedResult getJobsFilterByNameAndState( + final JobStateQueryParameters parameters) throws JobQueueDataException { + + final var queueName = parameters.queueName().orElseThrow(); + final var states = parameters.states(); + final var page = parameters.page(); + final var pageSize = parameters.pageSize(); + final var orderByColumn = parameters.orderByColumn(); + + String statesParam = String.join(", ", + Collections.nCopies(parameters.states().length, "?")); + + var query = GET_JOBS_QUERY_BY_QUEUE_AND_STATE + .replace(REPLACE_TOKEN_PARAMETERS, "(" + statesParam + ")") + .replace(REPLACE_TOKEN_ORDER_BY, orderByColumn); + + try { + + DotConnect dc = new DotConnect(); + dc.setSQL(query); + dc.addParam(queueName); + for (JobState state : states) { + dc.addParam(state.name()); + } + dc.addParam(queueName); // Repeated for paginated_data CTE + for (JobState state : states) { + dc.addParam(state.name()); + } + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + return jobPaginatedResult(page, pageSize, dc); + } catch (DotDataException e) { + Logger.error(this, + "Database error while fetching active jobs by queue", e); + throw new JobQueueDataException( + "Database error while fetching active jobs by queue", e); + } + } + + /** + * Retrieves a paginated result of jobs filtered by state, queue name, and date range. + * + * @param parameters An instance of JobStateQueryParameters containing filter and pagination + * information. This includes queue name, start and end dates, job states, + * page number, page size, order by column and filter date column. + * @return A JobPaginatedResult containing the jobs that match the specified filters and + * pagination criteria. + * @throws JobQueueDataException if there is a data storage error while fetching the jobs. + */ + @CloseDBIfOpened + private JobPaginatedResult getJobsFilterByNameDateAndState( + final JobStateQueryParameters parameters) throws JobQueueDataException { + + final var queueName = parameters.queueName().orElseThrow(); + final var startDate = parameters.startDate().orElseThrow(); + final var endDate = parameters.endDate().orElseThrow(); + final var states = parameters.states(); + final var page = parameters.page(); + final var pageSize = parameters.pageSize(); + final var orderByColumn = parameters.orderByColumn(); + final var filterDateColumn = parameters.filterDateColumn().orElseThrow(); + + String statesParam = String.join(", ", + Collections.nCopies(parameters.states().length, "?")); + + var query = GET_JOBS_QUERY_BY_QUEUE_AND_STATE_IN_DATE_RANGE + .replace(REPLACE_TOKEN_PARAMETERS, "(" + statesParam + ")") + .replace(REPLACE_TOKEN_ORDER_BY, orderByColumn) + .replace(REPLACE_TOKEN_DATE_COLUMN, filterDateColumn); + + try { + DotConnect dc = new DotConnect(); + dc.setSQL(query); + dc.addParam(queueName); + for (JobState state : states) { + dc.addParam(state.name()); + } + dc.addParam(Timestamp.valueOf(startDate)); + dc.addParam(Timestamp.valueOf(endDate)); + dc.addParam(queueName); // Repeated for paginated_data CTE + for (JobState state : states) { + dc.addParam(state.name()); + } + dc.addParam(Timestamp.valueOf(startDate)); + dc.addParam(Timestamp.valueOf(endDate)); + dc.addParam(pageSize); + dc.addParam((page - 1) * pageSize); + + return jobPaginatedResult(page, pageSize, dc); + } catch (DotDataException e) { + final var message = "Database error while fetching jobs by queue and state"; + Logger.error(this, message, e); + throw new JobQueueDataException(message, e); + } + } + /** * Helper method to create a JobPaginatedResult from a DotConnect query result. * diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java index d57d5d3edfaf..c3e86f797c71 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueHelper.java @@ -3,6 +3,7 @@ import static com.dotcms.jobs.business.util.JobUtil.roundedProgress; import com.dotcms.jobs.business.api.JobQueueManagerAPI; +import com.dotcms.jobs.business.api.events.JobWatcher; import com.dotcms.jobs.business.error.JobProcessorNotFoundException; import com.dotcms.jobs.business.job.Job; import com.dotcms.jobs.business.job.JobPaginatedResult; @@ -19,7 +20,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.liferay.portal.model.User; -import java.io.IOException; import java.io.InputStream; import java.time.format.DateTimeFormatter; import java.util.HashMap; @@ -31,10 +31,7 @@ import javax.enterprise.context.ApplicationScoped; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.core.MediaType; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; -import org.glassfish.jersey.media.sse.EventOutput; -import org.glassfish.jersey.media.sse.OutboundEvent; /** * Helper class for interacting with the job queue system. This class provides methods for creating, cancelling, and listing jobs. @@ -182,9 +179,28 @@ void cancelJob(String jobId) throws DotDataException { * @param jobId The ID of the job * @param watcher The watcher */ - void watchJob(String jobId, Consumer watcher) { + JobWatcher watchJob(String jobId, Consumer watcher) { // if it does then watch it - jobQueueManagerAPI.watchJob(jobId, watcher); + return jobQueueManagerAPI.watchJob(jobId, watcher); + } + + /** + * Removes a watcher from a job + * + * @param jobId The ID of the job + * @param watcher The watcher to remove + */ + void removeWatcher(final String jobId, final JobWatcher watcher) { + jobQueueManagerAPI.removeJobWatcher(jobId, watcher); + } + + /** + * Removes all watchers from a job + * + * @param jobId The ID of the job + */ + void removeAllWatchers(final String jobId) { + jobQueueManagerAPI.removeAllJobWatchers(jobId); } /** @@ -251,6 +267,22 @@ JobPaginatedResult getCompletedJobs(int page, int pageSize) { return JobPaginatedResult.builder().build(); } + /** + * Retrieves a list of successful jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of successful jobs and pagination information. + */ + JobPaginatedResult getSuccessfulJobs(int page, int pageSize) { + try { + return jobQueueManagerAPI.getSuccessfulJobs(page, pageSize); + } catch (DotDataException e) { + Logger.error(this.getClass(), "Error fetching successful jobs", e); + } + return JobPaginatedResult.builder().build(); + } + /** * Retrieves a list of canceled jobs * @@ -272,7 +304,7 @@ JobPaginatedResult getCanceledJobs(int page, int pageSize) { * * @param page The page number * @param pageSize The number of jobs per page - * @return A result object containing the list of completed jobs and pagination information. + * @return A result object containing the list of failed jobs and pagination information. */ JobPaginatedResult getFailedJobs(int page, int pageSize) { try { @@ -283,6 +315,22 @@ JobPaginatedResult getFailedJobs(int page, int pageSize) { return JobPaginatedResult.builder().build(); } + /** + * Retrieves a list of abandoned jobs + * + * @param page The page number + * @param pageSize The number of jobs per page + * @return A result object containing the list of abandoned jobs and pagination information. + */ + JobPaginatedResult getAbandonedJobs(int page, int pageSize) { + try { + return jobQueueManagerAPI.getAbandonedJobs(page, pageSize); + } catch (DotDataException e) { + Logger.error(this.getClass(), "Error fetching abandoned jobs", e); + } + return JobPaginatedResult.builder().build(); + } + /** * Retrieves a list of active jobs for a specific queue. * @return JobPaginatedResult @@ -358,43 +406,6 @@ Job getJobForSSE(final String jobId) throws DotDataException { return job; } - /** - * Send an error event and close the connection - * - * @param errorName The name of the error event - * @param errorCode The error code - * @param eventOutput The event output - */ - void sendErrorAndClose(final String errorName, final String errorCode, - final EventOutput eventOutput) { - - try { - OutboundEvent event = new OutboundEvent.Builder() - .mediaType(MediaType.TEXT_HTML_TYPE) - .name(errorName) - .data(String.class, errorCode) - .build(); - eventOutput.write(event); - closeSSEConnection(eventOutput); - } catch (IOException e) { - Logger.error(this, "Error sending error event", e); - closeSSEConnection(eventOutput); - } - } - - /** - * Close the SSE connection - * - * @param eventOutput The event output - */ - void closeSSEConnection(final EventOutput eventOutput) { - try { - eventOutput.close(); - } catch (IOException e) { - Logger.error(this, "Error closing SSE connection", e); - } - } - /** * Check if a job is in a terminal state * @@ -402,8 +413,9 @@ void closeSSEConnection(final EventOutput eventOutput) { * @return true if the job is in a terminal state, false otherwise */ boolean isTerminalState(final JobState state) { - return state == JobState.COMPLETED || - state == JobState.FAILED || + return state == JobState.SUCCESS || + state == JobState.FAILED_PERMANENTLY || + state == JobState.ABANDONED_PERMANENTLY || state == JobState.CANCELED; } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index 268ca26b38b4..d650845ca9b5 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -5,15 +5,13 @@ import com.dotcms.jobs.business.job.JobPaginatedResult; import com.dotcms.rest.ResponseEntityView; import com.dotcms.rest.WebResource; +import com.dotcms.rest.WebResource.InitBuilder; import com.dotcms.rest.exception.mapper.ExceptionMapperUtil; import com.dotmarketing.exception.DotDataException; -import com.dotmarketing.util.Logger; import com.fasterxml.jackson.core.JsonProcessingException; import graphql.VisibleForTesting; -import java.io.IOException; import java.util.Map; import java.util.Set; -import java.util.function.Consumer; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.BeanParam; @@ -29,7 +27,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.glassfish.jersey.media.sse.EventOutput; -import org.glassfish.jersey.media.sse.OutboundEvent; import org.glassfish.jersey.media.sse.SseFeature; @Path("/v1/jobs") @@ -37,20 +34,19 @@ public class JobQueueResource { private final WebResource webResource; private final JobQueueHelper helper; - private final SSEConnectionManager sseConnectionManager; + private final SSEMonitorUtil sseMonitorUtil; @Inject - public JobQueueResource(final JobQueueHelper helper, - final SSEConnectionManager sseConnectionManager) { - this(new WebResource(), helper, sseConnectionManager); + public JobQueueResource(final JobQueueHelper helper, final SSEMonitorUtil sseMonitorUtil) { + this(new WebResource(), helper, sseMonitorUtil); } @VisibleForTesting public JobQueueResource(WebResource webResource, JobQueueHelper helper, - SSEConnectionManager sseConnectionManager) { + final SSEMonitorUtil sseMonitorUtil) { this.webResource = webResource; this.helper = helper; - this.sseConnectionManager = sseConnectionManager; + this.sseMonitorUtil = sseMonitorUtil; } @POST @@ -62,7 +58,7 @@ public Response createJob( @PathParam("queueName") String queueName, @BeanParam JobParams form) throws JsonProcessingException, DotDataException { - final var initDataObject = new WebResource.InitBuilder(webResource) + final var initDataObject = new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -87,7 +83,7 @@ public Response createJob( @PathParam("queueName") String queueName, Map parameters) throws DotDataException { - final var initDataObject = new WebResource.InitBuilder(webResource) + final var initDataObject = new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -107,7 +103,7 @@ public Response createJob( @Path("/queues") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView> getQueues(@Context HttpServletRequest request) { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -123,7 +119,7 @@ public ResponseEntityView getJobStatus(@Context HttpServletRequest request, @PathParam("jobId") String jobId) throws DotDataException { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -140,7 +136,7 @@ public ResponseEntityView getJobStatus(@Context HttpServletRequest request, @Consumes(MediaType.WILDCARD) public ResponseEntityView cancelJob(@Context HttpServletRequest request, @PathParam("jobId") String jobId) throws DotDataException { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -157,7 +153,7 @@ public ResponseEntityView activeJobs(@Context HttpServletReq @PathParam("queueName") String queueName, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -167,83 +163,12 @@ public ResponseEntityView activeJobs(@Context HttpServletReq return new ResponseEntityView<>(result); } - @GET - @Path("/{jobId}/monitor") - @Produces(SseFeature.SERVER_SENT_EVENTS) - public EventOutput monitorJob(@Context HttpServletRequest request, - @PathParam("jobId") String jobId) { - - new WebResource.InitBuilder(webResource) - .requiredBackendUser(true) - .requiredFrontendUser(false) - .requestAndResponse(request, null) - .rejectWhenNoUser(true) - .init(); - - final EventOutput eventOutput = new EventOutput(); - - try { - Job job = helper.getJobForSSE(jobId); - - if (job == null) { - helper.sendErrorAndClose("job-not-found", "404", eventOutput); - return eventOutput; - } - - if (helper.isNotWatchable(job)) { - helper.sendErrorAndClose(String.format("job-not-watchable [%s]", - job.state()), "400", eventOutput); - return eventOutput; - } - - if (!sseConnectionManager.canAcceptNewConnection(jobId)) { - helper.sendErrorAndClose("too-many-connections", "429", eventOutput); - return eventOutput; - } - - // Callback for watching job updates and sending them to the client - Consumer jobWatcher = watched -> { - if (!eventOutput.isClosed()) { - try { - OutboundEvent event = new OutboundEvent.Builder() - .mediaType(MediaType.APPLICATION_JSON_TYPE) - .name("job-update") - .data(Map.class, helper.getJobStatusInfo(watched)) - .build(); - eventOutput.write(event); - - // If job is complete/failed/cancelled, close the connection - if (helper.isTerminalState(watched.state())) { - sseConnectionManager.closeJobConnections(jobId); - } - - } catch (IOException e) { - Logger.error(this, "Error writing SSE event", e); - sseConnectionManager.closeJobConnections(jobId); - } - } - }; - - // Register the connection and watcher - sseConnectionManager.addConnection(jobId, eventOutput); - - // Start watching the job - helper.watchJob(job.id(), jobWatcher); - - } catch (DotDataException e) { - Logger.error(this, "Error setting up job monitor", e); - helper.closeSSEConnection(eventOutput); - } - - return eventOutput; - } - @GET @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView listJobs(@Context HttpServletRequest request, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -259,7 +184,7 @@ public ResponseEntityView listJobs(@Context HttpServletReque public ResponseEntityView activeJobs(@Context HttpServletRequest request, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -275,7 +200,7 @@ public ResponseEntityView activeJobs(@Context HttpServletReq public ResponseEntityView completedJobs(@Context HttpServletRequest request, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -285,13 +210,30 @@ public ResponseEntityView completedJobs(@Context HttpServlet return new ResponseEntityView<>(result); } + @GET + @Path("/successful") + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView successfulJobs( + @Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + new InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getSuccessfulJobs(page, pageSize); + return new ResponseEntityView<>(result); + } + @GET @Path("/canceled") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView canceledJobs(@Context HttpServletRequest request, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -307,7 +249,7 @@ public ResponseEntityView canceledJobs(@Context HttpServletR public ResponseEntityView failedJobs(@Context HttpServletRequest request, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { - new WebResource.InitBuilder(webResource) + new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) .requestAndResponse(request, null) @@ -317,4 +259,38 @@ public ResponseEntityView failedJobs(@Context HttpServletReq return new ResponseEntityView<>(result); } + @GET + @Path("/abandoned") + @Produces(MediaType.APPLICATION_JSON) + public ResponseEntityView abandonedJobs(@Context HttpServletRequest request, + @QueryParam("page") @DefaultValue("1") int page, + @QueryParam("pageSize") @DefaultValue("20") int pageSize) { + new InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + final JobPaginatedResult result = helper.getAbandonedJobs(page, pageSize); + return new ResponseEntityView<>(result); + } + + @GET + @Path("/{jobId}/monitor") + @Produces(SseFeature.SERVER_SENT_EVENTS) + @SuppressWarnings("java:S1854") // jobWatcher assignment is needed for cleanup in catch blocks + public EventOutput monitorJob(@Context HttpServletRequest request, + @PathParam("jobId") String jobId) { + + new InitBuilder(webResource) + .requiredBackendUser(true) + .requiredFrontendUser(false) + .requestAndResponse(request, null) + .rejectWhenNoUser(true) + .init(); + + // Set up job monitoring + return sseMonitorUtil.monitorJob(jobId); + } + } \ No newline at end of file diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java index 104b824f6429..22724ff2186a 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java @@ -101,7 +101,7 @@ public void shutdown() { * @param jobId The ID of the job for which to check connection availability * @return true if a new connection can be accepted, false otherwise */ - public boolean canAcceptNewConnection(String jobId) { + public boolean canAcceptNewConnection(final String jobId) { if (getTotalConnections() >= MAX_SSE_TOTAL_CONNECTIONS.get()) { return false; } @@ -116,9 +116,10 @@ public boolean canAcceptNewConnection(String jobId) { * * @param jobId The ID of the job to monitor * @param eventOutput The EventOutput instance representing the SSE connection + * @return The created SSEConnection instance * @throws IllegalStateException if the manager is shut down */ - public void addConnection(String jobId, EventOutput eventOutput) { + public SSEConnection addConnection(final String jobId, final EventOutput eventOutput) { if (isShutdown) { throw new IllegalStateException("SSEConnectionManager is shut down"); @@ -130,28 +131,33 @@ public void addConnection(String jobId, EventOutput eventOutput) { // Schedule connection timeout timeoutExecutor.schedule(() -> { try { - removeConnection(jobId, connection); + closeConnection(connection); } catch (Exception e) { - Logger.error(this, "Error removing expired connection", e); + Logger.error(this, "Error closing expired connection", e); } }, SSE_CONNECTION_TIMEOUT_MINUTES.get(), TimeUnit.MINUTES); + + return connection; } /** - * Removes a specific SSE connection for a job. If this was the last connection for the job, the + * Closes a specific SSE connection for a job. If this was the last connection for the job, the * job entry is removed from tracking. * - * @param jobId The ID of the job * @param connection The connection to remove */ - public void removeConnection(String jobId, SSEConnection connection) { - Set connections = jobConnections.get(jobId); - if (connections != null) { - connections.remove(connection); - connection.close(); + public void closeConnection(final SSEConnection connection) { - if (connections.isEmpty()) { - jobConnections.remove(jobId); + if (connection != null) { + Set connections = jobConnections.get(connection.jobId); + if (connections != null) { + connections.remove(connection); + connection.close(); + + // If this was the last connection for the job, clean up the job entry + if (connections.isEmpty()) { + jobConnections.remove(connection.jobId); + } } } } @@ -182,7 +188,7 @@ private void closeAllConnections() { * * @param jobId The ID of the job whose connections should be closed */ - public void closeJobConnections(String jobId) { + public void closeAllJobConnections(final String jobId) { Set connections = jobConnections.remove(jobId); if (connections != null) { connections.forEach(SSEConnection::close); @@ -195,7 +201,7 @@ public void closeJobConnections(String jobId) { * @param jobId The ID of the job * @return The number of active connections for the job */ - public int getConnectionCount(String jobId) { + public int getConnectionCount(final String jobId) { Set connections = jobConnections.get(jobId); return connections != null ? connections.size() : 0; } @@ -265,6 +271,15 @@ public boolean isExpired() { public String getJobId() { return jobId; } + + /** + * Gets the EventOutput instance representing the SSE connection. + * + * @return The EventOutput instance + */ + public EventOutput getEventOutput() { + return eventOutput; + } } } diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java new file mode 100644 index 000000000000..ffa6a2b204c8 --- /dev/null +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java @@ -0,0 +1,171 @@ +package com.dotcms.rest.api.v1.job; + +import com.dotcms.jobs.business.api.events.JobWatcher; +import com.dotcms.jobs.business.job.Job; +import com.dotcms.rest.api.v1.job.SSEConnectionManager.SSEConnection; +import com.dotmarketing.exception.DotRuntimeException; +import com.dotmarketing.util.Logger; +import java.io.IOException; +import java.util.Map; +import java.util.function.Consumer; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.core.MediaType; +import org.glassfish.jersey.media.sse.EventOutput; +import org.glassfish.jersey.media.sse.OutboundEvent; +import org.glassfish.jersey.media.sse.OutboundEvent.Builder; + +/** + * Utility class for managing Server-Sent Events (SSE) job monitoring. This class handles the setup, + * maintenance, and cleanup of SSE connections for monitoring job progress and status updates. + * + *

Key responsibilities include: + *

    + *
  • Setting up SSE connections for job monitoring
  • + *
  • Managing job watchers and event streams
  • + *
  • Handling error conditions and connection cleanup
  • + *
  • Coordinating between job updates and SSE event publishing
  • + *
+ * + *

Usage example: + *

{@code
+ * @Inject
+ * private SSEMonitorUtil sseMonitorUtil;
+ *
+ * // Set up job monitoring
+ * EventOutput eventOutput = sseMonitorUtil.monitorJob(jobId);
+ * }
+ * + *

This class is thread-safe and can handle multiple concurrent monitoring sessions. + * It automatically manages resource cleanup through the {@link SSEConnectionManager} and + * ensures proper handling of connection lifecycles. + * + * @see SSEConnectionManager + * @see JobQueueHelper + */ +@ApplicationScoped +public class SSEMonitorUtil { + + private final JobQueueHelper helper; + private final SSEConnectionManager sseConnectionManager; + + public SSEMonitorUtil() { + // Default constructor required for CDI + this.helper = null; + this.sseConnectionManager = null; + } + + @Inject + public SSEMonitorUtil(JobQueueHelper helper, SSEConnectionManager sseConnectionManager) { + this.helper = helper; + this.sseConnectionManager = sseConnectionManager; + } + + /** + * Sets up job monitoring via SSE + * + * @param jobId The job ID to monitor + * @return EventOutput for streaming updates + */ + @SuppressWarnings("java:S1854") // jobWatcher assignment is needed for cleanup in catch blocks + public EventOutput monitorJob(final String jobId) { + + JobWatcher jobWatcher = null; + final EventOutput eventOutput = new EventOutput(); + final var connection = sseConnectionManager.addConnection(jobId, eventOutput); + + try { + + Job job = helper.getJobForSSE(jobId); + if (job == null) { + sendErrorAndClose("job-not-found", "404", connection); + return eventOutput; + } + + if (helper.isNotWatchable(job)) { + sendErrorAndClose(String.format("job-not-watchable [%s]", + job.state()), "400", connection); + return eventOutput; + } + + if (!sseConnectionManager.canAcceptNewConnection(jobId)) { + sendErrorAndClose("too-many-connections", "429", connection); + return eventOutput; + } + + // Callback for watching job updates and sending them to the client + Consumer jobWatcherConsumer = watched -> { + if (!eventOutput.isClosed()) { + OutboundEvent event = new Builder() + .mediaType(MediaType.APPLICATION_JSON_TYPE) + .name("job-update") + .data(Map.class, helper.getJobStatusInfo(watched)) + .build(); + + try { + eventOutput.write(event); + } catch (IOException e) { + final var errorMessage = "Error writing SSE event"; + Logger.error(this, errorMessage, e); + // Re-throw the IOException to be caught by the outer catch block + throw new DotRuntimeException(errorMessage, e); + } + + // If job is in a completed state, close all connections as no further + // updates will be available + if (helper.isTerminalState(watched.state())) { + sseConnectionManager.closeAllJobConnections(jobId); + } + } + }; + + // Start watching the job + jobWatcher = helper.watchJob(job.id(), jobWatcherConsumer); + + return eventOutput; + } catch (IOException e) { + final var errorMessage = "Error writing SSE event"; + Logger.error(this, errorMessage, e); + cleanupOnError(jobId, connection, jobWatcher); + throw new DotRuntimeException(errorMessage, e); + } catch (Exception e) { + final var errorMessage = "Error setting up job monitor"; + Logger.error(this, errorMessage, e); + cleanupOnError(jobId, connection, jobWatcher); + throw new DotRuntimeException(errorMessage, e); + } + } + + /** + * Send an error event and close the connection + * + * @param errorName The name of the error event + * @param errorCode The error code + * @param connection The SSE connection to close + * @throws IOException If there is an error writing the event + */ + private void sendErrorAndClose(final String errorName, final String errorCode, + final SSEConnection connection) throws IOException { + OutboundEvent event = new OutboundEvent.Builder() + .mediaType(MediaType.TEXT_HTML_TYPE) + .name(errorName) + .data(String.class, errorCode) + .build(); + connection.getEventOutput().write(event); + sseConnectionManager.closeConnection(connection); + } + + /** + * Clean up resources after an error + */ + private void cleanupOnError(final String jobId, final SSEConnection connection, + final JobWatcher jobWatcher) { + if (connection != null) { + sseConnectionManager.closeConnection(connection); + } + if (jobWatcher != null) { + helper.removeWatcher(jobId, jobWatcher); + } + } + +} diff --git a/dotCMS/src/main/java/com/dotmarketing/util/FileUtil.java b/dotCMS/src/main/java/com/dotmarketing/util/FileUtil.java index 7eb7f040cec9..1d0c30c3c396 100644 --- a/dotCMS/src/main/java/com/dotmarketing/util/FileUtil.java +++ b/dotCMS/src/main/java/com/dotmarketing/util/FileUtil.java @@ -9,20 +9,19 @@ import com.liferay.util.StringPool; import io.vavr.Lazy; import io.vavr.control.Try; -import java.nio.charset.Charset; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.StringUtils; - import java.io.BufferedInputStream; +import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileFilter; +import java.io.FileReader; import java.io.FileWriter; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URL; +import java.nio.charset.Charset; import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.Path; @@ -36,6 +35,8 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Stream; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; import org.mozilla.universalchardet.UniversalDetector; /** @@ -521,7 +522,21 @@ public static Charset detectEncodeType(final File file) { } + /** + * Count the number of lines in the file + * + * @param file the file to count the lines + * @return the number of lines in the file + */ + public static Long countFileLines(final File file) throws IOException { + long totalCount; + try (BufferedReader reader = new BufferedReader(new FileReader(file))) { + totalCount = reader.lines().count(); + } + + return totalCount; + } } diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java index 5ea4d2974f29..29c9b8372ef0 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPIIntegrationTest.java @@ -132,7 +132,7 @@ void test_CreateAndProcessJob() throws Exception { // Wait for the job to be processed CountDownLatch latch = new CountDownLatch(1); jobQueueManagerAPI.watchJob(jobId, job -> { - if (job.state() == JobState.COMPLETED) { + if (job.state() == JobState.SUCCESS) { latch.countDown(); } }); @@ -145,8 +145,8 @@ void test_CreateAndProcessJob() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { Job job = jobQueueManagerAPI.getJob(jobId); - assertEquals(JobState.COMPLETED, job.state(), - "Job should be in COMPLETED state"); + assertEquals(JobState.SUCCESS, job.state(), + "Job should be in SUCCESS state"); }); } @@ -183,7 +183,7 @@ void test_JobRetry() throws Exception { CountDownLatch latch = new CountDownLatch(1); jobQueueManagerAPI.watchJob(jobId, job -> { - if (job.state() == JobState.COMPLETED) { + if (job.state() == JobState.SUCCESS) { latch.countDown(); } }); @@ -196,8 +196,8 @@ void test_JobRetry() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { Job job = jobQueueManagerAPI.getJob(jobId); - assertEquals(JobState.COMPLETED, job.state(), - "Job should be in COMPLETED state"); + assertEquals(JobState.SUCCESS, job.state(), + "Job should be in SUCCESS state"); assertEquals(maxRetries + 1, processor.getAttempts(), "Job should have been attempted " + maxRetries + " times"); }); @@ -206,16 +206,17 @@ void test_JobRetry() throws Exception { /** * Method to test: Job failure handling in JobQueueManagerAPI * Given Scenario: A job is created that is designed to fail - * ExpectedResult: The job fails, is marked as FAILED, and contains the expected error details + * ExpectedResult: The job fails, is marked as FAILED_PERMANENTLY, and contains the expected + * error details */ @Test @Order(3) - void test_FailingJob() throws Exception { + void test_Failing_Permanently_Job() throws Exception { jobQueueManagerAPI.registerProcessor("failingQueue", FailingJobProcessor.class); - RetryStrategy contentImportRetryStrategy = new ExponentialBackoffRetryStrategy( + RetryStrategy noRetriesStrategy = new ExponentialBackoffRetryStrategy( 5000, 300000, 2.0, 0 ); - jobQueueManagerAPI.setRetryStrategy("failingQueue", contentImportRetryStrategy); + jobQueueManagerAPI.setRetryStrategy("failingQueue", noRetriesStrategy); if (!jobQueueManagerAPI.isStarted()) { jobQueueManagerAPI.start(); @@ -240,8 +241,8 @@ void test_FailingJob() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { Job job = jobQueueManagerAPI.getJob(jobId); - assertEquals(JobState.FAILED, job.state(), - "Job should be in FAILED state"); + assertEquals(JobState.FAILED_PERMANENTLY, job.state(), + "Job should be in FAILED_PERMANENTLY state"); assertNotNull(job.result().get().errorDetail().get(), "Job should have an error detail"); assertEquals("Simulated failure", @@ -306,8 +307,6 @@ void test_CancelJob() throws Exception { }); } - - /** * Method to test: Progress tracking functionality in JobQueueManagerAPI * Given Scenario: A job is created that reports progress during its execution @@ -342,12 +341,12 @@ void test_JobWithProgressTracker() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .until(() -> { Job job = jobQueueManagerAPI.getJob(jobId); - return job.state() == JobState.COMPLETED; + return job.state() == JobState.SUCCESS; }); // Verify final job state Job completedJob = jobQueueManagerAPI.getJob(jobId); - assertEquals(JobState.COMPLETED, completedJob.state(), "Job should be in COMPLETED state"); + assertEquals(JobState.SUCCESS, completedJob.state(), "Job should be in SUCCESS state"); assertEquals(1.0f, completedJob.progress(), 0.01f, "Final progress should be 1.0"); // Verify progress updates @@ -377,7 +376,6 @@ void test_JobWithProgressTracker() throws Exception { */ @Test @Order(6) - @Ignore void test_CombinedScenarios() throws Exception { // Register processors for different scenarios jobQueueManagerAPI.registerProcessor("successQueue", TestJobProcessor.class); @@ -409,12 +407,12 @@ void test_CombinedScenarios() throws Exception { // Watch jobs jobQueueManagerAPI.watchJob(successJob1Id, job -> { - if (job.state() == JobState.COMPLETED) { + if (job.state() == JobState.SUCCESS) { successLatch.countDown(); } }); jobQueueManagerAPI.watchJob(successJob2Id, job -> { - if (job.state() == JobState.COMPLETED) { + if (job.state() == JobState.SUCCESS) { successLatch.countDown(); } }); @@ -441,11 +439,11 @@ void test_CombinedScenarios() throws Exception { assertTrue(allCompleted, "All jobs should complete within the timeout period"); // Verify final states - assertEquals(JobState.COMPLETED, jobQueueManagerAPI.getJob(successJob1Id).state(), - "First success job should be completed"); - assertEquals(JobState.COMPLETED, jobQueueManagerAPI.getJob(successJob2Id).state(), - "Second success job should be completed"); - assertEquals(JobState.FAILED, jobQueueManagerAPI.getJob(failJobId).state(), + assertEquals(JobState.SUCCESS, jobQueueManagerAPI.getJob(successJob1Id).state(), + "First success job should be successful"); + assertEquals(JobState.SUCCESS, jobQueueManagerAPI.getJob(successJob2Id).state(), + "Second success job should be successful"); + assertEquals(JobState.FAILED_PERMANENTLY, jobQueueManagerAPI.getJob(failJobId).state(), "Fail job should be in failed state"); assertEquals(JobState.CANCELED, jobQueueManagerAPI.getJob(cancelJobId).state(), "Cancel job should be canceled"); @@ -457,8 +455,8 @@ void test_CombinedScenarios() throws Exception { Job failedJob = jobQueueManagerAPI.getJob(failJobId); assertEquals(2, failedJob.retryCount(), "Job should have been retried " + 2 + " times"); - assertEquals(JobState.FAILED, failedJob.state(), - "Job should be in FAILED state"); + assertEquals(JobState.FAILED_PERMANENTLY, failedJob.state(), + "Job should be in FAILED_PERMANENTLY state"); assertTrue(failedJob.result().isPresent(), "Failed job should have a result"); assertTrue(failedJob.result().get().errorDetail().isPresent(), @@ -555,8 +553,8 @@ void test_AbandonedJobDetection() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { Job job = jobQueueManagerAPI.getJob(jobId); - assertEquals(JobState.COMPLETED, job.state(), - "Job should be in COMPLETED state"); + assertEquals(JobState.SUCCESS, job.state(), + "Job should be in SUCCESS state"); }); // Verify job history contains the state transitions @@ -571,8 +569,119 @@ void test_AbandonedJobDetection() throws Exception { "Second state should be ABANDONED"); assertEquals(JobState.RUNNING.name(), history.get(2).get("state"), "Third state should be RUNNING"); - assertEquals(JobState.COMPLETED.name(), history.get(3).get("state"), - "Latest state should be COMPLETED"); + assertEquals(JobState.SUCCESS.name(), history.get(3).get("state"), + "Latest state should be SUCCESS"); + } + + /** + * Tests the abandoned job detection functionality. + * Given Scenario: A job exists in RUNNING state with an old timestamp + * ExpectedResult: The job is detected as abandoned, eventually marked as ABANDONED_PERMANENTLY + */ + @Test + @Order(8) + void test_Abandoned_Permanetly_Job() throws Exception { + + final String jobId = UUID.randomUUID().toString(); + final String queueName = "abandonedQueue"; + final Map parameters = Collections.singletonMap("test", "value"); + final String serverId = APILocator.getServerAPI().readServerId(); + final LocalDateTime oldTimestamp = LocalDateTime.now().minusMinutes(5); + + // Create a job directly in the database in RUNNING state to simulate an abandoned job + DotConnect dc = new DotConnect(); + + // Insert into job table + dc.setSQL("INSERT INTO job (id, queue_name, state, parameters, created_at, updated_at, started_at, execution_node) VALUES (?, ?, ?, ?::jsonb, ?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(new ObjectMapper().writeValueAsString(parameters)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(Timestamp.valueOf(oldTimestamp)) + .addParam(serverId) + .loadResult(); + + // Insert into job_queue table + dc.setSQL("INSERT INTO job_queue (id, queue_name, state, created_at) VALUES (?, ?, ?, ?)") + .addParam(jobId) + .addParam(queueName) + .addParam(JobState.RUNNING.name()) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Insert initial state into job_history + dc.setSQL("INSERT INTO job_history (id, job_id, state, execution_node, created_at) VALUES (?, ?, ?, ?, ?)") + .addParam(UUID.randomUUID().toString()) + .addParam(jobId) + .addParam(JobState.RUNNING.name()) + .addParam(serverId) + .addParam(Timestamp.valueOf(oldTimestamp)) + .loadResult(); + + // Verify the job was created in RUNNING state + Job initialJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.RUNNING, initialJob.state(), + "Job should be in RUNNING state initially"); + + // Start job queue manager if not started + if (!jobQueueManagerAPI.isStarted()) { + jobQueueManagerAPI.start(); + jobQueueManagerAPI.awaitStart(5, TimeUnit.SECONDS); + } + + // Register a processor for the abandoned job + jobQueueManagerAPI.registerProcessor(queueName, AbbandonedJobProcessor.class); + RetryStrategy noRetriesStrategy = new ExponentialBackoffRetryStrategy( + 5000, 300000, 2.0, 0 + ); + jobQueueManagerAPI.setRetryStrategy(queueName, noRetriesStrategy); + + // The job should be marked as abandoned + CountDownLatch latch = new CountDownLatch(1); + jobQueueManagerAPI.watchJob(jobId, job -> { + if (job.state() == JobState.ABANDONED) { + latch.countDown(); + } + }); + + boolean abandoned = latch.await(3, TimeUnit.MINUTES); + assertTrue(abandoned, "Job should be marked as abandoned within timeout period"); + + // Verify the abandoned job state and error details + Job abandonedJob = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.ABANDONED, abandonedJob.state(), + "Job should be in ABANDONED state"); + assertTrue(abandonedJob.result().isPresent(), + "Abandoned job should have a result"); + assertTrue(abandonedJob.result().get().errorDetail().isPresent(), + "Abandoned job should have error details"); + assertTrue(abandonedJob.result().get().errorDetail().get().message() + .contains("abandoned due to no updates"), + "Error message should indicate abandonment"); + + // Verify the job was put back in queue for retry and completed successfully + Awaitility.await().atMost(15, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + Job job = jobQueueManagerAPI.getJob(jobId); + assertEquals(JobState.ABANDONED_PERMANENTLY, job.state(), + "Job should be in ABANDONED_PERMANENTLY state"); + }); + + // Verify job history contains the state transitions + dc.setSQL("SELECT state FROM job_history WHERE job_id = ? ORDER BY created_at") + .addParam(jobId); + List> history = dc.loadObjectResults(); + + assertFalse(history.isEmpty(), "Job should have history records"); + assertEquals(JobState.RUNNING.name(), history.get(0).get("state"), + "First state should be RUNNING"); + assertEquals(JobState.ABANDONED.name(), history.get(1).get("state"), + "Second state should be ABANDONED"); + assertEquals(JobState.ABANDONED_PERMANENTLY.name(), history.get(2).get("state"), + "Latest state should be ABANDONED_PERMANENTLY"); } static class AbbandonedJobProcessor implements JobProcessor { diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java index d9f7d36bea68..09d25aabb0ad 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/api/JobQueueManagerAPITest.java @@ -367,8 +367,8 @@ public void test_JobRetry_single_retry() throws Exception { retryCount.incrementAndGet(); return mockJob; }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); + when(mockJob.markAsSuccessful(any())).thenAnswer(inv -> { + jobState.set(JobState.SUCCESS); return mockJob; }); when(mockJob.markAsFailed(any())).thenAnswer(inv -> { @@ -401,7 +401,7 @@ public void test_JobRetry_single_retry() throws Exception { throw new RuntimeException("Simulated failure"); } Job job = invocation.getArgument(0); - job.markAsCompleted(any()); + job.markAsSuccessful(any()); return null; }).when(mockJobProcessor).process(any()); @@ -413,7 +413,7 @@ public void test_JobRetry_single_retry() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { verify(mockJobProcessor, times(2)).process(any()); - assertEquals(JobState.COMPLETED, jobState.get()); + assertEquals(JobState.SUCCESS, jobState.get()); }); // Additional verifications @@ -462,8 +462,8 @@ public void test_JobRetry_retry_twice() throws Exception { lastRetry.set(LocalDateTime.now()); return mockJob; }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); + when(mockJob.markAsSuccessful(any())).thenAnswer(inv -> { + jobState.set(JobState.SUCCESS); return mockJob; }); when(mockJob.markAsFailed(any())).thenAnswer(inv -> { @@ -475,7 +475,7 @@ public void test_JobRetry_retry_twice() throws Exception { // Configure job queue to always return the mockJob until it's completed when(mockJobQueue.nextJob()).thenAnswer(inv -> - jobState.get() != JobState.COMPLETED ? mockJob : null + jobState.get() != JobState.SUCCESS ? mockJob : null ); // Configure retry strategy @@ -509,7 +509,7 @@ public void test_JobRetry_retry_twice() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { verify(mockJobProcessor, times(3)).process(any()); - assertEquals(JobState.COMPLETED, jobState.get()); + assertEquals(JobState.SUCCESS, jobState.get()); assertEquals(2, retryCount.get()); }); @@ -520,7 +520,7 @@ public void test_JobRetry_retry_twice() throws Exception { inOrder.verify(mockJob).markAsRunning(); inOrder.verify(mockJob).markAsFailed(any()); inOrder.verify(mockJob).markAsRunning(); - inOrder.verify(mockJob).markAsCompleted(any()); + inOrder.verify(mockJob).markAsSuccessful(any()); // Verify retry behavior verify(mockRetryStrategy, atLeast(2)).shouldRetry(any(), any()); @@ -573,6 +573,10 @@ public void test_JobRetry_MaxRetryLimit() throws Exception { jobState.set(JobState.FAILED); return mockJob; }); + when(mockJob.markAsFailedPermanently()).thenAnswer(inv -> { + jobState.set(JobState.FAILED_PERMANENTLY); + return mockJob; + }); when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); @@ -603,14 +607,13 @@ public void test_JobRetry_MaxRetryLimit() throws Exception { .untilAsserted(() -> { verify(mockJobProcessor, times(maxRetries + 1)). process(any()); // Initial attempt + retries - assertEquals(JobState.FAILED, jobState.get()); + assertEquals(JobState.FAILED_PERMANENTLY, jobState.get()); assertEquals(maxRetries, retryCount.get()); }); // Verify the job was not retried after reaching the max retry limit verify(mockRetryStrategy, times(maxRetries + 1)). shouldRetry(any(), any()); // Retries + final attempt - verify(mockJobQueue, times(1)).removeJobFromQueue(mockJob.id()); // Stop the job queue jobQueueManagerAPI.close(); @@ -643,8 +646,8 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { jobState.set(JobState.RUNNING); return mockJob; }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); + when(mockJob.markAsSuccessful(any())).thenAnswer(inv -> { + jobState.set(JobState.SUCCESS); return mockJob; }); when(mockJob.withProgressTracker(any(DefaultProgressTracker.class))).thenReturn(mockJob); @@ -661,7 +664,7 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { // Configure job processor to succeed doAnswer(inv -> { Job job = inv.getArgument(0); - job.markAsCompleted(any()); + job.markAsSuccessful(any()); return null; }).when(mockJobProcessor).process(any()); @@ -673,14 +676,14 @@ public void test_Job_SucceedsFirstAttempt() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { verify(mockJobProcessor, times(1)).process(any()); - assertEquals(JobState.COMPLETED, jobState.get()); + assertEquals(JobState.SUCCESS, jobState.get()); }); // Verify the job was processed only once and completed successfully verify(mockRetryStrategy, never()).shouldRetry(any(), any()); verify(mockJobQueue, times(2)).updateJobStatus(any()); verify(mockJobQueue, times(2)).updateJobStatus( - argThat(job -> job.state() == JobState.COMPLETED)); + argThat(job -> job.state() == JobState.SUCCESS)); // Stop the job queue jobQueueManagerAPI.close(); @@ -748,7 +751,6 @@ public void test_Job_NotRetryable() throws Exception { // Verify the job was not retried verify(mockRetryStrategy, times(1)).shouldRetry(any(), any()); verify(mockJobQueue, times(1)).putJobBackInQueue(any()); - verify(mockJobQueue, times(1)).removeJobFromQueue(mockJob.id()); // Capture and verify the error details ArgumentCaptor jobResultCaptor = ArgumentCaptor.forClass(JobResult.class); @@ -869,13 +871,13 @@ public void test_CircuitBreaker_Closes() throws Exception { } Job processingJob = inv.getArgument(0); - processingJob.markAsCompleted(any()); + processingJob.markAsSuccessful(any()); return null; }).when(mockJobProcessor).process(any()); AtomicReference jobState = new AtomicReference<>(JobState.PENDING); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - jobState.set(JobState.COMPLETED); + when(mockJob.markAsSuccessful(any())).thenAnswer(inv -> { + jobState.set(JobState.SUCCESS); return mockJob; }); @@ -904,7 +906,7 @@ public void test_CircuitBreaker_Closes() throws Exception { .pollInterval(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { assertTrue(circuitBreaker.allowRequest()); - assertEquals(JobState.COMPLETED, jobState.get()); + assertEquals(JobState.SUCCESS, jobState.get()); }); verify(mockJobProcessor, atLeast(6)).process(any()); @@ -1079,8 +1081,8 @@ public void test_complex_cancelJob() throws Exception { stateUpdates.add(JobState.CANCELED); return mockJob; }); - when(mockJob.markAsCompleted(any())).thenAnswer(inv -> { - stateUpdates.add(JobState.COMPLETED); + when(mockJob.markAsSuccessful(any())).thenAnswer(inv -> { + stateUpdates.add(JobState.SUCCESS); return mockJob; }); when(mockJob.markAsFailed(any())).thenAnswer(inv -> { diff --git a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java index f8fed037402e..a180cb5ea32d 100644 --- a/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java +++ b/dotcms-integration/src/test/java/com/dotcms/jobs/business/queue/PostgresJobQueueIntegrationTest.java @@ -122,7 +122,7 @@ void test_getActiveJobs() throws JobQueueException { * ExpectedResult: All completed jobs within the given time range are retrieved */ @Test - void testGetCompletedJobsForQueue() throws JobQueueException { + void test_getCompletedJobsForQueue() throws JobQueueException { String queueName = "testQueue"; LocalDateTime startDate = LocalDateTime.now().minusDays(1); @@ -132,14 +132,14 @@ void testGetCompletedJobsForQueue() throws JobQueueException { for (int i = 0; i < 3; i++) { String jobId = jobQueue.createJob(queueName, new HashMap<>()); Job job = jobQueue.getJob(jobId); - Job completedJob = job.markAsCompleted(null); + Job completedJob = job.markAsSuccessful(null); jobQueue.updateJobStatus(completedJob); } JobPaginatedResult result = jobQueue.getCompletedJobs(queueName, startDate, endDate, 1, 10); assertEquals(3, result.jobs().size()); assertEquals(3, result.total()); - result.jobs().forEach(job -> assertEquals(JobState.COMPLETED, job.state())); + result.jobs().forEach(job -> assertEquals(JobState.SUCCESS, job.state())); } /** @@ -148,7 +148,7 @@ void testGetCompletedJobsForQueue() throws JobQueueException { * ExpectedResult: All completed jobs are retrieved */ @Test - void testGetCompletedJobs() throws JobQueueException { + void test_getCompletedJobs() throws JobQueueException { String queueName = "testQueue"; @@ -156,14 +156,14 @@ void testGetCompletedJobs() throws JobQueueException { for (int i = 0; i < 3; i++) { String jobId = jobQueue.createJob(queueName, new HashMap<>()); Job job = jobQueue.getJob(jobId); - Job completedJob = job.markAsCompleted(null); + Job completedJob = job.markAsSuccessful(null); jobQueue.updateJobStatus(completedJob); } JobPaginatedResult result = jobQueue.getCompletedJobs(1, 10); assertEquals(3, result.jobs().size()); assertEquals(3, result.total()); - result.jobs().forEach(job -> assertEquals(JobState.COMPLETED, job.state())); + result.jobs().forEach(job -> assertEquals(JobState.SUCCESS, job.state())); } /** @@ -172,7 +172,7 @@ void testGetCompletedJobs() throws JobQueueException { * ExpectedResult: All canceled jobs are retrieved */ @Test - void testGetCanceledJobs() throws JobQueueException { + void test_getCanceledJobs() throws JobQueueException { String queueName = "testQueue"; @@ -214,6 +214,53 @@ void test_getFailedJobs() throws JobQueueException { result.jobs().forEach(job -> assertEquals(JobState.FAILED, job.state())); } + /** + * Method to test: getSuccessfulJobs in PostgresJobQueue Given Scenario: Multiple jobs are + * created and successfully completed ExpectedResult: All successful jobs are retrieved + * correctly + */ + @Test + void test_getSuccessfulJobs() throws JobQueueException { + + String queueName = "testQueue"; + + // Create and complete some jobs + for (int i = 0; i < 3; i++) { + String jobId = jobQueue.createJob(queueName, new HashMap<>()); + Job job = jobQueue.getJob(jobId); + Job completedJob = job.markAsSuccessful(null); + jobQueue.updateJobStatus(completedJob); + } + + JobPaginatedResult result = jobQueue.getSuccessfulJobs(1, 10); + assertEquals(3, result.jobs().size()); + assertEquals(3, result.total()); + result.jobs().forEach(job -> assertEquals(JobState.SUCCESS, job.state())); + } + + /** + * Method to test: getFailedJobs in PostgresJobQueue Given Scenario: Multiple jobs are created + * and set to failed state ExpectedResult: All failed jobs are retrieved correctly + */ + @Test + void test_getAbandonedJobs() throws JobQueueException { + + // Create and fail some jobs + for (int i = 0; i < 2; i++) { + String jobId = jobQueue.createJob("testQueue", new HashMap<>()); + Job job = jobQueue.getJob(jobId); + Job failedJob = Job.builder().from(job) + .state(JobState.ABANDONED) + .build(); + jobQueue.updateJobStatus(failedJob); + } + + JobPaginatedResult result = jobQueue.getAbandonedJobs(1, 10); + assertEquals(2, result.jobs().size()); + assertEquals(2, result.total()); + result.jobs().forEach(job -> assertEquals(JobState.ABANDONED, job.state())); + } + /** * Method to test: updateJobStatus in PostgresJobQueue * Given Scenario: A job's status is updated @@ -281,7 +328,7 @@ void test_nextJob() throws Exception { }); // Mark job as completed - Job completedJob = nextJob.markAsCompleted(null); + Job completedJob = nextJob.markAsSuccessful(null); jobQueue.updateJobStatus(completedJob); } } catch (Exception e) { @@ -307,8 +354,8 @@ void test_nextJob() throws Exception { // Verify all jobs are in COMPLETED state for (String jobId : createdJobIds) { Job job = jobQueue.getJob(jobId); - assertEquals(JobState.COMPLETED, job.state(), - "Job " + jobId + " is not in COMPLETED state"); + assertEquals(JobState.SUCCESS, job.state(), + "Job " + jobId + " is not in SUCCESS state"); } } @@ -495,7 +542,7 @@ void test_getJobs() throws JobQueueException { String completedJobId = jobQueue.createJob(queueName, new HashMap<>()); Job completedJob = jobQueue.getJob(completedJobId); - jobQueue.updateJobStatus(completedJob.markAsCompleted(null)); + jobQueue.updateJobStatus(completedJob.markAsSuccessful(null)); // Get all jobs JobPaginatedResult result = jobQueue.getJobs(1, 10); @@ -512,7 +559,7 @@ void test_getJobs() throws JobQueueException { } assertEquals(3, stateCounts.getOrDefault(JobState.PENDING, 0)); assertEquals(1, stateCounts.getOrDefault(JobState.RUNNING, 0)); - assertEquals(1, stateCounts.getOrDefault(JobState.COMPLETED, 0)); + assertEquals(1, stateCounts.getOrDefault(JobState.SUCCESS, 0)); } /** @@ -608,8 +655,18 @@ void test_removeJobFromQueue() throws JobQueueException { Job job = jobQueue.getJob(jobId); assertNotNull(job); - // Remove the job - jobQueue.removeJobFromQueue(jobId); + // Putting the job in a final state + Job updatedJob = Job.builder() + .from(job) + .state(JobState.FAILED_PERMANENTLY) + .progress(0.75f) + .startedAt(Optional.of(LocalDateTime.now().minusHours(1))) + .completedAt(Optional.of(LocalDateTime.now())) + .retryCount(2) + .build(); + + // Update the job + jobQueue.updateJobStatus(updatedJob); // Verify job is not returned by nextJob assertNull(jobQueue.nextJob()); @@ -648,7 +705,7 @@ void test_createUpdateAndRetrieveJob() throws JobQueueException { Job updatedJob = Job.builder() .from(initialJob) - .state(JobState.COMPLETED) + .state(JobState.SUCCESS) .progress(0.75f) .startedAt(Optional.of(LocalDateTime.now().minusHours(1))) .completedAt(Optional.of(LocalDateTime.now())) @@ -665,7 +722,7 @@ void test_createUpdateAndRetrieveJob() throws JobQueueException { // Verify all fields assertEquals(jobId, retrievedJob.id()); assertEquals(queueName, retrievedJob.queueName()); - assertEquals(JobState.COMPLETED, retrievedJob.state()); + assertEquals(JobState.SUCCESS, retrievedJob.state()); assertEquals(initialParameters, retrievedJob.parameters()); assertEquals(0.75f, retrievedJob.progress(), 0.001); assertTrue(retrievedJob.startedAt().isPresent()); @@ -775,8 +832,8 @@ void test_hasJobBeenInState() throws JobQueueException { assertFalse(jobQueue.hasJobBeenInState(jobId, JobState.CANCELED)); - jobQueue.updateJobStatus(job.withState(JobState.COMPLETED)); - assertTrue(jobQueue.hasJobBeenInState(jobId, JobState.COMPLETED)); + jobQueue.updateJobStatus(job.withState(JobState.SUCCESS)); + assertTrue(jobQueue.hasJobBeenInState(jobId, JobState.SUCCESS)); assertFalse(jobQueue.hasJobBeenInState(jobId, JobState.CANCELLING)); From c01fce810d9323e582f6cda7c9aa5a9a7c40c5ee Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Tue, 3 Dec 2024 12:59:03 -0600 Subject: [PATCH 2/9] #30367 Enhance SSE monitoring --- .../rest/api/v1/job/SSEConnectionManager.java | 67 +++----- .../rest/api/v1/job/SSEMonitorUtil.java | 149 +++++++++++++----- 2 files changed, 130 insertions(+), 86 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java index 22724ff2186a..457d498fa518 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java @@ -9,9 +9,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; import org.glassfish.jersey.media.sse.EventOutput; @@ -55,64 +52,57 @@ public class SSEConnectionManager { // Add status tracking private volatile boolean isShutdown = false; + // Add per-job connection limit, default to 5 and -1 to disable private static final Lazy MAX_SSE_CONNECTIONS_PER_JOB = Lazy.of(() -> Config.getIntProperty("MAX_SSE_CONNECTIONS_PER_JOB", 5)); + // Add total connection limit, default to 50 and -1 to disable private static final Lazy MAX_SSE_TOTAL_CONNECTIONS = Lazy.of(() -> Config.getIntProperty("MAX_SSE_TOTAL_CONNECTIONS", 50)); - private static final Lazy SSE_CONNECTION_TIMEOUT_MINUTES = - Lazy.of(() -> Config.getIntProperty("SSE_CONNECTION_TIMEOUT_MINUTES", 30)); - private final ConcurrentMap> jobConnections = new ConcurrentHashMap<>(); - private final ScheduledExecutorService timeoutExecutor = - Executors.newSingleThreadScheduledExecutor(); /** * Shuts down the SSE connection manager and cleans up all resources. This method closes all - * active connections and shuts down the timeout executor. After shutdown, no new connections - * can be added. + * active connections. After shutdown, no new connections can be added. */ @PreDestroy public void shutdown() { - isShutdown = true; - - try { - closeAllConnections(); - } finally { - timeoutExecutor.shutdown(); - try { - if (!timeoutExecutor.awaitTermination(30, TimeUnit.SECONDS)) { - timeoutExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - timeoutExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } + closeAllConnections(); } /** * Checks if a new SSE connection can be accepted for the given job. This method verifies both - * per-job and system-wide connection limits. + * per-job and system-wide connection limits if enabled (not -1). * * @param jobId The ID of the job for which to check connection availability * @return true if a new connection can be accepted, false otherwise */ public boolean canAcceptNewConnection(final String jobId) { - if (getTotalConnections() >= MAX_SSE_TOTAL_CONNECTIONS.get()) { + + final var maxSseTotalConnections = MAX_SSE_TOTAL_CONNECTIONS.get(); + final var maxSseConnectionsPerJob = MAX_SSE_CONNECTIONS_PER_JOB.get(); + + // Check total connections limit if enabled (not -1) + if (maxSseTotalConnections != -1 && getTotalConnections() >= maxSseTotalConnections) { return false; } + // If per-job limit is disabled (-1), allow connection + if (maxSseConnectionsPerJob == -1) { + return true; + } + + // Check per-job limit Set connections = jobConnections.get(jobId); - return connections == null || connections.size() < MAX_SSE_CONNECTIONS_PER_JOB.get(); + return connections == null || connections.size() < maxSseConnectionsPerJob; } /** * Adds a new SSE connection for a job. The connection will be automatically closed after the - * configured timeout period. + * configured timeout period if timeout is enabled (not -1). * * @param jobId The ID of the job to monitor * @param eventOutput The EventOutput instance representing the SSE connection @@ -128,15 +118,6 @@ public SSEConnection addConnection(final String jobId, final EventOutput eventOu SSEConnection connection = new SSEConnection(jobId, eventOutput); jobConnections.computeIfAbsent(jobId, k -> ConcurrentHashMap.newKeySet()).add(connection); - // Schedule connection timeout - timeoutExecutor.schedule(() -> { - try { - closeConnection(connection); - } catch (Exception e) { - Logger.error(this, "Error closing expired connection", e); - } - }, SSE_CONNECTION_TIMEOUT_MINUTES.get(), TimeUnit.MINUTES); - return connection; } @@ -253,16 +234,6 @@ public void close() { } } - /** - * Checks if this connection has exceeded its timeout period. - * - * @return true if the connection has expired, false otherwise - */ - public boolean isExpired() { - return LocalDateTime.now().isAfter( - createdAt.plusMinutes(SSE_CONNECTION_TIMEOUT_MINUTES.get())); - } - /** * Gets the ID of the job this connection is monitoring. * diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java index ffa6a2b204c8..c9c3d6ec3c26 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java @@ -1,5 +1,9 @@ package com.dotcms.rest.api.v1.job; +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; +import static javax.ws.rs.core.Response.Status.TOO_MANY_REQUESTS; + import com.dotcms.jobs.business.api.events.JobWatcher; import com.dotcms.jobs.business.job.Job; import com.dotcms.rest.api.v1.job.SSEConnectionManager.SSEConnection; @@ -74,64 +78,60 @@ public EventOutput monitorJob(final String jobId) { final EventOutput eventOutput = new EventOutput(); final var connection = sseConnectionManager.addConnection(jobId, eventOutput); - try { + try (final var resources = + new MonitorResources(jobId, connection, helper, sseConnectionManager)) { Job job = helper.getJobForSSE(jobId); if (job == null) { - sendErrorAndClose("job-not-found", "404", connection); + sendError(SSEError.JOB_NOT_FOUND, connection); return eventOutput; } if (helper.isNotWatchable(job)) { - sendErrorAndClose(String.format("job-not-watchable [%s]", - job.state()), "400", connection); + sendError(SSEError.JOB_NOT_WATCHABLE, connection); return eventOutput; } if (!sseConnectionManager.canAcceptNewConnection(jobId)) { - sendErrorAndClose("too-many-connections", "429", connection); + sendError(SSEError.TOO_MANY_CONNECTIONS, connection); return eventOutput; } // Callback for watching job updates and sending them to the client Consumer jobWatcherConsumer = watched -> { if (!eventOutput.isClosed()) { - OutboundEvent event = new Builder() - .mediaType(MediaType.APPLICATION_JSON_TYPE) - .name("job-update") - .data(Map.class, helper.getJobStatusInfo(watched)) - .build(); - try { + OutboundEvent event = new Builder() + .mediaType(MediaType.APPLICATION_JSON_TYPE) + .name("job-update") + .data(Map.class, helper.getJobStatusInfo(watched)) + .build(); eventOutput.write(event); + + // If job is in a completed state, close all connections as no further + // updates will be available + if (helper.isTerminalState(watched.state())) { + sseConnectionManager.closeAllJobConnections(jobId); + } + } catch (IOException e) { final var errorMessage = "Error writing SSE event"; Logger.error(this, errorMessage, e); - // Re-throw the IOException to be caught by the outer catch block + // Re-throw the IOException to be caught by the outer catch block in the + // RealTimeJobMonitor that will clean up the job watcher throw new DotRuntimeException(errorMessage, e); } - - // If job is in a completed state, close all connections as no further - // updates will be available - if (helper.isTerminalState(watched.state())) { - sseConnectionManager.closeAllJobConnections(jobId); - } } }; // Start watching the job jobWatcher = helper.watchJob(job.id(), jobWatcherConsumer); + resources.jobWatcher(jobWatcher); return eventOutput; - } catch (IOException e) { - final var errorMessage = "Error writing SSE event"; - Logger.error(this, errorMessage, e); - cleanupOnError(jobId, connection, jobWatcher); - throw new DotRuntimeException(errorMessage, e); } catch (Exception e) { final var errorMessage = "Error setting up job monitor"; Logger.error(this, errorMessage, e); - cleanupOnError(jobId, connection, jobWatcher); throw new DotRuntimeException(errorMessage, e); } } @@ -139,32 +139,105 @@ public EventOutput monitorJob(final String jobId) { /** * Send an error event and close the connection * - * @param errorName The name of the error event - * @param errorCode The error code + * @param error The error to send * @param connection The SSE connection to close * @throws IOException If there is an error writing the event */ - private void sendErrorAndClose(final String errorName, final String errorCode, - final SSEConnection connection) throws IOException { + private void sendError(final SSEError error, final SSEConnection connection) + throws IOException { OutboundEvent event = new OutboundEvent.Builder() .mediaType(MediaType.TEXT_HTML_TYPE) - .name(errorName) - .data(String.class, errorCode) + .name(error.getName()) + .data(String.class, String.valueOf(error.getCode())) .build(); connection.getEventOutput().write(event); - sseConnectionManager.closeConnection(connection); } /** - * Clean up resources after an error + * Enumeration representing various SSE (Server-Sent Events) error states with associated error + * names and HTTP status codes. It is used to identify specific error conditions related to job + * monitoring. + */ + private enum SSEError { + + JOB_NOT_FOUND("job-not-found", NOT_FOUND.getStatusCode()), + JOB_NOT_WATCHABLE("job-not-watchable", BAD_REQUEST.getStatusCode()), + TOO_MANY_CONNECTIONS("too-many-connections", TOO_MANY_REQUESTS.getStatusCode()); + + private final String name; + private final int code; + + SSEError(String name, int code) { + this.name = name; + this.code = code; + } + + public String getName() { + return name; + } + + public int getCode() { + return code; + } + } + + /** + * A resource management class that handles cleanup of SSE monitoring resources. This class + * implements AutoCloseable to ensure proper cleanup of both SSE connections and job watchers + * through try-with-resources blocks. + * + *

This class manages: + *

    + *
  • SSE connection lifecycle
  • + *
  • Job watcher registration and cleanup
  • + *
  • Automatic resource cleanup when monitoring ends or errors occur
  • + *
*/ - private void cleanupOnError(final String jobId, final SSEConnection connection, - final JobWatcher jobWatcher) { - if (connection != null) { - sseConnectionManager.closeConnection(connection); + private static class MonitorResources implements AutoCloseable { + + private final SSEConnection connection; + private JobWatcher jobWatcher; + private final String jobId; + private final JobQueueHelper helper; + private final SSEConnectionManager sseConnectionManager; + + /** + * Creates a new MonitorResources instance to manage SSE monitoring resources. + * + * @param jobId The ID of the job being monitored + * @param connection The SSE connection to manage + * @param helper Helper for job queue operations + * @param sseConnectionManager Manager for SSE connections + */ + MonitorResources(String jobId, SSEConnection connection, JobQueueHelper helper, + SSEConnectionManager sseConnectionManager) { + this.jobId = jobId; + this.connection = connection; + this.helper = helper; + this.sseConnectionManager = sseConnectionManager; + } + + /** + * Sets the job watcher for this monitoring session. + * + * @param watcher The job watcher to associate with this monitoring session + */ + void jobWatcher(JobWatcher watcher) { + this.jobWatcher = watcher; } - if (jobWatcher != null) { - helper.removeWatcher(jobId, jobWatcher); + + /** + * Closes and cleans up all monitoring resources. This includes closing the SSE connection + * and removing the job watcher if one exists. + */ + @Override + public void close() { + if (connection != null) { + sseConnectionManager.closeConnection(connection); + } + if (jobWatcher != null) { + helper.removeWatcher(jobId, jobWatcher); + } } } From 169318c9128850de76e3b4222738d96ca425d635 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Tue, 3 Dec 2024 13:26:17 -0600 Subject: [PATCH 3/9] #30367 Updating postman tests --- ...ueResourceAPITests.postman_collection.json | 104 +++++++++++++++--- 1 file changed, 87 insertions(+), 17 deletions(-) diff --git a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json index 14ecba3220a9..f6215595d3c5 100644 --- a/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json +++ b/dotcms-postman/src/main/resources/postman/JobQueueResourceAPITests.postman_collection.json @@ -1,6 +1,6 @@ { "info": { - "_postman_id": "be9c354a-6c94-4b10-be0e-bc0c1b324c24", + "_postman_id": "8f0a1603-03b2-4f37-b2a3-d6c6a8f5c910", "name": "JobQueueResource API Tests", "description": "Postman collection for testing the JobQueueResource API endpoints.", "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json", @@ -965,8 +965,8 @@ "var response = pm.response.json();", "console.log(\"Current job state:\", response.entity.state);", " ", - "// Check if job status is \"COMPLETED\"", - "if (response.entity.state === \"COMPLETED\") {", + "// Check if job status is \"SUCCESS\"", + "if (response.entity.state === \"SUCCESS\") {", " // Clear environment variables once done", " pm.environment.unset(\"startTime\");", " pm.environment.unset(\"retryCount\");", @@ -1060,8 +1060,8 @@ "var response = pm.response.json();", "console.log(\"Current job state:\", response.entity.state);", " ", - "// Check if job status is \"FAILED\"", - "if (response.entity.state === \"FAILED\") {", + "// Check if job status is \"FAILED_PERMANENTLY\"", + "if (response.entity.state === \"FAILED_PERMANENTLY\") {", " // Clear environment variables once done", " pm.environment.unset(\"startTime\");", " pm.environment.unset(\"retryCount\");", @@ -1360,6 +1360,76 @@ }, { "name": "Get all completed Jobs", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "// Parse the response JSON", + "const response = pm.response.json();", + "", + "// Validate that the response status is 200 OK", + "pm.test(\"Response status is 200\", function () {", + " pm.response.to.have.status(200);", + "});", + "", + "// Validate that the response contains an \"entity.jobs\" array", + "pm.test(\"Response should contain jobs array\", function () {", + " pm.expect(response.entity).to.have.property(\"jobs\");", + " pm.expect(response.entity.jobs).to.be.an(\"array\");", + "});", + "", + "// Validate that the jobs array contains 3 jobs", + "pm.test(\"Jobs array should contain 3 jobs\", function () {", + " pm.expect(response.entity.jobs.length).to.eql(3);", + "});" + ], + "type": "text/javascript", + "packages": {} + } + }, + { + "listen": "prerequest", + "script": { + "exec": [ + "" + ], + "type": "text/javascript", + "packages": {} + } + } + ], + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{baseUrl}}/api/v1/jobs/completed?page={{page}}&pageSize={{pageSize}}", + "host": [ + "{{baseUrl}}" + ], + "path": [ + "api", + "v1", + "jobs", + "completed" + ], + "query": [ + { + "key": "page", + "value": "{{page}}" + }, + { + "key": "pageSize", + "value": "{{pageSize}}" + } + ] + }, + "description": "Lists completed jobs with pagination." + }, + "response": [] + }, + { + "name": "Get all successful Jobs", "event": [ { "listen": "test", @@ -1411,7 +1481,7 @@ "method": "GET", "header": [], "url": { - "raw": "{{baseUrl}}/api/v1/jobs/completed?page={{page}}&pageSize={{pageSize}}", + "raw": "{{baseUrl}}/api/v1/jobs/successful?page={{page}}&pageSize={{pageSize}}", "host": [ "{{baseUrl}}" ], @@ -1419,7 +1489,7 @@ "api", "v1", "jobs", - "completed" + "successful" ], "query": [ { @@ -1432,7 +1502,7 @@ } ] }, - "description": "Lists completed jobs with pagination." + "description": "Lists successful jobs with pagination." }, "response": [] }, @@ -1507,7 +1577,7 @@ "response": [] }, { - "name": "List Jobs Expect Fail, Completed and Cancelled", + "name": "List Jobs Expect Fail, Successful and Cancelled", "event": [ { "listen": "test", @@ -1533,12 +1603,12 @@ "});", "", "// Check if there are jobs with \"FAILED\" and \"CANCELED\" status", - "const hasFailed = response.entity.jobs.some(job => job.state === \"FAILED\");", + "const hasFailed = response.entity.jobs.some(job => job.state === \"FAILED_PERMANENTLY\");", "const hasCanceled = response.entity.jobs.some(job => job.state === \"CANCELED\");", - "const hasCompleted = response.entity.jobs.some(job => job.state === \"COMPLETED\");", + "const hasSuccess = response.entity.jobs.some(job => job.state === \"SUCCESS\");", "", - "// Postman test to validate that there are jobs with \"FAILED\" statuses", - "pm.test(\"There are jobs in 'FAILED' state\", function () {", + "// Postman test to validate that there are jobs with \"FAILED_PERMANENTLY\" statuses", + "pm.test(\"There are jobs in 'FAILED_PERMANENTLY' state\", function () {", " pm.expect(hasFailed).to.be.true; ", "});", "", @@ -1547,9 +1617,9 @@ " pm.expect(hasCanceled).to.be.true;", "});", "", - "// Postman test to validate that there are jobs with \"COMPLETED\" statuses", - "pm.test(\"There are jobs in 'COMPLETED' state\", function () { ", - " pm.expect(hasCompleted).to.be.true;", + "// Postman test to validate that there are jobs with \"SUCCESS\" statuses", + "pm.test(\"There are jobs in 'SUCCESS' state\", function () { ", + " pm.expect(hasSuccess).to.be.true;", "});" ], "type": "text/javascript", @@ -1583,7 +1653,7 @@ } ] }, - "description": "List Jobs Expect Fail and Cancelled." + "description": "List Jobs Expect Fail, Successful and Cancelled." }, "response": [] } From cff25f24a6e95170545995dbdb51295c91371d4d Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Tue, 3 Dec 2024 18:05:10 -0600 Subject: [PATCH 4/9] #30367 Simplified SSE handling --- .../rest/api/v1/job/SSEConnectionManager.java | 256 ------------------ .../rest/api/v1/job/SSEMonitorUtil.java | 98 +++---- 2 files changed, 44 insertions(+), 310 deletions(-) delete mode 100644 dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java deleted file mode 100644 index 457d498fa518..000000000000 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEConnectionManager.java +++ /dev/null @@ -1,256 +0,0 @@ -package com.dotcms.rest.api.v1.job; - -import com.dotmarketing.util.Config; -import com.dotmarketing.util.Logger; -import io.vavr.Lazy; -import java.io.IOException; -import java.time.LocalDateTime; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import javax.annotation.PreDestroy; -import javax.enterprise.context.ApplicationScoped; -import org.glassfish.jersey.media.sse.EventOutput; - -/** - * Manages Server-Sent Events (SSE) connections for job monitoring. This class provides - * functionality for tracking, limiting, and cleaning up SSE connections across multiple jobs. - * - *

Key features include: - *

    - *
  • Connection limits per job and system-wide
  • - *
  • Automatic connection timeout and cleanup
  • - *
  • Thread-safe connection management
  • - *
  • Proper resource cleanup on shutdown
  • - *
- * - *

Configuration properties: - *

    - *
  • {@code MAX_SSE_CONNECTIONS_PER_JOB} - Maximum number of concurrent connections per job (default: 5)
  • - *
  • {@code MAX_SSE_TOTAL_CONNECTIONS} - Maximum total concurrent connections across all jobs (default: 50)
  • - *
  • {@code SSE_CONNECTION_TIMEOUT_MINUTES} - Connection timeout in minutes (default: 30)
  • - *
- * - *

Usage example: - *

{@code
- * SSEConnectionManager manager = new SSEConnectionManager();
- *
- * // Check if new connection can be accepted
- * if (manager.canAcceptNewConnection(jobId)) {
- *     // Add new connection
- *     manager.addConnection(jobId, eventOutput);
- * }
- *
- * // Close connections when job completes
- * manager.closeJobConnections(jobId);
- * }
- */ -@ApplicationScoped -public class SSEConnectionManager { - - // Add status tracking - private volatile boolean isShutdown = false; - - // Add per-job connection limit, default to 5 and -1 to disable - private static final Lazy MAX_SSE_CONNECTIONS_PER_JOB = - Lazy.of(() -> Config.getIntProperty("MAX_SSE_CONNECTIONS_PER_JOB", 5)); - - // Add total connection limit, default to 50 and -1 to disable - private static final Lazy MAX_SSE_TOTAL_CONNECTIONS = - Lazy.of(() -> Config.getIntProperty("MAX_SSE_TOTAL_CONNECTIONS", 50)); - - private final ConcurrentMap> jobConnections = - new ConcurrentHashMap<>(); - - /** - * Shuts down the SSE connection manager and cleans up all resources. This method closes all - * active connections. After shutdown, no new connections can be added. - */ - @PreDestroy - public void shutdown() { - isShutdown = true; - closeAllConnections(); - } - - /** - * Checks if a new SSE connection can be accepted for the given job. This method verifies both - * per-job and system-wide connection limits if enabled (not -1). - * - * @param jobId The ID of the job for which to check connection availability - * @return true if a new connection can be accepted, false otherwise - */ - public boolean canAcceptNewConnection(final String jobId) { - - final var maxSseTotalConnections = MAX_SSE_TOTAL_CONNECTIONS.get(); - final var maxSseConnectionsPerJob = MAX_SSE_CONNECTIONS_PER_JOB.get(); - - // Check total connections limit if enabled (not -1) - if (maxSseTotalConnections != -1 && getTotalConnections() >= maxSseTotalConnections) { - return false; - } - - // If per-job limit is disabled (-1), allow connection - if (maxSseConnectionsPerJob == -1) { - return true; - } - - // Check per-job limit - Set connections = jobConnections.get(jobId); - return connections == null || connections.size() < maxSseConnectionsPerJob; - } - - /** - * Adds a new SSE connection for a job. The connection will be automatically closed after the - * configured timeout period if timeout is enabled (not -1). - * - * @param jobId The ID of the job to monitor - * @param eventOutput The EventOutput instance representing the SSE connection - * @return The created SSEConnection instance - * @throws IllegalStateException if the manager is shut down - */ - public SSEConnection addConnection(final String jobId, final EventOutput eventOutput) { - - if (isShutdown) { - throw new IllegalStateException("SSEConnectionManager is shut down"); - } - - SSEConnection connection = new SSEConnection(jobId, eventOutput); - jobConnections.computeIfAbsent(jobId, k -> ConcurrentHashMap.newKeySet()).add(connection); - - return connection; - } - - /** - * Closes a specific SSE connection for a job. If this was the last connection for the job, the - * job entry is removed from tracking. - * - * @param connection The connection to remove - */ - public void closeConnection(final SSEConnection connection) { - - if (connection != null) { - Set connections = jobConnections.get(connection.jobId); - if (connections != null) { - connections.remove(connection); - connection.close(); - - // If this was the last connection for the job, clean up the job entry - if (connections.isEmpty()) { - jobConnections.remove(connection.jobId); - } - } - } - } - - /** - * Gets the total number of active SSE connections across all jobs. - * - * @return The total number of active connections - */ - private int getTotalConnections() { - return jobConnections.values().stream() - .mapToInt(Set::size) - .sum(); - } - - /** - * Closes all active SSE connections and clears connection tracking. - */ - private void closeAllConnections() { - jobConnections.values().forEach(connections -> - connections.forEach(SSEConnection::close) - ); - jobConnections.clear(); - } - - /** - * Closes all SSE connections for a specific job. - * - * @param jobId The ID of the job whose connections should be closed - */ - public void closeAllJobConnections(final String jobId) { - Set connections = jobConnections.remove(jobId); - if (connections != null) { - connections.forEach(SSEConnection::close); - } - } - - /** - * Gets the number of active connections for a specific job. - * - * @param jobId The ID of the job - * @return The number of active connections for the job - */ - public int getConnectionCount(final String jobId) { - Set connections = jobConnections.get(jobId); - return connections != null ? connections.size() : 0; - } - - /** - * Gets information about the current state of SSE connections. - * - * @return A map containing connection statistics: - * - totalConnections: Total number of active connections - * - activeJobs: Number of jobs with active connections - */ - public Map getConnectionInfo() { - return Map.of( - "totalConnections", getTotalConnections(), - "activeJobs", jobConnections.size() - ); - } - - /** - * Represents a single SSE connection for a job. Each connection tracks its creation time and - * handles its own cleanup. - */ - public static class SSEConnection { - - private final String jobId; - private final EventOutput eventOutput; - private final LocalDateTime createdAt; - - /** - * Creates a new SSE connection. - * - * @param jobId The ID of the job this connection is monitoring - * @param eventOutput The EventOutput instance representing the SSE connection - */ - public SSEConnection(String jobId, EventOutput eventOutput) { - this.jobId = jobId; - this.eventOutput = eventOutput; - this.createdAt = LocalDateTime.now(); - } - - /** - * Closes this SSE connection. - */ - public void close() { - try { - eventOutput.close(); - } catch (IOException e) { - Logger.error(SSEConnection.class, "Error closing SSE connection", e); - } - } - - /** - * Gets the ID of the job this connection is monitoring. - * - * @return The job ID - */ - public String getJobId() { - return jobId; - } - - /** - * Gets the EventOutput instance representing the SSE connection. - * - * @return The EventOutput instance - */ - public EventOutput getEventOutput() { - return eventOutput; - } - } - -} diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java index c9c3d6ec3c26..a213355d0e3f 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/SSEMonitorUtil.java @@ -2,11 +2,9 @@ import static javax.ws.rs.core.Response.Status.BAD_REQUEST; import static javax.ws.rs.core.Response.Status.NOT_FOUND; -import static javax.ws.rs.core.Response.Status.TOO_MANY_REQUESTS; import com.dotcms.jobs.business.api.events.JobWatcher; import com.dotcms.jobs.business.job.Job; -import com.dotcms.rest.api.v1.job.SSEConnectionManager.SSEConnection; import com.dotmarketing.exception.DotRuntimeException; import com.dotmarketing.util.Logger; import java.io.IOException; @@ -40,29 +38,21 @@ * EventOutput eventOutput = sseMonitorUtil.monitorJob(jobId); * } * - *

This class is thread-safe and can handle multiple concurrent monitoring sessions. - * It automatically manages resource cleanup through the {@link SSEConnectionManager} and - * ensures proper handling of connection lifecycles. - * - * @see SSEConnectionManager * @see JobQueueHelper */ @ApplicationScoped public class SSEMonitorUtil { private final JobQueueHelper helper; - private final SSEConnectionManager sseConnectionManager; public SSEMonitorUtil() { // Default constructor required for CDI this.helper = null; - this.sseConnectionManager = null; } @Inject - public SSEMonitorUtil(JobQueueHelper helper, SSEConnectionManager sseConnectionManager) { + public SSEMonitorUtil(JobQueueHelper helper) { this.helper = helper; - this.sseConnectionManager = sseConnectionManager; } /** @@ -74,26 +64,19 @@ public SSEMonitorUtil(JobQueueHelper helper, SSEConnectionManager sseConnectionM @SuppressWarnings("java:S1854") // jobWatcher assignment is needed for cleanup in catch blocks public EventOutput monitorJob(final String jobId) { - JobWatcher jobWatcher = null; - final EventOutput eventOutput = new EventOutput(); - final var connection = sseConnectionManager.addConnection(jobId, eventOutput); + final var eventOutput = new EventOutput(); + final var resources = new MonitorResources(jobId, eventOutput, helper); - try (final var resources = - new MonitorResources(jobId, connection, helper, sseConnectionManager)) { + try { Job job = helper.getJobForSSE(jobId); if (job == null) { - sendError(SSEError.JOB_NOT_FOUND, connection); + sendErrorAndClose(SSEError.JOB_NOT_FOUND, resources); return eventOutput; } if (helper.isNotWatchable(job)) { - sendError(SSEError.JOB_NOT_WATCHABLE, connection); - return eventOutput; - } - - if (!sseConnectionManager.canAcceptNewConnection(jobId)) { - sendError(SSEError.TOO_MANY_CONNECTIONS, connection); + sendErrorAndClose(SSEError.JOB_NOT_WATCHABLE, resources); return eventOutput; } @@ -108,15 +91,19 @@ public EventOutput monitorJob(final String jobId) { .build(); eventOutput.write(event); - // If job is in a completed state, close all connections as no further + // If job is in a completed state, close the connection as no further // updates will be available if (helper.isTerminalState(watched.state())) { - sseConnectionManager.closeAllJobConnections(jobId); + resources.close(); } } catch (IOException e) { final var errorMessage = "Error writing SSE event"; Logger.error(this, errorMessage, e); + + // Make sure to close the connection + resources.close(); + // Re-throw the IOException to be caught by the outer catch block in the // RealTimeJobMonitor that will clean up the job watcher throw new DotRuntimeException(errorMessage, e); @@ -125,13 +112,17 @@ public EventOutput monitorJob(final String jobId) { }; // Start watching the job - jobWatcher = helper.watchJob(job.id(), jobWatcherConsumer); + final var jobWatcher = helper.watchJob(job.id(), jobWatcherConsumer); resources.jobWatcher(jobWatcher); return eventOutput; } catch (Exception e) { final var errorMessage = "Error setting up job monitor"; Logger.error(this, errorMessage, e); + + // Make sure to close the connection and remove the job watcher + resources.close(); + throw new DotRuntimeException(errorMessage, e); } } @@ -140,17 +131,18 @@ public EventOutput monitorJob(final String jobId) { * Send an error event and close the connection * * @param error The error to send - * @param connection The SSE connection to close + * @param resources The current monitoring resources * @throws IOException If there is an error writing the event */ - private void sendError(final SSEError error, final SSEConnection connection) + private void sendErrorAndClose(final SSEError error, MonitorResources resources) throws IOException { OutboundEvent event = new OutboundEvent.Builder() .mediaType(MediaType.TEXT_HTML_TYPE) .name(error.getName()) .data(String.class, String.valueOf(error.getCode())) .build(); - connection.getEventOutput().write(event); + resources.eventOutput().write(event); + resources.close(); } /** @@ -161,8 +153,7 @@ private void sendError(final SSEError error, final SSEConnection connection) private enum SSEError { JOB_NOT_FOUND("job-not-found", NOT_FOUND.getStatusCode()), - JOB_NOT_WATCHABLE("job-not-watchable", BAD_REQUEST.getStatusCode()), - TOO_MANY_CONNECTIONS("too-many-connections", TOO_MANY_REQUESTS.getStatusCode()); + JOB_NOT_WATCHABLE("job-not-watchable", BAD_REQUEST.getStatusCode()); private final String name; private final int code; @@ -182,39 +173,26 @@ public int getCode() { } /** - * A resource management class that handles cleanup of SSE monitoring resources. This class - * implements AutoCloseable to ensure proper cleanup of both SSE connections and job watchers - * through try-with-resources blocks. - * - *

This class manages: - *

    - *
  • SSE connection lifecycle
  • - *
  • Job watcher registration and cleanup
  • - *
  • Automatic resource cleanup when monitoring ends or errors occur
  • - *
+ * A resource management class that handles cleanup of SSE monitoring resources. */ - private static class MonitorResources implements AutoCloseable { + private static class MonitorResources { - private final SSEConnection connection; + private final EventOutput eventOutput; private JobWatcher jobWatcher; private final String jobId; private final JobQueueHelper helper; - private final SSEConnectionManager sseConnectionManager; /** * Creates a new MonitorResources instance to manage SSE monitoring resources. * * @param jobId The ID of the job being monitored - * @param connection The SSE connection to manage + * @param eventOutput The SSE connection for job updates * @param helper Helper for job queue operations - * @param sseConnectionManager Manager for SSE connections */ - MonitorResources(String jobId, SSEConnection connection, JobQueueHelper helper, - SSEConnectionManager sseConnectionManager) { + MonitorResources(String jobId, EventOutput eventOutput, JobQueueHelper helper) { this.jobId = jobId; - this.connection = connection; + this.eventOutput = eventOutput; this.helper = helper; - this.sseConnectionManager = sseConnectionManager; } /** @@ -226,14 +204,26 @@ void jobWatcher(JobWatcher watcher) { this.jobWatcher = watcher; } + /** + * Gets the SSE connection for this monitoring session. + * + * @return The SSE connection + */ + EventOutput eventOutput() { + return eventOutput; + } + /** * Closes and cleans up all monitoring resources. This includes closing the SSE connection * and removing the job watcher if one exists. */ - @Override - public void close() { - if (connection != null) { - sseConnectionManager.closeConnection(connection); + void close() { + if (eventOutput != null) { + try { + eventOutput.close(); + } catch (IOException e) { + Logger.error(MonitorResources.class, "Error closing event output", e); + } } if (jobWatcher != null) { helper.removeWatcher(jobId, jobWatcher); From fdb6134bdd03bbe68aba1402f6db066477533d33 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Tue, 3 Dec 2024 20:59:01 -0600 Subject: [PATCH 5/9] #30367 Improvements on the cancel process --- .../business/api/JobQueueManagerAPIImpl.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 7f191b17d333..4518d25ac972 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -404,7 +404,7 @@ public void cancelJob(final String jobId) throws DotDataException { final Job job = getJob(jobId); - if (job.state() == JobState.PENDING || job.state() == JobState.RUNNING) { + if (isInCancelableState(job)) { handleJobCancelRequest(job); } else { Logger.warn(this, "Job " + job.id() + " is not in a cancellable state. " @@ -427,9 +427,7 @@ void onCancelRequestJob(final JobCancelRequestEvent event) { try { final var job = getJob(event.getJob().id()); - if (job.state() == JobState.PENDING - || job.state() == JobState.RUNNING - || job.state() == JobState.CANCEL_REQUESTED) { + if (isInCancelableState(job) || job.state() == JobState.CANCEL_REQUESTED) { final Optional instance = getInstance(job.id()); if (instance.isPresent()) { @@ -1136,6 +1134,18 @@ private int incrementAndResetEmptyQueueCount( return emptyQueueCount; } + /** + * Verifies if a job state is in a cancellable state. + * + * @param job The job to check. + * @return {@code true} if the job is in a cancellable state, {@code false} otherwise. + */ + private boolean isInCancelableState(final Job job) { + return job.state() == JobState.PENDING || job.state() == JobState.RUNNING + || job.state() == JobState.FAILED || job.state() == JobState.ABANDONED + || job.state() == JobState.ABANDONED_PERMANENTLY; + } + /** * A wrapper class that makes ScheduledExecutorService auto-closeable. This class is designed to * be used with try-with-resources to ensure that the ScheduledExecutorService is properly shut From 81e1c04207271397164602ae063c140a5d217cc8 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Wed, 4 Dec 2024 09:22:01 -0600 Subject: [PATCH 6/9] #30367 Applying code review feedback. --- .../com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java | 3 ++- .../dotcms/jobs/business/api/events/RealTimeJobMonitor.java | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 4518d25ac972..0d88f4053127 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -361,7 +361,8 @@ public JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDat @CloseDBIfOpened @Override - public JobPaginatedResult getSuccessfulJobs(int page, int pageSize) throws DotDataException { + public JobPaginatedResult getSuccessfulJobs(final int page, final int pageSize) + throws DotDataException { try { return jobQueue.getSuccessfulJobs(page, pageSize); } catch (JobQueueDataException e) { diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java index e07d1b4fbcb3..065b6e361060 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/events/RealTimeJobMonitor.java @@ -428,10 +428,9 @@ public static Predicate hasFailed() { } /** - * Creates a predicate that matches successful jobs. The predicate matches any job in the - * SUCCESS state. + * Creates a predicate that matches any completed job. * - * @return A predicate for matching successful jobs + * @return A predicate for matching completed jobs */ public static Predicate isCompleted() { return job -> (job.state() == JobState.SUCCESS From 432e64d3f6d7155318c390792fdf9d636b484146 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Wed, 4 Dec 2024 09:50:20 -0600 Subject: [PATCH 7/9] #30367 Applying code review feedback. --- .../business/api/JobQueueManagerAPIImpl.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java index 0d88f4053127..e789c70bd919 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/api/JobQueueManagerAPIImpl.java @@ -320,7 +320,7 @@ public Job getJob(final String jobId) throws DotDataException { @CloseDBIfOpened @Override - public JobPaginatedResult getActiveJobs(String queueName, int page, int pageSize) + public JobPaginatedResult getActiveJobs(final String queueName, final int page, final int pageSize) throws DotDataException { try { return jobQueue.getActiveJobs(queueName, page, pageSize); @@ -341,7 +341,8 @@ public JobPaginatedResult getJobs(final int page, final int pageSize) throws Dot @CloseDBIfOpened @Override - public JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataException { + public JobPaginatedResult getActiveJobs(final int page, final int pageSize) + throws DotDataException { try { return jobQueue.getActiveJobs(page, pageSize); } catch (JobQueueDataException e) { @@ -351,7 +352,8 @@ public JobPaginatedResult getActiveJobs(int page, int pageSize) throws DotDataEx @CloseDBIfOpened @Override - public JobPaginatedResult getCompletedJobs(int page, int pageSize) throws DotDataException { + public JobPaginatedResult getCompletedJobs(final int page, final int pageSize) + throws DotDataException { try { return jobQueue.getCompletedJobs(page, pageSize); } catch (JobQueueDataException e) { @@ -372,7 +374,8 @@ public JobPaginatedResult getSuccessfulJobs(final int page, final int pageSize) @CloseDBIfOpened @Override - public JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotDataException { + public JobPaginatedResult getCanceledJobs(final int page, final int pageSize) + throws DotDataException { try { return jobQueue.getCanceledJobs(page, pageSize); } catch (JobQueueDataException e) { @@ -382,7 +385,8 @@ public JobPaginatedResult getCanceledJobs(int page, int pageSize) throws DotData @CloseDBIfOpened @Override - public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataException { + public JobPaginatedResult getFailedJobs(final int page, final int pageSize) + throws DotDataException { try { return jobQueue.getFailedJobs(page, pageSize); } catch (JobQueueDataException e) { @@ -392,7 +396,7 @@ public JobPaginatedResult getFailedJobs(int page, int pageSize) throws DotDataEx @CloseDBIfOpened @Override - public JobPaginatedResult getAbandonedJobs(int page, int pageSize) throws DotDataException { + public JobPaginatedResult getAbandonedJobs(final int page, final int pageSize) throws DotDataException { try { return jobQueue.getAbandonedJobs(page, pageSize); } catch (JobQueueDataException e) { From a18547819b0a60319c2bf810c15b4c6ae72e427d Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Wed, 4 Dec 2024 10:01:49 -0600 Subject: [PATCH 8/9] #30367 Applying code review feedback. --- .../rest/api/v1/job/JobQueueResource.java | 71 +++++++++++-------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index d650845ca9b5..a4421cd53400 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -14,6 +14,7 @@ import java.util.Set; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import javax.ws.rs.BeanParam; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -54,14 +55,14 @@ public JobQueueResource(WebResource webResource, JobQueueHelper helper, @Consumes(MediaType.MULTIPART_FORM_DATA) @Produces(MediaType.APPLICATION_JSON) public Response createJob( - @Context HttpServletRequest request, + @Context HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("queueName") String queueName, @BeanParam JobParams form) throws JsonProcessingException, DotDataException { final var initDataObject = new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); @@ -79,14 +80,14 @@ public Response createJob( @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public Response createJob( - @Context HttpServletRequest request, + @Context HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("queueName") String queueName, Map parameters) throws DotDataException { final var initDataObject = new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); @@ -102,11 +103,12 @@ public Response createJob( @GET @Path("/queues") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView> getQueues(@Context HttpServletRequest request) { + public ResponseEntityView> getQueues( + @Context HttpServletRequest request, @Context final HttpServletResponse response) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); return new ResponseEntityView<>(helper.getQueueNames()); @@ -115,14 +117,14 @@ public ResponseEntityView> getQueues(@Context HttpServletRequest req @GET @Path("/{jobId}/status") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView getJobStatus(@Context HttpServletRequest request, - @PathParam("jobId") String jobId) - throws DotDataException { + public ResponseEntityView getJobStatus( + @Context HttpServletRequest request, @Context final HttpServletResponse response, + @PathParam("jobId") String jobId) throws DotDataException { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); @@ -134,12 +136,13 @@ public ResponseEntityView getJobStatus(@Context HttpServletRequest request, @Path("/{jobId}/cancel") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.WILDCARD) - public ResponseEntityView cancelJob(@Context HttpServletRequest request, + public ResponseEntityView cancelJob( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("jobId") String jobId) throws DotDataException { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); helper.cancelJob(jobId); @@ -149,14 +152,15 @@ public ResponseEntityView cancelJob(@Context HttpServletRequest request, @GET @Path("/{queueName}/active") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView activeJobs(@Context HttpServletRequest request, + public ResponseEntityView activeJobs( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("queueName") String queueName, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getActiveJobs(queueName, page, pageSize); @@ -165,13 +169,14 @@ public ResponseEntityView activeJobs(@Context HttpServletReq @GET @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView listJobs(@Context HttpServletRequest request, + public ResponseEntityView listJobs( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getJobs(page, pageSize); @@ -181,13 +186,14 @@ public ResponseEntityView listJobs(@Context HttpServletReque @GET @Path("/active") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView activeJobs(@Context HttpServletRequest request, + public ResponseEntityView activeJobs( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getActiveJobs(page, pageSize); @@ -197,13 +203,14 @@ public ResponseEntityView activeJobs(@Context HttpServletReq @GET @Path("/completed") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView completedJobs(@Context HttpServletRequest request, + public ResponseEntityView completedJobs( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getCompletedJobs(page, pageSize); @@ -214,13 +221,13 @@ public ResponseEntityView completedJobs(@Context HttpServlet @Path("/successful") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView successfulJobs( - @Context HttpServletRequest request, + @Context HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getSuccessfulJobs(page, pageSize); @@ -230,13 +237,14 @@ public ResponseEntityView successfulJobs( @GET @Path("/canceled") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView canceledJobs(@Context HttpServletRequest request, + public ResponseEntityView canceledJobs( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getCanceledJobs(page, pageSize); @@ -246,13 +254,14 @@ public ResponseEntityView canceledJobs(@Context HttpServletR @GET @Path("/failed") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView failedJobs(@Context HttpServletRequest request, + public ResponseEntityView failedJobs( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getFailedJobs(page, pageSize); @@ -262,13 +271,14 @@ public ResponseEntityView failedJobs(@Context HttpServletReq @GET @Path("/abandoned") @Produces(MediaType.APPLICATION_JSON) - public ResponseEntityView abandonedJobs(@Context HttpServletRequest request, + public ResponseEntityView abandonedJobs( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); final JobPaginatedResult result = helper.getAbandonedJobs(page, pageSize); @@ -279,13 +289,14 @@ public ResponseEntityView abandonedJobs(@Context HttpServlet @Path("/{jobId}/monitor") @Produces(SseFeature.SERVER_SENT_EVENTS) @SuppressWarnings("java:S1854") // jobWatcher assignment is needed for cleanup in catch blocks - public EventOutput monitorJob(@Context HttpServletRequest request, + public EventOutput monitorJob( + @Context HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("jobId") String jobId) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) - .requestAndResponse(request, null) + .requestAndResponse(request, response) .rejectWhenNoUser(true) .init(); From 6aa22b31382d76ba991d71521936a13b969dd5d1 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Wed, 4 Dec 2024 10:08:52 -0600 Subject: [PATCH 9/9] #30367 Applying code review feedback. --- .../rest/api/v1/job/JobQueueResource.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java index a4421cd53400..884e64900325 100644 --- a/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java +++ b/dotCMS/src/main/java/com/dotcms/rest/api/v1/job/JobQueueResource.java @@ -55,7 +55,7 @@ public JobQueueResource(WebResource webResource, JobQueueHelper helper, @Consumes(MediaType.MULTIPART_FORM_DATA) @Produces(MediaType.APPLICATION_JSON) public Response createJob( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("queueName") String queueName, @BeanParam JobParams form) throws JsonProcessingException, DotDataException { @@ -80,7 +80,7 @@ public Response createJob( @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public Response createJob( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("queueName") String queueName, Map parameters) throws DotDataException { @@ -104,7 +104,7 @@ public Response createJob( @Path("/queues") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView> getQueues( - @Context HttpServletRequest request, @Context final HttpServletResponse response) { + @Context final HttpServletRequest request, @Context final HttpServletResponse response) { new InitBuilder(webResource) .requiredBackendUser(true) .requiredFrontendUser(false) @@ -118,7 +118,7 @@ public ResponseEntityView> getQueues( @Path("/{jobId}/status") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView getJobStatus( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("jobId") String jobId) throws DotDataException { new InitBuilder(webResource) @@ -137,7 +137,7 @@ public ResponseEntityView getJobStatus( @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.WILDCARD) public ResponseEntityView cancelJob( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("jobId") String jobId) throws DotDataException { new InitBuilder(webResource) .requiredBackendUser(true) @@ -153,7 +153,7 @@ public ResponseEntityView cancelJob( @Path("/{queueName}/active") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView activeJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("queueName") String queueName, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { @@ -170,7 +170,7 @@ public ResponseEntityView activeJobs( @GET @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView listJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) @@ -187,7 +187,7 @@ public ResponseEntityView listJobs( @Path("/active") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView activeJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) @@ -204,7 +204,7 @@ public ResponseEntityView activeJobs( @Path("/completed") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView completedJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) @@ -221,7 +221,7 @@ public ResponseEntityView completedJobs( @Path("/successful") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView successfulJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) @@ -238,7 +238,7 @@ public ResponseEntityView successfulJobs( @Path("/canceled") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView canceledJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) @@ -255,7 +255,7 @@ public ResponseEntityView canceledJobs( @Path("/failed") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView failedJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) @@ -272,7 +272,7 @@ public ResponseEntityView failedJobs( @Path("/abandoned") @Produces(MediaType.APPLICATION_JSON) public ResponseEntityView abandonedJobs( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @QueryParam("page") @DefaultValue("1") int page, @QueryParam("pageSize") @DefaultValue("20") int pageSize) { new InitBuilder(webResource) @@ -290,7 +290,7 @@ public ResponseEntityView abandonedJobs( @Produces(SseFeature.SERVER_SENT_EVENTS) @SuppressWarnings("java:S1854") // jobWatcher assignment is needed for cleanup in catch blocks public EventOutput monitorJob( - @Context HttpServletRequest request, @Context final HttpServletResponse response, + @Context final HttpServletRequest request, @Context final HttpServletResponse response, @PathParam("jobId") String jobId) { new InitBuilder(webResource)