Skip to content

Commit

Permalink
Reactive routes: virtual threads support
Browse files Browse the repository at this point in the history
- resolves quarkusio#36430
  • Loading branch information
mkouba committed Oct 13, 2023
1 parent b3a623a commit 14dd05e
Show file tree
Hide file tree
Showing 15 changed files with 266 additions and 13 deletions.
8 changes: 7 additions & 1 deletion docs/src/main/asciidoc/reactive-routes.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void blocking(RoutingContext rc) {
// ...
}
----
When `@Blocking` is used, it ignores the `type` attribute of `@Route`.
When `@Blocking` is used, the `type` attribute of the `@Route` is ignored.
====

The `@Route` annotation is repeatable and so you can declare several routes for a single method:
Expand All @@ -164,6 +164,12 @@ String person() {
----
<1> If the `accept` header matches `text/html`, we set the content type automatically to `text/html`.

=== Executing route on a virtual thread

You can annotate a route method with `@io.smallrye.common.annotation.RunOnVirtualThread` in order to execute it on a virtual thread.
However, keep in mind that not everything can run safely on virtual threads.
You should read the xref:virtual-threads.adoc#run-code-on-virtual-threads-using-runonvirtualthread[Virtual thread support reference] carefully and get acquainted with all the details.

=== Handling conflicting routes

You may end up with multiple routes matching a given path.
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/asciidoc/virtual-threads.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ In this scenario, it is worse than useless to have thousands of threads if we ha
Even worse, when running a CPU-bound workload on a virtual thread, the virtual thread monopolizes the carrier thread on which it is mounted.
It will either reduce the chance for the other virtual thread to run or will start creating new carrier threads, leading to high memory usage.

[[run-code-on-virtual-threads-using-runonvirtualthread]]
== Run code on virtual threads using @RunOnVirtualThread

In Quarkus, the support of virtual thread is implemented using the link:{runonvthread}[@RunOnVirtualThread] annotation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.jboss.jandex.Type.Kind;

import io.quarkus.hibernate.validator.spi.BeanValidationAnnotationsBuildItem;
import io.quarkus.vertx.http.runtime.HandlerType;

/**
* Describe a request handler.
Expand All @@ -15,15 +14,15 @@ class HandlerDescriptor {

private final MethodInfo method;
private final BeanValidationAnnotationsBuildItem validationAnnotations;
private final HandlerType handlerType;
private final boolean failureHandler;
private final Type payloadType;
private final String[] contentTypes;

HandlerDescriptor(MethodInfo method, BeanValidationAnnotationsBuildItem bvAnnotations, HandlerType handlerType,
HandlerDescriptor(MethodInfo method, BeanValidationAnnotationsBuildItem bvAnnotations, boolean failureHandler,
String[] producedTypes) {
this.method = method;
this.validationAnnotations = bvAnnotations;
this.handlerType = handlerType;
this.failureHandler = failureHandler;
Type returnType = method.returnType();
if (returnType.kind() == Kind.VOID) {
payloadType = null;
Expand Down Expand Up @@ -120,8 +119,8 @@ boolean isPayloadMutinyBuffer() {
return type.name().equals(DotNames.MUTINY_BUFFER);
}

HandlerType getHandlerType() {
return handlerType;
boolean isFailureHandler() {
return failureHandler;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public boolean test(String name) {
if (routeHandler == null) {
String handlerClass = generateHandler(
new HandlerDescriptor(businessMethod.getMethod(), beanValidationAnnotations.orElse(null),
handlerType, produces),
handlerType == HandlerType.FAILURE, produces),
businessMethod.getBean(), businessMethod.getMethod(), classOutput, transformedAnnotations,
routeString, reflectiveHierarchy, produces.length > 0 ? produces[0] : null,
validatorAvailable, index);
Expand All @@ -458,6 +458,13 @@ public boolean test(String name) {
// Wrap the route handler if necessary
// Note that route annotations with the same values share a single handler implementation
routeHandler = recorder.compressRouteHandler(routeHandler, businessMethod.getCompression());
if (businessMethod.getMethod().hasDeclaredAnnotation(DotNames.RUN_ON_VIRTUAL_THREAD)) {
LOGGER.debugf("Route %s#%s() will be executed on a virtual thread",
businessMethod.getMethod().declaringClass().name(), businessMethod.getMethod().name());
routeHandler = recorder.runOnVirtualThread(routeHandler);
// The handler must be executed on the event loop
handlerType = HandlerType.NORMAL;
}

RouteMatcher matcher = new RouteMatcher(path, regex, produces, consumes, methods, order);
matchers.put(matcher, businessMethod.getMethod());
Expand Down Expand Up @@ -489,7 +496,7 @@ public boolean test(String name) {

for (AnnotatedRouteFilterBuildItem filterMethod : routeFilterBusinessMethods) {
String handlerClass = generateHandler(
new HandlerDescriptor(filterMethod.getMethod(), beanValidationAnnotations.orElse(null), HandlerType.NORMAL,
new HandlerDescriptor(filterMethod.getMethod(), beanValidationAnnotations.orElse(null), false,
new String[0]),
filterMethod.getBean(), filterMethod.getMethod(), classOutput, transformedAnnotations,
filterMethod.getRouteFilter().toString(true), reflectiveHierarchy, null, validatorAvailable, index);
Expand Down Expand Up @@ -785,7 +792,7 @@ void implementInvoke(HandlerDescriptor descriptor, BeanInfo bean, MethodInfo met
defaultProduces == null ? invoke.loadNull() : invoke.load(defaultProduces));

// For failure handlers attempt to match the failure type
if (descriptor.getHandlerType() == HandlerType.FAILURE) {
if (descriptor.isFailureHandler()) {
Type failureType = getFailureType(parameters, index);
if (failureType != null) {
ResultHandle failure = invoke.invokeInterfaceMethod(Methods.FAILURE, routingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public Handler<RoutingContext> createHandler(String handlerClassName) {
}
}

public Handler<RoutingContext> runOnVirtualThread(Handler<RoutingContext> routeHandler) {
return new VirtualThreadsRouteHandler(routeHandler);
}

public Handler<RoutingContext> compressRouteHandler(Handler<RoutingContext> routeHandler, HttpCompression compression) {
if (httpBuildTimeConfig.enableCompression) {
return new HttpCompressionHandler(routeHandler, compression,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.vertx.web.runtime;

import io.quarkus.vertx.core.runtime.VertxCoreRecorder;
import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.quarkus.virtual.threads.VirtualThreadsRecorder;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.ext.web.RoutingContext;

public class VirtualThreadsRouteHandler implements Handler<RoutingContext> {

private final Handler<RoutingContext> routeHandler;

public VirtualThreadsRouteHandler(Handler<RoutingContext> routeHandler) {
this.routeHandler = routeHandler;
}

@Override
public void handle(RoutingContext context) {
Context vertxContext = VertxContext.getOrCreateDuplicatedContext(VertxCoreRecorder.getVertx().get());
VertxContextSafetyToggle.setContextSafe(vertxContext, true);
vertxContext.runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
VirtualThreadsRecorder.getCurrent().execute(new Runnable() {
@Override
public void run() {
routeHandler.handle(context);
}
});
}
});
}

}
3 changes: 2 additions & 1 deletion integration-tests/virtual-threads/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
<module>vertx-event-bus-virtual-threads</module>
<module>scheduler-virtual-threads</module>
<module>quartz-virtual-threads</module>
<module>virtual-threads-disabled</module>
<module>virtual-threads-disabled</module>
<module>reactive-routes-virtual-threads</module>
</modules>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>quarkus-virtual-threads-integration-tests-parent</artifactId>
<groupId>io.quarkus</groupId>
<version>999-SNAPSHOT</version>
</parent>

<artifactId>quarkus-integration-test-virtual-threads-reactive-routes</artifactId>
<name>Quarkus - Integration Tests - Virtual Threads - Reactive Routes</name>

<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus.junit5</groupId>
<artifactId>junit5-virtual-threads</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<!-- Minimal test dependencies to *-deployment artifacts for consistent build order -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.quarkus.virtual.vertx.web;

import java.lang.reflect.Method;

import io.quarkus.arc.Arc;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Vertx;

public class AssertHelper {

/**
* Asserts that the current method:
* - runs on a duplicated context
* - runs on a virtual thread
* - has the request scope activated
*/
public static void assertEverything() {
assertThatTheRequestScopeIsActive();
assertThatItRunsOnVirtualThread();
assertThatItRunsOnADuplicatedContext();
}

public static void assertThatTheRequestScopeIsActive() {
if (!Arc.container().requestContext().isActive()) {
throw new AssertionError(("Expected the request scope to be active"));
}
}

public static void assertThatItRunsOnADuplicatedContext() {
var context = Vertx.currentContext();
if (context == null) {
throw new AssertionError("The method does not run on a Vert.x context");
}
if (!VertxContext.isOnDuplicatedContext()) {
throw new AssertionError("The method does not run on a Vert.x **duplicated** context");
}
}

public static void assertThatItRunsOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (!virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is not a virtual thread");
}
} catch (Exception e) {
throw new AssertionError(
"Thread " + Thread.currentThread() + " is not a virtual thread - cannot invoke Thread.isVirtual()", e);
}
}

