Skip to content

Commit

Permalink
kie-kogito-apps-2026: Improvements on the Job Service start-up and pe…
Browse files Browse the repository at this point in the history
…riodic jobs loading procedure (apache#2038)
  • Loading branch information
wmedvede authored and rgdoliveira committed Apr 23, 2024
1 parent cccb799 commit e1d6551
Show file tree
Hide file tree
Showing 46 changed files with 1,144 additions and 661 deletions.
5 changes: 0 additions & 5 deletions jobs-service/jobs-service-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
</dependency>

<!-- Data Index Integration -->
<dependency>
<groupId>org.kie.kogito</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@
*/
package org.kie.kogito.jobs.service.job;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import org.kie.kogito.jobs.service.exception.JobExecutionException;
import org.kie.kogito.jobs.service.executor.JobExecutor;
import org.kie.kogito.jobs.service.executor.JobExecutorResolver;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobDetailsContext;
import org.kie.kogito.jobs.service.model.JobExecutionResponse;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;
import org.kie.kogito.jobs.service.scheduler.ReactiveJobScheduler;
import org.kie.kogito.jobs.service.utils.ErrorHandling;
import org.kie.kogito.timer.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;

import static java.util.Objects.requireNonNull;
import static mutiny.zero.flow.adapters.AdaptersToFlow.publisher;

/**
* The job that delegates the execution to the {@link JobExecutorResolver} with the {@link JobDetailsContext}.
*/
Expand All @@ -40,35 +47,45 @@ public class DelegateJob implements Job<JobDetailsContext> {

private final JobExecutorResolver jobExecutorResolver;

private final JobEventPublisher jobEventPublisher;
ReactiveJobScheduler scheduler;

public DelegateJob(JobExecutorResolver executorResolver, JobEventPublisher jobEventPublisher) {
public DelegateJob(JobExecutorResolver executorResolver, ReactiveJobScheduler scheduler) {
this.jobExecutorResolver = executorResolver;
this.jobEventPublisher = jobEventPublisher;
this.scheduler = scheduler;
}

@Override
public void execute(JobDetailsContext ctx) {
LOGGER.info("Executing for context {}", ctx.getJobDetails());
Optional.ofNullable(ctx)
.map(JobDetailsContext::getJobDetails)
.map(jobExecutorResolver::get)
.map(executor -> executor.execute(ctx.getJobDetails()))
.orElseThrow(() -> new IllegalStateException("JobDetails cannot be null from context " + ctx))
.onItem().invoke(jobEventPublisher::publishJobSuccess)
.onFailure(JobExecutionException.class).invoke(ex -> {
final AtomicReference<JobExecutionResponse> executionResponse = new AtomicReference<>();
final JobDetails jobDetails = requireNonNull(ctx.getJobDetails(), () -> String.format("JobDetails cannot be null for context: %s", ctx));
final JobExecutor executor = requireNonNull(jobExecutorResolver.get(jobDetails), () -> String.format("No JobExecutor was found for jobDetails: %s", jobDetails));
LOGGER.info("Executing job for context: {}", jobDetails);
executor.execute(jobDetails)
.flatMap(response -> {
executionResponse.set(response);
return handleJobExecutionSuccess(response);
})
.onFailure(JobExecutionException.class).recoverWithUni(ex -> {
String jobId = ((JobExecutionException) ex).getJobId();
LOGGER.error("Error executing job {}", jobId, ex);
jobEventPublisher.publishJobError(JobExecutionResponse.builder()
executionResponse.set(JobExecutionResponse.builder()
.message(ex.getMessage())
.now()
.jobId(jobId)
.build());
return handleJobExecutionError(executionResponse.get());
})
//to avoid blocking IO pool from eventloop since execute() is blocking currently
//might consider execute() method to be non-blocking/async
// avoid blocking IO pool from the event-loop since alternative EmbeddedJobExecutor is blocking.
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.subscribe().with(response -> LOGGER.info("Executed successfully with response {}", response));
.subscribe().with(ignore -> LOGGER.info("Job execution response processing has finished: {}", executionResponse.get()));
}

public Uni<JobDetails> handleJobExecutionSuccess(JobExecutionResponse response) {
LOGGER.debug("Job execution success response received: {}", response);
return Uni.createFrom().publisher(publisher(ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionSuccess, response).buildRs()));
}

public Uni<JobDetails> handleJobExecutionError(JobExecutionResponse response) {
LOGGER.error("Job execution error response received: {}", response);
return Uni.createFrom().publisher(publisher(ErrorHandling.skipErrorPublisherBuilder(scheduler::handleJobExecutionError, response).buildRs()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
@ApplicationScoped
public class JobServiceLeaderHealthCheck implements HealthCheck {

private AtomicBoolean enabled = new AtomicBoolean(false);
private final AtomicBoolean enabled = new AtomicBoolean(false);

protected void onMessagingStatusChange(@Observes MessagingChangeEvent event) {
this.enabled.set(event.isEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,42 @@

public interface ReactiveJobRepository {

enum SortTermField {
FIRE_TIME,
CREATED,
ID
}

class SortTerm {
private final SortTermField field;
private final boolean asc;

private SortTerm(SortTermField field, boolean asc) {
this.field = field;
this.asc = asc;
}

public SortTermField getField() {
return field;
}

public boolean isAsc() {
return asc;
}

public static SortTerm byFireTime(boolean asc) {
return new SortTerm(SortTermField.FIRE_TIME, asc);
}

public static SortTerm byCreated(boolean asc) {
return new SortTerm(SortTermField.CREATED, asc);
}

public static SortTerm byId(boolean asc) {
return new SortTerm(SortTermField.ID, asc);
}
}

CompletionStage<JobDetails> save(JobDetails job);

CompletionStage<JobDetails> merge(String id, JobDetails job);
Expand All @@ -39,9 +75,8 @@ public interface ReactiveJobRepository {

CompletionStage<JobDetails> delete(JobDetails job);

PublisherBuilder<JobDetails> findByStatus(JobStatus... status);

PublisherBuilder<JobDetails> findAll();

PublisherBuilder<JobDetails> findByStatusBetweenDatesOrderByPriority(ZonedDateTime from, ZonedDateTime to, JobStatus... status);
PublisherBuilder<JobDetails> findByStatusBetweenDates(ZonedDateTime fromFireTime,
ZonedDateTime toFireTime,
JobStatus[] status,
SortTerm[] orderBy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@
*/
package org.kie.kogito.jobs.service.repository.impl;

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import org.apache.commons.lang3.StringUtils;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.kie.kogito.jobs.service.model.JobDetails;
import org.kie.kogito.jobs.service.model.JobStatus;
import org.kie.kogito.jobs.service.repository.ReactiveJobRepository;
import org.kie.kogito.jobs.service.stream.JobEventPublisher;

Expand All @@ -52,13 +48,6 @@ public <T> CompletionStage<T> runAsync(Supplier<T> function) {
return future;
}

@Override
public PublisherBuilder<JobDetails> findByStatus(JobStatus... status) {
return findAll()
.filter(job -> Objects.nonNull(job.getStatus()))
.filter(job -> Arrays.stream(status).anyMatch(job.getStatus()::equals));
}

@Override
public CompletionStage<JobDetails> save(JobDetails job) {
return doSave(job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@

import java.time.ZonedDateTime;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
Expand All @@ -42,6 +41,8 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import static org.kie.kogito.jobs.service.utils.ModelUtil.jobWithCreatedAndLastUpdate;

@DefaultBean
@ApplicationScoped
public class InMemoryJobRepository extends BaseReactiveJobRepository implements ReactiveJobRepository {
Expand All @@ -60,8 +61,10 @@ public InMemoryJobRepository(Vertx vertx, JobEventPublisher jobEventPublisher) {
@Override
public CompletionStage<JobDetails> doSave(JobDetails job) {
return runAsync(() -> {
jobMap.put(job.getId(), job);
return job;
boolean isNew = !jobMap.containsKey(job.getId());
JobDetails timeStampedJob = jobWithCreatedAndLastUpdate(isNew, job);
jobMap.put(timeStampedJob.getId(), timeStampedJob);
return timeStampedJob;
});
}

Expand All @@ -81,20 +84,61 @@ public CompletionStage<JobDetails> delete(String key) {
}

@Override
public PublisherBuilder<JobDetails> findAll() {
return ReactiveStreams.fromIterable(jobMap.values());
public PublisherBuilder<JobDetails> findByStatusBetweenDates(ZonedDateTime fromFireTime,
ZonedDateTime toFireTime,
JobStatus[] status,
SortTerm[] orderBy) {
Stream<JobDetails> unsortedResult = jobMap.values()
.stream()
.filter(j -> matchStatusFilter(j, status))
.filter(j -> matchFireTimeFilter(j, fromFireTime, toFireTime));
List<JobDetails> result = orderBy == null || orderBy.length == 0 ? unsortedResult.toList() : unsortedResult.sorted(orderByComparator(orderBy)).toList();
return ReactiveStreams.fromIterable(result);
}

@Override
public PublisherBuilder<JobDetails> findByStatusBetweenDatesOrderByPriority(ZonedDateTime from, ZonedDateTime to, JobStatus... status) {
return ReactiveStreams.fromIterable(
jobMap.values()
.stream()
.filter(j -> Optional.ofNullable(j.getStatus())
.filter(s -> Objects.nonNull(status))
.map(s -> Stream.of(status).anyMatch(s::equals)).orElse(true))
.filter(j -> DateUtil.fromDate(j.getTrigger().hasNextFireTime()).isAfter(from) && DateUtil.fromDate(j.getTrigger().hasNextFireTime()).isBefore(to))
.sorted(Comparator.comparing(JobDetails::getPriority).reversed())
.collect(Collectors.toList()));
private static boolean matchStatusFilter(JobDetails job, JobStatus[] status) {
if (status == null || status.length == 0) {
return true;
}
return Stream.of(status).anyMatch(s -> job.getStatus() == s);
}

private static boolean matchFireTimeFilter(JobDetails job, ZonedDateTime fromFireTime, ZonedDateTime toFireTime) {
ZonedDateTime fireTime = DateUtil.fromDate(job.getTrigger().hasNextFireTime());
return (fireTime.isEqual(fromFireTime) || fireTime.isAfter(fromFireTime)) &&
(fireTime.isEqual(toFireTime) || fireTime.isBefore(toFireTime));
}

private static Comparator<JobDetails> orderByComparator(SortTerm[] orderBy) {
Comparator<JobDetails> comparator = createOrderByFieldComparator(orderBy[0]);
for (int i = 1; i < orderBy.length; i++) {
comparator = comparator.thenComparing(createOrderByFieldComparator(orderBy[i]));
}
return comparator;
}

private static Comparator<JobDetails> createOrderByFieldComparator(SortTerm field) {
Comparator<JobDetails> comparator;
switch (field.getField()) {
case FIRE_TIME:
comparator = Comparator.comparingLong(jobDetails -> {
Date nextFireTime = jobDetails.getTrigger().hasNextFireTime();
return nextFireTime != null ? nextFireTime.getTime() : Long.MIN_VALUE;
});
break;
case CREATED:
comparator = Comparator.comparingLong(jobDetails -> {
ZonedDateTime created = jobDetails.getCreated();
return created != null ? created.toInstant().toEpochMilli() : Long.MIN_VALUE;
});
break;
case ID:
comparator = Comparator.comparing(JobDetails::getId);
break;
default:
throw new IllegalArgumentException("No comparator is defined for field: " + field.getField());
}
return field.isAsc() ? comparator : comparator.reversed();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private static class JobDetailsAccessor {
private Map<String, Object> trigger;
private Long executionTimeout;
private String executionTimeoutUnit;
private Date created;

public JobDetailsAccessor() {
}
Expand All @@ -99,6 +100,7 @@ public JobDetailsAccessor(JobDetails jobDetails, RecipientMarshaller recipientMa
this.trigger = Optional.ofNullable(jobDetails.getTrigger()).map(t -> triggerMarshaller.marshall(t).getMap()).orElse(null);
this.executionTimeout = jobDetails.getExecutionTimeout();
this.executionTimeoutUnit = Optional.ofNullable(jobDetails.getExecutionTimeoutUnit()).map(Enum::name).orElse(null);
this.created = Optional.ofNullable(jobDetails.getCreated()).map(u -> Date.from(u.toInstant())).orElse(null);
}

public JobDetails to(RecipientMarshaller recipientMarshaller, TriggerMarshaller triggerMarshaller) {
Expand All @@ -115,6 +117,7 @@ public JobDetails to(RecipientMarshaller recipientMarshaller, TriggerMarshaller
.trigger(Optional.ofNullable(this.trigger).map(t -> triggerMarshaller.unmarshall(new JsonObject(t))).orElse(null))
.executionTimeout(this.executionTimeout)
.executionTimeoutUnit(Optional.ofNullable(this.executionTimeoutUnit).map(ChronoUnit::valueOf).orElse(null))
.created(Optional.ofNullable(this.created).map(t -> ZonedDateTime.ofInstant(t.toInstant(), DEFAULT_ZONE)).orElse(null))
.build();
}

Expand Down Expand Up @@ -213,5 +216,13 @@ public String getExecutionTimeoutUnit() {
public void setExecutionTimeoutUnit(String executionTimeoutUnit) {
this.executionTimeoutUnit = executionTimeoutUnit;
}

public Date getCreated() {
return created;
}

public void setCreated(Date created) {
this.created = created;
}
}
}
Loading

0 comments on commit e1d6551

Please sign in to comment.