diff --git a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/GeneratedEndpointBuildItem.java b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/GeneratedEndpointBuildItem.java index 2e4aa1aa6ebf3..0c65f2320a52f 100644 --- a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/GeneratedEndpointBuildItem.java +++ b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/GeneratedEndpointBuildItem.java @@ -7,11 +7,13 @@ */ public final class GeneratedEndpointBuildItem extends MultiBuildItem { + public final String endpointId; public final String endpointClassName; public final String generatedClassName; public final String path; - GeneratedEndpointBuildItem(String endpointClassName, String generatedClassName, String path) { + GeneratedEndpointBuildItem(String endpointId, String endpointClassName, String generatedClassName, String path) { + this.endpointId = endpointId; this.endpointClassName = endpointClassName; this.generatedClassName = generatedClassName; this.path = path; diff --git a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketEndpointBuildItem.java b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketEndpointBuildItem.java index ad422611d1d29..d5ff556891e9e 100644 --- a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketEndpointBuildItem.java +++ b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketEndpointBuildItem.java @@ -37,6 +37,7 @@ public final class WebSocketEndpointBuildItem extends MultiBuildItem { public final BeanInfo bean; public final String path; + public final String endpointId; public final WebSocket.ExecutionMode executionMode; public final Callback onOpen; public final Callback onTextMessage; @@ -45,11 +46,13 @@ public final class WebSocketEndpointBuildItem extends MultiBuildItem { public final Callback onClose; public final List onErrors; - WebSocketEndpointBuildItem(BeanInfo bean, String path, WebSocket.ExecutionMode executionMode, Callback onOpen, + WebSocketEndpointBuildItem(BeanInfo bean, String path, String endpointId, WebSocket.ExecutionMode executionMode, + Callback onOpen, Callback onTextMessage, Callback onBinaryMessage, Callback onPongMessage, Callback onClose, List onErrors) { this.bean = bean; this.path = path; + this.endpointId = endpointId; this.executionMode = executionMode; this.onOpen = onOpen; this.onTextMessage = onTextMessage; diff --git a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java index 64c430a41c071..18df7a083df1d 100644 --- a/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java +++ b/extensions/websockets-next/server/deployment/src/main/java/io/quarkus/websockets/next/deployment/WebSocketServerProcessor.java @@ -149,6 +149,7 @@ public void collectEndpoints(BeanArchiveIndexBuildItem beanArchiveIndex, globalErrorHandlers.produce(new GlobalErrorHandlersBuildItem(List.copyOf(globalErrors.values()))); // Collect WebSocket endpoints + Map idToEndpoint = new HashMap<>(); Map pathToEndpoint = new HashMap<>(); for (BeanInfo bean : beanDiscoveryFinished.beanStream().classBeans()) { ClassInfo beanClass = bean.getTarget().get().asClass(); @@ -159,10 +160,23 @@ public void collectEndpoints(BeanArchiveIndexBuildItem beanArchiveIndex, // Sub-websocket - merge the path from the enclosing classes path = mergePath(getPathPrefix(index, beanClass.enclosingClass()), path); } - DotName previous = pathToEndpoint.put(path, beanClass.name()); - if (previous != null) { + DotName prevPath = pathToEndpoint.put(path, beanClass.name()); + if (prevPath != null) { throw new WebSocketServerException( - String.format("Multiple endpoints [%s, %s] define the same path: %s", previous, beanClass, path)); + String.format("Multiple endpoints [%s, %s] define the same path: %s", prevPath, beanClass, path)); + } + String endpointId; + AnnotationValue endpointIdValue = webSocketAnnotation.value("endpointId"); + if (endpointIdValue == null) { + endpointId = beanClass.name().toString(); + } else { + endpointId = endpointIdValue.asString(); + } + DotName prevId = idToEndpoint.put(endpointId, beanClass.name()); + if (prevId != null) { + throw new WebSocketServerException( + String.format("Multiple endpoints [%s, %s] define the same endpoint id: %s", prevId, beanClass, + endpointId)); } Callback onOpen = findCallback(beanArchiveIndex.getIndex(), beanClass, WebSocketDotNames.ON_OPEN, callbackArguments, transformedAnnotations, path); @@ -182,7 +196,7 @@ public void collectEndpoints(BeanArchiveIndexBuildItem beanArchiveIndex, + beanClass); } AnnotationValue executionMode = webSocketAnnotation.value("executionMode"); - endpoints.produce(new WebSocketEndpointBuildItem(bean, path, + endpoints.produce(new WebSocketEndpointBuildItem(bean, path, endpointId, executionMode != null ? WebSocket.ExecutionMode.valueOf(executionMode.asEnum()) : WebSocket.ExecutionMode.SERIAL, onOpen, @@ -234,8 +248,9 @@ public String apply(String name) { String generatedName = generateEndpoint(endpoint, argumentProviders, transformedAnnotations, index.getIndex(), classOutput, globalErrorHandlers); reflectiveClasses.produce(ReflectiveClassBuildItem.builder(generatedName).constructors().build()); - generatedEndpoints.produce(new GeneratedEndpointBuildItem(endpoint.bean.getImplClazz().name().toString(), - generatedName, endpoint.path)); + generatedEndpoints + .produce(new GeneratedEndpointBuildItem(endpoint.endpointId, endpoint.bean.getImplClazz().name().toString(), + generatedName, endpoint.path)); } } @@ -250,7 +265,7 @@ public void registerRoutes(WebSocketServerRecorder recorder, HttpRootPathBuildIt .route(httpRootPath.relativePath(endpoint.path)) .displayOnNotFoundPage("WebSocket Endpoint") .handlerType(HandlerType.NORMAL) - .handler(recorder.createEndpointHandler(endpoint.generatedClassName)); + .handler(recorder.createEndpointHandler(endpoint.generatedClassName, endpoint.endpointId)); routes.produce(builder.build()); } } diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/ConnectionArgumentTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/ConnectionArgumentTest.java index d137c345ac80b..a905cf46153f9 100644 --- a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/ConnectionArgumentTest.java +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/args/ConnectionArgumentTest.java @@ -38,11 +38,12 @@ public class ConnectionArgumentTest { void testArgument() { String message = "ok"; String header = "fool"; - WSClient client = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader("X-Test", header), - testUri); - JsonObject reply = client.sendAndAwaitReply(message).toJsonObject(); - assertEquals(header, reply.getString("header"), reply.toString()); - assertEquals(message, reply.getString("message"), reply.toString()); + try (WSClient client = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader("X-Test", header), + testUri)) { + JsonObject reply = client.sendAndAwaitReply(message).toJsonObject(); + assertEquals(header, reply.getString("header"), reply.toString()); + assertEquals(message, reply.getString("message"), reply.toString()); + } } @WebSocket(path = "/echo") diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/endpoints/AmbiguousEndpointIdTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/endpoints/AmbiguousEndpointIdTest.java new file mode 100644 index 0000000000000..d5d8ec8019cd2 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/endpoints/AmbiguousEndpointIdTest.java @@ -0,0 +1,45 @@ +package io.quarkus.websockets.next.test.endpoints; + +import static org.junit.jupiter.api.Assertions.fail; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketServerException; + +public class AmbiguousEndpointIdTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint1.class, Endpoint2.class); + }) + .setExpectedException(WebSocketServerException.class); + + @Test + public void testEndpointIds() { + fail(); + } + + @WebSocket(path = "/ws1", endpointId = "foo") + public static class Endpoint1 { + + @OnOpen + void open() { + } + + } + + @WebSocket(path = "/ws2", endpointId = "foo") + public static class Endpoint2 { + + @OnOpen + void open() { + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/OpenConnectionsTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/OpenConnectionsTest.java new file mode 100644 index 0000000000000..224f3d3669721 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/openconnections/OpenConnectionsTest.java @@ -0,0 +1,113 @@ +package io.quarkus.websockets.next.test.openconnections; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.net.URI; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.OpenConnections; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; +import io.vertx.core.http.WebSocketConnectOptions; + +public class OpenConnectionsTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("endpoint") + URI endUri; + + @Inject + OpenConnections connections; + + @Test + void testOpenConnections() throws Exception { + String headerName = "X-Test"; + String header2 = "foo"; + String header3 = "bar"; + + for (WebSocketConnection c : connections) { + fail("No connection should be found: " + c); + } + + try (WSClient client1 = WSClient.create(vertx).connect(endUri); + WSClient client2 = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader(headerName, header2), + endUri); + WSClient client3 = WSClient.create(vertx).connect(new WebSocketConnectOptions().addHeader(headerName, header3), + endUri)) { + + client1.waitForMessages(1); + String client1Id = client1.getMessages().get(0).toString(); + + client2.waitForMessages(1); + String client2Id = client2.getMessages().get(0).toString(); + + client3.waitForMessages(1); + String client3Id = client3.getMessages().get(0).toString(); + + assertNotNull(connections.findByConnectionId(client1Id).orElse(null)); + Collection found = connections.stream() + .filter(c -> header3.equals(c.handshakeRequest().header(headerName))) + .toList(); + assertEquals(1, found.size()); + assertEquals(client3Id, found.iterator().next().id()); + + found = connections.listAll(); + assertEquals(3, found.size()); + for (WebSocketConnection c : found) { + assertTrue(c.id().equals(client1Id) || c.id().equals(client2Id) || c.id().equals(client3Id)); + } + + client2.disconnect(); + assertTrue(Endpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + + assertEquals(2, connections.listAll().size()); + assertNull(connections.stream().filter(c -> c.id().equals(client2Id)).findFirst().orElse(null)); + + found = connections.findByEndpointId("end"); + assertEquals(2, found.size()); + } + } + + @WebSocket(endpointId = "end", path = "/endpoint") + public static class Endpoint { + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnOpen + String open(WebSocketConnection connection) { + return connection.id(); + } + + @OnClose + void close() { + CLOSED_LATCH.countDown(); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java index d0fd4fad46c08..61cb67233a353 100644 --- a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/utils/WSClient.java @@ -14,7 +14,7 @@ import io.vertx.core.http.WebSocketClient; import io.vertx.core.http.WebSocketConnectOptions; -public class WSClient { +public class WSClient implements AutoCloseable { private final WebSocketClient client; private AtomicReference socket = new AtomicReference<>(); @@ -124,4 +124,10 @@ public Buffer sendAndAwaitReply(String message) { public boolean isClosed() { return socket.get().isClosed(); } + + @Override + public void close() { + disconnect(); + } + } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OpenConnections.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OpenConnections.java new file mode 100644 index 0000000000000..b5a6859daafd0 --- /dev/null +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/OpenConnections.java @@ -0,0 +1,55 @@ +package io.quarkus.websockets.next; + +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Stream; + +import io.smallrye.common.annotation.Experimental; + +/** + * Provides convenient access to all open connections from clients to {@link WebSocket} endpoints on the server. + *

+ * Quarkus provides a built-in CDI bean with the {@link jakarta.inject.Singleton} scope that implements this interface. + */ +@Experimental("This API is experimental and may change in the future") +public interface OpenConnections extends Iterable { + + /** + * Returns an immutable snapshot of all open connections at the given time. + * + * @return an immutable collection of all open connections + */ + default Collection listAll() { + return stream().toList(); + } + + /** + * Returns an immutable snapshot of all open connections for the given endpoint id. + * + * @param endpointId + * @return an immutable collection of all open connections for the given endpoint id + * @see WebSocket#endpointId() + */ + default Collection findByEndpointId(String endpointId) { + return stream().filter(c -> c.endpointId().equals(endpointId)).toList(); + } + + /** + * Returns the open connection with the given id. + * + * @param connectionId + * @return the open connection or empty {@link Optional} if no open connection with the given id exists + * @see WebSocketConnection#id() + */ + default Optional findByConnectionId(String connectionId) { + return stream().filter(c -> c.id().equals(connectionId)).findFirst(); + } + + /** + * Returns the stream of all open connections at the given time. + * + * @return the stream of open connections + */ + Stream stream(); + +} diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocket.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocket.java index bacb563759b2d..736c080cc92a6 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocket.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocket.java @@ -39,11 +39,24 @@ */ public String path(); + /** + * By default, the fully qualified name of the annotated class is used. + * + * @return the endpoint id + * @see WebSocketConnection#endpointId() + */ + public String endpointId() default FCQN_NAME; + /** * The execution mode used to process incoming messages for a specific connection. */ public ExecutionMode executionMode() default ExecutionMode.SERIAL; + /** + * Constant value for {@link #endpointId()} indicating that the fully qualified name of the annotated class should be used. + */ + String FCQN_NAME = "<>"; + /** * Defines the execution mode used to process incoming messages for a specific connection. * diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java index 900e86fce2f2d..63a3b66058697 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketConnection.java @@ -29,6 +29,13 @@ public interface WebSocketConnection extends Sender, BlockingSender { */ String id(); + /** + * + * @return the endpoint id + * @see WebSocket#endpointId() + */ + String endpointId(); + /** * * @param name diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java index 6b8857b1112b2..513b4755dc57d 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ConnectionManager.java @@ -1,20 +1,23 @@ package io.quarkus.websockets.next.runtime; +import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Stream; import jakarta.annotation.PreDestroy; import jakarta.inject.Singleton; import org.jboss.logging.Logger; +import io.quarkus.websockets.next.OpenConnections; import io.quarkus.websockets.next.WebSocketConnection; @Singleton -public class ConnectionManager { +public class ConnectionManager implements OpenConnections { private static final Logger LOG = Logger.getLogger(ConnectionManager.class); @@ -22,6 +25,16 @@ public class ConnectionManager { private final List listeners = new CopyOnWriteArrayList<>(); + @Override + public Iterator iterator() { + return stream().iterator(); + } + + @Override + public Stream stream() { + return endpointToConnections.values().stream().flatMap(Set::stream).filter(WebSocketConnection::isOpen); + } + void add(String endpoint, WebSocketConnection connection) { LOG.debugf("Add connection: %s", connection); if (endpointToConnections.computeIfAbsent(endpoint, e -> ConcurrentHashMap.newKeySet()).add(connection)) { diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java index aa8ac39396031..8b7eda81b1461 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java @@ -26,7 +26,9 @@ class WebSocketConnectionImpl implements WebSocketConnection { - private final String endpoint; + private final String generatedEndpointClass; + + private final String endpointId; private final String identifier; @@ -44,9 +46,11 @@ class WebSocketConnectionImpl implements WebSocketConnection { private final Instant creationTime; - WebSocketConnectionImpl(String endpoint, ServerWebSocket webSocket, ConnectionManager connectionManager, + WebSocketConnectionImpl(String generatedEndpointClass, String endpointClass, ServerWebSocket webSocket, + ConnectionManager connectionManager, Codecs codecs, RoutingContext ctx) { - this.endpoint = endpoint; + this.generatedEndpointClass = generatedEndpointClass; + this.endpointId = endpointClass; this.identifier = UUID.randomUUID().toString(); this.webSocket = Objects.requireNonNull(webSocket); this.connectionManager = Objects.requireNonNull(connectionManager); @@ -62,6 +66,11 @@ public String id() { return identifier; } + @Override + public String endpointId() { + return endpointId; + } + @Override public String pathParam(String name) { return pathParams.get(name); @@ -124,7 +133,7 @@ public boolean isClosed() { @Override public Set getOpenConnections() { - return connectionManager.getConnections(endpoint).stream().filter(WebSocketConnection::isOpen) + return connectionManager.getConnections(generatedEndpointClass).stream().filter(WebSocketConnection::isOpen) .collect(Collectors.toUnmodifiableSet()); } @@ -292,7 +301,7 @@ public Uni sendPong(Buffer data) { } private Uni doSend(BiFunction> function, M message) { - Set connections = connectionManager.getConnections(endpoint); + Set connections = connectionManager.getConnections(generatedEndpointClass); if (connections.isEmpty()) { return Uni.createFrom().voidItem(); } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index c97dfc8107630..a0b98d13b1209 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -57,7 +57,7 @@ public Object get() { }; } - public Handler createEndpointHandler(String endpointClass) { + public Handler createEndpointHandler(String generatedEndpointClass, String endpointId) { ArcContainer container = Arc.container(); ConnectionManager connectionManager = container.instance(ConnectionManager.class).get(); Codecs codecs = container.instance(Codecs.class).get(); @@ -69,9 +69,9 @@ public void handle(RoutingContext ctx) { future.onSuccess(ws -> { Context context = VertxCoreRecorder.getVertx().get().getOrCreateContext(); - WebSocketConnection connection = new WebSocketConnectionImpl(endpointClass, ws, + WebSocketConnection connection = new WebSocketConnectionImpl(generatedEndpointClass, endpointId, ws, connectionManager, codecs, ctx); - connectionManager.add(endpointClass, connection); + connectionManager.add(generatedEndpointClass, connection); LOG.debugf("Connnected: %s", connection); // Initialize and capture the session context state that will be activated @@ -83,7 +83,7 @@ public void handle(RoutingContext ctx) { container.requestContext()); // Create an endpoint that delegates callbacks to the @WebSocket bean - WebSocketEndpoint endpoint = createEndpoint(endpointClass, context, connection, codecs, config, + WebSocketEndpoint endpoint = createEndpoint(generatedEndpointClass, context, connection, codecs, config, contextSupport); // A broadcast processor is only needed if Multi is consumed by the callback @@ -228,7 +228,7 @@ public void handle(Void event) { } else { LOG.errorf(r.cause(), "Unable to complete @OnClose callback: %s", connection); } - connectionManager.remove(endpointClass, connection); + connectionManager.remove(generatedEndpointClass, connection); }); } });