public static void assertNotOnVirtualThread() {
// We cannot depend on a Java 20.
try {
Method isVirtual = Thread.class.getMethod("isVirtual");
isVirtual.setAccessible(true);
boolean virtual = (Boolean) isVirtual.invoke(Thread.currentThread());
if (virtual) {
throw new AssertionError("Thread " + Thread.currentThread() + " is a virtual thread");
}
} catch (Exception e) {
// Trying using Thread name.
var name = Thread.currentThread().toString();
if (name.toLowerCase().contains("virtual")) {
throw new AssertionError("Thread " + Thread.currentThread() + " seems to be a virtual thread");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.quarkus.virtual.vertx.web;

import io.quarkus.vertx.web.Route;
import io.smallrye.common.annotation.RunOnVirtualThread;

public class Routes {

@RunOnVirtualThread
@Route
String hello() {
AssertHelper.assertEverything();
// Quarkus specific - each VT has a unique name
return Thread.currentThread().getName();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quarkus.native.additional-build-args=--enable-preview

quarkus.package.quiltflower.enabled=true
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.virtual.mail;
package io.quarkus.virtual.vertx.web;

import io.quarkus.test.junit.QuarkusIntegrationTest;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.quarkus.virtual.vertx.web;

import static io.restassured.RestAssured.get;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit5.virtual.ShouldNotPin;
import io.quarkus.test.junit5.virtual.VirtualThreadUnit;

@QuarkusTest
@VirtualThreadUnit
@ShouldNotPin
class RunOnVirtualThreadTest {

@Test
void testRoute() {
String bodyStr = get("/hello").then().statusCode(200).extract().asString();
// Each VT has a unique name in quarkus
assertNotEquals(bodyStr, get("/hello").then().statusCode(200).extract().asString());

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.virtual.scheduler;

import io.quarkus.test.junit.QuarkusIntegrationTest;

@QuarkusIntegrationTest
class RunOnVirtualThreadIT extends RunOnVirtualThreadTest {

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.quarkus.virtual.mail;
package io.quarkus.virtual.scheduler;

import java.time.Duration;
import java.util.List;
Expand Down

0 comments on commit 14dd05e

Please sign in to comment.