From e1d6551c384753bdc9f79c03eea8635164fa8767 Mon Sep 17 00:00:00 2001 From: Walter Medvedeo Date: Sat, 20 Apr 2024 19:05:40 +0200 Subject: [PATCH] kie-kogito-apps-2026: Improvements on the Job Service start-up and periodic jobs loading procedure (#2038) --- jobs-service/jobs-service-common/pom.xml | 5 - .../kogito/jobs/service/job/DelegateJob.java | 53 ++-- .../JobServiceLeaderHealthCheck.java | 2 +- .../repository/ReactiveJobRepository.java | 45 ++- .../impl/BaseReactiveJobRepository.java | 11 - .../impl/InMemoryJobRepository.java | 80 ++++-- .../marshaller/JobDetailsMarshaller.java | 11 + .../scheduler/BaseTimerJobScheduler.java | 209 +++++++++----- .../scheduler/JobSchedulerManager.java | 121 ++++++-- .../JobSchedulerManagerErrorEvent.java | 39 +++ .../JobSchedulerManagerHealthCheck.java | 49 ++++ .../scheduler/ReactiveJobScheduler.java | 2 + .../impl/TimerDelegateJobScheduler.java | 33 ++- .../impl/VertxTimerServiceScheduler.java | 14 +- .../service/stream/AbstractJobStreams.java | 21 +- .../service/stream/JobEventPublisher.java | 5 - .../jobs/service/stream/JobStreams.java | 11 + .../stream/JobStreamsEventPublisher.java | 125 ++------- .../jobs/service/utils/ErrorHandling.java | 4 +- .../META-INF/microprofile-config.properties | 3 +- .../impl/BaseJobRepositoryTest.java | 81 ++++-- .../marshaller/JobDetailsMarshallerTest.java | 12 +- .../scheduler/BaseTimerJobSchedulerTest.java | 12 +- .../scheduler/JobSchedulerManagerTest.java | 17 +- .../impl/TimerDelegateJobSchedulerTest.java | 3 +- .../impl/VertxTimerServiceSchedulerTest.java | 21 +- .../stream/AbstractJobStreamsTest.java | 8 +- .../infinispan/InfinispanJobRepository.java | 79 ++++-- .../marshaller/JobDetailsMarshaller.java | 4 + .../src/main/resources/META-INF/library.proto | 5 + .../kogito/jobs/service/model/JobDetails.java | 14 +- .../jobs/service/model/JobDetailsBuilder.java | 11 +- .../kogito/jobs/service/utils/ModelUtil.java | 25 ++ .../messaging/http/stream/HttpJobStreams.java | 12 +- .../jobs-service-messaging-kafka/pom.xml | 4 + .../kafka/stream/KafkaJobStreams.java | 10 +- .../mongodb/MongoDBJobRepository.java | 93 ++++-- .../MongoDBJobRepositoryExecutionTest.java | 265 +++++++++++------- .../postgresql/PostgreSqlJobRepository.java | 74 +++-- .../jobs-service/V3.0.3__Add_Created_Col.sql | 9 + .../PostgreSqlJobRepositoryExecutionTest.java | 47 ++-- ...arkusJobsServiceEmbeddedRuntimeConfig.java | 19 ++ .../stream/EventPublisherJobStreams.java | 75 ++--- .../src/main/resources/application.properties | 1 + .../stream/EventPublisherJobStreamsTest.java | 13 +- .../jobs/embedded/JobInVMEventPublisher.java | 48 ---- 46 files changed, 1144 insertions(+), 661 deletions(-) create mode 100644 jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerErrorEvent.java create mode 100644 jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerHealthCheck.java create mode 100644 jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java create mode 100644 jobs-service/jobs-service-postgresql-common/src/main/resources/db/jobs-service/V3.0.3__Add_Created_Col.sql diff --git a/jobs-service/jobs-service-common/pom.xml b/jobs-service/jobs-service-common/pom.xml index 7823ce6e52..cdfc82a41a 100644 --- a/jobs-service/jobs-service-common/pom.xml +++ b/jobs-service/jobs-service-common/pom.xml @@ -120,11 +120,6 @@ quarkus-smallrye-reactive-messaging - - io.smallrye.reactive - smallrye-reactive-messaging-in-memory - - org.kie.kogito diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java index 20796efa24..d76f163c6d 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/job/DelegateJob.java @@ -18,19 +18,26 @@ */ package org.kie.kogito.jobs.service.job; -import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import org.kie.kogito.jobs.service.exception.JobExecutionException; +import org.kie.kogito.jobs.service.executor.JobExecutor; import org.kie.kogito.jobs.service.executor.JobExecutorResolver; +import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobDetailsContext; import org.kie.kogito.jobs.service.model.JobExecutionResponse; -import org.kie.kogito.jobs.service.stream.JobEventPublisher; +import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler; +import org.kie.kogito.jobs.service.utils.ErrorHandling; import org.kie.kogito.timer.Job; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; +import static java.util.Objects.requireNonNull; +import static mutiny.zero.flow.adapters.AdaptersToFlow.publisher; + /** * The job that delegates the execution to the {@link JobExecutorResolver} with the {@link JobDetailsContext}. */ @@ -40,35 +47,45 @@ public class DelegateJob implements Job { private final JobExecutorResolver jobExecutorResolver; - private final JobEventPublisher jobEventPublisher; + ReactiveJobScheduler scheduler; - public DelegateJob(JobExecutorResolver executorResolver, JobEventPublisher jobEventPublisher) { + public DelegateJob(JobExecutorResolver executorResolver, ReactiveJobScheduler scheduler) { this.jobExecutorResolver = executorResolver; - this.jobEventPublisher = jobEventPublisher; + this.scheduler = scheduler; } @Override public void execute(JobDetailsContext ctx) { - LOGGER.info("Executing for context {}", ctx.getJobDetails()); - Optional.ofNullable(ctx) - .map(JobDetailsContext::getJobDetails) - .map(jobExecutorResolver::get) - .map(executor -> executor.execute(ctx.getJobDetails())) - .orElseThrow(() -> new IllegalStateException("JobDetails cannot be null from context " + ctx)) - .onItem().invoke(jobEventPublisher::publishJobSuccess) - .onFailure(JobExecutionException.class).invoke(ex -> { + final AtomicReference executionResponse = new AtomicReference<>(); + final JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () -> String.format("JobDetails cannot be null for context: %s", ctx)); + final JobExecutor executor = requireNonNull(jobExecutorResolver.get(jobDetails), () -> String.format("No JobExecutor was found for jobDetails: %s", jobDetails)); + LOGGER.info("Executing job for context: {}", jobDetails); + executor.execute(jobDetails) + .flatMap(response -> { + executionResponse.set(response); + return handleJobExecutionSuccess(response); + }) + .onFailure(JobExecutionException.class).recoverWithUni(ex -> { String jobId = ((JobExecutionException) ex).getJobId(); - LOGGER.error("Error executing job {}", jobId, ex); - jobEventPublisher.publishJobError(JobExecutionResponse.builder() + executionResponse.set(JobExecutionResponse.builder() .message(ex.getMessage()) .now() .jobId(jobId) .build()); + return handleJobExecutionError(executionResponse.get()); }) - //to avoid blocking IO pool from eventloop since execute() is blocking currently - //might consider execute() method to be non-blocking/async + // avoid blocking IO pool from the event-loop since alternative EmbeddedJobExecutor is blocking. .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) - .subscribe().with(response -> LOGGER.info("Executed successfully with response {}", response)); + .subscribe().with(ignore -> LOGGER.info("Job execution response processing has finished: {}", executionResponse.get())); + } + + public Uni handleJobExecutionSuccess(JobExecutionResponse response) { + LOGGER.debug("Job execution success response received: {}", response); + return Uni.createFrom().publisher(publisher(ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess, response).buildRs())); + } + public Uni handleJobExecutionError(JobExecutionResponse response) { + LOGGER.error("Job execution error response received: {}", response); + return Uni.createFrom().publisher(publisher(ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError, response).buildRs())); } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderHealthCheck.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderHealthCheck.java index 13af8fdcd8..57b7856675 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderHealthCheck.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceLeaderHealthCheck.java @@ -32,7 +32,7 @@ @ApplicationScoped public class JobServiceLeaderHealthCheck implements HealthCheck { - private AtomicBoolean enabled = new AtomicBoolean(false); + private final AtomicBoolean enabled = new AtomicBoolean(false); protected void onMessagingStatusChange(@Observes MessagingChangeEvent event) { this.enabled.set(event.isEnabled()); diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java index b96c021240..0e85226114 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/ReactiveJobRepository.java @@ -27,6 +27,42 @@ public interface ReactiveJobRepository { + enum SortTermField { + FIRE_TIME, + CREATED, + ID + } + + class SortTerm { + private final SortTermField field; + private final boolean asc; + + private SortTerm(SortTermField field, boolean asc) { + this.field = field; + this.asc = asc; + } + + public SortTermField getField() { + return field; + } + + public boolean isAsc() { + return asc; + } + + public static SortTerm byFireTime(boolean asc) { + return new SortTerm(SortTermField.FIRE_TIME, asc); + } + + public static SortTerm byCreated(boolean asc) { + return new SortTerm(SortTermField.CREATED, asc); + } + + public static SortTerm byId(boolean asc) { + return new SortTerm(SortTermField.ID, asc); + } + } + CompletionStage save(JobDetails job); CompletionStage merge(String id, JobDetails job); @@ -39,9 +75,8 @@ public interface ReactiveJobRepository { CompletionStage delete(JobDetails job); - PublisherBuilder findByStatus(JobStatus... status); - - PublisherBuilder findAll(); - - PublisherBuilder findByStatusBetweenDatesOrderByPriority(ZonedDateTime from, ZonedDateTime to, JobStatus... status); + PublisherBuilder findByStatusBetweenDates(ZonedDateTime fromFireTime, + ZonedDateTime toFireTime, + JobStatus[] status, + SortTerm[] orderBy); } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java index 2271f46dc5..2827d1538f 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/BaseReactiveJobRepository.java @@ -18,17 +18,13 @@ */ package org.kie.kogito.jobs.service.repository.impl; -import java.util.Arrays; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; -import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.kie.kogito.jobs.service.model.JobDetails; -import org.kie.kogito.jobs.service.model.JobStatus; import org.kie.kogito.jobs.service.repository.ReactiveJobRepository; import org.kie.kogito.jobs.service.stream.JobEventPublisher; @@ -52,13 +48,6 @@ public CompletionStage runAsync(Supplier function) { return future; } - @Override - public PublisherBuilder findByStatus(JobStatus... status) { - return findAll() - .filter(job -> Objects.nonNull(job.getStatus())) - .filter(job -> Arrays.stream(status).anyMatch(job.getStatus()::equals)); - } - @Override public CompletionStage save(JobDetails job) { return doSave(job) diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java index 5b61e1f403..fa11409c64 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/impl/InMemoryJobRepository.java @@ -20,12 +20,11 @@ import java.time.ZonedDateTime; import java.util.Comparator; +import java.util.Date; +import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; @@ -42,6 +41,8 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; +import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithCreatedAndLastUpdate; + @DefaultBean @ApplicationScoped public class InMemoryJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository { @@ -60,8 +61,10 @@ public InMemoryJobRepository(Vertx vertx, JobEventPublisher jobEventPublisher) { @Override public CompletionStage doSave(JobDetails job) { return runAsync(() -> { - jobMap.put(job.getId(), job); - return job; + boolean isNew = !jobMap.containsKey(job.getId()); + JobDetails timeStampedJob = jobWithCreatedAndLastUpdate(isNew, job); + jobMap.put(timeStampedJob.getId(), timeStampedJob); + return timeStampedJob; }); } @@ -81,20 +84,61 @@ public CompletionStage delete(String key) { } @Override - public PublisherBuilder findAll() { - return ReactiveStreams.fromIterable(jobMap.values()); + public PublisherBuilder findByStatusBetweenDates(ZonedDateTime fromFireTime, + ZonedDateTime toFireTime, + JobStatus[] status, + SortTerm[] orderBy) { + Stream unsortedResult = jobMap.values() + .stream() + .filter(j -> matchStatusFilter(j, status)) + .filter(j -> matchFireTimeFilter(j, fromFireTime, toFireTime)); + List result = orderBy == null || orderBy.length == 0 ? unsortedResult.toList() : unsortedResult.sorted(orderByComparator(orderBy)).toList(); + return ReactiveStreams.fromIterable(result); } - @Override - public PublisherBuilder findByStatusBetweenDatesOrderByPriority(ZonedDateTime from, ZonedDateTime to, JobStatus... status) { - return ReactiveStreams.fromIterable( - jobMap.values() - .stream() - .filter(j -> Optional.ofNullable(j.getStatus()) - .filter(s -> Objects.nonNull(status)) - .map(s -> Stream.of(status).anyMatch(s::equals)).orElse(true)) - .filter(j -> DateUtil.fromDate(j.getTrigger().hasNextFireTime()).isAfter(from) && DateUtil.fromDate(j.getTrigger().hasNextFireTime()).isBefore(to)) - .sorted(Comparator.comparing(JobDetails::getPriority).reversed()) - .collect(Collectors.toList())); + private static boolean matchStatusFilter(JobDetails job, JobStatus[] status) { + if (status == null || status.length == 0) { + return true; + } + return Stream.of(status).anyMatch(s -> job.getStatus() == s); } + + private static boolean matchFireTimeFilter(JobDetails job, ZonedDateTime fromFireTime, ZonedDateTime toFireTime) { + ZonedDateTime fireTime = DateUtil.fromDate(job.getTrigger().hasNextFireTime()); + return (fireTime.isEqual(fromFireTime) || fireTime.isAfter(fromFireTime)) && + (fireTime.isEqual(toFireTime) || fireTime.isBefore(toFireTime)); + } + + private static Comparator orderByComparator(SortTerm[] orderBy) { + Comparator comparator = createOrderByFieldComparator(orderBy[0]); + for (int i = 1; i < orderBy.length; i++) { + comparator = comparator.thenComparing(createOrderByFieldComparator(orderBy[i])); + } + return comparator; + } + + private static Comparator createOrderByFieldComparator(SortTerm field) { + Comparator comparator; + switch (field.getField()) { + case FIRE_TIME: + comparator = Comparator.comparingLong(jobDetails -> { + Date nextFireTime = jobDetails.getTrigger().hasNextFireTime(); + return nextFireTime != null ? nextFireTime.getTime() : Long.MIN_VALUE; + }); + break; + case CREATED: + comparator = Comparator.comparingLong(jobDetails -> { + ZonedDateTime created = jobDetails.getCreated(); + return created != null ? created.toInstant().toEpochMilli() : Long.MIN_VALUE; + }); + break; + case ID: + comparator = Comparator.comparing(JobDetails::getId); + break; + default: + throw new IllegalArgumentException("No comparator is defined for field: " + field.getField()); + } + return field.isAsc() ? comparator : comparator.reversed(); + } + } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshaller.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshaller.java index 6772a50918..ea4a82a418 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshaller.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshaller.java @@ -82,6 +82,7 @@ private static class JobDetailsAccessor { private Map trigger; private Long executionTimeout; private String executionTimeoutUnit; + private Date created; public JobDetailsAccessor() { } @@ -99,6 +100,7 @@ public JobDetailsAccessor(JobDetails jobDetails, RecipientMarshaller recipientMa this.trigger = Optional.ofNullable(jobDetails.getTrigger()).map(t -> triggerMarshaller.marshall(t).getMap()).orElse(null); this.executionTimeout = jobDetails.getExecutionTimeout(); this.executionTimeoutUnit = Optional.ofNullable(jobDetails.getExecutionTimeoutUnit()).map(Enum::name).orElse(null); + this.created = Optional.ofNullable(jobDetails.getCreated()).map(u -> Date.from(u.toInstant())).orElse(null); } public JobDetails to(RecipientMarshaller recipientMarshaller, TriggerMarshaller triggerMarshaller) { @@ -115,6 +117,7 @@ public JobDetails to(RecipientMarshaller recipientMarshaller, TriggerMarshaller .trigger(Optional.ofNullable(this.trigger).map(t -> triggerMarshaller.unmarshall(new JsonObject(t))).orElse(null)) .executionTimeout(this.executionTimeout) .executionTimeoutUnit(Optional.ofNullable(this.executionTimeoutUnit).map(ChronoUnit::valueOf).orElse(null)) + .created(Optional.ofNullable(this.created).map(t -> ZonedDateTime.ofInstant(t.toInstant(), DEFAULT_ZONE)).orElse(null)) .build(); } @@ -213,5 +216,13 @@ public String getExecutionTimeoutUnit() { public void setExecutionTimeoutUnit(String executionTimeoutUnit) { this.executionTimeoutUnit = executionTimeoutUnit; } + + public Date getCreated() { + return created; + } + + public void setCreated(Date created) { + this.created = created; + } } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java index aa2b39a824..ac080cdb82 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobScheduler.java @@ -21,6 +21,8 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -29,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; -import org.apache.commons.lang3.tuple.Pair; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.kie.kogito.jobs.service.exception.InvalidScheduleTimeException; @@ -50,6 +51,8 @@ import io.smallrye.mutiny.Uni; import static mutiny.zero.flow.adapters.AdaptersToFlow.publisher; +import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithStatus; +import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithStatusAndHandle; /** * Base reactive Job Scheduler that performs the fundamental operations and let to the concrete classes to @@ -67,7 +70,13 @@ public abstract class BaseTimerJobScheduler implements ReactiveJobScheduler { * Flag to allow and force a job with expirationTime in the past to be executed immediately. If false an * exception will be thrown. */ - Optional forceExecuteExpiredJobs; + boolean forceExecuteExpiredJobs; + + /** + * Flag to allow that jobs that might have overdue during an eventual service shutdown should be fired at the + * next service start. + */ + boolean forceExecuteExpiredJobsOnServiceStart; /** * The current chunk size in minutes the scheduler handles, it is used to keep a limit number of jobs scheduled @@ -77,42 +86,84 @@ public abstract class BaseTimerJobScheduler implements ReactiveJobScheduler { private ReactiveJobRepository jobRepository; - private final Map schedulerControl; + private final Map schedulerControl; + + protected static class SchedulerControlRecord { + private final String jobId; + private final long handleId; + private final ZonedDateTime scheduledTime; + + public SchedulerControlRecord(String jobId, long handleId, ZonedDateTime scheduledTime) { + this.jobId = jobId; + this.handleId = handleId; + this.scheduledTime = scheduledTime; + } + + public String getJobId() { + return jobId; + } + + public long getHandleId() { + return handleId; + } + + public ZonedDateTime getScheduledTime() { + return scheduledTime; + } + } protected BaseTimerJobScheduler() { - this(null, 0, 0, 0, null); + this(null, 0, 0, 0, true, true); } protected BaseTimerJobScheduler(ReactiveJobRepository jobRepository, long backoffRetryMillis, long maxIntervalLimitToRetryMillis, long schedulerChunkInMinutes, - Boolean forceExecuteExpiredJobs) { + boolean forceExecuteExpiredJobs, + boolean forceExecuteExpiredJobsOnServiceStart) { this.jobRepository = jobRepository; this.backoffRetryMillis = backoffRetryMillis; this.maxIntervalLimitToRetryMillis = maxIntervalLimitToRetryMillis; this.schedulerControl = new ConcurrentHashMap<>(); this.schedulerChunkInMinutes = schedulerChunkInMinutes; - this.forceExecuteExpiredJobs = Optional.ofNullable(forceExecuteExpiredJobs); + this.forceExecuteExpiredJobs = forceExecuteExpiredJobs; + this.forceExecuteExpiredJobsOnServiceStart = forceExecuteExpiredJobsOnServiceStart; } + /** + * Executed from the API to reflect client invocations. + */ @Override public Publisher schedule(JobDetails job) { - LOGGER.debug("Scheduling {}", job); + LOGGER.debug("Scheduling job: {}", job); return ReactiveStreams - //check if the job is already scheduled and persisted .fromCompletionStage(jobRepository.exists(job.getId())) .flatMap(exists -> Boolean.TRUE.equals(exists) - ? handleExistingJob(job).map(existingJob -> Pair.of(exists, existingJob)) - : ReactiveStreams.of(Pair.of(exists, job))) - .flatMap(pair -> isOnCurrentSchedulerChunk(job) - //in case the job is on the current bulk, proceed with scheduling process - ? doJobScheduling(job, pair.getLeft()) - //in case the job is not on the current bulk, just save it to be scheduled later + ? handleExistingJob(job) + : ReactiveStreams.of(job)) + .flatMap(handled -> isOnCurrentSchedulerChunk(job) + // in case the job is on the current bulk, proceed with scheduling process. + ? doJobScheduling(job) + // in case the job is not on the current bulk, just save it to be scheduled later. : ReactiveStreams.fromCompletionStage(jobRepository.save(jobWithStatus(job, JobStatus.SCHEDULED)))) .buildRs(); } + /** + * Internal use, executed by the periodic loader only. Jobs processed by this method belongs to the current chunk. + */ + @Override + public Publisher internalSchedule(JobDetails job, boolean onServiceStart) { + LOGGER.debug("Internal Scheduling, onServiceStart: {}, job: {}", onServiceStart, job); + return ReactiveStreams + .fromCompletionStage(jobRepository.exists(job.getId())) + .flatMap(exists -> Boolean.TRUE.equals(exists) + ? handleInternalSchedule(job, onServiceStart) + : handleInternalScheduleDeletedJob(job)) + .buildRs(); + } + @Override public PublisherBuilder reschedule(String id, Trigger trigger) { return ReactiveStreams.fromCompletionStageNullable(jobRepository.merge(id, JobDetails.builder().trigger(trigger).build())) @@ -121,21 +172,10 @@ public PublisherBuilder reschedule(String id, Trigger trigger) { .flatMapRsPublisher(j -> j); } - private JobDetails jobWithStatus(JobDetails job, JobStatus status) { - return JobDetails.builder().of(job).status(status).build(); - } - - private JobDetails jobWithStatusAndHandle(JobDetails job, JobStatus status, ManageableJobHandle handle) { - return JobDetails.builder().of(job).status(status).scheduledId(String.valueOf(handle.getId())).build(); - } - /** * Performs the given job scheduling process on the scheduler, after all the validations already made. - * - * @param job to be scheduled - * @return */ - private PublisherBuilder doJobScheduling(JobDetails job, boolean exists) { + private PublisherBuilder doJobScheduling(JobDetails job) { return ReactiveStreams.of(job) //calculate the delay (when the job should be executed) .map(current -> job.getTrigger().hasNextFireTime()) @@ -144,62 +184,88 @@ private PublisherBuilder doJobScheduling(JobDetails job, boolean exi .peek(delay -> Optional .of(delay.isNegative()) .filter(Boolean.FALSE::equals) - .orElseThrow(() -> new InvalidScheduleTimeException("The expirationTime should be greater than current " + - "time"))) - // new jobs in current bulk must be stored in the repository before we proceed to schedule, the same as - // way as we do with new jobs that aren't. In this way we provide the same pattern for both cases. - // https://issues.redhat.com/browse/KOGITO-8513 - .flatMap(delay -> !exists - ? ReactiveStreams.fromCompletionStage(jobRepository.save(jobWithStatus(job, JobStatus.SCHEDULED))) - : ReactiveStreams.fromCompletionStage(CompletableFuture.completedFuture(job))) - //schedule the job on the scheduler - .flatMap(j -> scheduleRegistering(job, Optional.empty())) + .orElseThrow(() -> new InvalidScheduleTimeException( + String.format("The expirationTime: %s, for job: %s should be greater than current time: %s.", + job.getTrigger().hasNextFireTime(), job.getId(), ZonedDateTime.now())))) + .flatMap(delay -> ReactiveStreams.fromCompletionStage(jobRepository.save(jobWithStatus(job, JobStatus.SCHEDULED)))) + //schedule the job in the scheduler + .flatMap(j -> scheduleRegistering(job, job.getTrigger())) .map(handle -> jobWithStatusAndHandle(job, JobStatus.SCHEDULED, handle)) .map(scheduledJob -> jobRepository.save(scheduledJob)) .flatMapCompletionStage(p -> p); } /** - * Check if it should be scheduled (on the current chunk) or saved to be scheduled later. - * - * @return + * Check if the job should be scheduled on the current chunk or saved to be scheduled later. */ private boolean isOnCurrentSchedulerChunk(JobDetails job) { return DateUtil.fromDate(job.getTrigger().hasNextFireTime()).isBefore(DateUtil.now().plusMinutes(schedulerChunkInMinutes)); } private PublisherBuilder handleExistingJob(JobDetails job) { - //always returns true, canceling in case the job is already schedule return ReactiveStreams.fromCompletionStage(jobRepository.get(job.getId())) - //handle scheduled and retry cases .flatMap( - j -> { - switch (j.getStatus()) { + currentJob -> { + switch (currentJob.getStatus()) { case SCHEDULED: - return handleExpirationTime(j) - .map(scheduled -> jobWithStatus(scheduled, JobStatus.CANCELED)) - .map(CompletableFuture::completedFuture) - .flatMapCompletionStage(this::cancel) - .map(deleted -> j); case RETRY: - return handleRetry(CompletableFuture.completedFuture(j)) - .flatMap(retryJob -> ReactiveStreams.empty()); + // cancel the job. + return ReactiveStreams.fromCompletionStage( + cancel(CompletableFuture.completedFuture(jobWithStatus(currentJob, JobStatus.CANCELED)))); default: - //empty to break the stream processing + // uncommon, break the stream processing return ReactiveStreams.empty(); } }) .onErrorResumeWith(t -> ReactiveStreams.empty()); } + private PublisherBuilder handleInternalSchedule(JobDetails job, boolean onStart) { + unregisterScheduledJob(job); + switch (job.getStatus()) { + case SCHEDULED: + Duration delay = calculateRawDelay(DateUtil.fromDate(job.getTrigger().hasNextFireTime())); + if (delay.isNegative() && onStart && !forceExecuteExpiredJobsOnServiceStart) { + return ReactiveStreams.fromCompletionStage(handleExpiredJob(job)); + } else { + // other cases of potential overdue are because of slow processing of the jobs service, or the user + // configured to fire overdue triggers at service startup. Always schedule. + PublisherBuilder preSchedule; + if (job.getScheduledId() != null) { + // cancel the existing timer if any. + preSchedule = ReactiveStreams.fromPublisher(doCancel(job)).flatMap(jobHandle -> ReactiveStreams.of(job)); + } else { + preSchedule = ReactiveStreams.of(job); + } + return preSchedule.flatMap(j -> scheduleRegistering(job, job.getTrigger())) + .map(handle -> jobWithStatusAndHandle(job, JobStatus.SCHEDULED, handle)) + .map(scheduledJob -> jobRepository.save(scheduledJob)) + .flatMapCompletionStage(p -> p); + } + case RETRY: + return handleRetry(CompletableFuture.completedFuture(job)); + default: + // by definition there are no more cases, only SCHEDULED and RETRY cases are picked by the loader. + return ReactiveStreams.of(job); + } + } + + private PublisherBuilder handleInternalScheduleDeletedJob(JobDetails job) { + LOGGER.warn("Job was removed from database: {}.", job); + return ReactiveStreams.of(job); + } + private Duration calculateDelay(ZonedDateTime expirationTime) { - //in case forceExecuteExpiredJobs is true, execute the job immediately (1ms) - return Optional.of(Duration.between(DateUtil.now(), expirationTime)) - .filter(d -> !d.isNegative()) - .orElse(forceExecuteExpiredJobs - .filter(Boolean.TRUE::equals) - .map(f -> Duration.ofSeconds(1)) - .orElse(Duration.ofSeconds(-1))); + Duration delay = Duration.between(DateUtil.now(), expirationTime); + if (!delay.isNegative()) { + return delay; + } + //in case forceExecuteExpiredJobs is true, execute the job immediately. + return forceExecuteExpiredJobs ? Duration.ofSeconds(1) : Duration.ofSeconds(-1); + } + + private Duration calculateRawDelay(ZonedDateTime expirationTime) { + return Duration.between(DateUtil.now(), expirationTime); } public PublisherBuilder handleJobExecutionSuccess(JobDetails futureJob) { @@ -212,7 +278,7 @@ public PublisherBuilder handleJobExecutionSuccess(JobDetails futureJ .flatMap(job -> Optional .ofNullable(job.getTrigger()) .filter(trigger -> Objects.nonNull(trigger.hasNextFireTime())) - .map(time -> doJobScheduling(job, true)) + .map(time -> doJobScheduling(job)) //in case the job should not be executed anymore (there is no nextFireTime) .orElseGet(() -> ReactiveStreams.of(jobWithStatus(job, JobStatus.EXECUTED)))) //final state EXECUTED, removing the job, it is not kept on the repository @@ -269,10 +335,10 @@ private PublisherBuilder handleRetry(CompletionStage fut .flatMap(scheduledJob -> handleExpirationTime(scheduledJob) .map(JobDetails::getStatus) .filter(s -> !JobStatus.ERROR.equals(s)) - .map(s -> scheduleRegistering(scheduledJob, Optional.of(getRetryTrigger()))) + .map(s -> scheduleRegistering(scheduledJob, getRetryTrigger())) .flatMap(p -> p) - .map(scheduleId -> JobDetails.builder() - .of(jobWithStatusAndHandle(scheduledJob, JobStatus.RETRY, scheduleId)) + .map(registeredJobHandle -> JobDetails.builder() + .of(jobWithStatusAndHandle(scheduledJob, JobStatus.RETRY, registeredJobHandle)) .incrementRetries() .build()) .map(jobRepository::save) @@ -298,21 +364,25 @@ private CompletionStage handleExpiredJob(JobDetails scheduledJob) { .orElse(null); } - private PublisherBuilder scheduleRegistering(JobDetails job, Optional trigger) { + private PublisherBuilder scheduleRegistering(JobDetails job, Trigger trigger) { return doSchedule(job, trigger) .peek(registerScheduledJob(job)); } - private Consumer registerScheduledJob(JobDetails job) { - return s -> schedulerControl.put(job.getId(), DateUtil.now()); + protected Consumer registerScheduledJob(JobDetails job) { + return handle -> schedulerControl.put(job.getId(), new SchedulerControlRecord(job.getId(), handle.getId(), DateUtil.now())); } - public abstract PublisherBuilder doSchedule(JobDetails job, Optional trigger); + public abstract PublisherBuilder doSchedule(JobDetails job, Trigger trigger); - private ZonedDateTime unregisterScheduledJob(JobDetails job) { + protected SchedulerControlRecord unregisterScheduledJob(JobDetails job) { return schedulerControl.remove(job.getId()); } + protected Collection getScheduledJobs() { + return new ArrayList<>(schedulerControl.values()); + } + public CompletionStage cancel(CompletionStage futureJob) { return Uni.createFrom().completionStage(futureJob) .onItem().invoke(job -> LOGGER.debug("Cancel Job Scheduling {}", job)) @@ -340,10 +410,11 @@ public CompletionStage cancel(String jobId) { @Override public Optional scheduled(String jobId) { - return Optional.ofNullable(schedulerControl.get(jobId)); + SchedulerControlRecord record = schedulerControl.get(jobId); + return Optional.ofNullable(record != null ? record.getScheduledTime() : null); } public void setForceExecuteExpiredJobs(boolean forceExecuteExpiredJobs) { - this.forceExecuteExpiredJobs = Optional.of(forceExecuteExpiredJobs); + this.forceExecuteExpiredJobs = forceExecuteExpiredJobs; } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java index c924efaeec..b0bd05feac 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManager.java @@ -18,7 +18,9 @@ */ package org.kie.kogito.jobs.service.scheduler; -import java.util.Optional; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.util.Date; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -26,6 +28,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.kie.kogito.jobs.service.management.MessagingChangeEvent; +import org.kie.kogito.jobs.service.management.ReleaseLeaderEvent; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobStatus; import org.kie.kogito.jobs.service.repository.ReactiveJobRepository; @@ -38,9 +41,14 @@ import io.vertx.mutiny.core.Vertx; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Event; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import static org.kie.kogito.jobs.service.repository.ReactiveJobRepository.SortTerm.byCreated; +import static org.kie.kogito.jobs.service.repository.ReactiveJobRepository.SortTerm.byFireTime; +import static org.kie.kogito.jobs.service.repository.ReactiveJobRepository.SortTerm.byId; + @ApplicationScoped public class JobSchedulerManager { @@ -66,20 +74,52 @@ public class JobSchedulerManager { @ConfigProperty(name = "kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes", defaultValue = "0") long loadJobFromCurrentTimeIntervalInMinutes; + /** + * Number of retries configured for the periodic jobs loading procedure. Every time the procedure is started this + * value is considered. + */ + @ConfigProperty(name = "kogito.jobs-service.loadJobRetries", defaultValue = "3") + int loadJobRetries; + + /** + * Error strategy to apply when the periodic jobs loading procedure has exceeded the jobLoadReties. + */ + @ConfigProperty(name = "kogito.jobs-service.loadJobErrorStrategy", defaultValue = "NONE") + String loadJobErrorStrategy; + @Inject TimerDelegateJobScheduler scheduler; @Inject ReactiveJobRepository repository; + @Inject + Event releaseLeaderEvent; + + @Inject + Event jobSchedulerManagerErrorEvent; + @Inject Vertx vertx; final AtomicBoolean enabled = new AtomicBoolean(false); - final AtomicLong periodicTimerIdForLoadJobs = new AtomicLong(-1l); + final AtomicLong periodicTimerIdForLoadJobs = new AtomicLong(-1L); + + final AtomicBoolean initialLoading = new AtomicBoolean(true); + + static final ZonedDateTime INITIAL_DATE = ZonedDateTime.of(LocalDateTime.parse("2000-01-01T00:00:00"), DateUtil.DEFAULT_ZONE); + + enum LoadJobErrorStrategy { + NONE, + /** + * The service liveness check goes to DOWN, indicating that the service must be restarted. + */ + FAIL_SERVICE + } private void startJobsLoadingFromRepositoryTask() { //guarantee it starts the task just in case it is not already active + initialLoading.set(true); if (periodicTimerIdForLoadJobs.get() < 0) { if (loadJobIntervalInMinutes > schedulerChunkInMinutes) { LOGGER.warn("The loadJobIntervalInMinutes ({}) cannot be greater than schedulerChunkInMinutes ({}), " + @@ -115,31 +155,72 @@ protected synchronized void onMessagingStatusChange(@Observes MessagingChangeEve } } - //Runs periodically loading the jobs from the repository in chunks + /** + * Runs periodically loading the jobs from the repository in chunks. + */ void loadJobDetails() { if (!enabled.get()) { LOGGER.info("Skip loading scheduled jobs"); return; } - loadJobsInCurrentChunk() - .filter(j -> !scheduler.scheduled(j.getId()).isPresent())//not consider already scheduled jobs - .flatMapRsPublisher(t -> ErrorHandling.skipErrorPublisher(scheduler::schedule, t)) - .forEach(a -> LOGGER.debug("Loaded and scheduled job {}", a)) + ZonedDateTime fromFireTime = DateUtil.now().minusMinutes(loadJobFromCurrentTimeIntervalInMinutes); + ZonedDateTime toFireTime = DateUtil.now().plusMinutes(schedulerChunkInMinutes); + if (initialLoading.get()) { + fromFireTime = INITIAL_DATE; + } + doLoadJobDetails(fromFireTime, toFireTime, loadJobRetries); + } + + public void doLoadJobDetails(ZonedDateTime fromFireTime, ZonedDateTime toFireTime, final int retries) { + LOGGER.info("Loading jobs to schedule from the repository, fromFireTime: {} toFireTime: {}.", fromFireTime, toFireTime); + loadJobsBetweenDates(fromFireTime, toFireTime) + .filter(this::isNotScheduled) + .flatMapRsPublisher(jobDetails -> ErrorHandling.skipErrorPublisher((jd) -> scheduler.internalSchedule(jd, initialLoading.get()), jobDetails)) + .forEach(jobDetails -> LOGGER.debug("Loaded and scheduled job {}.", jobDetails)) .run() - .whenComplete((v, t) -> Optional.ofNullable(t) - .map(ex -> { - LOGGER.error("Error Loading scheduled jobs!", ex); - return null; - }) - .orElseGet(() -> { - LOGGER.info("Loading scheduled jobs completed !"); - return null; - })); + .whenComplete((unused, throwable) -> { + if (throwable != null) { + LOGGER.error(String.format("Error during jobs loading, retries left: %d.", retries), throwable); + if (retries > 0) { + LOGGER.info("Jobs loading retry: #{} will be executed.", retries - 1); + doLoadJobDetails(fromFireTime, toFireTime, retries - 1); + } else { + LOGGER.error("Jobs loading has failed and no more retires are left, loadJobErrorStrategy: {} will be applied.", loadJobErrorStrategy); + applyLoadJobsErrorStrategy(throwable); + } + } + initialLoading.set(false); + LOGGER.info("Loading scheduled jobs completed !"); + }); } - private PublisherBuilder loadJobsInCurrentChunk() { - return repository.findByStatusBetweenDatesOrderByPriority(DateUtil.now().minusMinutes(loadJobFromCurrentTimeIntervalInMinutes), - DateUtil.now().plusMinutes(schedulerChunkInMinutes), - JobStatus.SCHEDULED, JobStatus.RETRY); + private boolean isNotScheduled(JobDetails jobDetails) { + Date triggerFireTime = jobDetails.getTrigger().hasNextFireTime(); + ZonedDateTime nextFireTime = triggerFireTime != null ? DateUtil.instantToZonedDateTime(triggerFireTime.toInstant()) : null; + boolean scheduled = scheduler.scheduled(jobDetails.getId()).isPresent(); + LOGGER.debug("Job found, id: {}, nextFireTime: {}, created: {}, status: {}, already scheduled: {}", jobDetails.getId(), + nextFireTime, + jobDetails.getCreated(), + jobDetails.getStatus(), + scheduled); + return !scheduled; + } + + private PublisherBuilder loadJobsBetweenDates(ZonedDateTime fromFireTime, ZonedDateTime toFireTime) { + return repository.findByStatusBetweenDates(fromFireTime, toFireTime, + new JobStatus[] { JobStatus.SCHEDULED, JobStatus.RETRY }, + new ReactiveJobRepository.SortTerm[] { byCreated(true), byFireTime(true), byId(true) }); + } + + private void applyLoadJobsErrorStrategy(Throwable throwable) { + if (LoadJobErrorStrategy.FAIL_SERVICE.name().equalsIgnoreCase(loadJobErrorStrategy)) { + scheduler.unscheduleTimers(); + cancelJobsLoadingFromRepositoryTask(); + releaseLeaderEvent.fire(new ReleaseLeaderEvent()); + String message = "An unrecoverable error occurred during the jobs loading procedure from database." + + " Please check the database status and configuration, or contact the administrator for a detailed review of the error: " + throwable.getMessage(); + LOGGER.error(message, throwable); + jobSchedulerManagerErrorEvent.fire(new JobSchedulerManagerErrorEvent(message, throwable)); + } } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerErrorEvent.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerErrorEvent.java new file mode 100644 index 0000000000..8813854dea --- /dev/null +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerErrorEvent.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.scheduler; + +public class JobSchedulerManagerErrorEvent { + + private final String message; + + private final Throwable error; + + public JobSchedulerManagerErrorEvent(String message, Throwable error) { + this.message = message; + this.error = error; + } + + public String getMessage() { + return message; + } + + public Throwable getError() { + return error; + } +} diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerHealthCheck.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerHealthCheck.java new file mode 100644 index 0000000000..bcca8d3c84 --- /dev/null +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerHealthCheck.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.jobs.service.scheduler; + +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.microprofile.health.HealthCheck; +import org.eclipse.microprofile.health.HealthCheckResponse; +import org.eclipse.microprofile.health.HealthCheckResponseBuilder; +import org.eclipse.microprofile.health.Liveness; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.event.Observes; + +@Liveness +@ApplicationScoped +public class JobSchedulerManagerHealthCheck implements HealthCheck { + + private final AtomicReference errorEvent = new AtomicReference<>(); + + @Override + public HealthCheckResponse call() { + final HealthCheckResponseBuilder responseBuilder = HealthCheckResponse.named("Job Scheduler Manager"); + if (errorEvent.get() != null) { + return responseBuilder.withData("error", errorEvent.get().getMessage()).down().build(); + } + return responseBuilder.up().build(); + } + + protected void onJobSchedulerManagerStatusChange(@Observes JobSchedulerManagerErrorEvent event) { + errorEvent.set(event); + } +} diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java index 07948f5c4b..6cd025abfc 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/ReactiveJobScheduler.java @@ -30,6 +30,8 @@ public interface ReactiveJobScheduler extends JobScheduler Publisher schedule(JobDetails job); + Publisher internalSchedule(JobDetails job, boolean onServiceStart); + CompletionStage cancel(String jobId); PublisherBuilder reschedule(String id, Trigger trigger); diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java index 97a8f0a906..6b30c30b0b 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobScheduler.java @@ -19,7 +19,6 @@ package org.kie.kogito.jobs.service.scheduler.impl; import java.util.Objects; -import java.util.Optional; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; @@ -31,7 +30,6 @@ import org.kie.kogito.jobs.service.model.ManageableJobHandle; import org.kie.kogito.jobs.service.repository.ReactiveJobRepository; import org.kie.kogito.jobs.service.scheduler.BaseTimerJobScheduler; -import org.kie.kogito.jobs.service.stream.JobEventPublisher; import org.kie.kogito.timer.Trigger; import org.reactivestreams.Publisher; import org.slf4j.Logger; @@ -52,8 +50,6 @@ public class TimerDelegateJobScheduler extends BaseTimerJobScheduler { private VertxTimerServiceScheduler delegate; - private JobEventPublisher jobEventPublisher; - protected TimerDelegateJobScheduler() { } @@ -63,21 +59,19 @@ public TimerDelegateJobScheduler(ReactiveJobRepository jobRepository, @ConfigProperty(name = "kogito.jobs-service.maxIntervalLimitToRetryMillis", defaultValue = "60000") long maxIntervalLimitToRetryMillis, @ConfigProperty(name = "kogito.jobs-service.schedulerChunkInMinutes", defaultValue = "10") long schedulerChunkInMinutes, @ConfigProperty(name = "kogito.jobs-service.forceExecuteExpiredJobs", defaultValue = "true") boolean forceExecuteExpiredJobs, - JobExecutorResolver jobExecutorResolver, VertxTimerServiceScheduler delegate, - JobEventPublisher jobEventPublisher) { - super(jobRepository, backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs); + @ConfigProperty(name = "kogito.jobs-service.forceExecuteExpiredJobsOnServiceStart", defaultValue = "true") boolean forceExecuteExpiredJobsOnServiceStart, + JobExecutorResolver jobExecutorResolver, VertxTimerServiceScheduler delegate) { + super(jobRepository, backoffRetryMillis, maxIntervalLimitToRetryMillis, schedulerChunkInMinutes, forceExecuteExpiredJobs, forceExecuteExpiredJobsOnServiceStart); this.jobExecutorResolver = jobExecutorResolver; this.delegate = delegate; - this.jobEventPublisher = jobEventPublisher; } @Override - public PublisherBuilder doSchedule(JobDetails job, Optional trigger) { - LOGGER.debug("Job Scheduling {}", job); - return ReactiveStreams - .of(job) - .map(j -> delegate.scheduleJob(new DelegateJob(jobExecutorResolver, jobEventPublisher), new JobDetailsContext(j), - trigger.orElse(j.getTrigger()))); + public PublisherBuilder doSchedule(JobDetails job, Trigger trigger) { + LOGGER.debug("Job Scheduling job: {}, trigger: {}", job, trigger); + ManageableJobHandle jobHandle = delegate.scheduleJob(new DelegateJob(jobExecutorResolver, this), + new JobDetailsContext(job), trigger); + return ReactiveStreams.of(jobHandle); } @Override @@ -94,4 +88,15 @@ public Publisher doCancel(JobDetails scheduledJob) { .buildRs(); } + /** + * Removes only the programed in-memory timers. + */ + public void unscheduleTimers() { + LOGGER.debug("Removing in-memory scheduled timers"); + super.getScheduledJobs().forEach(record -> { + boolean removed = delegate.removeJob(new ManageableJobHandle(record.getHandleId())); + LOGGER.debug("Vertex timer: {} for jobId: {}, was removed: {}", record.getHandleId(), record.getJobId(), removed); + super.unregisterScheduledJob(JobDetails.builder().id(record.getJobId()).build()); + }); + } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceScheduler.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceScheduler.java index 431727ac1d..bc538cea2c 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceScheduler.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceScheduler.java @@ -18,9 +18,7 @@ */ package org.kie.kogito.jobs.service.scheduler.impl; -import java.time.Instant; import java.time.ZonedDateTime; -import java.time.chrono.ChronoZonedDateTime; import java.util.Collection; import java.util.Optional; @@ -43,6 +41,8 @@ public class VertxTimerServiceScheduler implements TimerService, InternalSchedulerService { + private static final long MIN_TIMER_DELAY = 1000; + protected TimerJobFactoryManager jobFactoryManager = DefaultTimerJobFactoryManager.instance; protected final Vertx vertx; @@ -118,13 +118,9 @@ public void internalSchedule(TimerJobInstance timerJobInstance) { handle.setScheduledTime(now); } - private Long calculateDelay(long then, ZonedDateTime now) { - return Optional.of(now) - .map(ChronoZonedDateTime::toInstant) - .map(Instant::toEpochMilli) - .filter(n -> then > n) - .map(n -> then - n) - .orElse(1l); + private long calculateDelay(long then, ZonedDateTime now) { + long delay = then - now.toInstant().toEpochMilli(); + return Math.max(MIN_TIMER_DELAY, delay); } public Vertx getVertx() { diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java index 5899942821..64189e2ab1 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/AbstractJobStreams.java @@ -34,7 +34,7 @@ import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage; -public abstract class AbstractJobStreams { +public abstract class AbstractJobStreams implements JobStreams { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJobStreams.class); @@ -56,17 +56,24 @@ protected AbstractJobStreams(ObjectMapper objectMapper, boolean enabled, Emitter this.url = url; } - protected void jobStatusChange(JobDetails job) { - if (enabled) { + @Override + public boolean isEnabled() { + return enabled; + } + + @Override + public void jobStatusChange(JobDetails job) { + if (isEnabled()) { try { JobDataEvent event = JobDataEvent .builder() .source(url + RestApiConstants.JOBS_PATH) .data(ScheduledJobAdapter.of(job))//this should support jobs crated with V1 and V2 .build(); + LOGGER.debug("emit jobStatusChange, hasRequests: {}, eventId: {}, jobDetails: {}", emitter.hasRequests(), event.getId(), job); String json = objectMapper.writeValueAsString(event); emitter.send(decorate(ContextAwareMessage.of(json) - .withAck(() -> onAck(job)) + .withAck(() -> onAck(event.getId(), job)) .withNack(reason -> onNack(reason, job)))); } catch (Exception e) { String msg = String.format("An unexpected error was produced while processing a Job status change for the job: %s", job); @@ -75,13 +82,13 @@ protected void jobStatusChange(JobDetails job) { } } - protected CompletionStage onAck(JobDetails job) { - LOGGER.debug("Job Status change published: {}", job); + protected CompletionStage onAck(String eventId, JobDetails job) { + LOGGER.debug("Job Status change emitted successfully, eventId: {}, jobDetails: {}", eventId, job); return CompletableFuture.completedFuture(null); } protected CompletionStage onNack(Throwable reason, JobDetails job) { - String msg = String.format("An error was produced while publishing a Job status change for the job: %s", job); + String msg = String.format("An error was produced while emitting a Job status change for the job: %s", job); LOGGER.error(msg, reason); return CompletableFuture.completedFuture(null); } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java index 4342cf00c9..57f8911734 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobEventPublisher.java @@ -19,13 +19,8 @@ package org.kie.kogito.jobs.service.stream; import org.kie.kogito.jobs.service.model.JobDetails; -import org.kie.kogito.jobs.service.model.JobExecutionResponse; public interface JobEventPublisher { - JobExecutionResponse publishJobError(JobExecutionResponse response); - - JobExecutionResponse publishJobSuccess(JobExecutionResponse response); - JobDetails publishJobStatusChange(JobDetails scheduledJob); } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java new file mode 100644 index 0000000000..4f84e71cd6 --- /dev/null +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreams.java @@ -0,0 +1,11 @@ +package org.kie.kogito.jobs.service.stream; + +import org.kie.kogito.jobs.service.model.JobDetails; + +public interface JobStreams { + + boolean isEnabled(); + + void jobStatusChange(JobDetails job); + +} diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java index f8ef123190..30c9f01112 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/stream/JobStreamsEventPublisher.java @@ -18,131 +18,42 @@ */ package org.kie.kogito.jobs.service.stream; -import java.util.Optional; -import java.util.concurrent.CompletionStage; +import java.util.List; +import java.util.stream.Collectors; -import org.eclipse.microprofile.reactive.messaging.Acknowledgment; -import org.eclipse.microprofile.reactive.messaging.Channel; -import org.eclipse.microprofile.reactive.messaging.Emitter; -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.eclipse.microprofile.reactive.messaging.OnOverflow; -import org.eclipse.microprofile.reactive.messaging.Outgoing; import org.kie.kogito.jobs.service.model.JobDetails; -import org.kie.kogito.jobs.service.model.JobExecutionResponse; -import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler; -import org.kie.kogito.jobs.service.utils.ErrorHandling; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.smallrye.reactive.messaging.annotations.Broadcast; - +import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; -/** - * Class that configure the Consumers for Job Streams,like Job Executed, Job Error... and execute the actions for each - * received item. - */ @ApplicationScoped public class JobStreamsEventPublisher implements JobEventPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(JobStreamsEventPublisher.class); @Inject - ReactiveJobScheduler scheduler; - - /** - * Publish on Stream of Job Error events - */ - @Inject - @Channel(AvailableStreams.JOB_ERROR) - @OnOverflow(value = OnOverflow.Strategy.NONE) - Emitter jobErrorEmitter; - - /** - * Publish on Stream of Job Success events - */ - @Inject - @Channel(AvailableStreams.JOB_SUCCESS) - @OnOverflow(value = OnOverflow.Strategy.NONE) - Emitter jobSuccessEmitter; - - /** - * Publish on Stream of Job Success events - */ - @Inject - @Channel(AvailableStreams.JOB_STATUS_CHANGE) - @OnOverflow(value = OnOverflow.Strategy.NONE) - Emitter jobStatusChangeEmitter; - - public JobExecutionResponse publishJobError(JobExecutionResponse response) { - jobErrorEmitter.send(response); - return response; - } + Instance jobStreamsInstance; - public JobExecutionResponse publishJobSuccess(JobExecutionResponse response) { - jobSuccessEmitter.send(response); - return response; - } - - public JobDetails publishJobStatusChange(JobDetails scheduledJob) { - jobStatusChangeEmitter.send(scheduledJob); - return scheduledJob; - } - - //Stream Processors - @Incoming(AvailableStreams.JOB_ERROR_EVENTS) - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) - public CompletionStage jobErrorProcessor(JobExecutionResponse response) { - LOGGER.warn("Error received {}", response); - return ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError, response) - .findFirst() - .run() - .thenApply(Optional::isPresent) - .exceptionally(e -> { - LOGGER.error("Error handling error {}", response, e); - return false; - }); - } - - @Incoming(AvailableStreams.JOB_SUCCESS_EVENTS) - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) - public CompletionStage jobSuccessProcessor(JobExecutionResponse response) { - LOGGER.debug("Success received to be processed {}", response); - return ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess, response) - .findFirst() - .run() - .thenApply(Optional::isPresent) - .exceptionally(e -> { - LOGGER.error("Error handling error {}", response, e); - return false; - }); - } - - // Broadcast Events from Emitter to Streams - @Incoming(AvailableStreams.JOB_ERROR) - @Outgoing(AvailableStreams.JOB_ERROR_EVENTS) - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) - public JobExecutionResponse jobErrorBroadcast(JobExecutionResponse response) { - LOGGER.debug("Error broadcast published {}", response); - return response; - } + private List enabledStreams; - @Incoming(AvailableStreams.JOB_SUCCESS) - @Outgoing(AvailableStreams.JOB_SUCCESS_EVENTS) - @Broadcast - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) - public JobExecutionResponse jobSuccessBroadcast(JobExecutionResponse response) { - LOGGER.debug("Success broadcast published {}", response); - return response; + @PostConstruct + void init() { + this.enabledStreams = jobStreamsInstance.stream() + .filter(stream -> { + LOGGER.info("Job stream: {}, enabled: {}", stream, stream.isEnabled()); + return stream.isEnabled(); + }) + .collect(Collectors.toList()); } - @Incoming(AvailableStreams.JOB_STATUS_CHANGE) - @Outgoing(AvailableStreams.JOB_STATUS_CHANGE_EVENTS) - @Broadcast - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) - public JobDetails jobStatusChangeBroadcast(JobDetails job) { - LOGGER.debug("Status change broadcast for Job {}", job); + @Override + public JobDetails publishJobStatusChange(JobDetails job) { + LOGGER.debug("publishJobStatusChange to streams, job: {}", job); + enabledStreams.forEach(jobStream -> jobStream.jobStatusChange(job)); return job; } } diff --git a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/utils/ErrorHandling.java b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/utils/ErrorHandling.java index a6bed7431c..e1051f2210 100644 --- a/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/utils/ErrorHandling.java +++ b/jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/utils/ErrorHandling.java @@ -49,7 +49,7 @@ private ErrorHandling() { public static Publisher skipErrorPublisher(Function> function, T input) { return ReactiveStreams .fromPublisher(function.apply(input)) - .onError(t -> LOGGER.warn("Error skipped when processing {}.", input, t)) + .onError(t -> LOGGER.warn(String.format("Error skipped when processing input: %s.", input), t)) .onErrorResumeWithRsPublisher(t -> ReactiveStreams. empty().buildRs()) .buildRs(); } @@ -68,7 +68,7 @@ public static Publisher skipErrorPublisher(Function PublisherBuilder skipErrorPublisherBuilder(Function> function, T input) { return function.apply(input) - .onError(t -> LOGGER.warn("Error skipped when processing {}.", input, t)) + .onError(t -> LOGGER.warn(String.format("Error skipped when processing input: %s.", input), t)) .onErrorResumeWithRsPublisher(t -> ReactiveStreams. empty().buildRs()); } } diff --git a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties index 205ccc9251..a24e7828c7 100644 --- a/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties +++ b/jobs-service/jobs-service-common/src/main/resources/META-INF/microprofile-config.properties @@ -18,7 +18,6 @@ # # Log Config -quarkus.log.level=INFO %dev.quarkus.log.category."org.kie.kogito.jobs".level=DEBUG # Console @@ -46,6 +45,8 @@ kogito.jobs-service.schedulerChunkInMinutes=10 kogito.jobs-service.loadJobIntervalInMinutes=10 kogito.jobs-service.loadJobFromCurrentTimeIntervalInMinutes=60 kogito.jobs-service.forceExecuteExpiredJobs=true +kogito.jobs-service.forceExecuteExpiredJobsOnServiceStart=true + quarkus.oidc.enabled=true quarkus.oidc.tenant-enabled=false diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java index cfbadc4f8d..5f766a1dde 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/impl/BaseJobRepositoryTest.java @@ -18,10 +18,10 @@ */ package org.kie.kogito.jobs.service.repository.impl; +import java.time.ZonedDateTime; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.BeforeEach; @@ -29,7 +29,6 @@ import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient; import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipientStringPayloadData; import org.kie.kogito.jobs.service.model.JobDetails; -import org.kie.kogito.jobs.service.model.JobExecutionResponse; import org.kie.kogito.jobs.service.model.JobStatus; import org.kie.kogito.jobs.service.model.Recipient; import org.kie.kogito.jobs.service.model.RecipientInstance; @@ -40,6 +39,7 @@ import org.kie.kogito.timer.impl.PointInTimeTrigger; import static org.assertj.core.api.Assertions.assertThat; +import static org.kie.kogito.jobs.service.repository.ReactiveJobRepository.SortTerm.byFireTime; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; @@ -59,8 +59,6 @@ public void setUp() throws Exception { public JobEventPublisher mockJobEventPublisher() { final JobEventPublisher mock = mock(JobEventPublisher.class); lenient().when(mock.publishJobStatusChange(any(JobDetails.class))).thenAnswer(a -> a.getArgument(0)); - lenient().when(mock.publishJobSuccess(any(JobExecutionResponse.class))).thenAnswer(a -> a.getArgument(0)); - lenient().when(mock.publishJobError(any(JobExecutionResponse.class))).thenAnswer(a -> a.getArgument(0)); return mock; } @@ -69,7 +67,7 @@ public JobEventPublisher mockJobEventPublisher() { @Test void testSaveAndGet() throws ExecutionException, InterruptedException { JobDetails scheduledJob = tested().get(ID).toCompletableFuture().get(); - assertThat(scheduledJob).isEqualTo(job); + assertEqualsToReturnedJob(job, scheduledJob); JobDetails notFound = tested().get(UUID.randomUUID().toString()).toCompletableFuture().get(); assertThat(notFound).isNull(); } @@ -97,35 +95,30 @@ void testExists() throws ExecutionException, InterruptedException { @Test void testDelete() throws ExecutionException, InterruptedException { - JobDetails scheduledJob = tested().delete(ID).toCompletableFuture().get(); - assertThat(scheduledJob).isEqualTo(job); + JobDetails deletedJob = tested().delete(ID).toCompletableFuture().get(); + assertEqualsToReturnedJob(job, deletedJob); JobDetails notFound = tested().get(ID).toCompletableFuture().get(); assertThat(notFound).isNull(); } - @Test - void testFindAll() throws ExecutionException, InterruptedException { - List jobs = tested().findAll().toList().run().toCompletableFuture().get(); - assertThat(jobs.size()).isEqualTo(1); - assertThat(jobs.get(0)).isEqualTo(job); - } - @Test void testFindByStatusBetweenDates() throws ExecutionException, InterruptedException { + ZonedDateTime now = DateUtil.now(); List jobs = IntStream.rangeClosed(1, 10).boxed() .map(id -> JobDetails.builder() .status(JobStatus.SCHEDULED) .id(String.valueOf(id)) .priority(id) - .trigger(new PointInTimeTrigger(DateUtil.now().plusMinutes(id).toInstant().toEpochMilli(), null, null)) + .trigger(new PointInTimeTrigger(now.plusMinutes(id).toInstant().toEpochMilli(), null, null)) .priority(id) .build()) - .peek(j -> FunctionsUtil.unchecked((t) -> tested().save(j).toCompletableFuture().get()).apply(null)) - .collect(Collectors.toList()); + .map(j -> FunctionsUtil.unchecked((t) -> tested().save(j).toCompletableFuture().get()).apply(null)) + .toList(); - final List fetched = tested().findByStatusBetweenDatesOrderByPriority(DateUtil.now(), - DateUtil.now().plusMinutes(5).plusSeconds(1), - JobStatus.SCHEDULED) + final List fetched = tested().findByStatusBetweenDates(now, + now.plusMinutes(5), + new JobStatus[] { JobStatus.SCHEDULED }, + new ReactiveJobRepository.SortTerm[] { byFireTime(true) }) .toList() .run() .toCompletableFuture() @@ -133,13 +126,29 @@ void testFindByStatusBetweenDates() throws ExecutionException, InterruptedExcept assertThat(fetched.size()).isEqualTo(5); - IntStream.rangeClosed(0, 4).forEach( - i -> assertThat(fetched.get(i)).isEqualTo(jobs.get(fetched.size() - 1 - i))); + for (int i = 0; i < fetched.size(); i++) { + assertThat(fetched.get(i)).isEqualTo(jobs.get(i)); + } + + final List fetchedDesc = tested().findByStatusBetweenDates(now, + now.plusMinutes(5), + new JobStatus[] { JobStatus.SCHEDULED }, + new ReactiveJobRepository.SortTerm[] { byFireTime(false) }) + .toList() + .run() + .toCompletableFuture() + .get(); + + assertThat(fetchedDesc.size()).isEqualTo(5); + for (int i = 0; i < fetchedDesc.size(); i++) { + assertThat(fetchedDesc.get(i)).isEqualTo(jobs.get(4 - i)); + } //not found test - List fetchedNotFound = tested().findByStatusBetweenDatesOrderByPriority(DateUtil.now(), - DateUtil.now().plusMinutes(5).plusSeconds(1), - JobStatus.CANCELED) + List fetchedNotFound = tested().findByStatusBetweenDates(now, + now.plusMinutes(5), + new JobStatus[] { JobStatus.CANCELED }, + new ReactiveJobRepository.SortTerm[] { byFireTime(true) }) .toList() .run() .toCompletableFuture() @@ -147,9 +156,10 @@ void testFindByStatusBetweenDates() throws ExecutionException, InterruptedExcept assertThat(fetchedNotFound.size()).isZero(); - fetchedNotFound = tested().findByStatusBetweenDatesOrderByPriority(DateUtil.now().plusDays(1), - DateUtil.now().plusDays(2), - JobStatus.SCHEDULED) + fetchedNotFound = tested().findByStatusBetweenDates(now.plusMinutes(10).plusSeconds(1), + now.plusMinutes(20), + new JobStatus[] { JobStatus.SCHEDULED }, + new ReactiveJobRepository.SortTerm[] { byFireTime(true) }) .toList() .run() .toCompletableFuture() @@ -174,4 +184,19 @@ void testMergeCallbackEndpoint() throws Exception { assertThat(merged.getId()).isEqualTo(job.getId()); assertThat(merged.getTrigger().hasNextFireTime()).isEqualTo(job.getTrigger().hasNextFireTime()); } + + private static void assertEqualsToReturnedJob(JobDetails job, JobDetails returnedJob) { + assertThat(job.getId()).isEqualTo(returnedJob.getId()); + assertThat(job.getCorrelationId()).isEqualTo(returnedJob.getCorrelationId()); + assertThat(job.getStatus()).isEqualTo(returnedJob.getStatus()); + assertThat(job.getScheduledId()).isEqualTo(returnedJob.getScheduledId()); + assertThat(job.getRetries()).isEqualTo(returnedJob.getRetries()); + assertThat(job.getExecutionCounter()).isEqualTo(returnedJob.getExecutionCounter()); + assertThat(job.getExecutionTimeout()).isEqualTo(returnedJob.getExecutionTimeout()); + assertThat(job.getExecutionTimeoutUnit()).isEqualTo(returnedJob.getExecutionTimeoutUnit()); + assertThat(job.getTrigger().hasNextFireTime()).isEqualTo(returnedJob.getTrigger().hasNextFireTime()); + assertThat(job.getRecipient()).isEqualTo(returnedJob.getRecipient()); + assertThat(returnedJob.getCreated()).isNotNull(); + assertThat(returnedJob.getLastUpdate()).isNotNull(); + } } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshallerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshallerTest.java index a55eb1e8b9..b0c30403c3 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshallerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/repository/marshaller/JobDetailsMarshallerTest.java @@ -54,8 +54,8 @@ void setUp() { String id = "testId"; String correlationId = "testCorrelationId"; JobStatus status = JobStatus.SCHEDULED; - Date date = new Date(); - ZonedDateTime lastUpdate = ZonedDateTime.ofInstant(date.toInstant(), DEFAULT_ZONE); + Date lastUpdateDate = new Date(); + ZonedDateTime lastUpdate = ZonedDateTime.ofInstant(lastUpdateDate.toInstant(), DEFAULT_ZONE); Integer retries = 2; Integer priority = 3; Integer executionCounter = 4; @@ -65,6 +65,8 @@ void setUp() { Trigger trigger = new PointInTimeTrigger(new Date().toInstant().toEpochMilli(), null, null); Long executionTimeout = 10L; ChronoUnit executionTimeoutUnit = ChronoUnit.SECONDS; + Date createdDate = new Date(); + ZonedDateTime created = ZonedDateTime.ofInstant(createdDate.toInstant(), DEFAULT_ZONE); jobDetails = JobDetails.builder() .id(id) @@ -79,13 +81,14 @@ void setUp() { .trigger(trigger) .executionTimeout(executionTimeout) .executionTimeoutUnit(executionTimeoutUnit) + .created(created) .build(); jsonObject = new JsonObject() .put("id", id) .put("correlationId", correlationId) .put("status", status.name()) - .put("lastUpdate", date.getTime()) + .put("lastUpdate", lastUpdateDate.getTime()) .put("retries", retries) .put("executionCounter", executionCounter) .put("scheduledId", scheduledId) @@ -97,7 +100,8 @@ void setUp() { .put("nextFireTime", trigger.hasNextFireTime().getTime()) .put("classType", PointInTimeTrigger.class.getName())) .put("executionTimeout", executionTimeout) - .put("executionTimeoutUnit", executionTimeoutUnit.name()); + .put("executionTimeoutUnit", executionTimeoutUnit.name()) + .put("created", createdDate.getTime()); } @Test diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java index 62ebd9b2c8..aab159123a 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/BaseTimerJobSchedulerTest.java @@ -81,7 +81,7 @@ public abstract class BaseTimerJobSchedulerTest { public CompletionStage scheduled; @Captor - private ArgumentCaptor> delayCaptor; + private ArgumentCaptor delayCaptor; @Captor private ArgumentCaptor scheduleCaptor; @@ -102,7 +102,7 @@ public abstract class BaseTimerJobSchedulerTest { @BeforeEach public void setUp() { tested().schedulerChunkInMinutes = 5; - tested().forceExecuteExpiredJobs = Optional.of(Boolean.FALSE); + tested().forceExecuteExpiredJobs = false; //expiration on the current scheduler chunk expirationTime = DateUtil.now().plusMinutes(tested().schedulerChunkInMinutes - 1); errorResponse = JobExecutionResponse.builder() @@ -177,7 +177,7 @@ private void testExistingJob(boolean expired, JobStatus jobStatus) { verify(jobRepository, expired || SCHEDULED.equals(jobStatus) ? atLeastOnce() : never()).delete(any(JobDetails.class)); verify(tested(), expired ? never() : times(1)).doSchedule(eq(scheduledJob), delayCaptor.capture()); - verify(jobRepository, expired ? never() : times(1)).save(scheduleCaptor.capture()); + verify(jobRepository, expired ? never() : times(2)).save(scheduleCaptor.capture()); //assert always a scheduled job is canceled (periodic or not) Optional.ofNullable(jobStatus) @@ -208,7 +208,7 @@ void testScheduleExistingJobRetryExpired() { @Test void testScheduleExistingJobRetry() { - testExistingJob(false, JobStatus.RETRY); + testExistingJob(false, SCHEDULED); } @Test @@ -253,7 +253,7 @@ void testHandleJobExecutionSuccessPeriodic() { verify(tested(), never()).cancel(scheduleCaptorFuture.capture()); subscribeOn(executionSuccess.buildRs()); - verify(jobRepository, times(2)).save(scheduleCaptor.capture()); + verify(jobRepository, times(3)).save(scheduleCaptor.capture()); JobDetails scheduleCaptorValue = scheduleCaptor.getValue(); assertThat(scheduleCaptorValue.getStatus()).isEqualTo(SCHEDULED); assertThat(scheduleCaptorValue.getExecutionCounter()).isEqualTo(1); @@ -378,7 +378,7 @@ void testForceExpiredJobToBeExecuted() { verify(tested(), never()).doSchedule(eq(scheduledJob), delayCaptor.capture()); //testing with forcing enabled - tested().forceExecuteExpiredJobs = Optional.of(Boolean.TRUE); + tested().forceExecuteExpiredJobs = true; subscribeOn(tested().schedule(scheduledJob)); verify(tested(), times(1)).doSchedule(eq(scheduledJob), delayCaptor.capture()); } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java index 17d7b85282..dd7c46cf1a 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/JobSchedulerManagerTest.java @@ -46,7 +46,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -87,17 +89,18 @@ void setUp() { this.scheduledJob = JobDetails .builder() .id(JOB_ID) + .status(JobStatus.SCHEDULED) .trigger(new PointInTimeTrigger(System.currentTimeMillis(), null, null)) .build(); - lenient().when(repository.findByStatusBetweenDatesOrderByPriority(any(ZonedDateTime.class), + lenient().when(repository.findByStatusBetweenDates(any(ZonedDateTime.class), any(ZonedDateTime.class), - any(JobStatus.class), - any(JobStatus.class))) + any(JobStatus[].class), + any(ReactiveJobRepository.SortTerm[].class))) .thenReturn(ReactiveStreams.of(scheduledJob)); lenient().when(scheduler.scheduled(JOB_ID)) .thenReturn(Optional.empty()); - lenient().when(scheduler.schedule(scheduledJob)) + lenient().when(scheduler.internalSchedule(eq(scheduledJob), anyBoolean())) .thenReturn(ReactiveStreams.of(scheduledJob).buildRs()); ArgumentCaptor action = ArgumentCaptor.forClass(Runnable.class); lenient().doAnswer(a -> { @@ -110,13 +113,13 @@ void setUp() { } @Test - void testLoadJobDetailss() { + void testLoadJobDetails() { tested.loadJobDetails(); - verify(scheduler).schedule(scheduledJob); + verify(scheduler).internalSchedule(scheduledJob, true); } @Test - void testLoadAlreadyJobDetailss() { + void testLoadAlreadyJobDetails() { when(scheduler.scheduled(JOB_ID)).thenReturn(Optional.of(DateUtil.now())); tested.loadJobDetails(); diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java index f272a790ab..28b21c6189 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/TimerDelegateJobSchedulerTest.java @@ -18,7 +18,6 @@ */ package org.kie.kogito.jobs.service.scheduler.impl; -import java.util.Optional; import java.util.UUID; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; @@ -76,7 +75,7 @@ public BaseTimerJobScheduler tested() { @Test void testDoSchedule() { - PublisherBuilder schedule = tested.doSchedule(scheduledJob, Optional.empty()); + PublisherBuilder schedule = tested.doSchedule(scheduledJob, scheduledJob.getTrigger()); Multi.createFrom().publisher(publisher(schedule.buildRs())).subscribe().with(dummyCallback(), dummyCallback()); verify(timer).scheduleJob(any(DelegateJob.class), any(JobDetailsContext.class), eq(scheduledJob.getTrigger())); } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java index 3978fccb1c..b6c1421977 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/scheduler/impl/VertxTimerServiceSchedulerTest.java @@ -21,15 +21,19 @@ import java.time.ZonedDateTime; import java.util.concurrent.TimeUnit; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.kie.kogito.jobs.service.executor.JobExecutor; import org.kie.kogito.jobs.service.executor.JobExecutorResolver; import org.kie.kogito.jobs.service.job.DelegateJob; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobDetailsContext; +import org.kie.kogito.jobs.service.model.JobExecutionResponse; import org.kie.kogito.jobs.service.model.ManageableJobHandle; -import org.kie.kogito.jobs.service.stream.JobEventPublisher; +import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler; import org.kie.kogito.jobs.service.utils.DateUtil; import org.kie.kogito.timer.Job; import org.kie.kogito.timer.JobContext; @@ -41,11 +45,13 @@ import org.mockito.Spy; import org.mockito.junit.jupiter.MockitoExtension; +import io.smallrye.mutiny.Uni; import io.vertx.mutiny.core.Vertx; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.given; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -62,7 +68,10 @@ class VertxTimerServiceSchedulerTest { private JobExecutorResolver jobExecutorResolver; @Mock - private JobEventPublisher jobEventPublisher; + private JobExecutor jobExecutor; + + @Mock + private ReactiveJobScheduler reactiveJobScheduler; @Captor private ArgumentCaptor jobCaptor; @@ -82,6 +91,12 @@ public void setUp() { void testScheduleJob() { ZonedDateTime time = DateUtil.now().plusSeconds(1); final ManageableJobHandle handle = schedule(time); + doReturn(jobExecutor).when(jobExecutorResolver).get(any()); + JobExecutionResponse response = new JobExecutionResponse(); + Uni result = Uni.createFrom().item(response); + PublisherBuilder executionSuccessPublisherBuilder = ReactiveStreams.of(jobDetails); + doReturn(executionSuccessPublisherBuilder).when(reactiveJobScheduler).handleJobExecutionSuccess(response); + doReturn(result).when(jobExecutor).execute(jobDetails); verify(vertx).setTimer(timeCaptor.capture(), any()); assertThat(timeCaptor.getValue()).isGreaterThanOrEqualTo(time.toInstant().minusMillis(System.currentTimeMillis()).toEpochMilli()); given().await() @@ -110,7 +125,7 @@ private ManageableJobHandle schedule(ZonedDateTime time) { trigger = new PointInTimeTrigger(timestamp, null, null); jobDetails = JobDetails.builder().build(); context = new JobDetailsContext(jobDetails); - job = new DelegateJob(jobExecutorResolver, jobEventPublisher); + job = new DelegateJob(jobExecutorResolver, reactiveJobScheduler); return tested.scheduleJob(job, context, trigger); } diff --git a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java index 4295530a29..f7ac6ed1ce 100644 --- a/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java +++ b/jobs-service/jobs-service-common/src/test/java/org/kie/kogito/jobs/service/stream/AbstractJobStreamsTest.java @@ -44,6 +44,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -94,7 +96,7 @@ void jobStatusChangeWithAck() throws Exception { doReturn(SERIALIZED_MESSAGE).when(objectMapper).writeValueAsString(any()); Message message = executeStatusChange(job); message.ack(); - verify(jobStreams).onAck(job); + verify(jobStreams).onAck(anyString(), eq(job)); } @Test @@ -141,14 +143,14 @@ void jobStatusChangeWithUnexpectedErrorAndContinue() throws Exception { assertThat(message.getPayload()).isEqualTo(SERIALIZED_MESSAGE); assertExpectedMetadata(message); message.ack(); - verify(jobStreams).onAck(job); + verify(jobStreams).onAck(anyString(), eq(job)); } private void executeStatusChangeWithUnexpectedError(JobDetails job) throws Exception { doThrow(new RuntimeException("Unexpected error")).when(objectMapper).writeValueAsString(any()); jobStreams.jobStatusChange(job); - verify(jobStreams, never()).onAck(any()); + verify(jobStreams, never()).onAck(any(), any()); verify(jobStreams, never()).onNack(any(), any()); } diff --git a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java index 8b9a25fa71..a2ee936edc 100644 --- a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java +++ b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/InfinispanJobRepository.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; @@ -44,6 +45,7 @@ import jakarta.inject.Inject; import static org.kie.kogito.jobs.service.repository.infinispan.InfinispanConfiguration.Caches.JOB_DETAILS; +import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithCreatedAndLastUpdate; @ApplicationScoped public class InfinispanJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository { @@ -71,8 +73,12 @@ void init(@Observes InfinispanInitialized event) { @Override public CompletionStage doSave(JobDetails job) { - return runAsync(() -> cache.put(job.getId(), job)) - .thenApply(j -> job); + return runAsync(() -> { + boolean isNew = !cache.containsKey(job.getId()); + JobDetails timeStampedJob = jobWithCreatedAndLastUpdate(isNew, job); + cache.put(timeStampedJob.getId(), timeStampedJob); + return timeStampedJob; + }); } @Override @@ -93,37 +99,56 @@ public CompletionStage delete(String id) { } @Override - public PublisherBuilder findAll() { - Query query = queryFactory. create("from job.service.JobDetails"); + public PublisherBuilder findByStatusBetweenDates(ZonedDateTime fromFireTime, + ZonedDateTime toFireTime, + JobStatus[] status, + SortTerm[] orderBy) { + + String statusFilter = (status != null && status.length > 0) ? createStatusFilter(status) : null; + String fireTimeFilter = createFireTimeFilter("from", "to"); + String orderByCriteria = (orderBy != null && orderBy.length > 0) ? createOrderBy("j", orderBy) : ""; + + StringBuilder queryFilter = new StringBuilder(); + if (statusFilter != null) { + queryFilter.append(statusFilter); + queryFilter.append(" and "); + } + queryFilter.append(fireTimeFilter); + + String findQuery = "from job.service.JobDetails j" + + " where " + queryFilter + + " " + orderByCriteria; + + Query query = queryFactory.create(findQuery); + query.setParameter("from", fromFireTime.toInstant().toEpochMilli()); + query.setParameter("to", toFireTime.toInstant().toEpochMilli()); return ReactiveStreams.fromIterable(query.execute().list()); } - @Override - public PublisherBuilder findByStatus(JobStatus... status) { - Query query = queryFactory.create("from job.service.JobDetails j " + - "where " + - "j.status in (" + createStatusQuery(status) + ")"); - return ReactiveStreams.fromIterable(query.execute().list()); + private static String createFireTimeFilter(String fromParam, String toParam) { + return String.format("j.nextFireTime >= :%s and j.nextFireTime <= :%s ", fromParam, toParam); } - @Override - public PublisherBuilder findByStatusBetweenDatesOrderByPriority(ZonedDateTime from, ZonedDateTime to, - JobStatus... status) { - Query query = queryFactory.create("from job.service.JobDetails j " + - "where " + - "j.trigger.nextFireTime > :from " + - "and j.trigger.nextFireTime < :to " + - "and j.status in (" + createStatusQuery(status) + ") " + - "order by j.priority desc"); - query.setParameter("to", to.toInstant().toEpochMilli()); - query.setParameter("from", from.toInstant().toEpochMilli()); - return ReactiveStreams.fromIterable(query.execute().list()); + private static String createStatusFilter(JobStatus... status) { + return Arrays.stream(status).map(JobStatus::name) + .collect(Collectors.joining("', '", "j.status IN ('", "')")); + } + + private static String createOrderBy(String objName, SortTerm[] sortTerms) { + return Stream.of(sortTerms).map(term -> createOrderByTerm(objName, term)) + .collect(Collectors.joining(", ", "order by ", "")); + } + + private static String createOrderByTerm(String objName, SortTerm sortTerm) { + return objName + "." + toColumName(sortTerm.getField()) + (sortTerm.isAsc() ? " asc" : " desc"); } - //building the query sentence for the status IN (not supported to use array in setParameter on the query) - private String createStatusQuery(JobStatus[] status) { - return Arrays.stream(status) - .map(JobStatus::name) - .collect(Collectors.joining("\', \'", "\'", "\'")); + private static String toColumName(SortTermField field) { + return switch (field) { + case FIRE_TIME -> "nextFireTime"; + case CREATED -> "created"; + case ID -> "id"; + default -> throw new IllegalArgumentException("No colum name is defined for field: " + field); + }; } } diff --git a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/JobDetailsMarshaller.java b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/JobDetailsMarshaller.java index 64d8e768a4..10ca6045c8 100644 --- a/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/JobDetailsMarshaller.java +++ b/jobs-service/jobs-service-infinispan/src/main/java/org/kie/kogito/jobs/service/repository/infinispan/marshaller/JobDetailsMarshaller.java @@ -66,6 +66,8 @@ public void writeTo(ProtoStreamWriter writer, JobDetails job) throws IOException writer.writeObject("trigger", job.getTrigger(), getInterface(job.getTrigger())); writer.writeLong("executionTimeout", job.getExecutionTimeout()); writer.writeString("executionTimeoutUnit", job.getExecutionTimeoutUnit() != null ? job.getExecutionTimeoutUnit().name() : null); + writer.writeInstant("nextFireTime", toInstant(job.getTrigger().hasNextFireTime())); + writer.writeInstant("created", zonedDateTimeToInstant(job.getCreated())); } public Class getInterface(Object object) { @@ -90,6 +92,7 @@ public JobDetails readFrom(ProtoStreamReader reader) throws IOException { Trigger trigger = reader.readObject("trigger", Trigger.class); Long executionTimeout = reader.readLong("executionTimeout"); String executionTimeoutUnit = reader.readString("executionTimeoutUnit"); + ZonedDateTime created = instantToZonedDateTime(reader.readInstant("created")); return JobDetails.builder() .id(id) @@ -104,6 +107,7 @@ public JobDetails readFrom(ProtoStreamReader reader) throws IOException { .trigger(trigger) .executionTimeout(executionTimeout) .executionTimeoutUnit(executionTimeoutUnit != null ? ChronoUnit.valueOf(executionTimeoutUnit) : null) + .created(created) .build(); } } diff --git a/jobs-service/jobs-service-infinispan/src/main/resources/META-INF/library.proto b/jobs-service/jobs-service-infinispan/src/main/resources/META-INF/library.proto index fa0c8ae86c..7f08eddad5 100644 --- a/jobs-service/jobs-service-infinispan/src/main/resources/META-INF/library.proto +++ b/jobs-service/jobs-service-infinispan/src/main/resources/META-INF/library.proto @@ -2,6 +2,7 @@ package job.service; /* @Indexed */ message JobDetails { + /* @Field(store = Store.YES) @SortableField */ optional string id = 1; optional string correlationId = 2; /* @Field(store = Store.YES) */ @@ -18,6 +19,10 @@ message JobDetails { optional string type = 11;//enum optional int64 executionTimeout = 12; optional string executionTimeoutUnit = 13; + /* @Field(store = Store.YES) @SortableField */ + optional int64 nextFireTime = 14; + /* @Field(store = Store.YES) @SortableField */ + optional int64 created = 15; } /* @Indexed */ diff --git a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetails.java b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetails.java index afa397a07d..bc8016a492 100644 --- a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetails.java +++ b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetails.java @@ -42,11 +42,12 @@ public class JobDetails { private Trigger trigger;//when/how it should be executed private Long executionTimeout; private ChronoUnit executionTimeoutUnit; + private ZonedDateTime created; @SuppressWarnings("java:S107") protected JobDetails(String id, String correlationId, JobStatus status, ZonedDateTime lastUpdate, Integer retries, Integer executionCounter, String scheduledId, Recipient recipient, Trigger trigger, Integer priority, - Long executionTimeout, ChronoUnit executionTimeoutUnit) { + Long executionTimeout, ChronoUnit executionTimeoutUnit, ZonedDateTime created) { this.id = id; this.correlationId = correlationId; this.status = status; @@ -59,6 +60,7 @@ protected JobDetails(String id, String correlationId, JobStatus status, ZonedDat this.priority = priority; this.executionTimeout = executionTimeout; this.executionTimeoutUnit = executionTimeoutUnit; + this.created = created; } public String getId() { @@ -109,6 +111,10 @@ public ChronoUnit getExecutionTimeoutUnit() { return executionTimeoutUnit; } + public ZonedDateTime getCreated() { + return created; + } + public static JobDetailsBuilder builder() { return new JobDetailsBuilder(); } @@ -132,13 +138,14 @@ public boolean equals(Object o) { Objects.equals(getRecipient(), that.getRecipient()) && Objects.equals(getTrigger().hasNextFireTime(), that.getTrigger().hasNextFireTime()) && Objects.equals(getExecutionTimeout(), that.getExecutionTimeout()) && - Objects.equals(getExecutionTimeoutUnit(), that.getExecutionTimeoutUnit()); + Objects.equals(getExecutionTimeoutUnit(), that.getExecutionTimeoutUnit()) && + Objects.equals(getCreated(), that.getCreated()); } @Override public int hashCode() { return Objects.hash(getId(), getCorrelationId(), getStatus(), getRetries(), getExecutionCounter(), - getScheduledId(), getRecipient(), getTrigger(), getExecutionTimeout(), getExecutionTimeoutUnit()); + getScheduledId(), getRecipient(), getTrigger(), getExecutionTimeout(), getExecutionTimeoutUnit(), getCreated()); } @Override @@ -155,6 +162,7 @@ public String toString() { .add("trigger=" + trigger) .add("executionTimeout=" + executionTimeout) .add("executionTimeoutUnit=" + executionTimeoutUnit) + .add("created=" + created) .toString(); } } diff --git a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetailsBuilder.java b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetailsBuilder.java index 832ed1ee12..12061d4417 100644 --- a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetailsBuilder.java +++ b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/model/JobDetailsBuilder.java @@ -38,6 +38,7 @@ public class JobDetailsBuilder { private Integer priority; private Long executionTimeout; private ChronoUnit executionTimeoutUnit; + private ZonedDateTime created; public JobDetailsBuilder id(String id) { this.id = id; @@ -99,9 +100,14 @@ public JobDetailsBuilder executionTimeoutUnit(ChronoUnit executionTimeoutUnit) { return this; } + public JobDetailsBuilder created(ZonedDateTime created) { + this.created = created; + return this; + } + public JobDetails build() { return new JobDetails(id, correlationId, status, lastUpdate, retries, executionCounter, scheduledId, - recipient, trigger, priority, executionTimeout, executionTimeoutUnit); + recipient, trigger, priority, executionTimeout, executionTimeoutUnit, created); } public JobDetailsBuilder of(JobDetails jobDetails) { @@ -116,7 +122,8 @@ public JobDetailsBuilder of(JobDetails jobDetails) { .trigger(jobDetails.getTrigger()) .priority(jobDetails.getPriority()) .executionTimeout(jobDetails.getExecutionTimeout()) - .executionTimeoutUnit(jobDetails.getExecutionTimeoutUnit()); + .executionTimeoutUnit(jobDetails.getExecutionTimeoutUnit()) + .created(jobDetails.getCreated()); } public JobDetailsBuilder incrementRetries() { diff --git a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/utils/ModelUtil.java b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/utils/ModelUtil.java index 23353e68d1..ab5322c135 100644 --- a/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/utils/ModelUtil.java +++ b/jobs-service/jobs-service-internal-api/src/main/java/org/kie/kogito/jobs/service/utils/ModelUtil.java @@ -18,11 +18,15 @@ */ package org.kie.kogito.jobs.service.utils; +import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; import java.util.Objects; import org.kie.kogito.jobs.service.adapter.JobDetailsAdapter; import org.kie.kogito.jobs.service.api.Job; +import org.kie.kogito.jobs.service.model.JobDetails; +import org.kie.kogito.jobs.service.model.JobStatus; +import org.kie.kogito.jobs.service.model.ManageableJobHandle; public class ModelUtil { @@ -37,4 +41,25 @@ public static Long getExecutionTimeoutInMillis(Job job) { ChronoUnit chronoUnit = job.getExecutionTimeoutUnit() != null ? JobDetailsAdapter.TemporalUnitAdapter.toChronoUnit(job.getExecutionTimeoutUnit()) : ChronoUnit.MILLIS; return chronoUnit.getDuration().multipliedBy(job.getExecutionTimeout()).toMillis(); } + + public static JobDetails jobWithStatus(JobDetails job, JobStatus status) { + return JobDetails.builder().of(job).status(status).build(); + } + + public static JobDetails jobWithStatusAndHandle(JobDetails job, JobStatus status, ManageableJobHandle handle) { + return JobDetails.builder().of(job).status(status).scheduledId(String.valueOf(handle.getId())).build(); + } + + public static JobDetails jobWithCreatedAndLastUpdate(boolean isNew, JobDetails job) { + ZonedDateTime now = DateUtil.now(); + return isNew ? jobWithCreated(job, now, now) : jobWithLastUpdate(job, now); + } + + public static JobDetails jobWithCreated(JobDetails job, ZonedDateTime created, ZonedDateTime lastUpdate) { + return JobDetails.builder().of(job).created(created).lastUpdate(lastUpdate).build(); + } + + public static JobDetails jobWithLastUpdate(JobDetails job, ZonedDateTime lastUpdate) { + return JobDetails.builder().of(job).lastUpdate(lastUpdate).build(); + } } diff --git a/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java index 61c959509c..52bb50d35f 100644 --- a/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java +++ b/jobs-service/jobs-service-messaging-http/src/main/java/org/kie/kogito/jobs/service/messaging/http/stream/HttpJobStreams.java @@ -22,15 +22,14 @@ import java.util.function.Supplier; import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; -import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; import org.eclipse.microprofile.reactive.messaging.OnOverflow; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.stream.AbstractJobStreams; -import org.kie.kogito.jobs.service.stream.AvailableStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -47,6 +46,8 @@ public class HttpJobStreams extends AbstractJobStreams { public static final String PUBLISH_EVENTS_CONFIG_KEY = "kogito.jobs-service.http.job-status-change-events"; public static final String JOB_STATUS_CHANGE_EVENTS_HTTP = "kogito-job-service-job-status-events-http"; + private static final Logger LOGGER = LoggerFactory.getLogger(HttpJobStreams.class); + /** * Metadata to include the content-type for structured CloudEvents messages */ @@ -57,15 +58,14 @@ public class HttpJobStreams extends AbstractJobStreams { @Inject public HttpJobStreams(ObjectMapper objectMapper, @ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional config, - @Channel(JOB_STATUS_CHANGE_EVENTS_HTTP) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter emitter, + @Channel(JOB_STATUS_CHANGE_EVENTS_HTTP) @OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER) Emitter emitter, @ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url) { super(objectMapper, config.orElse(false), emitter, url); } - @Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS) - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) @Override public void jobStatusChange(JobDetails job) { + LOGGER.debug("jobStatusChange call received, enabled: {}, job: {}", enabled, job); super.jobStatusChange(job); } diff --git a/jobs-service/jobs-service-messaging-kafka/pom.xml b/jobs-service/jobs-service-messaging-kafka/pom.xml index 6a81e82973..beb82658ed 100644 --- a/jobs-service/jobs-service-messaging-kafka/pom.xml +++ b/jobs-service/jobs-service-messaging-kafka/pom.xml @@ -65,6 +65,10 @@ io.quarkus quarkus-smallrye-reactive-messaging-kafka + + io.smallrye.reactive + smallrye-reactive-messaging-in-memory + diff --git a/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java index c3ee2b5d9e..c9b149ede2 100644 --- a/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java +++ b/jobs-service/jobs-service-messaging-kafka/src/main/java/org/kie/kogito/jobs/service/messaging/kafka/stream/KafkaJobStreams.java @@ -21,14 +21,14 @@ import java.util.Optional; import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.eclipse.microprofile.reactive.messaging.Acknowledgment; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; -import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.OnOverflow; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.stream.AbstractJobStreams; import org.kie.kogito.jobs.service.stream.AvailableStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; @@ -39,19 +39,19 @@ public class KafkaJobStreams extends AbstractJobStreams { public static final String PUBLISH_EVENTS_CONFIG_KEY = "kogito.jobs-service.kafka.job-status-change-events"; + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaJobStreams.class); @Inject public KafkaJobStreams(ObjectMapper objectMapper, @ConfigProperty(name = PUBLISH_EVENTS_CONFIG_KEY) Optional config, - @Channel(AvailableStreams.JOB_STATUS_CHANGE_EVENTS_TOPIC) @OnOverflow(value = OnOverflow.Strategy.LATEST) Emitter emitter, + @Channel(AvailableStreams.JOB_STATUS_CHANGE_EVENTS_TOPIC) @OnOverflow(value = OnOverflow.Strategy.UNBOUNDED_BUFFER) Emitter emitter, @ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url) { super(objectMapper, config.orElse(false), emitter, url); } - @Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS) - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) @Override public void jobStatusChange(JobDetails job) { + LOGGER.debug("jobStatusChange call received, enabled: {}, job: {}", enabled, job); super.jobStatusChange(job); } } diff --git a/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java b/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java index 6bc29ec4b4..6ae337214b 100644 --- a/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java +++ b/jobs-service/jobs-service-mongodb/src/main/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepository.java @@ -19,9 +19,13 @@ package org.kie.kogito.jobs.service.repository.mongodb; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; import org.bson.Document; +import org.bson.conversions.Bson; import org.bson.json.JsonWriterSettings; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; @@ -38,6 +42,7 @@ import io.quarkus.mongodb.reactive.ReactiveMongoClient; import io.quarkus.mongodb.reactive.ReactiveMongoCollection; import io.quarkus.runtime.StartupEvent; +import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; @@ -48,12 +53,13 @@ import static com.mongodb.client.model.Filters.and; import static com.mongodb.client.model.Filters.eq; -import static com.mongodb.client.model.Filters.gt; +import static com.mongodb.client.model.Filters.gte; import static com.mongodb.client.model.Filters.in; -import static com.mongodb.client.model.Filters.lt; +import static com.mongodb.client.model.Filters.lte; import static com.mongodb.client.model.Indexes.ascending; import static com.mongodb.client.model.ReturnDocument.AFTER; import static com.mongodb.client.model.Sorts.descending; +import static com.mongodb.client.model.Sorts.orderBy; import static java.util.Arrays.stream; import static java.util.Optional.ofNullable; import static java.util.stream.Collectors.counting; @@ -61,6 +67,7 @@ import static mutiny.zero.flow.adapters.AdaptersToReactiveStreams.publisher; import static org.bson.Document.parse; import static org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams.fromPublisher; +import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithCreatedAndLastUpdate; @ApplicationScoped public class MongoDBJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository { @@ -75,6 +82,8 @@ public class MongoDBJobRepository extends BaseReactiveJobRepository implements R static final String FIRE_TIME_COLUMN = "trigger.nextFireTime"; + static final String CREATED_COLUMN = "created"; + private static final JsonWriterSettings jsonWriterSettings = JsonWriterSettings.builder() .int64Converter((value, writer) -> writer.writeNumber(value.toString())).build(); @@ -97,26 +106,34 @@ public MongoDBJobRepository(Vertx vertx, JobEventPublisher jobEventPublisher, Re void onStart(@Observes StartupEvent ev) { this.collection.createIndex(ascending(STATUS_COLUMN, FIRE_TIME_COLUMN)).await().indefinitely(); + this.collection.createIndex(ascending(CREATED_COLUMN, FIRE_TIME_COLUMN, ID)).await().indefinitely(); } @Override public CompletionStage doSave(JobDetails job) { + return collection.find(eq(ID, job.getId())) + .collect().with(counting()) + .map(count -> jobWithCreatedAndLastUpdate(count == 0, job)) + .chain(this::findAndUpdate) + .emitOn(Infrastructure.getDefaultExecutor()) + .convert() + .toCompletionStage(); + } + + private Uni findAndUpdate(JobDetails job) { return collection.findOneAndReplace( eq(ID, job.getId()), jsonToDocument(jobDetailsMarshaller.marshall(job)), new FindOneAndReplaceOptions().upsert(true).returnDocument(AFTER)) - .map(document -> documentToJson(document)) - .map(jobDetailsMarshaller::unmarshall) - .emitOn(Infrastructure.getDefaultExecutor()) - .convert() - .toCompletionStage(); + .map(MongoDBJobRepository::documentToJson) + .map(jobDetailsMarshaller::unmarshall); } @Override public CompletionStage get(String id) { return collection.find(eq(ID, id)) .collect().first() - .map(document -> documentToJson(document)) + .map(MongoDBJobRepository::documentToJson) .map(jobDetailsMarshaller::unmarshall) .emitOn(Infrastructure.getDefaultExecutor()) .convert() @@ -136,7 +153,7 @@ public CompletionStage exists(String id) { @Override public CompletionStage delete(String id) { return collection.findOneAndDelete(eq(ID, id)) - .map(document -> documentToJson(document)) + .map(MongoDBJobRepository::documentToJson) .map(jobDetailsMarshaller::unmarshall) .emitOn(Infrastructure.getDefaultExecutor()) .convert() @@ -144,25 +161,27 @@ public CompletionStage delete(String id) { } @Override - public PublisherBuilder findAll() { - return fromPublisher(publisher(collection.find() - .map(document -> documentToJson(document)) - .map(jobDetailsMarshaller::unmarshall) - .emitOn(Infrastructure.getDefaultExecutor()) - .convert() - .toPublisher())); - } + public PublisherBuilder findByStatusBetweenDates(ZonedDateTime fromFireTime, + ZonedDateTime toFireTime, + JobStatus[] status, + SortTerm[] orderBy) { + + FindOptions findOptions = new FindOptions(); + List filters = new ArrayList<>(); + if (status != null && status.length > 0) { + filters.add(createStatusFilter(status)); + } + filters.add(gte(FIRE_TIME_COLUMN, fromFireTime.toInstant().toEpochMilli())); + filters.add(lte(FIRE_TIME_COLUMN, toFireTime.toInstant().toEpochMilli())); + findOptions.filter(and(filters)); + + if (orderBy != null && orderBy.length > 0) { + findOptions.sort(createOrderBy(orderBy)); + } - @Override - public PublisherBuilder findByStatusBetweenDatesOrderByPriority(ZonedDateTime from, ZonedDateTime to, JobStatus... status) { return fromPublisher(publisher( - collection.find( - and( - in(STATUS_COLUMN, stream(status).map(Enum::name).collect(toList())), - gt(FIRE_TIME_COLUMN, from.toInstant().toEpochMilli()), - lt(FIRE_TIME_COLUMN, to.toInstant().toEpochMilli())), - new FindOptions().sort(descending("priority"))) - .map(document -> documentToJson(document)) + collection.find(findOptions) + .map(MongoDBJobRepository::documentToJson) .map(jobDetailsMarshaller::unmarshall) .emitOn(Infrastructure.getDefaultExecutor()) .convert() @@ -176,4 +195,26 @@ static JsonObject documentToJson(Document document) { static Document jsonToDocument(JsonObject jsonNode) { return ofNullable(jsonNode).map(json -> parse(json.toString())).orElse(null); } + + static Bson createStatusFilter(JobStatus... status) { + return in(STATUS_COLUMN, stream(status).map(Enum::name).collect(toList())); + } + + static Bson createOrderBy(SortTerm[] sortTerms) { + return orderBy(stream(sortTerms).map(MongoDBJobRepository::createOrderByTerm).collect(Collectors.toList())); + } + + static Bson createOrderByTerm(SortTerm sortTerm) { + String columnName = toColumName(sortTerm.getField()); + return sortTerm.isAsc() ? ascending(columnName) : descending(columnName); + } + + static String toColumName(SortTermField field) { + return switch (field) { + case FIRE_TIME -> FIRE_TIME_COLUMN; + case CREATED -> CREATED_COLUMN; + case ID -> ID; + default -> throw new IllegalArgumentException("No colum name is defined for field: " + field); + }; + } } diff --git a/jobs-service/jobs-service-mongodb/src/test/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepositoryExecutionTest.java b/jobs-service/jobs-service-mongodb/src/test/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepositoryExecutionTest.java index 216300e7ae..8e834b232d 100644 --- a/jobs-service/jobs-service-mongodb/src/test/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepositoryExecutionTest.java +++ b/jobs-service/jobs-service-mongodb/src/test/java/org/kie/kogito/jobs/service/repository/mongodb/MongoDBJobRepositoryExecutionTest.java @@ -22,9 +22,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.Flow; import org.bson.Document; import org.bson.conversions.Bson; @@ -32,17 +30,25 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipient; +import org.kie.kogito.jobs.service.api.recipient.http.HttpRecipientStringPayloadData; import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.JobDetailsBuilder; import org.kie.kogito.jobs.service.model.JobStatus; import org.kie.kogito.jobs.service.model.Recipient; import org.kie.kogito.jobs.service.model.RecipientInstance; +import org.kie.kogito.jobs.service.repository.ReactiveJobRepository; import org.kie.kogito.jobs.service.repository.marshaller.JobDetailsMarshaller; +import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller; +import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller; +import org.kie.kogito.jobs.service.repository.mongodb.marshaller.MongoDBJobDetailsMarshaller; +import org.kie.kogito.jobs.service.utils.DateUtil; +import org.kie.kogito.timer.Trigger; import org.kie.kogito.timer.impl.PointInTimeTrigger; import org.mockito.ArgumentCaptor; import com.mongodb.client.model.FindOneAndReplaceOptions; import com.mongodb.client.model.ReturnDocument; +import com.mongodb.reactivestreams.client.FindPublisher; import io.quarkus.mongodb.FindOptions; import io.quarkus.mongodb.reactive.ReactiveMongoClient; @@ -50,28 +56,28 @@ import io.quarkus.mongodb.reactive.ReactiveMongoDatabase; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; -import io.smallrye.mutiny.groups.MultiCollect; -import io.smallrye.mutiny.groups.MultiConvert; import io.smallrye.mutiny.groups.UniAwait; -import io.smallrye.mutiny.groups.UniConvert; import io.vertx.core.json.JsonObject; import static com.mongodb.client.model.Filters.and; import static com.mongodb.client.model.Filters.eq; -import static com.mongodb.client.model.Filters.gt; +import static com.mongodb.client.model.Filters.gte; import static com.mongodb.client.model.Filters.in; -import static com.mongodb.client.model.Filters.lt; +import static com.mongodb.client.model.Filters.lte; import static com.mongodb.client.model.Indexes.ascending; import static java.util.stream.Collectors.toList; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.kie.kogito.jobs.service.repository.mongodb.MongoDBJobRepository.CREATED_COLUMN; +import static org.kie.kogito.jobs.service.repository.mongodb.MongoDBJobRepository.FIRE_TIME_COLUMN; import static org.kie.kogito.jobs.service.repository.mongodb.MongoDBJobRepository.ID; -import static org.kie.kogito.jobs.service.utils.DateUtil.DEFAULT_ZONE; +import static org.kie.kogito.jobs.service.repository.mongodb.MongoDBJobRepository.STATUS_COLUMN; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -79,167 +85,186 @@ @SuppressWarnings({ "unchecked", "rawtypes" }) class MongoDBJobRepositoryExecutionTest { - MongoDBJobRepository mongoDBJobRepository; + private static final String JOB_ID = "JOB_ID"; - ReactiveMongoClient mongoClient; + private MongoDBJobRepository mongoDBJobRepository; - ReactiveMongoCollection collection; + private ReactiveMongoCollection collection; - JobDetailsMarshaller jobDetailsMarshaller; - - JsonObject marshalled; - - JobDetails unmarshalled; - - CompletableFuture completableFuture; + private JobDetailsMarshaller jobDetailsMarshaller; @BeforeEach void setUp() { - mongoClient = mock(ReactiveMongoClient.class); + ReactiveMongoClient mongoClient = mock(ReactiveMongoClient.class); collection = mock(ReactiveMongoCollection.class); ReactiveMongoDatabase mongoDatabase = mock(ReactiveMongoDatabase.class); when(mongoClient.getDatabase(anyString())).thenReturn(mongoDatabase); when(mongoDatabase.getCollection(anyString())).thenReturn(collection); + jobDetailsMarshaller = spy(new MongoDBJobDetailsMarshaller(new TriggerMarshaller(), new RecipientMarshaller())); + mongoDBJobRepository = new MongoDBJobRepository(null, null, mongoClient, "test", jobDetailsMarshaller); + } - Uni uni = mock(Uni.class); - Multi multi = mock(Multi.class); - when(collection.findOneAndReplace(any(Bson.class), any(), any(FindOneAndReplaceOptions.class))).thenReturn(uni); - when(collection.find(any(Bson.class))).thenReturn(multi); - when(collection.findOneAndDelete(any(Bson.class))).thenReturn(uni); - when(collection.find()).thenReturn(multi); - when(collection.find(any(Bson.class), any(FindOptions.class))).thenReturn(multi); - when(collection.createIndex(any())).thenReturn(uni); + @Test + void saveExisting() throws Exception { + doSave(createExistingJob(), true); + } - when(uni.map(any())).thenAnswer(invocationOnMock -> { - jobDetailsMarshaller.unmarshall(marshalled); - return uni; - }); - when(uni.await()).thenReturn(mock(UniAwait.class)); + @Test + void saveNotExisting() throws Exception { + JobDetails notExisting = new JobDetailsBuilder() + .id(JOB_ID) + .trigger(createTrigger()) + .recipient(createRecipient()) + .build(); + doSave(notExisting, false); + } - MultiCollect multiCollect = mock(MultiCollect.class); - when(multi.collect()).thenReturn(multiCollect); - when(multiCollect.first()).thenReturn(uni); - when(multiCollect.with(any())).thenReturn(uni); - when(multi.map(any())).thenAnswer(invocationOnMock -> { - jobDetailsMarshaller.unmarshall(marshalled); - return multi; - }); - MultiConvert convertMulti = mock(MultiConvert.class); - when(multi.emitOn(any())).thenReturn(multi); - when(multi.convert()).thenReturn(convertMulti); - Flow.Publisher publisher = mock(Flow.Publisher.class); - when(convertMulti.toPublisher()).thenReturn(publisher); - - completableFuture = mock(CompletableFuture.class); - UniConvert convert = mock(UniConvert.class); - when(uni.emitOn(any())).thenReturn(uni); - when(uni.convert()).thenReturn(convert); - when(convert.toCompletionStage()).thenReturn(completableFuture); - - ZonedDateTime time = ZonedDateTime.now(DEFAULT_ZONE); - PointInTimeTrigger trigger = new PointInTimeTrigger(time.toInstant().getEpochSecond(), null, null); - Recipient recipient = new RecipientInstance(HttpRecipient.builder().forStringPayload().url("test").build()); - unmarshalled = new JobDetailsBuilder().id("test").trigger(trigger).recipient(recipient).build(); - marshalled = new JsonObject().put("id", "test"); - - jobDetailsMarshaller = mock(JobDetailsMarshaller.class); - when(jobDetailsMarshaller.marshall(any(JobDetails.class))).thenReturn(marshalled); - when(jobDetailsMarshaller.unmarshall(any(JsonObject.class))).thenReturn(unmarshalled); + private void doSave(JobDetails job, boolean exists) throws Exception { + ZonedDateTime now = ZonedDateTime.now(); + Multi multi; + if (exists) { + Document document = Document.parse(new JobDetailsMarshaller(new TriggerMarshaller(), new RecipientMarshaller()).marshall(job).toString()); + multi = Multi.createFrom().item(document); + } else { + multi = Multi.createFrom().empty(); + } + when(collection.find(any(Bson.class))).thenReturn(multi); - mongoDBJobRepository = new MongoDBJobRepository(null, null, mongoClient, "test", jobDetailsMarshaller); - } + Document replaced = new Document().append("id", "replaced"); + Uni replacedDocument = Uni.createFrom().item(replaced); + when(collection.findOneAndReplace(any(Bson.class), any(), any(FindOneAndReplaceOptions.class))).thenReturn(replacedDocument); - @Test - void doSave() { - CompletionStage result = mongoDBJobRepository.doSave(unmarshalled); - assertEquals(completableFuture, result); + CompletionStage result = mongoDBJobRepository.doSave(job); + JobDetails saved = result.toCompletableFuture().get(); ArgumentCaptor filterCaptor = ArgumentCaptor.forClass(Bson.class); ArgumentCaptor documentCaptor = ArgumentCaptor.forClass(Document.class); ArgumentCaptor optionCaptor = ArgumentCaptor.forClass(FindOneAndReplaceOptions.class); - verify(collection, times(1)).findOneAndReplace(filterCaptor.capture(), documentCaptor.capture(), optionCaptor.capture()); - verify(jobDetailsMarshaller, times(1)).marshall(unmarshalled); - verify(jobDetailsMarshaller, atLeastOnce()).unmarshall(marshalled); + ArgumentCaptor marshallCaptor = ArgumentCaptor.forClass(JobDetails.class); + ArgumentCaptor unmarshallCaptor = ArgumentCaptor.forClass(JsonObject.class); - assertEquals(eq(ID, unmarshalled.getId()), filterCaptor.getValue()); - assertEquals(Document.parse(marshalled.toString()), documentCaptor.getValue()); - assertTrue(optionCaptor.getValue().isUpsert()); - assertEquals(ReturnDocument.AFTER, optionCaptor.getValue().getReturnDocument()); + verify(collection, times(1)).findOneAndReplace(filterCaptor.capture(), documentCaptor.capture(), optionCaptor.capture()); + verify(jobDetailsMarshaller, times(1)).marshall(marshallCaptor.capture()); + verify(jobDetailsMarshaller).unmarshall(unmarshallCaptor.capture()); + + assertThat(filterCaptor.getValue()).isEqualTo(eq(ID, job.getId())); + assertThat(optionCaptor.getValue().isUpsert()).isTrue(); + assertThat(optionCaptor.getValue().getReturnDocument()).isEqualTo(ReturnDocument.AFTER); + + JobDetails timestampedJob = marshallCaptor.getAllValues().get(0); + assertThat(timestampedJob.getId()).isEqualTo(job.getId()); + if (exists) { + assertThat(timestampedJob.getCreated()).isEqualTo(job.getCreated()); + assertThat(timestampedJob.getLastUpdate()).isAfter(job.getLastUpdate()); + } else { + assertThat(timestampedJob.getCreated()).isAfter(now); + assertThat(timestampedJob.getLastUpdate()).isAfter(now); + } + JsonObject replacedAsJson = new JsonObject(replaced.toJson()); + assertThat(unmarshallCaptor.getValue()).isEqualTo(replacedAsJson); + assertThat(saved.getId()).isEqualTo(replacedAsJson.getString("id")); } @Test - void get() { - CompletionStage result = mongoDBJobRepository.get(unmarshalled.getId()); - assertEquals(completableFuture, result); + void get() throws Exception { + JobDetails job = createExistingJob(); + Document document = Document.parse(new JobDetailsMarshaller(new TriggerMarshaller(), new RecipientMarshaller()).marshall(job).toString()); + Multi multi = Multi.createFrom().item(document); + when(collection.find(any(Bson.class))).thenReturn(multi); + + JobDetails result = mongoDBJobRepository.get(job.getId()).toCompletableFuture().get(); + assertThat(result).isEqualTo(job); ArgumentCaptor filterCaptor = ArgumentCaptor.forClass(Bson.class); + ArgumentCaptor unmarshallCaptor = ArgumentCaptor.forClass(JsonObject.class); + verify(collection, times(1)).find(filterCaptor.capture()); - verify(jobDetailsMarshaller, atLeastOnce()).unmarshall(marshalled); + assertEquals(eq(ID, job.getId()), filterCaptor.getValue()); - assertEquals(eq(ID, unmarshalled.getId()), filterCaptor.getValue()); + verify(jobDetailsMarshaller).unmarshall(unmarshallCaptor.capture()); + assertThat(unmarshallCaptor.getValue()).isEqualTo(new JsonObject(document.toJson())); } @Test - void exists() { - CompletionStage result = mongoDBJobRepository.exists(unmarshalled.getId()); - assertEquals(completableFuture, result); + void exists() throws Exception { + JobDetails job = createExistingJob(); + Document document = Document.parse(new JobDetailsMarshaller(new TriggerMarshaller(), new RecipientMarshaller()).marshall(job).toString()); + Multi multi = Multi.createFrom().item(document); + when(collection.find(any(Bson.class))).thenReturn(multi); + + Boolean result = mongoDBJobRepository.exists(job.getId()).toCompletableFuture().get(); + assertThat(result).isTrue(); ArgumentCaptor filterCaptor = ArgumentCaptor.forClass(Bson.class); verify(collection, times(1)).find(filterCaptor.capture()); - verify(jobDetailsMarshaller, times(1)).unmarshall(marshalled); - - assertEquals(eq(ID, unmarshalled.getId()), filterCaptor.getValue()); + assertEquals(eq(ID, job.getId()), filterCaptor.getValue()); } @Test - void delete() { - CompletionStage result = mongoDBJobRepository.delete(unmarshalled.getId()); - assertEquals(completableFuture, result); + void delete() throws Exception { + JobDetails job = createExistingJob(); + Document document = Document.parse(new JobDetailsMarshaller(new TriggerMarshaller(), new RecipientMarshaller()).marshall(job).toString()); + Uni uni = Uni.createFrom().item(document); + when(collection.findOneAndDelete(any(Bson.class))).thenReturn(uni); + + JobDetails result = mongoDBJobRepository.delete(job.getId()).toCompletableFuture().get(); + assertThat(result).isEqualTo(job); ArgumentCaptor filterCaptor = ArgumentCaptor.forClass(Bson.class); + ArgumentCaptor unmarshallCaptor = ArgumentCaptor.forClass(JsonObject.class); + verify(collection, times(1)).findOneAndDelete(filterCaptor.capture()); - verify(jobDetailsMarshaller, atLeastOnce()).unmarshall(marshalled); + assertEquals(eq(ID, job.getId()), filterCaptor.getValue()); - assertEquals(eq(ID, unmarshalled.getId()), filterCaptor.getValue()); + verify(jobDetailsMarshaller).unmarshall(unmarshallCaptor.capture()); + assertThat(unmarshallCaptor.getValue()).isEqualTo(new JsonObject(document.toJson())); } @Test - void findAll() { - PublisherBuilder result = mongoDBJobRepository.findAll(); - assertNotNull(result); - - verify(collection, times(1)).find(); - verify(jobDetailsMarshaller, atLeastOnce()).unmarshall(marshalled); - } + void findByStatusBetweenDates() { + JobDetails job = createExistingJob(); + Document document = Document.parse(new JobDetailsMarshaller(new TriggerMarshaller(), new RecipientMarshaller()).marshall(job).toString()); + Multi multi = Multi.createFrom().item(document); + doReturn(multi).when(collection).find(any(FindOptions.class)); - @Test - void findByStatusBetweenDatesOrderByPriority() { ZonedDateTime from = ZonedDateTime.now(); ZonedDateTime to = ZonedDateTime.now(); - PublisherBuilder result = mongoDBJobRepository.findByStatusBetweenDatesOrderByPriority(from, to, JobStatus.SCHEDULED, JobStatus.RETRY); + PublisherBuilder result = mongoDBJobRepository.findByStatusBetweenDates(from, to, + new JobStatus[] { JobStatus.SCHEDULED, JobStatus.RETRY }, + new ReactiveJobRepository.SortTerm[] { ReactiveJobRepository.SortTerm.byFireTime(true) }); assertNotNull(result); ArgumentCaptor filterCaptor = ArgumentCaptor.forClass(Bson.class); ArgumentCaptor optionCaptor = ArgumentCaptor.forClass(FindOptions.class); - verify(collection, times(1)).find(filterCaptor.capture(), optionCaptor.capture()); - verify(jobDetailsMarshaller, atLeastOnce()).unmarshall(marshalled); + verify(collection, times(1)).find(optionCaptor.capture()); + + FindPublisher findPublisher = mock(FindPublisher.class); + doReturn(findPublisher).when(findPublisher).filter(any()); + + optionCaptor.getValue().apply(findPublisher); + verify(findPublisher).filter(filterCaptor.capture()); assertEquals(and( in("status", Arrays.stream(new JobStatus[] { JobStatus.SCHEDULED, JobStatus.RETRY }).map(Enum::name).collect(toList())), - gt("trigger.nextFireTime", from.toInstant().toEpochMilli()), - lt("trigger.nextFireTime", to.toInstant().toEpochMilli())), + gte("trigger.nextFireTime", from.toInstant().toEpochMilli()), + lte("trigger.nextFireTime", to.toInstant().toEpochMilli())), filterCaptor.getValue()); } @Test void onStart() { + Uni uni = mock(Uni.class); + when(collection.createIndex(any())).thenReturn(uni); + when(uni.await()).thenReturn(mock(UniAwait.class)); + mongoDBJobRepository.onStart(null); ArgumentCaptor indexCaptor = ArgumentCaptor.forClass(Bson.class); - verify(collection, times(1)).createIndex(indexCaptor.capture()); + verify(collection, times(2)).createIndex(indexCaptor.capture()); - assertEquals(ascending("status", "trigger.nextFireTime"), indexCaptor.getValue()); + assertEquals(ascending(STATUS_COLUMN, FIRE_TIME_COLUMN), indexCaptor.getAllValues().get(0)); + assertEquals(ascending(CREATED_COLUMN, FIRE_TIME_COLUMN, ID), indexCaptor.getAllValues().get(1)); } @Test @@ -269,4 +294,28 @@ void jsonToDocument() { assertEquals(document, MongoDBJobRepository.jsonToDocument(object)); } + + private static JobDetails createExistingJob() { + ZonedDateTime createdTime = ZonedDateTime.parse("2024-01-30T12:00:00.000Z[UTC]"); + ZonedDateTime lastUpdateTime = ZonedDateTime.parse("2024-01-30T15:00:00.000Z[UTC]"); + return new JobDetailsBuilder() + .id(JOB_ID) + .trigger(createTrigger()) + .recipient(createRecipient()) + .created(createdTime) + .lastUpdate(lastUpdateTime) + .build(); + } + + private static Trigger createTrigger() { + return new PointInTimeTrigger(DateUtil.now().toInstant().toEpochMilli(), null, null); + } + + private static Recipient createRecipient() { + return new RecipientInstance(HttpRecipient.builder() + .forStringPayload() + .url("http://my-service") + .payload(HttpRecipientStringPayloadData.from("payload data")) + .build()); + } } diff --git a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java index fa95138faa..20db74cbc2 100644 --- a/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java +++ b/jobs-service/jobs-service-postgresql-common/src/main/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepository.java @@ -56,12 +56,10 @@ @ApplicationScoped public class PostgreSqlJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository { - public static final Integer MAX_ITEMS_QUERY = 10000; - private static final String JOB_DETAILS_TABLE = "job_details"; private static final String JOB_DETAILS_COLUMNS = "id, correlation_id, status, last_update, retries, " + - "execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit"; + "execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created"; private PgPool client; @@ -85,7 +83,7 @@ public PostgreSqlJobRepository(Vertx vertx, JobEventPublisher jobEventPublisher, @Override public CompletionStage doSave(JobDetails job) { return client.preparedQuery("INSERT INTO " + JOB_DETAILS_TABLE + " (" + JOB_DETAILS_COLUMNS + - ") VALUES ($1, $2, $3, now(), $4, $5, $6, $7, $8, $9, $10, $11, $12) " + + ") VALUES ($1, $2, $3, now(), $4, $5, $6, $7, $8, $9, $10, $11, $12, now()) " + "ON CONFLICT (id) DO " + "UPDATE SET correlation_id = $2, status = $3, last_update = now(), retries = $4, " + "execution_counter = $5, scheduled_id = $6, priority = $7, " + @@ -138,43 +136,60 @@ public CompletionStage delete(String id) { } @Override - public PublisherBuilder findByStatus(JobStatus... status) { - String statusQuery = createStatusQuery(status); - String query = " WHERE " + statusQuery; + public PublisherBuilder findByStatusBetweenDates(ZonedDateTime fromFireTime, + ZonedDateTime toFireTime, + JobStatus[] status, + SortTerm[] orderBy) { + + String statusFilter = (status != null && status.length > 0) ? createStatusFilter(status) : null; + String fireTimeFilter = createFireTimeFilter("$1", "$2"); + String orderByCriteria = (orderBy != null && orderBy.length > 0) ? createOrderBy(orderBy) : ""; + + StringBuilder queryFilter = new StringBuilder(); + if (statusFilter != null) { + queryFilter.append(statusFilter); + queryFilter.append(" AND "); + } + queryFilter.append(fireTimeFilter); + + String findQuery = "SELECT " + JOB_DETAILS_COLUMNS + + " FROM " + JOB_DETAILS_TABLE + + " WHERE " + queryFilter + + " " + orderByCriteria; + + Tuple params = Tuple.of(fromFireTime.toOffsetDateTime(), toFireTime.toOffsetDateTime()); return ReactiveStreams.fromPublisher(publisher( - client.preparedQuery("SELECT " + JOB_DETAILS_COLUMNS + " FROM " + JOB_DETAILS_TABLE + query + " ORDER BY priority DESC LIMIT $1").execute(Tuple.of(MAX_ITEMS_QUERY)) + client.preparedQuery(findQuery) + .execute(params) .onItem().transformToMulti(rowSet -> Multi.createFrom().iterable(rowSet)) .onItem().transform(this::from))); } - @Override - public PublisherBuilder findAll() { - return ReactiveStreams.fromPublisher(publisher( - client.preparedQuery("SELECT " + JOB_DETAILS_COLUMNS + " FROM " + JOB_DETAILS_TABLE + " LIMIT $1").execute(Tuple.of(MAX_ITEMS_QUERY)) - .onItem().transformToMulti(rowSet -> Multi.createFrom().iterable(rowSet)) - .onItem().transform(this::from))); + static String createStatusFilter(JobStatus... status) { + return Arrays.stream(status).map(JobStatus::name) + .collect(Collectors.joining("', '", "status IN ('", "')")); } - @Override - public PublisherBuilder findByStatusBetweenDatesOrderByPriority(ZonedDateTime from, ZonedDateTime to, JobStatus... status) { - String statusQuery = createStatusQuery(status); - String timeQuery = createTimeQuery("$2", "$3"); - String query = " WHERE " + statusQuery + " AND " + timeQuery; + static String createFireTimeFilter(String indexFrom, String indexTo) { + return String.format("fire_time BETWEEN %s AND %s", indexFrom, indexTo); + } - return ReactiveStreams.fromPublisher(publisher( - client.preparedQuery("SELECT " + JOB_DETAILS_COLUMNS + " FROM " + JOB_DETAILS_TABLE + query + " ORDER BY priority DESC LIMIT $1") - .execute(Tuple.of(MAX_ITEMS_QUERY, from.toOffsetDateTime(), to.toOffsetDateTime())) - .onItem().transformToMulti(rowSet -> Multi.createFrom().iterable(rowSet)) - .onItem().transform(this::from))); + static String createOrderBy(SortTerm[] sortTerms) { + return Stream.of(sortTerms).map(PostgreSqlJobRepository::createOrderByTerm) + .collect(Collectors.joining(", ", "ORDER BY ", "")); } - static String createStatusQuery(JobStatus... status) { - return Arrays.stream(status).map(JobStatus::name) - .collect(Collectors.joining("', '", "status IN ('", "')")); + static String createOrderByTerm(SortTerm sortTerm) { + return toColumName(sortTerm.getField()) + (sortTerm.isAsc() ? " ASC" : " DESC"); } - static String createTimeQuery(String indexFrom, String indexTo) { - return String.format("fire_time BETWEEN %s AND %s", indexFrom, indexTo); + static String toColumName(SortTermField field) { + return switch (field) { + case FIRE_TIME -> "fire_time"; + case CREATED -> "created"; + case ID -> "id"; + default -> throw new IllegalArgumentException("No colum name is defined for field: " + field); + }; } JobDetails from(Row row) { @@ -191,6 +206,7 @@ JobDetails from(Row row) { .trigger(triggerMarshaller.unmarshall(row.get(JsonObject.class, "trigger"))) .executionTimeout(row.getLong("execution_timeout")) .executionTimeoutUnit(Optional.ofNullable(row.getString("execution_timeout_unit")).map(ChronoUnit::valueOf).orElse(null)) + .created(Optional.ofNullable(row.getOffsetDateTime("created")).map(t -> t.atZoneSameInstant(DEFAULT_ZONE)).orElse(null)) .build(); } } diff --git a/jobs-service/jobs-service-postgresql-common/src/main/resources/db/jobs-service/V3.0.3__Add_Created_Col.sql b/jobs-service/jobs-service-postgresql-common/src/main/resources/db/jobs-service/V3.0.3__Add_Created_Col.sql new file mode 100644 index 0000000000..e76162d170 --- /dev/null +++ b/jobs-service/jobs-service-postgresql-common/src/main/resources/db/jobs-service/V3.0.3__Add_Created_Col.sql @@ -0,0 +1,9 @@ +ALTER TABLE job_details + ADD COLUMN created TIMESTAMPTZ; + +UPDATE job_details +SET created = last_update +WHERE created is null; + +CREATE INDEX job_details_created_idx + ON job_details (created); diff --git a/jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepositoryExecutionTest.java b/jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepositoryExecutionTest.java index 8d40032982..ff2bd2fa2e 100644 --- a/jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepositoryExecutionTest.java +++ b/jobs-service/jobs-service-postgresql-common/src/test/java/org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobRepositoryExecutionTest.java @@ -35,6 +35,7 @@ import org.kie.kogito.jobs.service.model.JobStatus; import org.kie.kogito.jobs.service.model.Recipient; import org.kie.kogito.jobs.service.model.RecipientInstance; +import org.kie.kogito.jobs.service.repository.ReactiveJobRepository; import org.kie.kogito.jobs.service.repository.marshaller.RecipientMarshaller; import org.kie.kogito.jobs.service.repository.marshaller.TriggerMarshaller; import org.kie.kogito.jobs.service.utils.DateUtil; @@ -152,11 +153,11 @@ void doSave() { verify(query, times(1)).execute(parameterCaptor.capture()); String query = "INSERT INTO " + JOB_DETAILS + " (id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, " + - "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit) VALUES ($1, $2, $3, now(), $4, $5, $6, $7, $8, $9, $10, $11, $12) " + + "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created) VALUES ($1, $2, $3, now(), $4, $5, $6, $7, $8, $9, $10, $11, $12, now()) " + "ON CONFLICT (id) DO UPDATE SET correlation_id = $2, status = $3, last_update = now(), retries = $4, " + "execution_counter = $5, scheduled_id = $6, priority = $7, " + "recipient = $8, trigger = $9, fire_time = $10, execution_timeout = $11, execution_timeout_unit = $12 RETURNING id, correlation_id, status, last_update, retries, " + - "execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit"; + "execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created"; Tuple parameter = Tuple.tuple(Stream.of( job.getId(), @@ -199,7 +200,7 @@ void get() { verify(query, times(1)).execute(parameterCaptor.capture()); String query = "SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, " + - "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit FROM " + JOB_DETAILS + " WHERE id = $1"; + "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created FROM " + JOB_DETAILS + " WHERE id = $1"; String parameter = "test"; assertEquals(query, queryCaptor.getValue()); @@ -235,7 +236,7 @@ void delete() { String query = "DELETE FROM " + JOB_DETAILS + " WHERE id = $1 " + "RETURNING id, correlation_id, status, last_update, retries, " + - "execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit"; + "execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created"; String parameter = "test"; assertEquals(query, queryCaptor.getValue()); @@ -243,64 +244,54 @@ void delete() { } @Test - void findAll() { - PublisherBuilder result = repository.findAll(); - assertNotNull(result); - - ArgumentCaptor queryCaptor = ArgumentCaptor.forClass(String.class); - verify(client, times(1)).preparedQuery(queryCaptor.capture()); - - String query = "SELECT id, correlation_id, status, last_update, retries, " + - "execution_counter, scheduled_id, priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit FROM " + JOB_DETAILS + " LIMIT $1"; - - assertEquals(query, queryCaptor.getValue()); - } - - @Test - void findByStatusBetweenDatesOrderByPriority() { + void findByStatusBetweenDates() { ZonedDateTime from = ZonedDateTime.now(); ZonedDateTime to = ZonedDateTime.now(); - PublisherBuilder result = repository.findByStatusBetweenDatesOrderByPriority(from, to, JobStatus.SCHEDULED, JobStatus.RETRY); + PublisherBuilder result = repository.findByStatusBetweenDates(from, to, + new JobStatus[] { JobStatus.SCHEDULED, JobStatus.RETRY }, + new ReactiveJobRepository.SortTerm[] { ReactiveJobRepository.SortTerm.byFireTime(true) }); assertNotNull(result); ArgumentCaptor queryCaptor = ArgumentCaptor.forClass(String.class); verify(client, times(1)).preparedQuery(queryCaptor.capture()); String query = "SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, " + - "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit FROM " + JOB_DETAILS + " " + - "WHERE status IN ('SCHEDULED', 'RETRY') AND fire_time BETWEEN $2 AND $3 ORDER BY priority DESC LIMIT $1"; + "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created FROM " + JOB_DETAILS + " " + + "WHERE status IN ('SCHEDULED', 'RETRY') AND fire_time BETWEEN $1 AND $2 ORDER BY fire_time ASC"; assertEquals(query, queryCaptor.getValue()); } @Test - void findByStatusBetweenDatesOrderByPriorityNoCondition() { + void findByStatusBetweenDatesNoStatusCondition() { ZonedDateTime from = ZonedDateTime.now(); ZonedDateTime to = ZonedDateTime.now(); - PublisherBuilder result = repository.findByStatusBetweenDatesOrderByPriority(from, to, JobStatus.SCHEDULED); + PublisherBuilder result = repository.findByStatusBetweenDates(from, to, + new JobStatus[] {}, + new ReactiveJobRepository.SortTerm[] { ReactiveJobRepository.SortTerm.byFireTime(false) }); assertNotNull(result); ArgumentCaptor queryCaptor = ArgumentCaptor.forClass(String.class); verify(client, times(1)).preparedQuery(queryCaptor.capture()); String query = "SELECT id, correlation_id, status, last_update, retries, execution_counter, scheduled_id, " + - "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit FROM " + JOB_DETAILS + " " + - "WHERE status IN ('SCHEDULED') AND fire_time BETWEEN $2 AND $3 ORDER BY priority DESC LIMIT $1"; + "priority, recipient, trigger, fire_time, execution_timeout, execution_timeout_unit, created FROM " + JOB_DETAILS + " " + + "WHERE fire_time BETWEEN $1 AND $2 ORDER BY fire_time DESC"; assertEquals(query, queryCaptor.getValue()); } @Test void createStatusQuery() { - String statusQuery = PostgreSqlJobRepository.createStatusQuery(JobStatus.SCHEDULED, JobStatus.RETRY); + String statusQuery = PostgreSqlJobRepository.createStatusFilter(JobStatus.SCHEDULED, JobStatus.RETRY); assertEquals("status IN ('SCHEDULED', 'RETRY')", statusQuery); } @Test void createTimeQuery() { - String timeQuery = PostgreSqlJobRepository.createTimeQuery("$1", "$2"); + String timeQuery = PostgreSqlJobRepository.createFireTimeFilter("$1", "$2"); assertEquals("fire_time BETWEEN $1 AND $2", timeQuery); } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java index 0f077459a5..94828cf4c5 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig.java @@ -64,4 +64,23 @@ public class KogitoAddonsQuarkusJobsServiceEmbeddedRuntimeConfig { @ConfigItem(name = "forceExecuteExpiredJobs", defaultValue = "true") public boolean forceExecuteExpiredJobs; + /** + * Flag to allow that jobs that where timed-out when the jobs service was down, must be fired immediately at the + * jobs service next startup. + */ + @ConfigItem(name = "forceExecuteExpiredJobsOnServiceStart", defaultValue = "true") + boolean forceExecuteExpiredJobsOnServiceStart; + + /** + * Number of retries configured for the periodic jobs loading procedure. Every time the procedure is started this + * value is considered. + */ + @ConfigItem(name = "loadJobRetries", defaultValue = "3") + int loadJobRetries; + + /** + * Error strategy to apply when the periodic jobs loading procedure has exceeded the jobLoadReties. + */ + @ConfigItem(name = "loadJobErrorStrategy", defaultValue = "NONE") + String loadJobErrorStrategy; } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java index 7121c9b78b..7032eebd39 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java @@ -19,11 +19,11 @@ package org.kie.kogito.addons.quarkus.jobs.service.embedded.stream; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.eclipse.microprofile.reactive.messaging.Acknowledgment; -import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.context.ManagedExecutor; import org.kie.kogito.event.EventPublisher; import org.kie.kogito.event.job.JobInstanceDataEvent; import org.kie.kogito.jobs.JobsServiceException; @@ -31,15 +31,14 @@ import org.kie.kogito.jobs.service.model.JobDetails; import org.kie.kogito.jobs.service.model.ScheduledJob; import org.kie.kogito.jobs.service.resource.RestApiConstants; -import org.kie.kogito.jobs.service.stream.AvailableStreams; +import org.kie.kogito.jobs.service.stream.JobEventPublisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; -import io.smallrye.reactive.messaging.annotations.Blocking; - import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Alternative; import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; @@ -50,7 +49,8 @@ * EventPublisher API. Events propagation is enabled only when the embedded data index is present in current application. */ @ApplicationScoped -public class EventPublisherJobStreams { +@Alternative +public class EventPublisherJobStreams implements JobEventPublisher { public static final String DATA_INDEX_EVENT_PUBLISHER = "org.kie.kogito.index.addon.DataIndexEventPublisher"; @@ -62,41 +62,52 @@ public class EventPublisherJobStreams { private final ObjectMapper objectMapper; + private final ManagedExecutor managedExecutor; + @Inject public EventPublisherJobStreams(@ConfigProperty(name = "kogito.service.url", defaultValue = "http://localhost:8080") String url, Instance eventPublishers, - ObjectMapper objectMapper) { + ObjectMapper objectMapper, + ManagedExecutor managedExecutor) { this.url = url; eventPublisher = eventPublishers.stream().collect(Collectors.toList()); this.objectMapper = objectMapper; + this.managedExecutor = managedExecutor; } - @Incoming(AvailableStreams.JOB_STATUS_CHANGE_EVENTS) - @Acknowledgment(Acknowledgment.Strategy.PRE_PROCESSING) - @Blocking - public void onJobStatusChange(JobDetails jobDetails) { - if (eventPublisher != null) { - ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails); - byte[] jsonContent; - try { - jsonContent = objectMapper.writeValueAsBytes(scheduledJob); - } catch (Exception e) { - throw new JobsServiceException("It was not possible to serialize scheduledJob to json: " + scheduledJob, e); - } - JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE, - url + RestApiConstants.JOBS_PATH, - jsonContent, - scheduledJob.getProcessInstanceId(), - scheduledJob.getRootProcessInstanceId(), - scheduledJob.getProcessId(), - scheduledJob.getRootProcessId(), - null); - try { - eventPublisher.forEach(e -> e.publish(event)); - } catch (Exception e) { - LOGGER.error("Job status change propagation has failed at eventPublisher: " + eventPublisher.getClass() + " execution.", e); + @Override + public JobDetails publishJobStatusChange(JobDetails jobDetails) { + try { + managedExecutor.runAsync(() -> { + if (eventPublisher != null) { + ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails); + byte[] jsonContent; + try { + jsonContent = objectMapper.writeValueAsBytes(scheduledJob); + } catch (Exception e) { + throw new JobsServiceException("It was not possible to serialize scheduledJob to json: " + scheduledJob, e); + } + JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE, + url + RestApiConstants.JOBS_PATH, + jsonContent, + scheduledJob.getProcessInstanceId(), + scheduledJob.getRootProcessInstanceId(), + scheduledJob.getProcessId(), + scheduledJob.getRootProcessId(), + null); + try { + eventPublisher.forEach(e -> e.publish(event)); + } catch (Exception e) { + LOGGER.error("Job status change propagation has failed at eventPublisher: " + eventPublisher.getClass() + " execution.", e); + } + } + }).get(); + } catch (InterruptedException | ExecutionException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); } + throw new RuntimeException("Job status change propagation has failed.", e); } + return jobDetails; } - } diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties index c1852d00aa..58cf6e8a22 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/resources/application.properties @@ -16,6 +16,7 @@ # specific language governing permissions and limitations # under the License. # +quarkus.arc.selected-alternatives=org.kie.kogito.addons.quarkus.jobs.service.embedded.stream.* %dev.quarkus.log.category."org.kie.kogito.jobs".level=INFO diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java index 3fec2fee68..c6081945c6 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/test/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreamsTest.java @@ -22,8 +22,10 @@ import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.Date; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; +import org.eclipse.microprofile.context.ManagedExecutor; import org.junit.jupiter.api.Test; import org.kie.kogito.event.EventPublisher; import org.kie.kogito.event.job.JobInstanceDataEvent; @@ -44,6 +46,7 @@ import jakarta.enterprise.inject.Instance; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.anyCollection; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -93,14 +96,20 @@ void onJobStatusChange() throws Exception { Instance eventPublisherInstance = mock(Instance.class); Stream eventPublishers = Arrays.stream(new EventPublisher[] { eventPublisher }); doReturn(eventPublishers).when(eventPublisherInstance).stream(); + ManagedExecutor managedExecutor = mock(ManagedExecutor.class); + ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + CompletableFuture completableFuture = CompletableFuture.completedFuture(null); + doReturn(completableFuture).when(managedExecutor).runAsync(any()); ObjectMapper objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()) .disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - EventPublisherJobStreams eventPublisherJobStreams = new EventPublisherJobStreams(URL, eventPublisherInstance, objectMapper); + EventPublisherJobStreams eventPublisherJobStreams = new EventPublisherJobStreams(URL, eventPublisherInstance, objectMapper, managedExecutor); JobDetails jobDetails = buildJobDetails(); - eventPublisherJobStreams.onJobStatusChange(jobDetails); + eventPublisherJobStreams.publishJobStatusChange(jobDetails); + verify(managedExecutor).runAsync(runnableArgumentCaptor.capture()); + runnableArgumentCaptor.getValue().run(); verify(eventPublisher).publish(eventCaptor.capture()); verify(eventPublisher, never()).publish(anyCollection()); diff --git a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java index aa8f331a8e..dae4ac4ecf 100644 --- a/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java +++ b/jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java @@ -20,7 +20,6 @@ package org.kie.kogito.jobs.embedded; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutionException; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -30,12 +29,9 @@ import org.kie.kogito.jobs.service.adapter.ScheduledJobAdapter; import org.kie.kogito.jobs.service.api.Recipient; import org.kie.kogito.jobs.service.model.JobDetails; -import org.kie.kogito.jobs.service.model.JobExecutionResponse; import org.kie.kogito.jobs.service.model.ScheduledJob; import org.kie.kogito.jobs.service.resource.RestApiConstants; -import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler; import org.kie.kogito.jobs.service.stream.JobEventPublisher; -import org.kie.kogito.jobs.service.utils.ErrorHandling; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,9 +59,6 @@ public class JobInVMEventPublisher implements JobEventPublisher { private final ObjectMapper objectMapper; - @Inject - ReactiveJobScheduler scheduler; - @Inject Event bus; @@ -79,47 +72,6 @@ public JobInVMEventPublisher( LOGGER.info("JobInVMEventPublisher Started with url {}", url); } - @Override - public JobExecutionResponse publishJobError(JobExecutionResponse response) { - try { - LOGGER.debug("publishJobError {}", response); - - ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError, response) - .findFirst() - .run() - .thenApply(Optional::isPresent) - .exceptionally(e -> { - LOGGER.error("Error handling error {}", response, e); - return false; - }).toCompletableFuture().get(); - - return response; - } catch (Exception e) { - LOGGER.error("error in publishJobError", e); - return response; - } - } - - @Override - public JobExecutionResponse publishJobSuccess(JobExecutionResponse response) { - try { - LOGGER.debug("publishJobSuccess {}", response); - ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess, response) - .findFirst() - .run() - .thenApply(Optional::isPresent) - .exceptionally(e -> { - LOGGER.error("Error handling error {}", response, e); - return false; - }).toCompletableFuture().get(); - - return response; - } catch (Exception e) { - LOGGER.error("error in publishJobSuccess", e); - return response; - } - } - @Override public JobDetails publishJobStatusChange(JobDetails jobDetails) { try {