From 180487d106ce111933c18abe9c5d966662bdacc3 Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 6 Jun 2024 17:12:16 +0200 Subject: [PATCH] WebSockets Next: fire CDI event when a connection is opened/closed - resolves #40217 --- .../asciidoc/websockets-next-reference.adoc | 20 ++++ .../openconnections/ConnectionEventsTest.java | 98 +++++++++++++++++++ .../websockets/next/ConnectionClosed.java | 41 ++++++++ .../websockets/next/ConnectionOpen.java | 41 ++++++++ .../next/runtime/ConnectionManager.java | 25 +++++ .../io/quarkus/qute/StandaloneLinesTest.java | 68 +++++++++++++ 6 files changed, 293 insertions(+) create mode 100644 extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/ConnectionEventsTest.java create mode 100644 extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionClosed.java create mode 100644 extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionOpen.java create mode 100644 independent-projects/qute/core/src/test/java/io/quarkus/qute/StandaloneLinesTest.java diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index fb0879a609c635..222d8d554222ac 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -521,6 +521,26 @@ class MyBean { There are also other convenient methods. For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint. +=== CDI events + +Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionOpen` asynchronously when a new connection is opened. +Moreover, a CDI event of type `WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionClosed` is fired asynchronously when a connection is closed. + +[source, java] +---- +import jakarta.enterprise.event.ObservesAsync; +import io.quarkus.websockets.next.ConnectionOpen; +import io.quarkus.websockets.next.WebSocketConnection; + +class MyBean { + + void connectionOpened(@ObservesAsync @ConnectionOpen WebSocketConnection connection) { <1> + // This observer method is called when a connection is opened... + } +} +---- +<1> An asynchronous observer method is executed using the default blocking executor service. + == Serialization and Deserialization The WebSocket Next extension supports automatic serialization and deserialization of messages. diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/ConnectionEventsTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/ConnectionEventsTest.java new file mode 100644 index 00000000000000..383c1cfd690377 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/ConnectionEventsTest.java @@ -0,0 +1,98 @@ +package io.quarkus.websockets.next.test.openconnections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.event.ObservesAsync; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.ConnectionClosed; +import io.quarkus.websockets.next.ConnectionOpen; +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class ConnectionEventsTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, ObservingBean.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("endpoint") + URI endUri; + + @Test + void testEvents() throws Exception { + String client1ConnectionId; + try (WSClient client1 = WSClient.create(vertx).connect(endUri)) { + client1.waitForMessages(1); + client1ConnectionId = client1.getMessages().get(0).toString(); + assertTrue(ObservingBean.OPEN_LATCH.await(5, TimeUnit.SECONDS)); + assertNotNull(ObservingBean.OPEN_CONN.get()); + assertEquals(client1ConnectionId, ObservingBean.OPEN_CONN.get().id()); + } + assertTrue(Endpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(ObservingBean.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertNotNull(ObservingBean.CLOSED_CONN.get()); + assertEquals(client1ConnectionId, ObservingBean.CLOSED_CONN.get().id()); + } + + @WebSocket(path = "/endpoint") + public static class Endpoint { + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnOpen + String open(WebSocketConnection connection) { + return connection.id(); + } + + @OnClose + void close() { + CLOSED_LATCH.countDown(); + } + + } + + @Singleton + public static class ObservingBean { + + static final CountDownLatch OPEN_LATCH = new CountDownLatch(1); + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + static final AtomicReference OPEN_CONN = new AtomicReference<>(); + static final AtomicReference CLOSED_CONN = new AtomicReference<>(); + + void onOpen(@ObservesAsync @ConnectionOpen WebSocketConnection connection) { + OPEN_CONN.set(connection); + OPEN_LATCH.countDown(); + } + + void onClose(@ObservesAsync @ConnectionClosed WebSocketConnection connection) { + CLOSED_CONN.set(connection); + CLOSED_LATCH.countDown(); + } + + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionClosed.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionClosed.java new file mode 100644 index 00000000000000..544e8fc90500db --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionClosed.java @@ -0,0 +1,41 @@ +package io.quarkus.websockets.next; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import jakarta.enterprise.event.Event; +import jakarta.enterprise.event.ObservesAsync; +import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.inject.Qualifier; + +/** + * A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a connection is closed. + * + * @see ObservesAsync + * @see Event#fireAsync(Object) + */ +@Qualifier +@Documented +@Retention(RUNTIME) +@Target({ METHOD, FIELD, PARAMETER, TYPE }) +public @interface ConnectionClosed { + + /** + * Supports inline instantiation of the {@link ConnectionClosed} qualifier. + */ + public static final class Literal extends AnnotationLiteral implements ConnectionClosed { + + public static final Literal INSTANCE = new Literal(); + + private static final long serialVersionUID = 1L; + + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionOpen.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionOpen.java new file mode 100644 index 00000000000000..8ab0c8e09e2f15 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/ConnectionOpen.java @@ -0,0 +1,41 @@ +package io.quarkus.websockets.next; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import jakarta.enterprise.event.Event; +import jakarta.enterprise.event.ObservesAsync; +import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.inject.Qualifier; + +/** + * A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a new connection is opened. + * + * @see ObservesAsync + * @see Event#fireAsync(Object) + */ +@Qualifier +@Documented +@Retention(RUNTIME) +@Target({ METHOD, FIELD, PARAMETER, TYPE }) +public @interface ConnectionOpen { + + /** + * Supports inline instantiation of the {@link ConnectionOpen} qualifier. + */ + public static final class Literal extends AnnotationLiteral implements ConnectionOpen { + + public static final Literal INSTANCE = new Literal(); + + private static final long serialVersionUID = 1L; + + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java index fe7268a63a8528..036d3bb578c02b 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java @@ -9,10 +9,15 @@ import java.util.stream.Stream; import jakarta.annotation.PreDestroy; +import jakarta.enterprise.event.Event; import jakarta.inject.Singleton; import org.jboss.logging.Logger; +import io.quarkus.arc.Arc; +import io.quarkus.arc.ArcContainer; +import io.quarkus.websockets.next.ConnectionClosed; +import io.quarkus.websockets.next.ConnectionOpen; import io.quarkus.websockets.next.OpenConnections; import io.quarkus.websockets.next.WebSocketConnection; @@ -21,10 +26,24 @@ public class ConnectionManager implements OpenConnections { private static final Logger LOG = Logger.getLogger(ConnectionManager.class); + // generatedEndpointClassName -> open connections private final ConcurrentMap> endpointToConnections = new ConcurrentHashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); + private final Event openEvent; + private final Event closedEvent; + + ConnectionManager(@ConnectionOpen Event openEvent, + @ConnectionClosed Event closedEvent) { + ArcContainer container = Arc.container(); + this.openEvent = container.resolveObserverMethods(WebSocketConnection.class, ConnectionOpen.Literal.INSTANCE).isEmpty() + ? null + : openEvent; + this.closedEvent = container.resolveObserverMethods(WebSocketConnection.class, ConnectionClosed.Literal.INSTANCE) + .isEmpty() ? null : closedEvent; + } + @Override public Iterator iterator() { return stream().iterator(); @@ -38,6 +57,9 @@ public Stream stream() { void add(String endpoint, WebSocketConnection connection) { LOG.debugf("Add connection: %s", connection); if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) { + if (openEvent != null) { + openEvent.fireAsync(connection); + } if (!listeners.isEmpty()) { for (ConnectionListener listener : listeners) { try { @@ -56,6 +78,9 @@ void remove(String endpoint, WebSocketConnection connection) { Set connections = endpointToConnections.get(endpoint); if (connections != null) { if (connections.remove(connection)) { + if (closedEvent != null) { + closedEvent.fireAsync(connection); + } if (!listeners.isEmpty()) { for (ConnectionListener listener : listeners) { try { diff --git a/independent-projects/qute/core/src/test/java/io/quarkus/qute/StandaloneLinesTest.java b/independent-projects/qute/core/src/test/java/io/quarkus/qute/StandaloneLinesTest.java new file mode 100644 index 00000000000000..71a9f6dafb46b4 --- /dev/null +++ b/independent-projects/qute/core/src/test/java/io/quarkus/qute/StandaloneLinesTest.java @@ -0,0 +1,68 @@ +package io.quarkus.qute; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +public class StandaloneLinesTest { + + @Test + public void testStandaloneLines() { + Engine engine = Engine.builder() + .addDefaults() + .addSectionHelper(new UserTagSectionHelper.Factory("htmx", "htmx")) + .addSectionHelper(new UserTagSectionHelper.Factory("email", "email")) + .addParserHook(new ParserHook() { + + @Override + public void beforeParsing(ParserHelper parserHelper) { + parserHelper.addContentFilter(content -> { + return null; + }); + } + }) + .build(); + Template htmx = engine.parse("{! core attributes !}\n" + + "{#if hx-boost.notNull??}hx-boost=\"{hx-boost}\"{/if}\n" + + "{#if hx-get.notNull??}hx-get=\"{hx-get}\"{/if}\n" + + "{#if hx-post.notNull??}hx-post=\"{hx-post}\"{/if}\n" + + "{#if hx-on.notNull??}hx-on=\"{hx-on}\"{/if}\n" + + "{#if hx-push-url.notNull??}hx-push-url=\"{hx-push-url}\"{/if}\n" + + "{#if hx-select.notNull??}hx-select=\"{hx-select}\"{/if}\n" + + "{#if hx-select-oob.notNull??}hx-select-oob=\"{hx-select-oob}\"{/if}\n" + + "{#if hx-swap.notNull??}hx-swap=\"{hx-swap}\"{/if}\n" + + "{#if hx-swap-oob.notNull??}hx-swap-oob=\"{hx-swap-oob}\"{/if}\n" + + "{#if hx-target.notNull??}hx-target=\"{hx-target}\"{/if}\n" + + "{#if hx-trigger.notNull??}hx-trigger=\"{hx-trigger}\"{/if}\n" + + "{#if hx-vals.notNull??}hx-vals=\"{hx-vals}\"{/if}\n" + + "{! additional attributes !}\n" + + "{#if hx-confirm.notNull??}hx-confirm=\"{hx-confirm}\"{/if}\n" + + "{#if hx-delete.notNull??}hx-delete=\"{hx-delete}\"{/if}\n" + + "{#if hx-disable.notNull??}hx-disable=\"{hx-disable}\"{/if}\n" + + "{#if hx-disinherit.notNull??}hx-disinherit=\"{hx-disinherit}\"{/if}\n" + + "{#if hx-encoding.notNull??}hx-encoding=\"{hx-encoding}\"{/if}\n" + + "{#if hx-ext.notNull??}hx-ext=\"{hx-ext}\"{/if}\n" + + "{#if hx-headers.notNull??}hx-headers=\"{hx-headers}\"{/if}\n" + + "{#if hx-history.notNull??}hx-history=\"{hx-history}\"{/if}\n" + + "{#if hx-history-elt.notNull??}hx-history-elt=\"{hx-history-elt}\"{/if}\n" + + "{#if hx-include.notNull??}hx-include=\"{hx-include}\"{/if}\n" + + "{#if hx-indicator.notNull??}hx-indicator=\"{hx-indicator}\"{/if}\n" + + "{#if hx-params.notNull??}hx-params=\"{hx-params}\"{/if}\n" + + "{#if hx-patch.notNull??}hx-patch=\"{hx-patch}\"{/if}\n" + + "{#if hx-preserve.notNull??}hx-preserve=\"{hx-preserve}\"{/if}\n" + + "{#if hx-prompt.notNull??}hx-prompt=\"{hx-prompt}\"{/if}\n" + + "{#if hx-put.notNull??}hx-put=\"{hx-put}\"{/if}\n" + + "{#if hx-replace-url.notNull??}hx-replace-url=\"{hx-replace-url}\"{/if}\n" + + "{#if hx-request.notNull??}hx-request=\"{hx-request}\"{/if}\n" + + "{#if hx-sse.notNull??}hx-sse=\"{hx-sse}\"{/if}\n" + + "{#if hx-sync.notNull??}hx-sync=\"{hx-sync}\"{/if}\n" + + "{#if hx-validate.notNull??}hx-validate=\"{hx-validate}\"{/if}\n" + + "{#if hx-vars.notNull??}hx-vars=\"{hx-vars}\"{/if}\n" + + "{#if hx-ws.notNull??}hx-ws=\"{hx-ws}\"{/if}"); + engine.putTemplate("htmx", htmx); + Template email = engine.parse(""); + engine.putTemplate("email", email); + assertEquals("", engine.parse("{#email hx-get=\"/url\" /}").render()); + } + +}