diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index fb0879a609c63..222d8d554222a 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -521,6 +521,26 @@ class MyBean { There are also other convenient methods. For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint. +=== CDI events + +Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionOpen` asynchronously when a new connection is opened. +Moreover, a CDI event of type `WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionClosed` is fired asynchronously when a connection is closed. + +[source, java] +---- +import jakarta.enterprise.event.ObservesAsync; +import io.quarkus.websockets.next.ConnectionOpen; +import io.quarkus.websockets.next.WebSocketConnection; + +class MyBean { + + void connectionOpened(@ObservesAsync @ConnectionOpen WebSocketConnection connection) { <1> + // This observer method is called when a connection is opened... + } +} +---- +<1> An asynchronous observer method is executed using the default blocking executor service. + == Serialization and Deserialization The WebSocket Next extension supports automatic serialization and deserialization of messages. diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/ConnectionEventsTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/ConnectionEventsTest.java new file mode 100644 index 0000000000000..091b27ee968e6 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/ConnectionEventsTest.java @@ -0,0 +1,98 @@ +package io.quarkus.websockets.next.test.openconnections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import jakarta.enterprise.event.ObservesAsync; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.Closed; +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.Open; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class ConnectionEventsTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, ObservingBean.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("endpoint") + URI endUri; + + @Test + void testEvents() throws Exception { + String client1ConnectionId; + try (WSClient client1 = WSClient.create(vertx).connect(endUri)) { + client1.waitForMessages(1); + client1ConnectionId = client1.getMessages().get(0).toString(); + assertTrue(ObservingBean.OPEN_LATCH.await(5, TimeUnit.SECONDS)); + assertNotNull(ObservingBean.OPEN_CONN.get()); + assertEquals(client1ConnectionId, ObservingBean.OPEN_CONN.get().id()); + } + assertTrue(Endpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(ObservingBean.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertNotNull(ObservingBean.CLOSED_CONN.get()); + assertEquals(client1ConnectionId, ObservingBean.CLOSED_CONN.get().id()); + } + + @WebSocket(path = "/endpoint") + public static class Endpoint { + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnOpen + String open(WebSocketConnection connection) { + return connection.id(); + } + + @OnClose + void close() { + CLOSED_LATCH.countDown(); + } + + } + + @Singleton + public static class ObservingBean { + + static final CountDownLatch OPEN_LATCH = new CountDownLatch(1); + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + static final AtomicReference OPEN_CONN = new AtomicReference<>(); + static final AtomicReference CLOSED_CONN = new AtomicReference<>(); + + void onOpen(@ObservesAsync @Open WebSocketConnection connection) { + OPEN_CONN.set(connection); + OPEN_LATCH.countDown(); + } + + void onClose(@ObservesAsync @Closed WebSocketConnection connection) { + CLOSED_CONN.set(connection); + CLOSED_LATCH.countDown(); + } + + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Closed.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Closed.java new file mode 100644 index 0000000000000..ea65c81e00bb9 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Closed.java @@ -0,0 +1,41 @@ +package io.quarkus.websockets.next; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import jakarta.enterprise.event.Event; +import jakarta.enterprise.event.ObservesAsync; +import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.inject.Qualifier; + +/** + * A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a connection is closed. + * + * @see ObservesAsync + * @see Event#fireAsync(Object) + */ +@Qualifier +@Documented +@Retention(RUNTIME) +@Target({ METHOD, FIELD, PARAMETER, TYPE }) +public @interface Closed { + + /** + * Supports inline instantiation of the {@link Closed} qualifier. + */ + public static final class Literal extends AnnotationLiteral implements Closed { + + public static final Literal INSTANCE = new Literal(); + + private static final long serialVersionUID = 1L; + + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Open.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Open.java new file mode 100644 index 0000000000000..08a17eb71f591 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/Open.java @@ -0,0 +1,41 @@ +package io.quarkus.websockets.next; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import jakarta.enterprise.event.Event; +import jakarta.enterprise.event.ObservesAsync; +import jakarta.enterprise.util.AnnotationLiteral; +import jakarta.inject.Qualifier; + +/** + * A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a new connection is opened. + * + * @see ObservesAsync + * @see Event#fireAsync(Object) + */ +@Qualifier +@Documented +@Retention(RUNTIME) +@Target({ METHOD, FIELD, PARAMETER, TYPE }) +public @interface Open { + + /** + * Supports inline instantiation of the {@link Open} qualifier. + */ + public static final class Literal extends AnnotationLiteral implements Open { + + public static final Literal INSTANCE = new Literal(); + + private static final long serialVersionUID = 1L; + + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java index fe7268a63a852..1bf4b3fd00348 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java @@ -9,10 +9,15 @@ import java.util.stream.Stream; import jakarta.annotation.PreDestroy; +import jakarta.enterprise.event.Event; import jakarta.inject.Singleton; import org.jboss.logging.Logger; +import io.quarkus.arc.Arc; +import io.quarkus.arc.ArcContainer; +import io.quarkus.websockets.next.Closed; +import io.quarkus.websockets.next.Open; import io.quarkus.websockets.next.OpenConnections; import io.quarkus.websockets.next.WebSocketConnection; @@ -21,10 +26,23 @@ public class ConnectionManager implements OpenConnections { private static final Logger LOG = Logger.getLogger(ConnectionManager.class); + // generatedEndpointClassName -> open connections private final ConcurrentMap> endpointToConnections = new ConcurrentHashMap<>(); private final List listeners = new CopyOnWriteArrayList<>(); + private final Event openEvent; + private final Event closedEvent; + + ConnectionManager(@Open Event openEvent, @Closed Event closedEvent) { + ArcContainer container = Arc.container(); + this.openEvent = container.resolveObserverMethods(WebSocketConnection.class, Open.Literal.INSTANCE).isEmpty() + ? null + : openEvent; + this.closedEvent = container.resolveObserverMethods(WebSocketConnection.class, Closed.Literal.INSTANCE) + .isEmpty() ? null : closedEvent; + } + @Override public Iterator iterator() { return stream().iterator(); @@ -38,6 +56,9 @@ public Stream stream() { void add(String endpoint, WebSocketConnection connection) { LOG.debugf("Add connection: %s", connection); if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) { + if (openEvent != null) { + openEvent.fireAsync(connection); + } if (!listeners.isEmpty()) { for (ConnectionListener listener : listeners) { try { @@ -56,6 +77,9 @@ void remove(String endpoint, WebSocketConnection connection) { Set connections = endpointToConnections.get(endpoint); if (connections != null) { if (connections.remove(connection)) { + if (closedEvent != null) { + closedEvent.fireAsync(connection); + } if (!listeners.isEmpty()) { for (ConnectionListener listener : listeners) { try {