Skip to content

Commit

Permalink
WebSockets Next: error handlers part 4
Browse files Browse the repository at this point in the history
- use error handlers to process Mutiny Multi failures
- update docs
  • Loading branch information
mkouba committed Apr 3, 2024
1 parent e00af30 commit b8502d8
Show file tree
Hide file tree
Showing 15 changed files with 689 additions and 50 deletions.
52 changes: 41 additions & 11 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ This guide is maintained in the main Quarkus repository
and pull requests should be submitted there:
https://github.com/quarkusio/quarkus/tree/main/docs/src/main/asciidoc
////
= WebSockets-Next extension reference guide
[id="websockets-next-reference-guide"]
= WebSockets Next extension reference guide
:extension-status: preview
include::_attributes.adoc[]
:numbered:
Expand All @@ -12,7 +13,8 @@ include::_attributes.adoc[]
:topics: web,websockets
:extensions: io.quarkus:quarkus-websockets-next

IMPORTANT: The `websockets-next` extension is experimental. The proposal API may change in future releases.
The `websockets-next` extension provides an experimental API to define _WebSocket_ endpoints declaratively.
The proposed API may change in future releases.

== The WebSocket protocol

Expand Down Expand Up @@ -193,6 +195,7 @@ A WebSocket endpoint comprises the following components:
* At most one `@OnBinaryMessage` method: Handles the binary messages the connected client sends.
* At most one `@OnOpen` method: Invoked when a client connects to the WebSocket.
* At most one `@OnClose` method: Executed upon the client disconnecting from the WebSocket.
* Any number of `@OnError` methods: Invoked when an error occurs; that is when an endpoint callback throws a runtime error, or when a conversion errors occurs, or when a returned `io.smallrye.mutiny.Uni`/`io.smallrye.mutiny.Multi` receives a failure.

Only some endpoints need to include all methods.
However, it must contain at least `@On[Text|Binary]Message` or `@OnOpen`.
Expand Down Expand Up @@ -222,23 +225,26 @@ Here are the rules governing execution:
* Methods annotated with `@RunOnVirtualThread` are considered blocking and should execute on a virtual thread.
* Blocking methods must execute on a worker thread if not annotated with `@RunOnVirtualThread`.
* When `@RunOnVirtualThread` is employed, each invocation spawns a new virtual thread.
* Methods returning `CompletionStage` and `Uni` are considered non-blocking
* Methods returning `Multi` are considered non-blocking and must be subscribed to, except if they return their own `Multi`.
* Methods returning `CompletionStage`, `Uni` and `Multi` are considered non-blocking.
* Methods returning `void` or plain objects are considered blocking.

=== Parameters
=== Method Parameters

These methods can accept parameters in two formats:
The method must accept exactly one message parameter:

* The message object (of any type).
* A `Multi<X>` with X as the message type.
* Any other parameters should be flagged as errors.

However, it may also accept the following parameters:

* `WebSocketConnection`
* `HandshakeRequest`
* `String` parameters annotated with `@PathParam`

The message object represents the data sent and can be accessed as either raw content (`String`, `JsonObject`, `JsonArray`, `Buffer` or `byte[]`) or deserialized high-level objects, which is the recommended approach.

When receiving a `Multi`, the method is invoked once per connection, and the provided `Multi` receives the items transmitted by this connection.
The method must subscribe to the `Multi` to receive these items (or return a Multi).
Cancelling this subscription closes the associated connection.

=== Allowed Returned Types

Expand Down Expand Up @@ -324,7 +330,7 @@ When a method is intended to produce a message written to the client, it can emi
Emitting `null` signifies no response to be sent to the client, allowing for skipping a response when needed.

=== JsonObject and JsonArray
Vert.x `JSONObject` and `JSONArray` instances bypass the serialization and deserialization mechanisms.
Vert.x `JsonObject` and `JsonArray` instances bypass the serialization and deserialization mechanisms.
Messages are sent as text messages.

