From 6a1d47a1ecb015231cf4b596a012df11de1a4c17 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 18 Apr 2024 21:11:56 +0200 Subject: [PATCH] WebSockets Next: configuration updates - remove unimplemented timeout - add compression support/level - add max message size - also invoke error handlers for connection exceptions - resolves #39590 --- .../maxmessagesize/MaxMessageSizeTest.java | 63 +++++++++++++++++++ .../next/WebSocketsRuntimeConfig.java | 27 +++++--- .../next/runtime/ContextSupport.java | 2 +- .../WebSocketHttpServerOptionsCustomizer.java | 13 +++- .../next/runtime/WebSocketServerRecorder.java | 15 +++++ 5 files changed, 111 insertions(+), 9 deletions(-) create mode 100644 extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/maxmessagesize/MaxMessageSizeTest.java diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/maxmessagesize/MaxMessageSizeTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/maxmessagesize/MaxMessageSizeTest.java new file mode 100644 index 0000000000000..2ffe0778d69f7 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/maxmessagesize/MaxMessageSizeTest.java @@ -0,0 +1,63 @@ +package io.quarkus.websockets.next.test.maxmessagesize; + +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class MaxMessageSizeTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }).overrideConfigKey("quarkus.websockets-next.max-message-size", "10"); + + @Inject + Vertx vertx; + + @TestHTTPResource("/echo") + URI echoUri; + + @Test + void testMaxMessageSize() { + WSClient client = WSClient.create(vertx).connect(echoUri); + String msg = "foo".repeat(10); + String reply = client.sendAndAwaitReply(msg).toString(); + assertNotEquals(msg, reply); + assertTrue(Echo.ISE_THROWN.get()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + static final AtomicBoolean ISE_THROWN = new AtomicBoolean(); + + @OnTextMessage + String process(String message) { + return message; + } + + @OnError + String onError(IllegalStateException ise) { + ISE_THROWN.set(true); + return ise.getMessage(); + } + + } + +} diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java index ff38df72391d6..e1c76dc33dde3 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java @@ -1,12 +1,14 @@ package io.quarkus.websockets.next; -import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.vertx.core.http.HttpServerOptions; @ConfigMapping(prefix = "quarkus.websockets-next") @ConfigRoot(phase = ConfigPhase.RUN_TIME) @@ -14,16 +16,27 @@ public interface WebSocketsRuntimeConfig { /** * See The WebSocket Protocol - * - * @return the supported subprotocols */ Optional> supportedSubprotocols(); /** - * TODO Not implemented yet. - * - * The default timeout to complete processing of a message. + * Compression Extensions for WebSocket are supported by default. + *

+ * See also RFC 7692 */ - Optional timeout(); + @WithDefault("true") + boolean perMessageCompressionSupported(); + + /** + * The compression level must be a value between 0 and 9. The default value is + * {@value HttpServerOptions#DEFAULT_WEBSOCKET_COMPRESSION_LEVEL}. + */ + OptionalInt compressionLevel(); + + /** + * The maximum size of a message in bytes. The default values is + * {@value HttpServerOptions#DEFAULT_MAX_WEBSOCKET_MESSAGE_SIZE}. + */ + OptionalInt maxMessageSize(); } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java index 6edb66693f906..9e4b7a4e81525 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java @@ -69,7 +69,7 @@ void endSession() { } ContextState currentRequestContextState() { - return requestContext.getState(); + return requestContext.getStateIfActive(); } static Context createNewDuplicatedContext(Context context, WebSocketConnection connection) { diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java index 5018b1aee2b35..5233fd4a1cc34 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java @@ -17,12 +17,23 @@ public class WebSocketHttpServerOptionsCustomizer implements HttpServerOptionsCu @Override public void customizeHttpServer(HttpServerOptions options) { - config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol); + customize(options); } @Override public void customizeHttpsServer(HttpServerOptions options) { + customize(options); + } + + private void customize(HttpServerOptions options) { config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol); + options.setPerMessageWebSocketCompressionSupported(config.perMessageCompressionSupported()); + if (config.compressionLevel().isPresent()) { + options.setWebSocketCompressionLevel(config.compressionLevel().getAsInt()); + } + if (config.maxMessageSize().isPresent()) { + options.setMaxWebSocketMessageSize(config.maxMessageSize().getAsInt()); + } } } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index a0b98d13b1209..c53d15645b01d 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -234,6 +234,21 @@ public void handle(Void event) { }); } }); + + ws.exceptionHandler(new Handler() { + @Override + public void handle(Throwable t) { + ContextSupport.createNewDuplicatedContext(context, connection).runOnContext(new Handler() { + @Override + public void handle(Void event) { + endpoint.doOnError(t).subscribe().with( + v -> LOG.debugf("Error [%s] processed: %s", t.getClass(), connection), + t -> LOG.errorf(t, "Unhandled error occured: %s", t.toString(), + connection)); + } + }); + } + }); }); } };