Skip to content

Commit

Permalink
Scheduler: run multiple scheduler implementations
Browse files Browse the repository at this point in the history
- related to quarkusio#41954
  • Loading branch information
mkouba committed Jul 25, 2024
1 parent fc6adf5 commit ed94790
Show file tree
Hide file tree
Showing 18 changed files with 577 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import jakarta.inject.Singleton;

import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTransformation;
import org.jboss.jandex.DotName;
import org.quartz.Job;
import org.quartz.JobDataMap;
Expand All @@ -36,6 +38,7 @@
import io.quarkus.agroal.spi.JdbcDataSourceBuildItem;
import io.quarkus.agroal.spi.JdbcDataSourceSchemaReadyBuildItem;
import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.AnnotationsTransformerBuildItem;
import io.quarkus.arc.deployment.AutoAddScopeBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.processor.BuiltinScope;
Expand Down Expand Up @@ -64,10 +67,13 @@
import io.quarkus.quartz.runtime.jdbc.QuarkusPostgreSQLDelegate;
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.deployment.DiscoveredImplementationsBuildItem;
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;
import io.quarkus.scheduler.runtime.Constituent;
import io.quarkus.scheduler.runtime.SchedulerConfig;

/**
*
*/
public class QuartzProcessor {

private static final DotName JOB = DotName.createSimple(Job.class.getName());
Expand All @@ -77,6 +83,22 @@ FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.QUARTZ);
}

@BuildStep
SchedulerImplementationBuildItem implementation() {
return new SchedulerImplementationBuildItem(Implementation.QUARTZ, 1);
}

@BuildStep
void transformQuartzScheduler(SchedulerConfig config, DiscoveredImplementationsBuildItem discoveredImplementations,
BuildProducer<AnnotationsTransformerBuildItem> transformer) {
if (config.useCompositeScheduler && discoveredImplementations.hasMultiple()) {
transformer.produce(new AnnotationsTransformerBuildItem(AnnotationTransformation.forClasses()
.whenClass(QuartzSchedulerImpl.class)
.transform(c -> c.add(AnnotationInstance.builder(Constituent.class)
.add("value", Scheduled.Implementation.QUARTZ).build()))));
}
}