=== Broadcasting
Expand All @@ -341,6 +347,8 @@ String emitToAll(String message) {

The same principle applies to methods returning instances of `Multi` or `Uni`.

NOTE: If you need to select the connected clients that should receive the message, you can use `WebSocketConnection.broadcast().filter().sendText()`.

== OnOpen and OnClose methods

The WebSocket endpoint can also be notified when a client connects or disconnects.
Expand Down Expand Up @@ -369,8 +377,11 @@ These methods have access to the _session-scoped_ `WebSocketConnection` bean.

=== Parameters

Methods annotated with `@OnOpen` and `@OnClose` do not accept any parameters.
If such methods declare parameters, they will be flagged as errors and reported at build time.
Methods annotated with `@OnOpen` and `@OnClose` may accept the following parameters:

* `WebSocketConnection`
* `HandshakeRequest`
* `String` parameters annotated with `@PathParam`

=== Allowed Returned Types

Expand Down Expand Up @@ -425,6 +436,25 @@ String onOpen() {
}
----

== Error Handling

The WebSocket endpoint can also be notified when an error occurs.
A WebSocket endpoint method annotated with `@io.quarkus.websockets.next.OnError` is invoked when an endpoint callback throws a runtime error, or when a conversion errors occurs,
or when a returned `io.smallrye.mutiny.Uni`/`io.smallrye.mutiny.Multi` receives a failure.

The method must accept exactly one "error" parameter, i.e. a parameter that is assignable from `java.lang.Throwable`.
The method may also accept the following parameters:

* `WebSocketConnection`
* `HandshakeRequest`
* `String` parameters annotated with `@PathParam`

An endpoint may declare multiple methods annotated with `@io.quarkus.websockets.next.OnError`.
However, each method must declare a different error parameter.
The method that declares a most-specific supertype of the actual exception is selected.

NOTE: The `@io.quarkus.websockets.next.OnError` annotation can be also used to declare a global error handler, i.e. a method that is not declared on a WebSocket endpoint. Such a method may not accept `@PathParam` paremeters. Error handlers declared on an endpoint take precedence over the global error handlers.

== Access to the WebSocketConnection

The `io.quarkus.websockets.next.WebSocketConnection` object represents the WebSocket connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ private String generateEndpoint(WebSocketEndpointBuildItem endpoint,
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class),
doOnOpen.getThis(), doOnOpen.load(endpoint.bean.getIdentifier()));
// Call the business method
TryBlock tryBlock = onErrorTryBlock(doOnOpen);
TryBlock tryBlock = onErrorTryBlock(doOnOpen, doOnOpen.getThis());
ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index);
ResultHandle ret = tryBlock.invokeVirtualMethod(MethodDescriptor.of(callback.method), beanInstance, args);
encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret);
Expand All @@ -488,7 +488,7 @@ private String generateEndpoint(WebSocketEndpointBuildItem endpoint,
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class),
doOnClose.getThis(), doOnClose.load(endpoint.bean.getIdentifier()));
// Call the business method
TryBlock tryBlock = onErrorTryBlock(doOnClose);
TryBlock tryBlock = onErrorTryBlock(doOnClose, doOnClose.getThis());
ResultHandle[] args = callback.generateArguments(tryBlock.getThis(), tryBlock, transformedAnnotations, index);
ResultHandle ret = tryBlock.invokeVirtualMethod(MethodDescriptor.of(callback.method), beanInstance, args);
encodeAndReturnResult(tryBlock.getThis(), tryBlock, callback, globalErrorHandlers, endpoint, ret);
Expand Down Expand Up @@ -632,7 +632,7 @@ private void generateOnMessage(ClassCreator endpointCreator, WebSocketEndpointBu
MethodCreator doOnMessage = endpointCreator.getMethodCreator("doOn" + messageType + "Message", Uni.class,
methodParameterType);

TryBlock tryBlock = onErrorTryBlock(doOnMessage);
TryBlock tryBlock = onErrorTryBlock(doOnMessage, doOnMessage.getThis());
// Foo foo = beanInstance("foo");
ResultHandle beanInstance = tryBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "beanInstance", Object.class, String.class),
Expand Down Expand Up @@ -673,13 +673,13 @@ private TryBlock uniFailureTryBlock(BytecodeCreator method) {
return tryBlock;
}

private TryBlock onErrorTryBlock(BytecodeCreator method) {
private TryBlock onErrorTryBlock(BytecodeCreator method, ResultHandle endpointThis) {
TryBlock tryBlock = method.tryBlock();
CatchBlockCreator catchBlock = tryBlock.addCatch(Throwable.class);
// return doOnError(t);
catchBlock.returnValue(catchBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class, "doOnError", Uni.class, Throwable.class),
catchBlock.getThis(), catchBlock.getCaughtException()));
endpointThis, catchBlock.getCaughtException()));
return tryBlock;
}

