Skip to content

Commit

Permalink
#29478 Improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Aug 26, 2024
1 parent 452bf8b commit 6ef7fb6
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 10 deletions.
102 changes: 96 additions & 6 deletions dotCMS/src/main/java/com/dotcms/jobs/business/JobQueueManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* }
* }</pre>
*/
public class JobQueueManager {
public class JobQueueManager implements AutoCloseable {

private final CircuitBreaker circuitBreaker;
private final JobQueue jobQueue;
Expand All @@ -86,33 +86,60 @@ public JobQueueManager(JobQueue jobQueue, int threadPoolSize) {
this.processors = new ConcurrentHashMap<>();
this.jobWatchers = new ConcurrentHashMap<>();
this.retryStrategies = new ConcurrentHashMap<>();
this.defaultRetryStrategy = new ExponentialBackoffRetryStrategy(1000, 60000, 2.0, 5);
this.circuitBreaker = new CircuitBreaker(5, 60000); // 5 failures within 1 minute
this.defaultRetryStrategy = new ExponentialBackoffRetryStrategy(
1000, 60000, 2.0, 5
);
this.circuitBreaker = new CircuitBreaker(
5, 60000
); // 5 failures within 1 minute
}

/**
* Starts the job queue manager, initializing the thread pool for job processing.
*/
public void start() {

Logger.info(
this, "Starting JobQueueManager with " + threadPoolSize + " threads."
);

executorService = Executors.newFixedThreadPool(threadPoolSize);
for (int i = 0; i < threadPoolSize; i++) {
executorService.submit(this::processJobs);
}

Logger.info(this, "JobQueueManager has been successfully started.");
}

/**
* Stops the job queue manager, shutting down the thread pool.
* Closes the JobQueueManager, stopping all job processing and releasing resources.
* <p>
* This method should be called when the JobQueueManager is no longer needed.
* <p>
* Once closed, the manager cannot be restarted.
*
* @throws Exception if an error occurs while closing the manager
*/
public void stop() {
@Override
public void close() throws Exception {

Logger.info(this, "Closing JobQueueManager and stopping all job processing.");
executorService.shutdown();

try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
Logger.error(this, "ExecutorService did not terminate");
}
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
Logger.error(this, "Interrupted while waiting for jobs to complete", e);
}

Logger.info(this, "JobQueueManager has been successfully closed.");
}

/**
Expand Down Expand Up @@ -172,11 +199,15 @@ public List<Job> jobs(final int page, final int pageSize) {
* @throws JobCancellationException if the job cannot be cancelled.
*/
public void cancelJob(final String jobId) {

Job job = jobQueue.job(jobId);
if (job != null) {

final var processor = processors.get(job.queueName());
if (processor != null && processor.canCancel(job)) {

try {

processor.cancel(job);
Job cancelledJob = job.withState(JobState.CANCELLED);
jobQueue.updateJobStatus(cancelledJob);
Expand Down Expand Up @@ -364,8 +395,9 @@ private void processJob(final Job job) {
jobQueue.updateJobStatus(runningJob);
notifyJobWatchers(runningJob);

try (final ScheduledExecutorService progressUpdater = Executors.newSingleThreadScheduledExecutor()) {
try (final CloseableScheduledExecutor closeableExecutor = new CloseableScheduledExecutor()) {

ScheduledExecutorService progressUpdater = closeableExecutor.getExecutorService();
final ProgressTracker progressTracker = processor.progressTracker(runningJob);

// Start a separate thread to periodically update and persist progress
Expand Down Expand Up @@ -495,4 +527,62 @@ public String getCircuitBreakerStatus() {
: "N/A");
}

/**
* A wrapper class that makes ScheduledExecutorService auto-closeable. This class is designed to
* be used with try-with-resources to ensure that the ScheduledExecutorService is properly shut
* down when it's no longer needed.
*
* <p>Usage example:</p>
* <pre>
* try (CloseableScheduledExecutor executor = new CloseableScheduledExecutor()) {
* ScheduledExecutorService service = executor.getExecutorService();
* // Use the service...
* } // The executor service is automatically shut down here
* </pre>
*/
private static class CloseableScheduledExecutor implements AutoCloseable {

private final ScheduledExecutorService executorService;

/**
* Constructs a new CloseableScheduledExecutor. This creates a new single-threaded
* ScheduledExecutorService.
*/
public CloseableScheduledExecutor() {
this.executorService = Executors.newSingleThreadScheduledExecutor();
}

/**
* Gets the wrapped ScheduledExecutorService.
*
* @return the ScheduledExecutorService
*/
public ScheduledExecutorService getExecutorService() {
return executorService;
}

/**
* Closes the ScheduledExecutorService. This method attempts to perform an orderly shutdown,
* waiting up to 60 seconds for submitted tasks to complete. If the shutdown doesn't
* complete within this time, it forces an immediate shutdown.
*
* <p>This method is automatically called when used with try-with-resources.</p>
*
* @throws RuntimeException if the current thread is interrupted while waiting for the
* executor service to terminate
*/
@Override
public void close() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException ie) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* Implements an exponential backoff retry strategy. This strategy increases the delay between retry
* attempts exponentially, and adds a small random jitter to prevent synchronized retries in
* attempts exponentially and adds a small random jitter to prevent synchronized retries in
* distributed systems.
*/
public class ExponentialBackoffRetryStrategy implements RetryStrategy {
Expand Down Expand Up @@ -78,11 +78,14 @@ public boolean shouldRetry(final Job job, final Throwable exception) {
*/
@Override
public long nextRetryDelay(final Job job) {

long delay = (long) (initialDelay * Math.pow(backoffFactor, job.retryCount()));
delay = Math.min(delay, maxDelay);
// Add jitter using bounded nextLong()
delay += random.nextLong((long) (delay * 0.1));
return delay;

// Add jitter (0-10% of delay)
long jitter = (long) (delay * 0.1 * random.nextDouble());

return delay + jitter;
}

/**
Expand Down

0 comments on commit 6ef7fb6

Please sign in to comment.