Skip to content

Commit

Permalink
Scheduler: make sure an exception never slips through an invokers chain
Browse files Browse the repository at this point in the history
- related to quarkusio#41240
  • Loading branch information
mkouba committed Jul 25, 2024
1 parent 8a19237 commit 0e075ee
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
Expand All @@ -13,6 +14,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.FailedExecution;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.SkippedExecution;
import io.quarkus.scheduler.SuccessfulExecution;
Expand All @@ -39,6 +41,10 @@ public void testExecution() {
} else {
fail("Jobs were not executed in 10 seconds!");
}

assertTrue(Jobs.FAILED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.FAILURE_COUNTER.get() > 0);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
Expand All @@ -50,8 +56,11 @@ static class Jobs {
static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1);

static final AtomicInteger COUNTER = new AtomicInteger(0);
static final AtomicInteger FAILING_COUNTER = new AtomicInteger(0);
static final AtomicInteger SUCCESS_COUNTER = new AtomicInteger(0);
static final AtomicInteger FAILURE_COUNTER = new AtomicInteger(0);
static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1);
static final CountDownLatch FAILED_LATCH = new CountDownLatch(1);

@Scheduled(every = "1s", concurrentExecution = SKIP)
void nonconcurrent() throws InterruptedException {
Expand All @@ -61,12 +70,24 @@ void nonconcurrent() throws InterruptedException {
}
}

@Scheduled(every = "1s", concurrentExecution = SKIP)
void failing() {
if (FAILING_COUNTER.incrementAndGet() > 2) {
FAILED_LATCH.countDown();
}
throw new IllegalStateException();
}

void onSkip(@Observes SkippedExecution event) {
SKIPPED_LATCH.countDown();
}

void onSuccess(@Observes SuccessfulExecution event) {
SUCCESS_COUNTER.incrementAndGet();
}

void onFailure(@Observes FailedExecution event) {
FAILURE_COUNTER.incrementAndGet();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.quarkus.scheduler.common.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.quarkus.arc.Arc;
Expand All @@ -24,17 +25,18 @@ public CompletionStage<Void> invoke(ScheduledExecution execution) throws Excepti
return invokeBean(execution).whenComplete((v, t) -> {
requestContext.destroy(state);
});
} catch (RuntimeException e) {
// Just terminate the context and rethrow the exception if something goes really wrong
} catch (Throwable e) {
// Terminate the context and return a failed stage if something goes really wrong
requestContext.terminate();
throw e;
return CompletableFuture.failedStage(e);
} finally {
// Always deactivate the context
requestContext.deactivate();
}
}
}

// This method is generated and should never throw an exception
protected abstract CompletionStage<Void> invokeBean(ScheduledExecution execution);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package io.quarkus.scheduler.common.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.quarkus.scheduler.ScheduledExecution;

abstract class DelegateInvoker implements ScheduledInvoker {

protected final ScheduledInvoker delegate;
Expand All @@ -17,4 +22,12 @@ public boolean isBlocking() {
public boolean isRunningOnVirtualThread() {
return delegate.isRunningOnVirtualThread();
}

protected CompletionStage<Void> invokeDelegate(ScheduledExecution execution) {
try {
return delegate.invoke(execution);
} catch (Throwable e) {
return CompletableFuture.failedStage(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.quarkus.scheduler.common.runtime;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.quarkus.scheduler.ScheduledExecution;
Expand All @@ -26,11 +25,7 @@ public CompletionStage<Void> invoke(ScheduledExecution execution) throws Excepti

@Override
public CompletionStage<Void> executeJob() {
try {
return delegate.invoke(execution);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
return invokeDelegate(execution);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public SkipConcurrentExecutionInvoker(ScheduledInvoker delegate, Event<SkippedEx
@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
if (running.compareAndSet(false, true)) {
return delegate.invoke(execution).whenComplete((r, t) -> running.set(false));
return invokeDelegate(execution).whenComplete((r, t) -> running.set(false));
}
LOG.debugf("Skipped scheduled invoker execution: %s", delegate.getClass().getName());
SkippedExecution payload = new SkippedExecution(execution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public CompletionStage<Void> invoke(ScheduledExecution execution) throws Excepti
event.fireAsync(payload);
return CompletableFuture.completedStage(null);
} else {
return delegate.invoke(execution);
return invokeDelegate(execution);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public StatusEmitterInvoker(ScheduledInvoker delegate, Event<SuccessfulExecution

@Override
public CompletionStage<Void> invoke(ScheduledExecution execution) throws Exception {
return delegate.invoke(execution).whenComplete((v, t) -> {
return invokeDelegate(execution).whenComplete((v, t) -> {
if (t != null) {
LOG.errorf(t, "Error occurred while executing task for trigger %s", execution.getTrigger());
Events.fire(failedEvent, new FailedExecution(execution, t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.concurrent.CountDownLatch;
Expand All @@ -13,6 +14,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.FailedExecution;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.SkippedExecution;
import io.quarkus.scheduler.SuccessfulExecution;
Expand All @@ -39,6 +41,10 @@ public void testExecution() {
} else {
fail("Jobs were not executed in 10 seconds!");
}

assertTrue(Jobs.FAILED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.FAILURE_COUNTER.get() > 0);

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
Expand All @@ -50,8 +56,11 @@ static class Jobs {
static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1);

static final AtomicInteger COUNTER = new AtomicInteger(0);
static final AtomicInteger FAILING_COUNTER = new AtomicInteger(0);
static final AtomicInteger SUCCESS_COUNTER = new AtomicInteger(0);
static final AtomicInteger FAILURE_COUNTER = new AtomicInteger(0);
static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1);
static final CountDownLatch FAILED_LATCH = new CountDownLatch(1);

@Scheduled(every = "1s", concurrentExecution = SKIP)
void nonconcurrent() throws InterruptedException {
Expand All @@ -61,12 +70,24 @@ void nonconcurrent() throws InterruptedException {
}
}

@Scheduled(every = "1s", concurrentExecution = SKIP)
void failing() {
if (FAILING_COUNTER.incrementAndGet() > 2) {
FAILED_LATCH.countDown();
}
throw new IllegalStateException();
}

void onSkip(@Observes SkippedExecution event) {
SKIPPED_LATCH.countDown();
}

void onSuccess(@Observes SuccessfulExecution event) {
SUCCESS_COUNTER.incrementAndGet();
}

void onFailure(@Observes FailedExecution event) {
FAILURE_COUNTER.incrementAndGet();
}
}
}

0 comments on commit 0e075ee

Please sign in to comment.