Skip to content

Commit

Permalink
WebSockets Next: fire CDI event when a connection is opened/closed
Browse files Browse the repository at this point in the history
- resolves quarkusio#40217
  • Loading branch information
mkouba committed Jun 6, 2024
1 parent 75928a5 commit c5663f9
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 0 deletions.
20 changes: 20 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,26 @@ class MyBean {
There are also other convenient methods.
For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint.

=== CDI events

Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionOpen` asynchronously when a new connection is opened.
Moreover, a CDI event of type `WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionClosed` is fired asynchronously when a connection is closed.

[source, java]
----
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.ConnectionOpen;
import io.quarkus.websockets.next.WebSocketConnection;
class MyBean {
void connectionOpened(@ObservesAsync @ConnectionOpen WebSocketConnection connection) { <1>
// This observer method is called when a connection is opened...
}
}
----
<1> An asynchronous observer method is executed using the default blocking executor service.

== Serialization and Deserialization

The WebSocket Next extension supports automatic serialization and deserialization of messages.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.quarkus.websockets.next.test.openconnections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.event.ObservesAsync;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.Closed;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class ConnectionEventsTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, ObservingBean.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("endpoint")
URI endUri;

@Test
void testEvents() throws Exception {
String client1ConnectionId;
try (WSClient client1 = WSClient.create(vertx).connect(endUri)) {
client1.waitForMessages(1);
client1ConnectionId = client1.getMessages().get(0).toString();
assertTrue(ObservingBean.OPEN_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.OPEN_CONN.get());
assertEquals(client1ConnectionId, ObservingBean.OPEN_CONN.get().id());
}
assertTrue(Endpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(ObservingBean.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.CLOSED_CONN.get());
assertEquals(client1ConnectionId, ObservingBean.CLOSED_CONN.get().id());
}

@WebSocket(path = "/endpoint")
public static class Endpoint {

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnOpen
String open(WebSocketConnection connection) {
return connection.id();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}

@Singleton
public static class ObservingBean {

static final CountDownLatch OPEN_LATCH = new CountDownLatch(1);
static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

static final AtomicReference<WebSocketConnection> OPEN_CONN = new AtomicReference<>();
static final AtomicReference<WebSocketConnection> CLOSED_CONN = new AtomicReference<>();

void onOpen(@ObservesAsync @Open WebSocketConnection connection) {
OPEN_CONN.set(connection);
OPEN_LATCH.countDown();
}

void onClose(@ObservesAsync @Closed WebSocketConnection connection) {
CLOSED_CONN.set(connection);
CLOSED_LATCH.countDown();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.websockets.next;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.ObservesAsync;
import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a connection is closed.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
*/
@Qualifier
@Documented
@Retention(RUNTIME)
@Target({ METHOD, FIELD, PARAMETER, TYPE })
public @interface Closed {

/**
* Supports inline instantiation of the {@link Closed} qualifier.
*/
public static final class Literal extends AnnotationLiteral<Closed> implements Closed {

public static final Literal INSTANCE = new Literal();

private static final long serialVersionUID = 1L;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.websockets.next;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.ObservesAsync;
import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a new connection is opened.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
*/
@Qualifier
@Documented
@Retention(RUNTIME)
@Target({ METHOD, FIELD, PARAMETER, TYPE })
public @interface Open {

/**
* Supports inline instantiation of the {@link Open} qualifier.
*/
public static final class Literal extends AnnotationLiteral<Open> implements Open {

public static final Literal INSTANCE = new Literal();

private static final long serialVersionUID = 1L;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
import java.util.stream.Stream;

import jakarta.annotation.PreDestroy;
import jakarta.enterprise.event.Event;
import jakarta.inject.Singleton;

import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.websockets.next.Closed;
import io.quarkus.websockets.next.Open;
import io.quarkus.websockets.next.OpenConnections;
import io.quarkus.websockets.next.WebSocketConnection;

Expand All @@ -21,10 +26,23 @@ public class ConnectionManager implements OpenConnections {

private static final Logger LOG = Logger.getLogger(ConnectionManager.class);

// generatedEndpointClassName -> open connections
private final ConcurrentMap<String, Set<WebSocketConnection>> endpointToConnections = new ConcurrentHashMap<>();

private final List<ConnectionListener> listeners = new CopyOnWriteArrayList<>();

private final Event<WebSocketConnection> openEvent;
private final Event<WebSocketConnection> closedEvent;

ConnectionManager(@Open Event<WebSocketConnection> openEvent, @Closed Event<WebSocketConnection> closedEvent) {
ArcContainer container = Arc.container();
this.openEvent = container.resolveObserverMethods(WebSocketConnection.class, Open.Literal.INSTANCE).isEmpty()
? null
: openEvent;
this.closedEvent = container.resolveObserverMethods(WebSocketConnection.class, Closed.Literal.INSTANCE)
.isEmpty() ? null : closedEvent;
}

@Override
public Iterator<WebSocketConnection> iterator() {
return stream().iterator();
Expand All @@ -38,6 +56,9 @@ public Stream<WebSocketConnection> stream() {
void add(String endpoint, WebSocketConnection connection) {
LOG.debugf("Add connection: %s", connection);
if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) {
if (openEvent != null) {
openEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ConnectionListener listener : listeners) {
try {
Expand All @@ -56,6 +77,9 @@ void remove(String endpoint, WebSocketConnection connection) {
Set<WebSocketConnection> connections = endpointToConnections.get(endpoint);
if (connections != null) {
if (connections.remove(connection)) {
if (closedEvent != null) {
closedEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ConnectionListener listener : listeners) {
try {
Expand Down

0 comments on commit c5663f9

Please sign in to comment.