Expand Down Expand Up @@ -810,23 +810,28 @@ private ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCreator me
return uniOnFailureDoOnError(endpointThis, method, callback, uniChain, endpoint, globalErrorHandlers);
}
} else if (callback.isReturnTypeMulti()) {
// return multiBinary(multi, broadcast, m -> {
// Buffer buffer = encodeBuffer(m);
// return sendBinary(buffer,broadcast);
//});
// try {
// Buffer buffer = encodeBuffer(m);
// return sendBinary(buffer,broadcast);
// } catch(Throwable t) {
// return doOnError(t);
// }
FunctionCreator fun = method.createFunction(Function.class);
BytecodeCreator funBytecode = fun.getBytecode();
ResultHandle buffer = encodeBuffer(funBytecode, callback.returnType().asParameterizedType().arguments().get(0),
funBytecode.getMethodParam(0), endpointThis, callback);
funBytecode.returnValue(funBytecode.invokeVirtualMethod(
// This checkcast should not be necessary but we need to use the endpoint in the function bytecode
// otherwise gizmo does not access the endpoint reference correcly
ResultHandle endpointBase = funBytecode.checkCast(endpointThis, WebSocketEndpointBase.class);
TryBlock tryBlock = onErrorTryBlock(fun.getBytecode(), endpointBase);
ResultHandle buffer = encodeBuffer(tryBlock, callback.returnType().asParameterizedType().arguments().get(0),
tryBlock.getMethodParam(0), endpointThis, callback);
tryBlock.returnValue(tryBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"sendBinary", Uni.class, Buffer.class, boolean.class),
endpointThis, buffer,
funBytecode.load(callback.broadcast())));
tryBlock.load(callback.broadcast())));
return method.invokeVirtualMethod(MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"multiBinary", Uni.class, Multi.class, boolean.class, Function.class), endpointThis,
"multiBinary", Uni.class, Multi.class, Function.class), endpointThis,
value,
method.load(callback.broadcast()),
fun.getInstance());
} else {
// return sendBinary(buffer,broadcast);
Expand Down Expand Up @@ -865,22 +870,29 @@ private ResultHandle encodeMessage(ResultHandle endpointThis, BytecodeCreator me
}
} else if (callback.isReturnTypeMulti()) {
// return multiText(multi, broadcast, m -> {
// String text = encodeText(m);
// return sendText(buffer,broadcast);
// try {
// String text = encodeText(m);
// return sendText(buffer,broadcast);
// } catch(Throwable t) {
// return doOnError(t);
// }
//});
FunctionCreator fun = method.createFunction(Function.class);
BytecodeCreator funBytecode = fun.getBytecode();
ResultHandle text = encodeText(funBytecode, callback.returnType().asParameterizedType().arguments().get(0),
funBytecode.getMethodParam(0), endpointThis, callback);
funBytecode.returnValue(funBytecode.invokeVirtualMethod(
// This checkcast should not be necessary but we need to use the endpoint in the function bytecode
// otherwise gizmo does not access the endpoint reference correcly
ResultHandle endpointBase = funBytecode.checkCast(endpointThis, WebSocketEndpointBase.class);
TryBlock tryBlock = onErrorTryBlock(fun.getBytecode(), endpointBase);
ResultHandle text = encodeText(tryBlock, callback.returnType().asParameterizedType().arguments().get(0),
tryBlock.getMethodParam(0), endpointThis, callback);
tryBlock.returnValue(tryBlock.invokeVirtualMethod(
MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"sendText", Uni.class, String.class, boolean.class),
endpointThis, text,
funBytecode.load(callback.broadcast())));
tryBlock.load(callback.broadcast())));
return method.invokeVirtualMethod(MethodDescriptor.ofMethod(WebSocketEndpointBase.class,
"multiText", Uni.class, Multi.class, boolean.class, Function.class), endpointThis,
"multiText", Uni.class, Multi.class, Function.class), endpointThis,
value,
method.load(callback.broadcast()),
fun.getInstance());
} else {
// return sendText(text,broadcast);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.quarkus.websockets.next.test.args;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.PathParam;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocketConnectOptions;

public class OnClosePathParamConnectionArgumentTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(MontyEcho.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo/monty/and/foo")
URI testUri;

@Test
void testArguments() throws InterruptedException {
String header = "fool";
WSClient client = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader("X-Test", header), testUri);
client.disconnect();
assertTrue(MontyEcho.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertEquals("foo:monty:fool", MontyEcho.CLOSED_MESSAGE.get());
}

@WebSocket(path = "/echo/{grail}/and/{life}")
public static class MontyEcho {

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);
static final AtomicReference<String> CLOSED_MESSAGE = new AtomicReference<>();

@OnOpen
void open() {
}

@OnClose
void close(@PathParam String life, @PathParam String grail, WebSocketConnection connection) {
CLOSED_MESSAGE.set(life + ":" + grail + ":" + connection.handshakeRequest().header("X-Test"));
CLOSED_LATCH.countDown();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.quarkus.websockets.next.test.args;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.net.URI;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.PathParam;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;
import io.vertx.core.http.WebSocketConnectOptions;

public class OnOpenPathParamConnectionArgumentTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(MontyEcho.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("echo/monty/and/foo")
URI testUri;

@Test
void testArguments() {
String header = "fool";
WSClient client = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader("X-Test", header), testUri);
client.waitForMessages(1);
assertEquals("foo:monty:fool", client.getMessages().get(0).toString());
}

@WebSocket(path = "/echo/{grail}/and/{life}")
public static class MontyEcho {

@OnOpen
String process(@PathParam String life, @PathParam String grail, WebSocketConnection connection) {
return life + ":" + grail + ":" + connection.handshakeRequest().header("X-Test");
}

}

}
Loading

0 comments on commit b8502d8

Please sign in to comment.