From faf0d3693be7b1b662a9f8109ae118dbf029283c Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Wed, 15 May 2024 12:01:17 +0200 Subject: [PATCH] WebSockets Next: provide strategies to process unhandled failures - resolves #40648 - also add WebSocketConnection#closeReason() and WebSocketClientConnection#closeReason() --- .../asciidoc/websockets-next-reference.adoc | 6 +++ .../client/ClientMessageErrorEndpoint.java | 35 ++++++++++++++ .../test/client/ClientOpenErrorEndpoint.java | 37 ++++++++++++++ .../next/test/client/ServerEndpoint.java | 24 ++++++++++ ...dledMessageFailureDefaultStrategyTest.java | 47 ++++++++++++++++++ ...nhandledMessageFailureLogStrategyTest.java | 46 ++++++++++++++++++ ...handledOpenFailureDefaultStrategyTest.java | 46 ++++++++++++++++++ .../UnhandledOpenFailureLogStrategyTest.java | 47 ++++++++++++++++++ .../next/test/errors/EchoMessageError.java | 23 +++++++++ .../next/test/errors/EchoOpenError.java | 25 ++++++++++ ...dledMessageFailureDefaultStrategyTest.java | 46 ++++++++++++++++++ ...nhandledMessageFailureLogStrategyTest.java | 44 +++++++++++++++++ ...handledOpenFailureDefaultStrategyTest.java | 45 +++++++++++++++++ .../UnhandledOpenFailureLogStrategyTest.java | 43 +++++++++++++++++ .../websockets/next/test/utils/WSClient.java | 4 ++ .../quarkus/websockets/next/CloseReason.java | 2 + .../next/UnhandledFailureStrategy.java | 20 ++++++++ .../next/WebSocketClientConnection.java | 8 +++- .../websockets/next/WebSocketConnection.java | 8 +++- .../next/WebSocketsClientRuntimeConfig.java | 8 ++++ .../next/WebSocketsServerRuntimeConfig.java | 8 ++++ .../websockets/next/runtime/Endpoints.java | 48 ++++++++++++++----- .../next/runtime/WebSocketConnectionBase.java | 2 +- .../next/runtime/WebSocketConnectorImpl.java | 1 + .../next/runtime/WebSocketServerRecorder.java | 2 +- 25 files changed, 609 insertions(+), 16 deletions(-) create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientMessageErrorEndpoint.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientOpenErrorEndpoint.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ServerEndpoint.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureDefaultStrategyTest.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureDefaultStrategyTest.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureLogStrategyTest.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoMessageError.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoOpenError.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureDefaultStrategyTest.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureLogStrategyTest.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureDefaultStrategyTest.java create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureLogStrategyTest.java create mode 100644 extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/UnhandledFailureStrategy.java diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index 62039a09f8114..55203bc86b1a2 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -16,6 +16,8 @@ include::_attributes.adoc[] include::{includes}/extension-status.adoc[] +The `quarkus-websockets-next` extension provides a modern declarative API to define WebSocket server and client endpoints. + == The WebSocket protocol The _WebSocket_ protocol, documented in the https://datatracker.ietf.org/doc/html/rfc6455[RFC6455], establishes a standardized method for creating a bidirectional communication channel between a client and a server through a single TCP connection. @@ -457,6 +459,10 @@ The method that declares a most-specific supertype of the actual exception is se NOTE: The `@io.quarkus.websockets.next.OnError` annotation can be also used to declare a global error handler, i.e. a method that is not declared on a WebSocket endpoint. Such a method may not accept `@PathParam` paremeters. Error handlers declared on an endpoint take precedence over the global error handlers. +When an error occurs but no error handler can handle the failure, Quarkus uses the strategy specified by `quarkus.websockets-next.server.unhandled-failure-strategy` and `quarkus.websockets-next.client.unhandled-failure-strategy`, respectively. +By default, the connection is closed. +Alternatively, an error message can be logged or no operation performed. + == Access to the WebSocketConnection The `io.quarkus.websockets.next.WebSocketConnection` object represents the WebSocket connection. diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientMessageErrorEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientMessageErrorEndpoint.java new file mode 100644 index 0000000000000..8de5fa38add05 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientMessageErrorEndpoint.java @@ -0,0 +1,35 @@ +package io.quarkus.websockets.next.test.client; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocketClient; + +@WebSocketClient(path = "/endpoint") +public class ClientMessageErrorEndpoint { + + static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1); + + static final List MESSAGES = new CopyOnWriteArrayList<>(); + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnTextMessage + void message(String message) { + if ("foo".equals(message)) { + throw new IllegalStateException("I cannot do it!"); + } else { + MESSAGES.add(message); + } + MESSAGE_LATCH.countDown(); + } + + @OnClose + void close() { + CLOSED_LATCH.countDown(); + } + +} \ No newline at end of file diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientOpenErrorEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientOpenErrorEndpoint.java new file mode 100644 index 0000000000000..990c85bed80c7 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ClientOpenErrorEndpoint.java @@ -0,0 +1,37 @@ +package io.quarkus.websockets.next.test.client; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocketClient; + +@WebSocketClient(path = "/endpoint") +public class ClientOpenErrorEndpoint { + + static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1); + + static final List MESSAGES = new CopyOnWriteArrayList<>(); + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnOpen + void open() { + throw new IllegalStateException("I cannot do it!"); + } + + @OnTextMessage + void message(String message) { + MESSAGES.add(message); + MESSAGE_LATCH.countDown(); + } + + @OnClose + void close() { + CLOSED_LATCH.countDown(); + } + +} \ No newline at end of file diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ServerEndpoint.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ServerEndpoint.java new file mode 100644 index 0000000000000..b2fbcbc19cd53 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/ServerEndpoint.java @@ -0,0 +1,24 @@ +package io.quarkus.websockets.next.test.client; + +import java.util.concurrent.CountDownLatch; + +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; + +@WebSocket(path = "/endpoint") +public class ServerEndpoint { + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnTextMessage + String echo(String message) { + return message; + } + + @OnClose + void close() { + CLOSED_LATCH.countDown(); + } + +} \ No newline at end of file diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureDefaultStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureDefaultStrategyTest.java new file mode 100644 index 0000000000000..a1d80c81a021f --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureDefaultStrategyTest.java @@ -0,0 +1,47 @@ +package io.quarkus.websockets.next.test.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnector; + +public class UnhandledMessageFailureDefaultStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(ServerEndpoint.class, ClientMessageErrorEndpoint.class); + }); + + @Inject + WebSocketConnector connector; + + @TestHTTPResource("/") + URI testUri; + + @Test + void testError() throws InterruptedException { + WebSocketClientConnection connection = connector + .baseUri(testUri) + .connectAndAwait(); + connection.sendTextAndAwait("foo"); + assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(ClientMessageErrorEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(connection.isClosed()); + assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), connection.closeReason().getCode()); + assertTrue(ClientMessageErrorEndpoint.MESSAGES.isEmpty()); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java new file mode 100644 index 0000000000000..1b047d03e5bd7 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledMessageFailureLogStrategyTest.java @@ -0,0 +1,46 @@ +package io.quarkus.websockets.next.test.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnector; + +public class UnhandledMessageFailureLogStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(ServerEndpoint.class, ClientMessageErrorEndpoint.class); + }).overrideConfigKey("quarkus.websockets-next.client.unhandled-failure-strategy", "log"); + + @Inject + WebSocketConnector connector; + + @TestHTTPResource("/") + URI testUri; + + @Test + void testError() throws InterruptedException { + WebSocketClientConnection connection = connector + .baseUri(testUri) + .connectAndAwait(); + connection.sendTextAndAwait("foo"); + assertFalse(connection.isClosed()); + connection.sendText("bar"); + assertTrue(ClientMessageErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS)); + assertEquals("bar", ClientMessageErrorEndpoint.MESSAGES.get(0)); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureDefaultStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureDefaultStrategyTest.java new file mode 100644 index 0000000000000..decf21f2b1705 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureDefaultStrategyTest.java @@ -0,0 +1,46 @@ +package io.quarkus.websockets.next.test.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnector; + +public class UnhandledOpenFailureDefaultStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(ServerEndpoint.class, ClientOpenErrorEndpoint.class); + }); + + @Inject + WebSocketConnector connector; + + @TestHTTPResource("/") + URI testUri; + + @Test + void testError() throws InterruptedException { + WebSocketClientConnection connection = connector + .baseUri(testUri) + .connectAndAwait(); + assertTrue(ServerEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(ClientOpenErrorEndpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(connection.isClosed()); + assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), connection.closeReason().getCode()); + assertTrue(ClientOpenErrorEndpoint.MESSAGES.isEmpty()); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureLogStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureLogStrategyTest.java new file mode 100644 index 0000000000000..dc5f6d41504fa --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/client/UnhandledOpenFailureLogStrategyTest.java @@ -0,0 +1,47 @@ +package io.quarkus.websockets.next.test.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnector; + +public class UnhandledOpenFailureLogStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(ServerEndpoint.class, ClientOpenErrorEndpoint.class); + }).overrideConfigKey("quarkus.websockets-next.client.unhandled-failure-strategy", "log"); + + @Inject + WebSocketConnector connector; + + @TestHTTPResource("/") + URI testUri; + + @Test + void testError() throws InterruptedException { + WebSocketClientConnection connection = connector + .baseUri(testUri) + .connectAndAwait(); + connection.sendTextAndAwait("foo"); + assertFalse(connection.isClosed()); + assertNull(connection.closeReason()); + assertTrue(ClientOpenErrorEndpoint.MESSAGE_LATCH.await(5, TimeUnit.SECONDS)); + assertEquals("foo", ClientOpenErrorEndpoint.MESSAGES.get(0)); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoMessageError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoMessageError.java new file mode 100644 index 0000000000000..3d52df32d1473 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoMessageError.java @@ -0,0 +1,23 @@ +package io.quarkus.websockets.next.test.errors; + +import java.util.concurrent.CountDownLatch; + +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; + +@WebSocket(path = "/echo") +public class EchoMessageError { + + static final CountDownLatch MESSAGE_FAILURE_CALLED = new CountDownLatch(1); + + @OnTextMessage + String echo(String message) { + if ("foo".equals(message)) { + MESSAGE_FAILURE_CALLED.countDown(); + throw new IllegalStateException("I cannot do it!"); + } else { + return message; + } + } + +} \ No newline at end of file diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoOpenError.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoOpenError.java new file mode 100644 index 0000000000000..7a079a0eb45c2 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/EchoOpenError.java @@ -0,0 +1,25 @@ +package io.quarkus.websockets.next.test.errors; + +import java.util.concurrent.CountDownLatch; + +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; + +@WebSocket(path = "/echo") +public class EchoOpenError { + + static final CountDownLatch OPEN_CALLED = new CountDownLatch(1); + + @OnOpen + void open() { + OPEN_CALLED.countDown(); + throw new IllegalStateException("I cannot do it!"); + } + + @OnTextMessage + String echo(String message) { + return message; + } + +} \ No newline at end of file diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureDefaultStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureDefaultStrategyTest.java new file mode 100644 index 0000000000000..1207e6689277a --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureDefaultStrategyTest.java @@ -0,0 +1,46 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class UnhandledMessageFailureDefaultStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(EchoMessageError.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() throws InterruptedException { + try (WSClient client = WSClient.create(vertx).connect(testUri)) { + client.sendAndAwait("foo"); + assertTrue(EchoMessageError.MESSAGE_FAILURE_CALLED.await(5, TimeUnit.SECONDS)); + Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed()); + assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode()); + } + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureLogStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureLogStrategyTest.java new file mode 100644 index 0000000000000..0061937345fcf --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledMessageFailureLogStrategyTest.java @@ -0,0 +1,44 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class UnhandledMessageFailureLogStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(EchoMessageError.class, WSClient.class); + }).overrideConfigKey("quarkus.websockets-next.server.unhandled-failure-strategy", "log"); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testErrorDoesNotCloseConnection() throws InterruptedException { + try (WSClient client = WSClient.create(vertx).connect(testUri)) { + client.sendAndAwait("foo"); + assertTrue(EchoMessageError.MESSAGE_FAILURE_CALLED.await(5, TimeUnit.SECONDS)); + client.sendAndAwait("bar"); + client.waitForMessages(1); + assertEquals("bar", client.getLastMessage().toString()); + } + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureDefaultStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureDefaultStrategyTest.java new file mode 100644 index 0000000000000..61c712d005d86 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureDefaultStrategyTest.java @@ -0,0 +1,45 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class UnhandledOpenFailureDefaultStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(EchoOpenError.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() throws InterruptedException { + try (WSClient client = WSClient.create(vertx).connect(testUri)) { + assertTrue(EchoOpenError.OPEN_CALLED.await(5, TimeUnit.SECONDS)); + Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed()); + assertEquals(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code(), client.closeStatusCode()); + } + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureLogStrategyTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureLogStrategyTest.java new file mode 100644 index 0000000000000..b704e8c551cde --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/errors/UnhandledOpenFailureLogStrategyTest.java @@ -0,0 +1,43 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class UnhandledOpenFailureLogStrategyTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(EchoOpenError.class, WSClient.class); + }).overrideConfigKey("quarkus.websockets-next.server.unhandled-failure-strategy", "log"); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testErrorDoesNotCloseConnection() throws InterruptedException { + try (WSClient client = WSClient.create(vertx).connect(testUri)) { + assertTrue(EchoOpenError.OPEN_CALLED.await(5, TimeUnit.SECONDS)); + client.sendAndAwait("foo"); + client.waitForMessages(1); + assertEquals("foo", client.getLastMessage().toString()); + } + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java index 773b9ab8d134f..955eb9c1b315c 100644 --- a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java @@ -126,6 +126,10 @@ public boolean isClosed() { return socket.get().isClosed(); } + public int closeStatusCode() { + return socket.get().closeStatusCode(); + } + @Override public void close() { disconnect(); diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/CloseReason.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/CloseReason.java index 55e100a9b9e7d..108c2d150b55b 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/CloseReason.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/CloseReason.java @@ -15,6 +15,8 @@ public class CloseReason { public static final CloseReason NORMAL = new CloseReason(WebSocketCloseStatus.NORMAL_CLOSURE.code()); + public static final CloseReason INTERNAL_SERVER_ERROR = new CloseReason(WebSocketCloseStatus.INTERNAL_SERVER_ERROR.code()); + private final int code; private final String message; diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/UnhandledFailureStrategy.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/UnhandledFailureStrategy.java new file mode 100644 index 0000000000000..bdfb1f17ad2be --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/UnhandledFailureStrategy.java @@ -0,0 +1,20 @@ +package io.quarkus.websockets.next; + +/** + * The strategy used when an error occurs but no error handler can handle the failure. + */ +public enum UnhandledFailureStrategy { + /** + * Close the connection. + */ + CLOSE, + /** + * Log an error message. + */ + LOG, + /** + * No operation. + */ + NOOP; + +} \ No newline at end of file diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketClientConnection.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketClientConnection.java index 5151349c559d8..e262a9839bd44 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketClientConnection.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketClientConnection.java @@ -27,7 +27,7 @@ public interface WebSocketClientConnection extends Sender, BlockingSender { /** * * @param name - * @return the actual value of the path parameter or null + * @return the actual value of the path parameter or {@code null} * @see WebSocketClient#path() */ String pathParam(String name); @@ -42,6 +42,12 @@ public interface WebSocketClientConnection extends Sender, BlockingSender { */ boolean isClosed(); + /** + * + * @return the close reason or {@code null} if the connection is not closed + */ + CloseReason closeReason(); + /** * * @return {@code true} if the WebSocket is open diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java index be8acb1a93539..d8e1a3cd98551 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java @@ -37,7 +37,7 @@ public interface WebSocketConnection extends Sender, BlockingSender { /** * * @param name - * @return the actual value of the path parameter or null + * @return the actual value of the path parameter or {@code null} * @see WebSocket#path() */ String pathParam(String name); @@ -67,6 +67,12 @@ public interface WebSocketConnection extends Sender, BlockingSender { */ boolean isClosed(); + /** + * + * @return the close reason or {@code null} if the connection is not closed + */ + CloseReason closeReason(); + /** * * @return {@code true} if the WebSocket is open diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java index dff4780aa45c7..ecaf0bb169d0d 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java @@ -40,4 +40,12 @@ public interface WebSocketsClientRuntimeConfig { */ Optional autoPingInterval(); + /** + * The strategy used when an error occurs but no error handler can handle the failure. + *

