Skip to content

Commit

Permalink
WebSockets Next: fire CDI event when a connection is opened/closed
Browse files Browse the repository at this point in the history
- resolves quarkusio#40217
  • Loading branch information
mkouba committed Jun 6, 2024
1 parent 75928a5 commit 180487d
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 0 deletions.
20 changes: 20 additions & 0 deletions docs/src/main/asciidoc/websockets-next-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,26 @@ class MyBean {
There are also other convenient methods.
For example, `OpenConnections#findByEndpointId(String)` makes it easy to find connections for a specific endpoint.

=== CDI events

Quarkus fires a CDI event of type `io.quarkus.websockets.next.WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionOpen` asynchronously when a new connection is opened.
Moreover, a CDI event of type `WebSocketConnection` with qualifier `@io.quarkus.websockets.next.ConnectionClosed` is fired asynchronously when a connection is closed.

[source, java]
----
import jakarta.enterprise.event.ObservesAsync;
import io.quarkus.websockets.next.ConnectionOpen;
import io.quarkus.websockets.next.WebSocketConnection;
class MyBean {
void connectionOpened(@ObservesAsync @ConnectionOpen WebSocketConnection connection) { <1>
// This observer method is called when a connection is opened...
}
}
----
<1> An asynchronous observer method is executed using the default blocking executor service.

== Serialization and Deserialization

The WebSocket Next extension supports automatic serialization and deserialization of messages.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package io.quarkus.websockets.next.test.openconnections;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.enterprise.event.ObservesAsync;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.quarkus.websockets.next.ConnectionClosed;
import io.quarkus.websockets.next.ConnectionOpen;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.quarkus.websockets.next.test.utils.WSClient;
import io.vertx.core.Vertx;

public class ConnectionEventsTest {

@RegisterExtension
public static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
root.addClasses(Endpoint.class, ObservingBean.class, WSClient.class);
});

@Inject
Vertx vertx;

@TestHTTPResource("endpoint")
URI endUri;

