Skip to content

Commit

Permalink
Added synthetic bean with @VirtualThreads qualifier for the managed E…
Browse files Browse the repository at this point in the history
…xecutorService backed by virtual threads
  • Loading branch information
ozangunalp committed Oct 15, 2023
1 parent 2265840 commit bd26805
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 29 deletions.
44 changes: 44 additions & 0 deletions docs/src/main/asciidoc/virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,50 @@ quarkus.virtual-threads.name-prefix=
----

== Inject the virtual thread executor

In order to run tasks on virtual threads Quarkus manages an internal `ThreadPerTaskExecutor`.
In rare instances where you'd need to access this executor directly you can inject it using the `@VirtualThreads` CDI qualifier:

IMPORTANT: Injecting the Virtual Thread ExecutorService is experimental and may change in future versions.

[source,java]
----
package org.acme;
import org.acme.fortune.repository.FortuneRepository;
import java.util.concurrent.ExecutorService;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import io.quarkus.virtual.threads.VirtualThreads;
public class MyApplication {
@Inject
FortuneRepository repository;
@Inject
@VirtualThreads
ExecutorService vThreads;
void onEvent(@Observes StartupEvent event) {
vThreads.execute(this::findAll);
}
@Transactional
void findAll() {
Log.info(repository.findAllBlocking());
}
}
----

== Testing virtual thread applications

As mentioned above, virtual threads have a few limitations that can drastically affect your application performance and memory usage.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
package io.quarkus.virtual.threads;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import org.jboss.jandex.AnnotationInstance;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.arc.processor.BuiltinScope;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
Expand All @@ -12,8 +21,19 @@ public class VirtualThreadsProcessor {
@Record(ExecutionTime.STATIC_INIT)
public void setup(VirtualThreadsConfig config, VirtualThreadsRecorder recorder,
ShutdownContextBuildItem shutdownContextBuildItem,
LaunchModeBuildItem launchModeBuildItem) {
LaunchModeBuildItem launchModeBuildItem,
BuildProducer<AdditionalBeanBuildItem> beans,
BuildProducer<SyntheticBeanBuildItem> producer) {
beans.produce(new AdditionalBeanBuildItem(VirtualThreads.class));
recorder.setupVirtualThreads(config, shutdownContextBuildItem, launchModeBuildItem.getLaunchMode());
producer.produce(
SyntheticBeanBuildItem.configure(ExecutorService.class)
.addType(Executor.class)
.addQualifier(AnnotationInstance.builder(VirtualThreads.class).build())
.scope(BuiltinScope.APPLICATION.getInfo())
.setRuntimeInit()
.supplier(recorder.getCurrentSupplier())
.done());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.quarkus.virtual.threads;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* An implementation of {@code ExecutorService} that delegates to the real executor, while disallowing termination.
*/
class DelegatingExecutorService implements ExecutorService {
private final ExecutorService delegate;

DelegatingExecutorService(final ExecutorService delegate) {
this.delegate = delegate;
}

public void execute(final Runnable command) {
delegate.execute(command);
}

public boolean isShutdown() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean isTerminated() {
// container managed executors are never shut down from the application's perspective
return false;
}

public boolean awaitTermination(final long timeout, final TimeUnit unit) {
return false;
}

public void shutdown() {
throw new UnsupportedOperationException("shutdown not allowed on managed executor service");
}

public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException("shutdownNow not allowed on managed executor service");
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

public String toString() {
return delegate.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.quarkus.virtual.threads;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.TimeUnit;

import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

/**
* Fallback executor service implementation in case the virtual threads are disabled or not available on the current platform.
* <p>
* Executes tasks on the current Vert.x context worker pool, or when not available, on the Mutiny Infrastructure default worker
* pool
* Shutdown methods are no-op as the executor service is a wrapper around these previous execute methods.
*/
class FallbackVirtualThreadsExecutorService extends AbstractExecutorService {

@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}

@Override
public void shutdown() {
// no-op
}

@Override
public List<Runnable> shutdownNow() {
return Collections.EMPTY_LIST;
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public boolean isTerminated() {
return false;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.quarkus.virtual.threads;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* Qualifies an injected virtual threads executor service.
*/
@Qualifier
@Target({ FIELD, METHOD, PARAMETER })
@Retention(RUNTIME)
public @interface VirtualThreads {

final class Literal extends AnnotationLiteral<VirtualThreads> implements VirtualThreads {

public static final Literal INSTANCE = new Literal();

private static final long serialVersionUID = 1L;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;

import org.jboss.logging.Logger;

import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;

@Recorder
public class VirtualThreadsRecorder {
Expand All @@ -26,19 +23,26 @@ public class VirtualThreadsRecorder {

static VirtualThreadsConfig config = new VirtualThreadsConfig();

private static volatile Executor current;
private static volatile ExecutorService current;
private static final Object lock = new Object();

public static Supplier<ExecutorService> VIRTUAL_THREADS_EXECUTOR_SUPPLIER = new Supplier<ExecutorService>() {
@Override
public ExecutorService get() {
return new DelegatingExecutorService(VirtualThreadsRecorder.getCurrent());
}
};

public void setupVirtualThreads(VirtualThreadsConfig c, ShutdownContext shutdownContext, LaunchMode launchMode) {
config = c;
if (config.enabled) {
if (launchMode == LaunchMode.DEVELOPMENT) {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdownNow();
ExecutorService service = current;
if (service != null) {
service.shutdownNow();
}
current = null;
}
Expand All @@ -47,10 +51,9 @@ public void run() {
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Executor executor = current;
ExecutorService service = current;
current = null;
if (executor instanceof ExecutorService) {
ExecutorService service = (ExecutorService) executor;
if (service != null) {
service.shutdown();

final long timeout = config.shutdownTimeout.toNanos();
Expand Down Expand Up @@ -82,8 +85,12 @@ public void run() {
}
}

public static Executor getCurrent() {
Executor executor = current;
public Supplier<ExecutorService> getCurrentSupplier() {
return VIRTUAL_THREADS_EXECUTOR_SUPPLIER;
}

public static ExecutorService getCurrent() {
ExecutorService executor = current;
if (executor != null) {
return executor;
}
Expand Down Expand Up @@ -134,7 +141,7 @@ static ExecutorService newVirtualThreadExecutor()
* Since we try to load the "Loom-preview" classes/methods at runtime, the application can even be compiled
* using java 11 and executed with a loom-compliant JDK.
*/
private static Executor createExecutor() {
private static ExecutorService createExecutor() {
if (config.enabled) {
try {
return new ContextPreservingExecutorService(newVirtualThreadExecutor());
Expand All @@ -149,19 +156,6 @@ private static Executor createExecutor() {
}
}
// Fallback to regular worker threads
return new Executor() {
@Override
public void execute(Runnable command) {
var context = Vertx.currentContext();
if (!(context instanceof ContextInternal)) {
Infrastructure.getDefaultWorkerPool().execute(command);
} else {
context.executeBlocking(() -> {
command.run();
return null;
}, false);
}
}
};
return new FallbackVirtualThreadsExecutorService();
}
}
Loading

0 comments on commit bd26805

Please sign in to comment.