@BuildStep
AdditionalBeanBuildItem beans() {
return new AdditionalBeanBuildItem(QuartzSchedulerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.quarkus.quartz.test.composite;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.runtime.Constituent;
import io.quarkus.scheduler.runtime.SimpleScheduler;
import io.quarkus.test.QuarkusUnitTest;

public class CompositeSchedulerTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true");

@Constituent
QuartzScheduler quartz;

@Constituent
SimpleScheduler simple;

@Inject
Scheduler composite;

@Test
public void testExecution() throws InterruptedException {
assertTrue(Jobs.SIMPLE_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.QUARTZ_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.AUTO_LATCH.await(5, TimeUnit.SECONDS));

assertNull(quartz.getScheduledJob("simple"));
assertNotNull(quartz.getScheduledJob("quartz"));
assertNotNull(quartz.getScheduledJob("auto"));

assertNotNull(simple.getScheduledJob("simple"));
assertNull(simple.getScheduledJob("quartz"));
assertNull(simple.getScheduledJob("auto"));

assertNotNull(composite.getScheduledJob("quartz"));
assertNotNull(composite.getScheduledJob("auto"));
assertNotNull(composite.getScheduledJob("simple"));
}

static class Jobs {

static final CountDownLatch SIMPLE_LATCH = new CountDownLatch(1);
static final CountDownLatch QUARTZ_LATCH = new CountDownLatch(1);
static final CountDownLatch AUTO_LATCH = new CountDownLatch(1);

@Scheduled(identity = "simple", every = "1s", executeWith = Implementation.SIMPLE)
void simple() {
SIMPLE_LATCH.countDown();
}

@Scheduled(identity = "quartz", every = "1s", executeWith = Implementation.QUARTZ)
void quartz() {
QUARTZ_LATCH.countDown();
}

@Scheduled(identity = "auto", every = "1s")
void auto() {
AUTO_LATCH.countDown();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import jakarta.enterprise.event.Reception;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.enterprise.inject.Typed;
import jakarta.inject.Singleton;
import jakarta.interceptor.Interceptor;
import jakarta.transaction.SystemException;
Expand Down Expand Up @@ -71,6 +70,7 @@
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.FailedExecution;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduled.SkipPredicate;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.ScheduledJobPaused;
Expand Down Expand Up @@ -107,7 +107,6 @@
* name of this class can be stored in a Quartz table in the database. See https://github.com/quarkusio/quarkus/issues/29177
* for more information.
*/
@Typed({ QuartzScheduler.class, Scheduler.class })
@Singleton
public class QuartzSchedulerImpl implements QuartzScheduler {

Expand Down Expand Up @@ -198,7 +197,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
} else if (!forceStart && context.getScheduledMethods().isEmpty() && !context.forceSchedulerStart()) {
} else if (!forceStart && context.getScheduledMethods(Implementation.QUARTZ).isEmpty()
&& !context.forceSchedulerStart()) {
LOGGER.info("No scheduled business methods found - Quartz scheduler will not be started");
this.scheduler = null;
} else {
Expand Down Expand Up @@ -232,10 +232,13 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) {
}
};

for (ScheduledMethod method : context.getScheduledMethods()) {
for (ScheduledMethod method : context.getScheduledMethods(Implementation.QUARTZ)) {
int nameSequence = 0;

for (Scheduled scheduled : method.getSchedules()) {
if (!context.matchesImplementation(scheduled, Implementation.QUARTZ)) {
continue;
}
String identity = SchedulerUtils.lookUpPropertyValue(scheduled.identity());
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
Expand Down Expand Up @@ -345,6 +348,11 @@ public org.quartz.Scheduler getScheduler() {
return scheduler;
}

@Override
public Implementation implementation() {
return Implementation.QUARTZ;
}

@Override
public void pause() {
if (!enabled) {
Expand Down Expand Up @@ -893,7 +901,7 @@ public Trigger schedule() {
}
scheduled = true;
SyntheticScheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone, implementation);
return createJobDefinitionQuartzTrigger(this, scheduled, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,16 @@
*/
String timeZone() default DEFAULT_TIMEZONE;

/**
* The scheduler implementation used to execute this scheduled method.
* <p>
* By default, the implementation is selected automatically. If the selected implementation is not available, then the build
* fails.
*
* @return the implementation
*/
Implementation executeWith() default Implementation.AUTO;

@Retention(RUNTIME)
@Target(METHOD)
@interface Schedules {
Expand All @@ -213,6 +223,25 @@

}

/**
* Represents a scheduler implementation used to execute a scheduled method.
*/
enum Implementation {

/**
* The implementation is selected automatically.
*/
AUTO,
/**
* Simple in-memory implementation provided by the {@code quarkus-scheduler} extension.
*/
SIMPLE,
/**
* Quartz implementation provided by the {@code quarkus-quartz} extension.
*/
QUARTZ
}

/**
* Represents a strategy to handle concurrent execution of a scheduled method.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import jakarta.enterprise.context.Dependent;

import io.quarkus.scheduler.Scheduled.ConcurrentExecution;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduled.SkipPredicate;
import io.smallrye.mutiny.Uni;

Expand Down Expand Up @@ -88,6 +89,12 @@ public interface Scheduler {
*/
Trigger unscheduleJob(String identity);

/**
*
* @return the implementation
*/
Scheduled.Implementation implementation();

/**
* The job definition is a builder-like API that can be used to define a job programmatically.
* <p>
Expand Down Expand Up @@ -177,6 +184,16 @@ interface JobDefinition {
*/
JobDefinition setTimeZone(String timeZone);

/**
* {@link Scheduled#executeWith()}
*
* @param implementation
* @return self
* @throws IllegalArgumentException If the selected implementation is not available
* @see Scheduled#executeWith()
*/
JobDefinition setExecuteWith(Implementation implementation);

/**
*
* @param task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.ConcurrentExecution;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduled.SkipPredicate;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.Scheduler.JobDefinition;
Expand All @@ -28,6 +29,7 @@ public abstract class AbstractJobDefinition implements JobDefinition {
protected boolean scheduled = false;
protected String timeZone = Scheduled.DEFAULT_TIMEZONE;
protected boolean runOnVirtualThread;
protected Scheduled.Implementation implementation = Implementation.AUTO;

public AbstractJobDefinition(String identity) {
this.identity = identity;
Expand Down Expand Up @@ -88,6 +90,12 @@ public JobDefinition setTimeZone(String timeZone) {
return this;
}

@Override
public JobDefinition setExecuteWith(Implementation implementation) {
this.implementation = implementation;
return this;
}

@Override
public JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread) {
checkScheduled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import com.cronutils.model.CronType;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;

public interface SchedulerContext {

CronType getCronType();
Expand All @@ -13,6 +16,10 @@ public interface SchedulerContext {

boolean forceSchedulerStart();

List<ScheduledMethod> getScheduledMethods(Implementation implementation);

boolean matchesImplementation(Scheduled scheduled, Implementation implementation);

@SuppressWarnings("unchecked")
default ScheduledInvoker createInvoker(String invokerClassName) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ public final class SyntheticScheduled extends AnnotationLiteral<Scheduled> imple
private final ConcurrentExecution concurrentExecution;
private final SkipPredicate skipPredicate;
private final String timeZone;
private final Scheduled.Implementation implementation;

public SyntheticScheduled(String identity, String cron, String every, long delay, TimeUnit delayUnit, String delayed,
String overdueGracePeriod, ConcurrentExecution concurrentExecution,
SkipPredicate skipPredicate, String timeZone) {
SkipPredicate skipPredicate, String timeZone, Scheduled.Implementation implementation) {
this.identity = Objects.requireNonNull(identity);
this.cron = Objects.requireNonNull(cron);
this.every = Objects.requireNonNull(every);
Expand All @@ -36,6 +37,7 @@ public SyntheticScheduled(String identity, String cron, String every, long delay
this.concurrentExecution = Objects.requireNonNull(concurrentExecution);
this.skipPredicate = skipPredicate;
this.timeZone = timeZone;
this.implementation = implementation;
}

@Override
Expand Down Expand Up @@ -88,6 +90,11 @@ public String timeZone() {
return timeZone;
}

@Override
public Implementation executeWith() {
return implementation;
}

public String toJson() {
if (skipPredicate != null) {
throw new IllegalStateException("A skipPredicate instance may not be serialized");
Expand All @@ -102,6 +109,7 @@ public String toJson() {
json.put("overdueGracePeriod", overdueGracePeriod);
json.put("concurrentExecution", concurrentExecution.toString());
json.put("timeZone", timeZone);
json.put("executeWith", implementation);
return json.encode();
}

Expand All @@ -110,7 +118,7 @@ public static SyntheticScheduled fromJson(String json) {
return new SyntheticScheduled(jsonObj.getString("identity"), jsonObj.getString("cron"), jsonObj.getString("every"),
jsonObj.getLong("delay"), TimeUnit.valueOf(jsonObj.getString("delayUnit")), jsonObj.getString("delayed"),
jsonObj.getString("overdueGracePeriod"), ConcurrentExecution.valueOf(jsonObj.getString("concurrentExecution")),
null, jsonObj.getString("timeZone"));
null, jsonObj.getString("timeZone"), Scheduled.Implementation.valueOf(jsonObj.getString("executeWith")));
}

@Override
Expand Down
Loading

0 comments on commit ed94790

Please sign in to comment.