@Test
void testEvents() throws Exception {
String client1ConnectionId;
try (WSClient client1 = WSClient.create(vertx).connect(endUri)) {
client1.waitForMessages(1);
client1ConnectionId = client1.getMessages().get(0).toString();
assertTrue(ObservingBean.OPEN_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.OPEN_CONN.get());
assertEquals(client1ConnectionId, ObservingBean.OPEN_CONN.get().id());
}
assertTrue(Endpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(ObservingBean.CLOSED_LATCH.await(5, TimeUnit.SECONDS));
assertNotNull(ObservingBean.CLOSED_CONN.get());
assertEquals(client1ConnectionId, ObservingBean.CLOSED_CONN.get().id());
}

@WebSocket(path = "/endpoint")
public static class Endpoint {

static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

@OnOpen
String open(WebSocketConnection connection) {
return connection.id();
}

@OnClose
void close() {
CLOSED_LATCH.countDown();
}

}

@Singleton
public static class ObservingBean {

static final CountDownLatch OPEN_LATCH = new CountDownLatch(1);
static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1);

static final AtomicReference<WebSocketConnection> OPEN_CONN = new AtomicReference<>();
static final AtomicReference<WebSocketConnection> CLOSED_CONN = new AtomicReference<>();

void onOpen(@ObservesAsync @ConnectionOpen WebSocketConnection connection) {
OPEN_CONN.set(connection);
OPEN_LATCH.countDown();
}

void onClose(@ObservesAsync @ConnectionClosed WebSocketConnection connection) {
CLOSED_CONN.set(connection);
CLOSED_LATCH.countDown();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.websockets.next;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.ObservesAsync;
import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a connection is closed.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
*/
@Qualifier
@Documented
@Retention(RUNTIME)
@Target({ METHOD, FIELD, PARAMETER, TYPE })
public @interface ConnectionClosed {

/**
* Supports inline instantiation of the {@link ConnectionClosed} qualifier.
*/
public static final class Literal extends AnnotationLiteral<ConnectionClosed> implements ConnectionClosed {

public static final Literal INSTANCE = new Literal();

private static final long serialVersionUID = 1L;

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.quarkus.websockets.next;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.ObservesAsync;
import jakarta.enterprise.util.AnnotationLiteral;
import jakarta.inject.Qualifier;

/**
* A CDI event of type {@link WebSocketConnection} with this qualifier is fired asynchronously when a new connection is opened.
*
* @see ObservesAsync
* @see Event#fireAsync(Object)
*/
@Qualifier
@Documented
@Retention(RUNTIME)
@Target({ METHOD, FIELD, PARAMETER, TYPE })
public @interface ConnectionOpen {

/**
* Supports inline instantiation of the {@link ConnectionOpen} qualifier.
*/
public static final class Literal extends AnnotationLiteral<ConnectionOpen> implements ConnectionOpen {

public static final Literal INSTANCE = new Literal();

private static final long serialVersionUID = 1L;

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
import java.util.stream.Stream;

import jakarta.annotation.PreDestroy;
import jakarta.enterprise.event.Event;
import jakarta.inject.Singleton;

import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.ArcContainer;
import io.quarkus.websockets.next.ConnectionClosed;
import io.quarkus.websockets.next.ConnectionOpen;
import io.quarkus.websockets.next.OpenConnections;
import io.quarkus.websockets.next.WebSocketConnection;

Expand All @@ -21,10 +26,24 @@ public class ConnectionManager implements OpenConnections {

private static final Logger LOG = Logger.getLogger(ConnectionManager.class);

// generatedEndpointClassName -> open connections
private final ConcurrentMap<String, Set<WebSocketConnection>> endpointToConnections = new ConcurrentHashMap<>();

private final List<ConnectionListener> listeners = new CopyOnWriteArrayList<>();

private final Event<WebSocketConnection> openEvent;
private final Event<WebSocketConnection> closedEvent;

ConnectionManager(@ConnectionOpen Event<WebSocketConnection> openEvent,
@ConnectionClosed Event<WebSocketConnection> closedEvent) {
ArcContainer container = Arc.container();
this.openEvent = container.resolveObserverMethods(WebSocketConnection.class, ConnectionOpen.Literal.INSTANCE).isEmpty()
? null
: openEvent;
this.closedEvent = container.resolveObserverMethods(WebSocketConnection.class, ConnectionClosed.Literal.INSTANCE)
.isEmpty() ? null : closedEvent;
}

@Override
public Iterator<WebSocketConnection> iterator() {
return stream().iterator();
Expand All @@ -38,6 +57,9 @@ public Stream<WebSocketConnection> stream() {
void add(String endpoint, WebSocketConnection connection) {
LOG.debugf("Add connection: %s", connection);
if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) {
if (openEvent != null) {
openEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ConnectionListener listener : listeners) {
try {
Expand All @@ -56,6 +78,9 @@ void remove(String endpoint, WebSocketConnection connection) {
Set<WebSocketConnection> connections = endpointToConnections.get(endpoint);
if (connections != null) {
if (connections.remove(connection)) {
if (closedEvent != null) {
closedEvent.fireAsync(connection);
}
if (!listeners.isEmpty()) {
for (ConnectionListener listener : listeners) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.quarkus.qute;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.Test;

public class StandaloneLinesTest {

@Test
public void testStandaloneLines() {
Engine engine = Engine.builder()
.addDefaults()
.addSectionHelper(new UserTagSectionHelper.Factory("htmx", "htmx"))
.addSectionHelper(new UserTagSectionHelper.Factory("email", "email"))
.addParserHook(new ParserHook() {

@Override
public void beforeParsing(ParserHelper parserHelper) {
parserHelper.addContentFilter(content -> {
return null;
});
}
})
.build();
Template htmx = engine.parse("{! core attributes !}\n"
+ "{#if hx-boost.notNull??}hx-boost=\"{hx-boost}\"{/if}\n"
+ "{#if hx-get.notNull??}hx-get=\"{hx-get}\"{/if}\n"
+ "{#if hx-post.notNull??}hx-post=\"{hx-post}\"{/if}\n"
+ "{#if hx-on.notNull??}hx-on=\"{hx-on}\"{/if}\n"
+ "{#if hx-push-url.notNull??}hx-push-url=\"{hx-push-url}\"{/if}\n"
+ "{#if hx-select.notNull??}hx-select=\"{hx-select}\"{/if}\n"
+ "{#if hx-select-oob.notNull??}hx-select-oob=\"{hx-select-oob}\"{/if}\n"
+ "{#if hx-swap.notNull??}hx-swap=\"{hx-swap}\"{/if}\n"
+ "{#if hx-swap-oob.notNull??}hx-swap-oob=\"{hx-swap-oob}\"{/if}\n"
+ "{#if hx-target.notNull??}hx-target=\"{hx-target}\"{/if}\n"
+ "{#if hx-trigger.notNull??}hx-trigger=\"{hx-trigger}\"{/if}\n"
+ "{#if hx-vals.notNull??}hx-vals=\"{hx-vals}\"{/if}\n"
+ "{! additional attributes !}\n"
+ "{#if hx-confirm.notNull??}hx-confirm=\"{hx-confirm}\"{/if}\n"
+ "{#if hx-delete.notNull??}hx-delete=\"{hx-delete}\"{/if}\n"
+ "{#if hx-disable.notNull??}hx-disable=\"{hx-disable}\"{/if}\n"
+ "{#if hx-disinherit.notNull??}hx-disinherit=\"{hx-disinherit}\"{/if}\n"
+ "{#if hx-encoding.notNull??}hx-encoding=\"{hx-encoding}\"{/if}\n"
+ "{#if hx-ext.notNull??}hx-ext=\"{hx-ext}\"{/if}\n"
+ "{#if hx-headers.notNull??}hx-headers=\"{hx-headers}\"{/if}\n"
+ "{#if hx-history.notNull??}hx-history=\"{hx-history}\"{/if}\n"
+ "{#if hx-history-elt.notNull??}hx-history-elt=\"{hx-history-elt}\"{/if}\n"
+ "{#if hx-include.notNull??}hx-include=\"{hx-include}\"{/if}\n"
+ "{#if hx-indicator.notNull??}hx-indicator=\"{hx-indicator}\"{/if}\n"
+ "{#if hx-params.notNull??}hx-params=\"{hx-params}\"{/if}\n"
+ "{#if hx-patch.notNull??}hx-patch=\"{hx-patch}\"{/if}\n"
+ "{#if hx-preserve.notNull??}hx-preserve=\"{hx-preserve}\"{/if}\n"
+ "{#if hx-prompt.notNull??}hx-prompt=\"{hx-prompt}\"{/if}\n"
+ "{#if hx-put.notNull??}hx-put=\"{hx-put}\"{/if}\n"
+ "{#if hx-replace-url.notNull??}hx-replace-url=\"{hx-replace-url}\"{/if}\n"
+ "{#if hx-request.notNull??}hx-request=\"{hx-request}\"{/if}\n"
+ "{#if hx-sse.notNull??}hx-sse=\"{hx-sse}\"{/if}\n"
+ "{#if hx-sync.notNull??}hx-sync=\"{hx-sync}\"{/if}\n"
+ "{#if hx-validate.notNull??}hx-validate=\"{hx-validate}\"{/if}\n"
+ "{#if hx-vars.notNull??}hx-vars=\"{hx-vars}\"{/if}\n"
+ "{#if hx-ws.notNull??}hx-ws=\"{hx-ws}\"{/if}");
engine.putTemplate("htmx", htmx);
Template email = engine.parse("<input type=\"email\" {#htmx _unisolated /}>");
engine.putTemplate("email", email);
assertEquals("<input type=\"email\">", engine.parse("{#email hx-get=\"/url\" /}").render());
}

}

0 comments on commit 180487d

Please sign in to comment.