Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
fabrizzio-dotCMS committed Oct 8, 2024
1 parent 4550407 commit dc178fb
Show file tree
Hide file tree
Showing 9 changed files with 10,614 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
import org.jboss.jandex.Index;
import org.jboss.jandex.IndexReader;

/**
* Scans the classpath for classes that implement the JobProcessor interface.
* This class uses Jandex to scan the classpath for classes that implement the JobProcessor interface.
*/
@ApplicationScoped
public class JobProcessorScanner {


/**
* Discovers all classes that implement the JobProcessor interface.
* @return A list of classes that implement the JobProcessor interface.
*/
public List<Class<? extends JobProcessor>> discoverJobProcessors() {
List<Class<? extends JobProcessor>> jobProcessors = new ArrayList<>();
try {
Expand All @@ -42,6 +49,11 @@ public List<Class<? extends JobProcessor>> discoverJobProcessors() {
return jobProcessors;
}

/**
* Reads the Jandex index file.
* @return The Jandex index.
* @throws IOException If the Jandex index file cannot be read.
*/
private Index getJandexIndex() throws IOException {
InputStream input = getClass().getClassLoader().getResourceAsStream("META-INF/jandex.idx");
if (input == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.dotcms.jobs.business.api.events.RealTimeJobMonitor;
import com.dotcms.jobs.business.error.CircuitBreaker;
import com.dotcms.jobs.business.error.ErrorDetail;
import com.dotcms.jobs.business.error.JobProcessorInstantiationException;
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.error.RetryStrategy;
import com.dotcms.jobs.business.job.Job;
Expand Down Expand Up @@ -399,7 +398,6 @@ public RetryStrategy getDefaultRetryStrategy() {
*/
@CloseDBIfOpened
private void pollJobUpdates() {

try {
final var watchedJobIds = realTimeJobMonitor.getWatchedJobIds();
if (watchedJobIds.isEmpty()) {
Expand All @@ -412,9 +410,8 @@ private void pollJobUpdates() {
);
realTimeJobMonitor.updateWatchers(updatedJobs);
lastPollJobUpdateTime = currentPollTime;
} catch (JobQueueDataException e) {
Logger.error(this, "Error polling job updates: " + e.getMessage(), e);
throw new DotRuntimeException("Error polling job updates", e);
} catch (Exception e) {
Logger.error(this, "Error polling job updates: " + e.getMessage(), e);//
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.dotcms.jobs.business.processor.impl;

import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotmarketing.exception.DotRuntimeException;
import java.util.Map;

@Queue("fail")
public class FailJob implements JobProcessor {

@Override
public void process(Job job) {

throw new DotRuntimeException( "Failed job !");
}

@Override
public Map<String, Object> getResultMetadata(Job job) {
return Map.of();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.processor.Cancellable;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.ProgressTracker;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.rest.api.v1.temp.DotTempFile;
import com.dotcms.rest.api.v1.temp.TempFileAPI;
import com.dotmarketing.business.APILocator;
import com.dotmarketing.exception.DotRuntimeException;
import com.dotmarketing.util.Logger;
import io.vavr.control.Try;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.jetbrains.annotations.Nullable;

/**
* This class reads a large file and prints the content to the log.
* It is here for the sole purpose of demonstrating the job queue system.
*/
@Queue("FileReader")
@Queue("demo")
public class LargeFileReader implements JobProcessor, Cancellable {

private boolean working = true;
Expand All @@ -28,58 +32,90 @@ public class LargeFileReader implements JobProcessor, Cancellable {
public void process(Job job) {
// Retrieve job parameters
working = true;

Logger.info(this.getClass(), "Processing job: " + job.id());
Map<String, Object> params = job.parameters();

final Optional<Integer> linesParam = linesParam(params);
if (linesParam.isEmpty()) {
Logger.error(this.getClass(),
"Unable to retrieve the number of lines to read. Quitting the job.");
return;
throw new DotRuntimeException("Unable to retrieve the temporary file.");
}

Optional<DotTempFile> tempFile = tempFile(params);
if (tempFile.isEmpty()) {
Logger.error(this.getClass(), "Unable to retrieve the temporary. Quitting the job.");
return;
throw new DotRuntimeException("Unable to retrieve the temporary file.");
}

final int nLines = linesParam.get();
final DotTempFile dotTempFile = tempFile.get();

doReadLargeFile(dotTempFile, nLines);
doReadLargeFile(dotTempFile, nLines, job);
}

/**
* Process the job
* @param dotTempFile temporary file
* @param nLines number of lines to read and print
*/
private void doReadLargeFile(DotTempFile dotTempFile, int nLines) {
private void doReadLargeFile(DotTempFile dotTempFile, int nLines, final Job job) {
final Long totalCount = countLines(dotTempFile);
if (totalCount == null) {
return;
}
Logger.info(this.getClass(), "Total lines in the file: " + totalCount);
final Optional<ProgressTracker> progressTracker = job.progressTracker();
try (BufferedReader reader = new BufferedReader(new FileReader(dotTempFile.file))) {
String line;
int lineCount = 0;
int totalLines = 0;

Logger.info(this.getClass(),
"Starting to read the file: " + dotTempFile.file.getName());

while (working && (line = reader.readLine()) != null) {
lineCount++;
totalLines++;

// Print the line when the counter reaches nLines
if (lineCount == nLines) {
Logger.info(this.getClass(), "Line " + totalLines + ": " + line);
lineCount = 0; // Reset the counter
String line;
int lineCount = 0;
int readCount = 0;

Logger.info(this.getClass(),
"Starting to read the file: " + dotTempFile.file.getName());

while (working && (line = reader.readLine()) != null) {
lineCount++;
readCount++;

// Print the line when the counter reaches nLines
if (lineCount == nLines) {
lineCount = 0; // Reset the counter
Logger.info(this.getClass(), line);
delay();
}
final float progressPercentage = ((float) readCount / totalCount) * 100;
progressTracker.ifPresent(tracker -> tracker.updateProgress(progressPercentage));
}

Logger.info(this.getClass(), "Reading completed. Total lines read: " + readCount);
} catch (Exception e) {
Logger.error(this.getClass(),
"Unexpected error during processing: " + e.getMessage());
}
}

Logger.info(this.getClass(), "Reading completed. Total lines read: " + totalLines);
private @Nullable Long countLines(DotTempFile dotTempFile) {
long totalCount = 0;
try (BufferedReader reader = new BufferedReader(new FileReader(dotTempFile.file))) {
totalCount = reader.lines().count();
if (totalCount == 0) {
Logger.info(this.getClass(), "No lines in the file: " + dotTempFile.file.getName());
return null;
}
} catch (Exception e) {
Logger.error(this.getClass(), "Unexpected error during processing: " + e.getMessage());
return null;
}
return totalCount;
}

private void delay() {
Try.of(()->{
Thread.sleep(1000);
return null;
}).onFailure(e->Logger.error(this.getClass(), "Error during delay", e));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;

/**
* This class represents the parameters for a job.
* This bean encapsulates the expected parameters for a job.
* that would be a file and a json object.
* The json object is expected to be a simple key value pair.
*/
public class JobParams {

@FormDataParam("file")
Expand Down Expand Up @@ -44,7 +50,7 @@ public void setJsonParams(String jsonParams) {
public Map getParams() throws JsonProcessingException {
if (null == params) {
if (null == jsonParams){
throw new IllegalArgumentException("Job Params must be passed in json format.");
throw new IllegalArgumentException("Job Params must be passed as a json object in the params field.");
}
params = new ObjectMapper().readValue(jsonParams, Map.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.dotcms.jobs.business.error.JobProcessorNotFoundException;
import com.dotcms.jobs.business.job.Job;
import com.dotcms.jobs.business.job.JobPaginatedResult;
import com.dotcms.jobs.business.job.JobState;
import com.dotcms.jobs.business.processor.JobProcessor;
import com.dotcms.jobs.business.processor.Queue;
import com.dotcms.jobs.business.queue.error.JobQueueDataException;
Expand All @@ -17,6 +18,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -154,9 +156,7 @@ void cancelJob(String jobId) throws DotDataException {
* @param jobId The ID of the job
* @param watcher The watcher
*/
void watchJob(String jobId, Consumer<Job> watcher) throws DotDataException {
//Validate the job exists
jobQueueManagerAPI.getJob(jobId);
void watchJob(String jobId, Consumer<Job> watcher) {
// if it does then watch it
jobQueueManagerAPI.watchJob(jobId, watcher);
}
Expand Down Expand Up @@ -237,4 +237,30 @@ private void handleUploadIfPresent(final JobParams form, Map<String, Object> par
}
}
}

/**
* Check if a job is NOT watchable
* @param job The job
* @return true if the job is watchable, false otherwise
*/
public boolean isNotWatchable(Job job){
return JobState.PENDING != job.state() && JobState.RUNNING != job.state()
&& JobState.CANCELLING != job.state();
}

/**
* Get the status info for a job
* @param job The job
* @return The status info
*/
public Map<String, Object> getJobStatusInfo(Job job) {
final DateTimeFormatter isoFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
return Map.of(
"startedAt", job.startedAt().map(isoFormatter::format).orElse("N/A"),
"finishedAt", job.completedAt().map(isoFormatter::format).orElse("N/A"),
"state", job.state(),
"progress", job.progress()
);
}

}
Loading

0 comments on commit dc178fb

Please sign in to comment.