+ * By default, the connection is closed when an unhandled failure occurs. + */ + @WithDefault("close") + UnhandledFailureStrategy unhandledFailureStrategy(); + } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java index 28e9d284c2fce..43beffda35600 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java @@ -46,4 +46,12 @@ public interface WebSocketsServerRuntimeConfig { */ Optional autoPingInterval(); + /** + * The strategy used when an error occurs but no error handler can handle the failure. + *

+ * By default, the connection is closed when an unhandled failure occurs. + */ + @WithDefault("close") + UnhandledFailureStrategy unhandledFailureStrategy(); + } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java index e8ed61d23620c..ce4d2c096628d 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java @@ -13,6 +13,8 @@ import io.quarkus.security.AuthenticationFailedException; import io.quarkus.security.ForbiddenException; import io.quarkus.security.UnauthorizedException; +import io.quarkus.websockets.next.CloseReason; +import io.quarkus.websockets.next.UnhandledFailureStrategy; import io.quarkus.websockets.next.WebSocketException; import io.quarkus.websockets.next.runtime.WebSocketSessionContext.SessionContextState; import io.smallrye.mutiny.Multi; @@ -29,7 +31,7 @@ class Endpoints { static void initialize(Vertx vertx, ArcContainer container, Codecs codecs, WebSocketConnectionBase connection, WebSocketBase ws, String generatedEndpointClass, Optional autoPingInterval, - SecuritySupport securitySupport, Runnable onClose) { + SecuritySupport securitySupport, UnhandledFailureStrategy unhandledFailureStrategy, Runnable onClose) { Context context = vertx.getOrCreateContext(); @@ -75,7 +77,7 @@ public void handle(Void event) { LOG.debugf("@OnTextMessage callback consuming Multi completed: %s", connection); } else { - logFailure(r.cause(), + handleFailure(unhandledFailureStrategy, r.cause(), "Unable to complete @OnTextMessage callback consuming Multi", connection); } @@ -93,7 +95,7 @@ public void handle(Void event) { LOG.debugf("@OnBinaryMessage callback consuming Multi completed: %s", connection); } else { - logFailure(r.cause(), + handleFailure(unhandledFailureStrategy, r.cause(), "Unable to complete @OnBinaryMessage callback consuming Multi", connection); } @@ -102,7 +104,7 @@ public void handle(Void event) { }); } } else { - logFailure(r.cause(), "Unable to complete @OnOpen callback", connection); + handleFailure(unhandledFailureStrategy, r.cause(), "Unable to complete @OnOpen callback", connection); } }); } @@ -115,7 +117,8 @@ public void handle(Void event) { if (r.succeeded()) { LOG.debugf("@OnTextMessage callback consumed text message: %s", connection); } else { - logFailure(r.cause(), "Unable to consume text message in @OnTextMessage callback", + handleFailure(unhandledFailureStrategy, r.cause(), + "Unable to consume text message in @OnTextMessage callback", connection); } }); @@ -130,7 +133,8 @@ public void handle(Void event) { } catch (Throwable throwable) { endpoint.doOnError(throwable).subscribe().with( v -> LOG.debugf("Text message >> Multi: %s", connection), - t -> LOG.errorf(t, "Unable to send text message to Multi: %s", connection)); + t -> handleFailure(unhandledFailureStrategy, t, "Unable to send text message to Multi", + connection)); } finally { contextSupport.end(false); } @@ -144,7 +148,8 @@ public void handle(Void event) { if (r.succeeded()) { LOG.debugf("@OnBinaryMessage callback consumed binary message: %s", connection); } else { - logFailure(r.cause(), "Unable to consume binary message in @OnBinaryMessage callback", + handleFailure(unhandledFailureStrategy, r.cause(), + "Unable to consume binary message in @OnBinaryMessage callback", connection); } }); @@ -159,7 +164,8 @@ public void handle(Void event) { } catch (Throwable throwable) { endpoint.doOnError(throwable).subscribe().with( v -> LOG.debugf("Binary message >> Multi: %s", connection), - t -> LOG.errorf(t, "Unable to send binary message to Multi: %s", connection)); + t -> handleFailure(unhandledFailureStrategy, t, "Unable to send binary message to Multi", + connection)); } finally { contextSupport.end(false); } @@ -171,7 +177,8 @@ public void handle(Void event) { if (r.succeeded()) { LOG.debugf("@OnPongMessage callback consumed text message: %s", connection); } else { - logFailure(r.cause(), "Unable to consume text message in @OnPongMessage callback", connection); + handleFailure(unhandledFailureStrategy, r.cause(), + "Unable to consume text message in @OnPongMessage callback", connection); } }); }); @@ -198,7 +205,8 @@ public void handle(Void event) { if (r.succeeded()) { LOG.debugf("@OnClose callback completed: %s", connection); } else { - logFailure(r.cause(), "Unable to complete @OnClose callback", connection); + handleFailure(unhandledFailureStrategy, r.cause(), "Unable to complete @OnClose callback", + connection); } onClose.run(); if (timerId != null) { @@ -218,14 +226,30 @@ public void handle(Throwable t) { public void handle(Void event) { endpoint.doOnError(t).subscribe().with( v -> LOG.debugf("Error [%s] processed: %s", t.getClass(), connection), - t -> LOG.errorf(t, "Unhandled error occurred: %s", t.toString(), - connection)); + t -> handleFailure(unhandledFailureStrategy, t, "Unhandled error occurred", connection)); } }); } }); } + private static void handleFailure(UnhandledFailureStrategy strategy, Throwable cause, String message, + WebSocketConnectionBase connection) { + switch (strategy) { + case CLOSE -> closeConnection(cause, connection); + case LOG -> logFailure(cause, message, connection); + case NOOP -> LOG.tracef("Unhandled failure ignored: %s", connection); + default -> throw new IllegalArgumentException("Unexpected strategy: " + strategy); + } + } + + private static void closeConnection(Throwable cause, WebSocketConnectionBase connection) { + connection.close(CloseReason.INTERNAL_SERVER_ERROR).subscribe().with( + v -> LOG.debugf("Connection closed due to unhandled failure %s: %s", cause, connection), + t -> LOG.errorf("Unable to close connection [%s] due to unhandled failure [%s]: %s", connection.id(), cause, + t)); + } + private static void logFailure(Throwable throwable, String message, WebSocketConnectionBase connection) { if (isWebSocketIsClosedFailure(throwable, connection)) { LOG.debugf(throwable, diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java index e722da795ede8..00ae0dc9e0d1f 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java @@ -125,6 +125,6 @@ public CloseReason closeReason() { if (ws.isClosed()) { return new CloseReason(ws.closeStatusCode(), ws.closeReason()); } - throw new IllegalStateException("Connection is not closed"); + return null; } } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java index d6281e5da71f4..8b8781ccac2ed 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java @@ -116,6 +116,7 @@ public Uni connect() { Endpoints.initialize(vertx, Arc.container(), codecs, connection, ws, clientEndpoint.generatedEndpointClass, config.autoPingInterval(), SecuritySupport.NOOP, + config.unhandledFailureStrategy(), () -> { connectionManager.remove(clientEndpoint.generatedEndpointClass, connection); client.close(); diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index 9384f8d60fc47..35bdae2ca2206 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -102,7 +102,7 @@ public void handle(RoutingContext ctx) { LOG.debugf("Connection created: %s", connection); Endpoints.initialize(vertx, container, codecs, connection, ws, generatedEndpointClass, - config.autoPingInterval(), securitySupport, + config.autoPingInterval(), securitySupport, config.unhandledFailureStrategy(), () -> connectionManager.remove(generatedEndpointClass, connection)); }); }