diff --git a/docs/src/main/asciidoc/virtual-threads.adoc b/docs/src/main/asciidoc/virtual-threads.adoc index e409077fdb982..a147ddf843451 100644 --- a/docs/src/main/asciidoc/virtual-threads.adoc +++ b/docs/src/main/asciidoc/virtual-threads.adoc @@ -447,6 +447,50 @@ quarkus.virtual-threads.name-prefix= ---- +== Inject the virtual thread executor + +In order to run tasks on virtual threads Quarkus manages an internal `ThreadPerTaskExecutor`. +In rare instances where you'd need to access this executor directly you can inject it using the `@VirtualThreads` CDI qualifier: + +IMPORTANT: Injecting the Virtual Thread ExecutorService is experimental and may change in future versions. + +[source,java] +---- +package org.acme; + +import org.acme.fortune.repository.FortuneRepository; + +import java.util.concurrent.ExecutorService; + +import jakarta.enterprise.event.Observes; +import jakarta.inject.Inject; +import jakarta.transaction.Transactional; + +import io.quarkus.logging.Log; +import io.quarkus.runtime.StartupEvent; +import io.quarkus.virtual.threads.VirtualThreads; + +public class MyApplication { + + @Inject + FortuneRepository repository; + + @Inject + @VirtualThreads + ExecutorService vThreads; + + void onEvent(@Observes StartupEvent event) { + vThreads.execute(this::findAll); + } + + @Transactional + void findAll() { + Log.info(repository.findAllBlocking()); + } + +} +---- + == Testing virtual thread applications As mentioned above, virtual threads have a few limitations that can drastically affect your application performance and memory usage. diff --git a/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java index 136bfa1f2bff8..f9de96a844644 100644 --- a/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java +++ b/extensions/virtual-threads/deployment/src/main/java/io/quarkus/virtual/threads/VirtualThreadsProcessor.java @@ -1,5 +1,14 @@ package io.quarkus.virtual.threads; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; + +import org.jboss.jandex.AnnotationInstance; + +import io.quarkus.arc.deployment.AdditionalBeanBuildItem; +import io.quarkus.arc.deployment.SyntheticBeanBuildItem; +import io.quarkus.arc.processor.BuiltinScope; +import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.annotations.ExecutionTime; import io.quarkus.deployment.annotations.Record; @@ -12,8 +21,19 @@ public class VirtualThreadsProcessor { @Record(ExecutionTime.STATIC_INIT) public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder, ShutdownContextBuildItem shutdownContextBuildItem, - LaunchModeBuildItem launchModeBuildItem) { + LaunchModeBuildItem launchModeBuildItem, + BuildProducer beans, + BuildProducer producer) { + beans.produce(new AdditionalBeanBuildItem(VirtualThreads.class)); recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode()); + producer.produce( + SyntheticBeanBuildItem.configure(ExecutorService.class) + .addType(Executor.class) + .addQualifier(AnnotationInstance.builder(VirtualThreads.class).build()) + .scope(BuiltinScope.APPLICATION.getInfo()) + .setRuntimeInit() + .supplier(recorder.getCurrentSupplier()) + .done()); } } diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/DelegatingExecutorService.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/DelegatingExecutorService.java new file mode 100644 index 0000000000000..089515a5f162f --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/DelegatingExecutorService.java @@ -0,0 +1,88 @@ +package io.quarkus.virtual.threads; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * An implementation of {@code ExecutorService} that delegates to the real executor, while disallowing termination. + */ +class DelegatingExecutorService implements ExecutorService { + private final ExecutorService delegate; + + DelegatingExecutorService(final ExecutorService delegate) { + this.delegate = delegate; + } + + public void execute(final Runnable command) { + delegate.execute(command); + } + + public boolean isShutdown() { + // container managed executors are never shut down from the application's perspective + return false; + } + + public boolean isTerminated() { + // container managed executors are never shut down from the application's perspective + return false; + } + + public boolean awaitTermination(final long timeout, final TimeUnit unit) { + return false; + } + + public void shutdown() { + throw new UnsupportedOperationException("shutdown not allowed on managed executor service"); + } + + public List shutdownNow() { + throw new UnsupportedOperationException("shutdownNow not allowed on managed executor service"); + } + + @Override + public Future submit(Callable task) { + return delegate.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return delegate.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + return delegate.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + return delegate.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return delegate.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + return delegate.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.invokeAny(tasks, timeout, unit); + } + + public String toString() { + return delegate.toString(); + } +} diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/FallbackVirtualThreadsExecutorService.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/FallbackVirtualThreadsExecutorService.java new file mode 100644 index 0000000000000..01c89c19481f9 --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/FallbackVirtualThreadsExecutorService.java @@ -0,0 +1,58 @@ +package io.quarkus.virtual.threads; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.TimeUnit; + +import io.smallrye.mutiny.infrastructure.Infrastructure; +import io.vertx.core.Vertx; +import io.vertx.core.impl.ContextInternal; + +/** + * Fallback executor service implementation in case the virtual threads are disabled or not available on the current platform. + *

+ * Executes tasks on the current Vert.x context worker pool, or when not available, on the Mutiny Infrastructure default worker + * pool + * Shutdown methods are no-op as the executor service is a wrapper around these previous execute methods. + */ +class FallbackVirtualThreadsExecutorService extends AbstractExecutorService { + + @Override + public void execute(Runnable command) { + var context = Vertx.currentContext(); + if (!(context instanceof ContextInternal)) { + Infrastructure.getDefaultWorkerPool().execute(command); + } else { + context.executeBlocking(() -> { + command.run(); + return null; + }, false); + } + } + + @Override + public void shutdown() { + // no-op + } + + @Override + public List shutdownNow() { + return Collections.EMPTY_LIST; + } + + @Override + public boolean isShutdown() { + return false; + } + + @Override + public boolean isTerminated() { + return false; + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) { + return false; + } +} diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreads.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreads.java new file mode 100644 index 0000000000000..7aa85bb57f7f2 --- /dev/null +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreads.java @@ -0,0 +1,29 @@ +package io.quarkus.virtual.threads; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.inject.Qualifier; + +/** + * Qualifies an injected virtual threads executor service. + */ +@Qualifier +@Target({ FIELD, METHOD, PARAMETER }) +@Retention(RUNTIME) +public @interface VirtualThreads { + + final class Literal extends AnnotationLiteral implements VirtualThreads { + + public static final Literal INSTANCE = new Literal(); + + private static final long serialVersionUID = 1L; + + } +} diff --git a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java index 80fef790f3b1e..16336d8ec8fc9 100644 --- a/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java +++ b/extensions/virtual-threads/runtime/src/main/java/io/quarkus/virtual/threads/VirtualThreadsRecorder.java @@ -5,19 +5,16 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Optional; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.function.Supplier; import org.jboss.logging.Logger; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.ShutdownContext; import io.quarkus.runtime.annotations.Recorder; -import io.smallrye.mutiny.infrastructure.Infrastructure; -import io.vertx.core.Vertx; -import io.vertx.core.impl.ContextInternal; @Recorder public class VirtualThreadsRecorder { @@ -26,9 +23,16 @@ public class VirtualThreadsRecorder { static VirtualThreadsConfig config = new VirtualThreadsConfig(); - private static volatile Executor current; + private static volatile ExecutorService current; private static final Object lock = new Object(); + public static Supplier VIRTUAL_THREADS_EXECUTOR_SUPPLIER = new Supplier() { + @Override + public ExecutorService get() { + return new DelegatingExecutorService(VirtualThreadsRecorder.getCurrent()); + } + }; + public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) { config = c; if (config.enabled) { @@ -36,9 +40,9 @@ public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdown shutdownContext.addLastShutdownTask(new Runnable() { @Override public void run() { - Executor executor = current; - if (executor instanceof ExecutorService) { - ((ExecutorService) executor).shutdownNow(); + ExecutorService service = current; + if (service != null) { + service.shutdownNow(); } current = null; } @@ -47,10 +51,9 @@ public void run() { shutdownContext.addLastShutdownTask(new Runnable() { @Override public void run() { - Executor executor = current; + ExecutorService service = current; current = null; - if (executor instanceof ExecutorService) { - ExecutorService service = (ExecutorService) executor; + if (service != null) { service.shutdown(); final long timeout = config.shutdownTimeout.toNanos(); @@ -82,8 +85,12 @@ public void run() { } } - public static Executor getCurrent() { - Executor executor = current; + public Supplier getCurrentSupplier() { + return VIRTUAL_THREADS_EXECUTOR_SUPPLIER; + } + + public static ExecutorService getCurrent() { + ExecutorService executor = current; if (executor != null) { return executor; } @@ -134,7 +141,7 @@ static ExecutorService newVirtualThreadExecutor() * Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled * using java 11 and executed with a loom-compliant JDK. */ - private static Executor createExecutor() { + private static ExecutorService createExecutor() { if (config.enabled) { try { return new ContextPreservingExecutorService(newVirtualThreadExecutor()); @@ -149,19 +156,6 @@ private static Executor createExecutor() { } } // Fallback to regular worker threads - return new Executor() { - @Override - public void execute(Runnable command) { - var context = Vertx.currentContext(); - if (!(context instanceof ContextInternal)) { - Infrastructure.getDefaultWorkerPool().execute(command); - } else { - context.executeBlocking(() -> { - command.run(); - return null; - }, false); - } - } - }; + return new FallbackVirtualThreadsExecutorService(); } } diff --git a/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java b/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java index 3999e24058263..13ec9b464056f 100644 --- a/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java +++ b/integration-tests/virtual-threads/rest-client-reactive-virtual-threads/src/main/java/io/quarkus/virtual/rest/RestClientResource.java @@ -1,11 +1,15 @@ package io.quarkus.virtual.rest; +import java.util.concurrent.ExecutorService; + +import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import org.eclipse.microprofile.rest.client.inject.RestClient; import io.quarkus.test.vertx.VirtualThreadsAssertions; +import io.quarkus.virtual.threads.VirtualThreads; import io.smallrye.common.annotation.RunOnVirtualThread; @Path("/") @@ -15,9 +19,14 @@ public class RestClientResource { @RestClient ServiceClient client; + @Inject + @VirtualThreads + ExecutorService executor; + @GET public Greeting test() { VirtualThreadsAssertions.assertEverything(); + assert executor != null; return client.hello(); }