Skip to content

Commit

Permalink
#29478 Improvements and adding more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jgambarios committed Aug 29, 2024
1 parent 485d513 commit 6072717
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 88 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.dotcms.jobs.business.api;

import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.JobCancellationException;
import com.dotcms.jobs.business.error.ProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryStrategy;
Expand All @@ -14,7 +15,7 @@
* Defines the contract for interacting with the job queue system. This interface provides methods
* for managing jobs, processors, and the overall state of the job queue.
*/
public interface JobQueueAPI extends AutoCloseable {
public interface JobQueueManagerAPI extends AutoCloseable {

/**
* Starts the job queue manager, initializing the thread pool for job processing.
Expand Down Expand Up @@ -108,16 +109,10 @@ String createJob(String queueName, Map<String, Object> parameters)
void setRetryStrategy(String queueName, RetryStrategy retryStrategy);

/**
* Manually resets the CircuitBreaker. This should be called with caution, typically after
* addressing the underlying issues causing failures.
*/
void resetCircuitBreaker();

/**
* Provides information about the current state of the CircuitBreaker.
* Retrieves the CircuitBreaker instance.
*
* @return A string representation of the CircuitBreaker's current status
* @return The CircuitBreaker instance
*/
String getCircuitBreakerStatus();
CircuitBreaker getCircuitBreaker();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.dotmarketing.util.Logger;
import com.google.common.annotations.VisibleForTesting;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -38,36 +37,36 @@
* JobQueue jobQueue = new PostgresJobQueue();
*
* // Create and start the job queue manager
* JobQueueAPIImpl jobQueueAPI = new JobQueueAPIImpl(jobQueue, 5); // 5 threads
* JobQueueManagerAPIImpl jobQueueManagerAPI = new JobQueueManagerAPIImpl(jobQueue, 5); // 5 threads
*
* //(Optional) Set up a retry strategy for content import jobs
* RetryStrategy contentImportRetryStrategy = new ExponentialBackoffRetryStrategy(5000, 300000, 2.0, 3);
* contentImportRetryStrategy.addRetryableException(IOException.class);
* jobQueueAPI.setRetryStrategy("contentImport", contentImportRetryStrategy);
* jobQueueManagerAPI.setRetryStrategy("contentImport", contentImportRetryStrategy);
*
* // Register job processors
* jobQueueAPI.registerProcessor("contentImport", new ContentImportJobProcessor());
* jobQueueManagerAPI.registerProcessor("contentImport", new ContentImportJobProcessor());
*
* // Start the job queue manager
* jobQueueAPI.start();
* jobQueueManagerAPI.start();
*
* // Create a content import job (dummy example)
* Map<String, Object> jobParameters = new HashMap<>();
* jobParameters.put("filePath", "/path/to/import/file.csv");
* jobParameters.put("contentType", "Article");
* String jobId = jobQueueAPI.createJob("contentImport", jobParameters);
* String jobId = jobQueueManagerAPI.createJob("contentImport", jobParameters);
*
* // Optionally, watch the job progress
* jobQueueAPI.watchJob(jobId, job -> {
* jobQueueManagerAPI.watchJob(jobId, job -> {
* System.out.println("Job " + job.id() + " progress: " + job.progress() * 100 + "%");
* });
*
* // When shutting down the application
* jobQueueAPI.close();
* jobQueueManagerAPI.close();
* }
* }</pre>
*/
public class JobQueueAPIImpl implements JobQueueAPI {
public class JobQueueManagerAPIImpl implements JobQueueManagerAPI {

private final AtomicBoolean isStarted = new AtomicBoolean(false);
private CountDownLatch startLatch;
Expand All @@ -88,22 +87,22 @@ public class JobQueueAPIImpl implements JobQueueAPI {
);

/**
* Constructs a new JobQueueAPIImpl with the default job queue implementation and the default
* Constructs a new JobQueueManagerAPIImpl with the default job queue implementation and the default
* number of threads.
*/
public JobQueueAPIImpl() {
public JobQueueManagerAPIImpl() {
// TODO: Use a job queue implementation
this(null, DEFAULT_THREAD_POOL_SIZE);
}

/**
* Constructs a new JobQueueAPIImpl.
* Constructs a new JobQueueManagerAPIImpl.
*
* @param jobQueue The JobQueue implementation to use.
* @param threadPoolSize The number of threads to use for job processing.
*/
@VisibleForTesting
public JobQueueAPIImpl(JobQueue jobQueue, int threadPoolSize) {
public JobQueueManagerAPIImpl(JobQueue jobQueue, int threadPoolSize) {
this.jobQueue = jobQueue;
this.threadPoolSize = threadPoolSize;
this.processors = new ConcurrentHashMap<>();
Expand All @@ -117,6 +116,26 @@ public JobQueueAPIImpl(JobQueue jobQueue, int threadPoolSize) {
); // 5 failures within 1 minute
}

/**
* Constructs a new JobQueueManagerAPIImpl.
*
* @param jobQueue The JobQueue implementation to use.
* @param threadPoolSize The number of threads to use for job processing.
* @param circuitBreaker The CircuitBreaker implementation to use.
*/
@VisibleForTesting
public JobQueueManagerAPIImpl(JobQueue jobQueue, int threadPoolSize, CircuitBreaker circuitBreaker) {
this.jobQueue = jobQueue;
this.threadPoolSize = threadPoolSize;
this.processors = new ConcurrentHashMap<>();
this.jobWatchers = new ConcurrentHashMap<>();
this.retryStrategies = new ConcurrentHashMap<>();
this.defaultRetryStrategy = new ExponentialBackoffRetryStrategy(
1000, 60000, 2.0, 5
);
this.circuitBreaker = circuitBreaker;
}

@Override
public boolean isStarted() {
return isStarted.get() && !isClosed;
Expand Down Expand Up @@ -203,7 +222,7 @@ public String createJob(final String queueName, final Map<String, Object> parame

if (!processors.containsKey(queueName)) {
final var error = new ProcessorNotFoundException(queueName);
Logger.error(JobQueueAPIImpl.class, error);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}

Expand Down Expand Up @@ -237,17 +256,17 @@ public void cancelJob(final String jobId) {
notifyJobWatchers(cancelledJob);
} catch (Exception e) {
final var error = new JobCancellationException(jobId, e.getMessage());
Logger.error(JobQueueAPIImpl.class, error);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}
} else {
final var error = new JobCancellationException(jobId, "Job cannot be cancelled");
Logger.error(JobQueueAPIImpl.class, error);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}
} else {
final var error = new JobCancellationException(jobId, "Job not found");
Logger.error(JobQueueAPIImpl.class, error);
Logger.error(JobQueueManagerAPIImpl.class, error);
throw error;
}
}
Expand All @@ -265,19 +284,9 @@ public void setRetryStrategy(final String queueName, final RetryStrategy retrySt
}

@Override
public void resetCircuitBreaker() {
Logger.info(this, "Manually resetting CircuitBreaker");
circuitBreaker.reset();
}

@Override
public String getCircuitBreakerStatus() {
return String.format("CircuitBreaker - Open: %b, Failure Count: %d, Last Failure: %s",
circuitBreaker.isOpen(),
circuitBreaker.getFailureCount(),
circuitBreaker.getLastFailureTime() > 0
? new Date(circuitBreaker.getLastFailureTime()).toString()
: "N/A");
@VisibleForTesting
public CircuitBreaker getCircuitBreaker() {
return circuitBreaker;
}

/**
Expand Down Expand Up @@ -345,7 +354,7 @@ private void processJobs() {
Thread.currentThread().interrupt();
} catch (Exception e) {
Logger.error(this, "Unexpected error in job processing loop: " + e.getMessage(), e);
circuitBreaker.recordFailure();
getCircuitBreaker().recordFailure();
}
}
}
Expand All @@ -357,7 +366,7 @@ private void processJobs() {
*/
private boolean isCircuitBreakerOpen() {

if (!circuitBreaker.allowRequest()) {
if (!getCircuitBreaker().allowRequest()) {

Logger.warn(this, "Circuit breaker is open. Pausing job processing for a while.");

Expand Down Expand Up @@ -493,9 +502,13 @@ private void processJob(final Job job) {
* @param errorDetail The details of the error that caused the failure.
*/
private void handleJobFailure(final Job job, final ErrorDetail errorDetail) {

final Job failedJob = job.markAsFailed(errorDetail);
jobQueue.updateJobStatus(failedJob);
notifyJobWatchers(failedJob);

// Record the failure in the circuit breaker
getCircuitBreaker().recordFailure();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.dotcms.jobs.business.error;

import com.dotmarketing.util.Logger;

/**
* Implements the Circuit Breaker pattern to prevent repeated failures in a system. It helps to
* avoid cascading failures by temporarily disabling operations that are likely to fail.
Expand Down Expand Up @@ -63,6 +65,9 @@ public synchronized void recordFailure() {
* Manually resets the circuit breaker to a closed state.
*/
public synchronized void reset() {

Logger.info(this, "Manually resetting CircuitBreaker");

isOpen = false;
failureCount = 0;
lastFailureTime = 0;
Expand Down
4 changes: 2 additions & 2 deletions dotcms-integration/src/test/java/com/dotcms/MainSuite2b.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import com.dotcms.integritycheckers.ContentPageIntegrityCheckerTest;
import com.dotcms.integritycheckers.HostIntegrityCheckerTest;
import com.dotcms.integritycheckers.IntegrityUtilTest;
import com.dotcms.jobs.business.api.JobQueueAPITest;
import com.dotcms.jobs.business.api.JobQueueManagerAPITest;
import com.dotcms.junit.MainBaseSuite;
import com.dotcms.mail.MailAPIImplTest;
import com.dotcms.publisher.bundle.business.BundleAPITest;
Expand Down Expand Up @@ -313,7 +313,7 @@
EmbeddingContentListenerTest.class,
Task240606AddVariableColumnToWorkflowTest.class,
OpenAIContentPromptActionletTest.class,
JobQueueAPITest.class
JobQueueManagerAPITest.class
})

public class MainSuite2b {
Expand Down
Loading

0 comments on commit 6072717

Please sign in to comment.