From 6ef7fb6cf121b8da65131c44f7aba6996e35fad5 Mon Sep 17 00:00:00 2001 From: Jonathan Gamba Date: Mon, 26 Aug 2024 17:28:41 -0600 Subject: [PATCH] #29478 Improvements --- .../dotcms/jobs/business/JobQueueManager.java | 102 ++++++++++++++++-- .../ExponentialBackoffRetryStrategy.java | 11 +- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/JobQueueManager.java b/dotCMS/src/main/java/com/dotcms/jobs/business/JobQueueManager.java index 0b345100a91d..1de52bfcbecf 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/JobQueueManager.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/JobQueueManager.java @@ -63,7 +63,7 @@ * } * } */ -public class JobQueueManager { +public class JobQueueManager implements AutoCloseable { private final CircuitBreaker circuitBreaker; private final JobQueue jobQueue; @@ -86,33 +86,60 @@ public JobQueueManager(JobQueue jobQueue, int threadPoolSize) { this.processors = new ConcurrentHashMap<>(); this.jobWatchers = new ConcurrentHashMap<>(); this.retryStrategies = new ConcurrentHashMap<>(); - this.defaultRetryStrategy = new ExponentialBackoffRetryStrategy(1000, 60000, 2.0, 5); - this.circuitBreaker = new CircuitBreaker(5, 60000); // 5 failures within 1 minute + this.defaultRetryStrategy = new ExponentialBackoffRetryStrategy( + 1000, 60000, 2.0, 5 + ); + this.circuitBreaker = new CircuitBreaker( + 5, 60000 + ); // 5 failures within 1 minute } /** * Starts the job queue manager, initializing the thread pool for job processing. */ public void start() { + + Logger.info( + this, "Starting JobQueueManager with " + threadPoolSize + " threads." + ); + executorService = Executors.newFixedThreadPool(threadPoolSize); for (int i = 0; i < threadPoolSize; i++) { executorService.submit(this::processJobs); } + + Logger.info(this, "JobQueueManager has been successfully started."); } /** - * Stops the job queue manager, shutting down the thread pool. + * Closes the JobQueueManager, stopping all job processing and releasing resources. + *

+ * This method should be called when the JobQueueManager is no longer needed. + *

+ * Once closed, the manager cannot be restarted. + * + * @throws Exception if an error occurs while closing the manager */ - public void stop() { + @Override + public void close() throws Exception { + + Logger.info(this, "Closing JobQueueManager and stopping all job processing."); executorService.shutdown(); + try { if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { executorService.shutdownNow(); + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + Logger.error(this, "ExecutorService did not terminate"); + } } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); + Logger.error(this, "Interrupted while waiting for jobs to complete", e); } + + Logger.info(this, "JobQueueManager has been successfully closed."); } /** @@ -172,11 +199,15 @@ public List jobs(final int page, final int pageSize) { * @throws JobCancellationException if the job cannot be cancelled. */ public void cancelJob(final String jobId) { + Job job = jobQueue.job(jobId); if (job != null) { + final var processor = processors.get(job.queueName()); if (processor != null && processor.canCancel(job)) { + try { + processor.cancel(job); Job cancelledJob = job.withState(JobState.CANCELLED); jobQueue.updateJobStatus(cancelledJob); @@ -364,8 +395,9 @@ private void processJob(final Job job) { jobQueue.updateJobStatus(runningJob); notifyJobWatchers(runningJob); - try (final ScheduledExecutorService progressUpdater = Executors.newSingleThreadScheduledExecutor()) { + try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) { + ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService(); final ProgressTracker progressTracker = processor.progressTracker(runningJob); // Start a separate thread to periodically update and persist progress @@ -495,4 +527,62 @@ public String getCircuitBreakerStatus() { : "N/A"); } + /** + * A wrapper class that makes ScheduledExecutorService auto-closeable. This class is designed to + * be used with try-with-resources to ensure that the ScheduledExecutorService is properly shut + * down when it's no longer needed. + * + *

Usage example:

+ *
+     * try (CloseableScheduledExecutor executor = new CloseableScheduledExecutor()) {
+     *     ScheduledExecutorService service = executor.getExecutorService();
+     *     // Use the service...
+     * } // The executor service is automatically shut down here
+     * 
+ */ + private static class CloseableScheduledExecutor implements AutoCloseable { + + private final ScheduledExecutorService executorService; + + /** + * Constructs a new CloseableScheduledExecutor. This creates a new single-threaded + * ScheduledExecutorService. + */ + public CloseableScheduledExecutor() { + this.executorService = Executors.newSingleThreadScheduledExecutor(); + } + + /** + * Gets the wrapped ScheduledExecutorService. + * + * @return the ScheduledExecutorService + */ + public ScheduledExecutorService getExecutorService() { + return executorService; + } + + /** + * Closes the ScheduledExecutorService. This method attempts to perform an orderly shutdown, + * waiting up to 60 seconds for submitted tasks to complete. If the shutdown doesn't + * complete within this time, it forces an immediate shutdown. + * + *

This method is automatically called when used with try-with-resources.

+ * + * @throws RuntimeException if the current thread is interrupted while waiting for the + * executor service to terminate + */ + @Override + public void close() { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException ie) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + } diff --git a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java b/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java index 8603ecdcf26d..58bbacfbe07b 100644 --- a/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java +++ b/dotCMS/src/main/java/com/dotcms/jobs/business/error/ExponentialBackoffRetryStrategy.java @@ -8,7 +8,7 @@ /** * Implements an exponential backoff retry strategy. This strategy increases the delay between retry - * attempts exponentially, and adds a small random jitter to prevent synchronized retries in + * attempts exponentially and adds a small random jitter to prevent synchronized retries in * distributed systems. */ public class ExponentialBackoffRetryStrategy implements RetryStrategy { @@ -78,11 +78,14 @@ public boolean shouldRetry(final Job job, final Throwable exception) { */ @Override public long nextRetryDelay(final Job job) { + long delay = (long) (initialDelay * Math.pow(backoffFactor, job.retryCount())); delay = Math.min(delay, maxDelay); - // Add jitter using bounded nextLong() - delay += random.nextLong((long) (delay * 0.1)); - return delay; + + // Add jitter (0-10% of delay) + long jitter = (long) (delay * 0.1 * random.nextDouble()); + + return delay + jitter; } /**