diff --git a/docs/src/main/asciidoc/websockets-next-reference.adoc b/docs/src/main/asciidoc/websockets-next-reference.adoc index 537ddf3a996dc0..4eae1989108172 100644 --- a/docs/src/main/asciidoc/websockets-next-reference.adoc +++ b/docs/src/main/asciidoc/websockets-next-reference.adoc @@ -765,6 +765,29 @@ public class ExampleHttpUpgradeCheck implements HttpUpgradeCheck { TIP: You can choose WebSocket endpoints to which the `HttpUpgradeCheck` is applied with the `HttpUpgradeCheck#appliesTo` method. +[[traffic-logging]] +== Traffic logging + +Quarkus can log the messages sent and received for debugging purposes. +To enable traffic logging, set the `quarkus.websockets-next.server.traffic-logging.enabled` configuration property to `true`. +The payload of text messages is logged as well. +However, the number of logged characters is limited. +The default limit is 100, but you can change this limit with the `quarkus.websockets-next.server.traffic-logging.text-payload-limit` configuration property. + +TIP: The messages are only logged if the `DEBUG` level is enabled for the logger `io.quarkus.websockets.next.traffic`. + +.Example configuration +[source, properties] +---- +quarkus.websockets-next.server.traffic-logging.enabled=true <1> +quarkus.websockets-next.server.traffic-logging.text-payload-limit=50 <2> + +quarkus.log.category."io.quarkus.websockets.next.traffic".level=DEBUG <3> +---- +<1> Enables traffic logging. +<2> Set the number of characters of a text message payload which will be logged. +<3> Enable `DEBUG` level is for the logger `io.quarkus.websockets.next.traffic`. + [[websocket-next-configuration-reference]] == Configuration reference diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/BasicConnectorTrafficLoggerTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/BasicConnectorTrafficLoggerTest.java new file mode 100644 index 00000000000000..500bee65d83b1b --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/BasicConnectorTrafficLoggerTest.java @@ -0,0 +1,56 @@ +package io.quarkus.websockets.next.test.traffic; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.websockets.next.BasicWebSocketConnector; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.vertx.core.Context; +import jakarta.inject.Inject; + +public class BasicConnectorTrafficLoggerTest extends TrafficLoggerTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class); + TrafficLoggerTest.addApplicationProperties(root, false); + }) + .setLogRecordPredicate(TrafficLoggerTest::isTrafficLogRecord) + .assertLogRecords(logRecordsConsumer(true)); + + @Inject + BasicWebSocketConnector connector; + + @Test + public void testTrafficLogger() throws InterruptedException { + List messages = new CopyOnWriteArrayList<>(); + CountDownLatch closedLatch = new CountDownLatch(1); + CountDownLatch messageLatch = new CountDownLatch(1); + WebSocketClientConnection conn = connector + .baseUri(endUri) + .path("end") + .onTextMessage((c, m) -> { + assertTrue(Context.isOnWorkerThread()); + messages.add(m); + messageLatch.countDown(); + }) + .onClose((c, s) -> closedLatch.countDown()) + .connectAndAwait(); + conn.sendTextAndAwait("dummy"); + assertTrue(messageLatch.await(5, TimeUnit.SECONDS)); + assertEquals("ok", messages.get(0)); + conn.closeAndAwait(); + assertTrue(closedLatch.await(5, TimeUnit.SECONDS)); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/ClientEndpointTrafficLoggerTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/ClientEndpointTrafficLoggerTest.java new file mode 100644 index 00000000000000..598b66748a3c9a --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/ClientEndpointTrafficLoggerTest.java @@ -0,0 +1,72 @@ +package io.quarkus.websockets.next.test.traffic; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocketClient; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnector; + +public class ClientEndpointTrafficLoggerTest extends TrafficLoggerTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, Client.class); + TrafficLoggerTest.addApplicationProperties(root, false); + }) + .setLogRecordPredicate( + TrafficLoggerTest::isTrafficLogRecord) + .assertLogRecords(logRecordsConsumer(true)); + + @Inject + WebSocketConnector connector; + + @Test + public void testTrafficLogger() throws InterruptedException { + WebSocketClientConnection conn = connector + .baseUri(endUri) + .connectAndAwait(); + assertTrue(Client.MESSAGE_LATCH.await(5, TimeUnit.SECONDS)); + assertEquals("ok", Client.MESSAGES.get(0)); + conn.closeAndAwait(); + assertTrue(Client.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(Endpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + } + + @WebSocketClient(path = "/end") + public static class Client { + + static final List MESSAGES = new CopyOnWriteArrayList<>(); + + static final CountDownLatch MESSAGE_LATCH = new CountDownLatch(1); + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnTextMessage + void onMessage(String message) { + MESSAGES.add(message); + MESSAGE_LATCH.countDown(); + } + + @OnClose + void onClose() { + CLOSED_LATCH.countDown(); + } + + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/ServerTrafficLoggerTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/ServerTrafficLoggerTest.java new file mode 100644 index 00000000000000..e2832d6bbac64e --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/ServerTrafficLoggerTest.java @@ -0,0 +1,42 @@ +package io.quarkus.websockets.next.test.traffic; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class ServerTrafficLoggerTest extends TrafficLoggerTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Endpoint.class, WSClient.class); + TrafficLoggerTest.addApplicationProperties(root, true); + }) + .setLogRecordPredicate(TrafficLoggerTest::isTrafficLogRecord) + .assertLogRecords(logRecordsConsumer(false)); + + @Inject + Vertx vertx; + + @Test + public void testTrafficLogger() throws InterruptedException, ExecutionException { + try (WSClient client = new WSClient(vertx)) { + client.connect(WSClient.toWS(endUri, "end")); + client.waitForMessages(1); + assertEquals("ok", client.getMessages().get(0).toString()); + } + assertTrue(Endpoint.CLOSED_LATCH.await(5, TimeUnit.SECONDS)); + } + +} diff --git a/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/TrafficLoggerTest.java b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/TrafficLoggerTest.java new file mode 100644 index 00000000000000..a42dad47a93114 --- /dev/null +++ b/extensions/websockets-next/deployment/src/test/java/io/quarkus/websockets/next/test/traffic/TrafficLoggerTest.java @@ -0,0 +1,66 @@ +package io.quarkus.websockets.next.test.traffic; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; +import java.util.logging.Level; +import java.util.logging.LogRecord; + +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; + +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnClose; +import io.quarkus.websockets.next.OnOpen; +import io.quarkus.websockets.next.WebSocket; + +public abstract class TrafficLoggerTest { + + @TestHTTPResource("/") + URI endUri; + + @WebSocket(path = "/end") + public static class Endpoint { + + static final CountDownLatch CLOSED_LATCH = new CountDownLatch(1); + + @OnOpen + public String open() { + return "ok"; + } + + @OnClose + public void close() { + CLOSED_LATCH.countDown(); + } + + } + + static void addApplicationProperties(JavaArchive archive, boolean server) { + archive.addAsResource(new StringAsset( + "quarkus.websockets-next." + (server ? "server" : "client") + ".traffic-logging.enabled=true\n" + + "quarkus.log.category.\"io.quarkus.websockets.next.traffic\".level=DEBUG"), + "application.properties"); + } + + static Consumer> logRecordsConsumer(boolean received) { + return recs -> { + assertTrue(recs.stream() + .anyMatch(r -> r.getMessage().contains("%s connection opened:"))); + assertTrue(recs.stream() + .anyMatch(r -> r.getMessage() + .contains("%s " + (received ? "received" : "sent") + " text message, Connection[%s]"))); + assertTrue(recs.stream() + .anyMatch(r -> r.getMessage().contains("%s connection closed,"))); + }; + } + + static boolean isTrafficLogRecord(LogRecord r) { + return r.getLevel().equals(Level.FINE) + && r.getLoggerName().equals("io.quarkus.websockets.next.traffic"); + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/TrafficLoggingConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/TrafficLoggingConfig.java new file mode 100644 index 00000000000000..7411390eb6d9df --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/TrafficLoggingConfig.java @@ -0,0 +1,20 @@ +package io.quarkus.websockets.next; + +import io.smallrye.config.WithDefault; + +public interface TrafficLoggingConfig { + + /** + * If set to true then binary/text messages received/sent are logged if the {@code DEBUG} level is enabled for the + * logger {@code io.quarkus.websockets.next.traffic}. + */ + @WithDefault("false") + public boolean enabled(); + + /** + * The number of characters of a text message which will be logged if traffic logging is enabled. The payload of a + * binary message is never logged. + */ + @WithDefault("100") + public int textPayloadLimit(); +} \ No newline at end of file diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java index b79a9de8578536..90c84b47a90c99 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsClientRuntimeConfig.java @@ -58,4 +58,9 @@ public interface WebSocketsClientRuntimeConfig { */ Optional tlsConfigurationName(); + /** + * Traffic logging config. + */ + TrafficLoggingConfig trafficLogging(); + } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java index a6bd6679836f3e..3d4b71a427dd00 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsServerRuntimeConfig.java @@ -59,6 +59,11 @@ public interface WebSocketsServerRuntimeConfig { */ Security security(); + /** + * Traffic logging config. + */ + TrafficLoggingConfig trafficLogging(); + interface Security { /** diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java index d47df577837d3c..fb33732ca6c471 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/BasicWebSocketConnectorImpl.java @@ -147,12 +147,15 @@ public Uni connect() { return UniHelper.toUni(client.connect(connectOptions)) .map(ws -> { String clientId = BasicWebSocketConnector.class.getName(); + TrafficLogger trafficLogger = TrafficLogger.forClient(config); WebSocketClientConnectionImpl connection = new WebSocketClientConnectionImpl(clientId, ws, codecs, pathParams, serverEndpointUri, - headers); - LOG.debugf("Client connection created: %s", connection); + headers, trafficLogger); + if (trafficLogger != null) { + trafficLogger.connectionOpened(connection); + } connectionManager.add(BasicWebSocketConnectorImpl.class.getName(), connection); if (openHandler != null) { @@ -162,8 +165,11 @@ public Uni connect() { if (textMessageHandler != null) { ws.textMessageHandler(new Handler() { @Override - public void handle(String event) { - doExecute(connection, event, textMessageHandler); + public void handle(String message) { + if (trafficLogger != null) { + trafficLogger.textMessageReceived(connection, message); + } + doExecute(connection, message, textMessageHandler); } }); } @@ -172,8 +178,11 @@ public void handle(String event) { ws.binaryMessageHandler(new Handler() { @Override - public void handle(Buffer event) { - doExecute(connection, event, binaryMessageHandler); + public void handle(Buffer message) { + if (trafficLogger != null) { + trafficLogger.binaryMessageReceived(connection, message); + } + doExecute(connection, message, binaryMessageHandler); } }); } @@ -202,6 +211,9 @@ public void handle(Throwable event) { @Override public void handle(Void event) { + if (trafficLogger != null) { + trafficLogger.connectionClosed(connection); + } if (closeHandler != null) { doExecute(connection, new CloseReason(ws.closeStatusCode(), ws.closeReason()), closeHandler); } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java index 15980876612be3..0440b3c0b7fb0f 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/Endpoints.java @@ -31,7 +31,8 @@ class Endpoints { static void initialize(Vertx vertx, ArcContainer container, Codecs codecs, WebSocketConnectionBase connection, WebSocketBase ws, String generatedEndpointClass, Optional autoPingInterval, - SecuritySupport securitySupport, UnhandledFailureStrategy unhandledFailureStrategy, Runnable onClose) { + SecuritySupport securitySupport, UnhandledFailureStrategy unhandledFailureStrategy, TrafficLogger trafficLogger, + Runnable onClose) { Context context = vertx.getOrCreateContext(); @@ -113,6 +114,9 @@ public void handle(Void event) { if (textBroadcastProcessor == null) { // Multi not consumed - invoke @OnTextMessage callback for each message received textMessageHandler(connection, endpoint, ws, onOpenContext, m -> { + if (trafficLogger != null) { + trafficLogger.textMessageReceived(connection, m); + } endpoint.onTextMessage(m).onComplete(r -> { if (r.succeeded()) { LOG.debugf("@OnTextMessage callback consumed text message: %s", connection); @@ -128,6 +132,9 @@ public void handle(Void event) { contextSupport.start(); securitySupport.start(); try { + if (trafficLogger != null) { + trafficLogger.textMessageReceived(connection, m); + } textBroadcastProcessor.onNext(endpoint.decodeTextMultiItem(m)); LOG.debugf("Text message >> Multi: %s", connection); } catch (Throwable throwable) { @@ -144,6 +151,9 @@ public void handle(Void event) { if (binaryBroadcastProcessor == null) { // Multi not consumed - invoke @OnBinaryMessage callback for each message received binaryMessageHandler(connection, endpoint, ws, onOpenContext, m -> { + if (trafficLogger != null) { + trafficLogger.binaryMessageReceived(connection, m); + } endpoint.onBinaryMessage(m).onComplete(r -> { if (r.succeeded()) { LOG.debugf("@OnBinaryMessage callback consumed binary message: %s", connection); @@ -159,6 +169,9 @@ public void handle(Void event) { contextSupport.start(); securitySupport.start(); try { + if (trafficLogger != null) { + trafficLogger.binaryMessageReceived(connection, m); + } binaryBroadcastProcessor.onNext(endpoint.decodeBinaryMultiItem(m)); LOG.debugf("Binary message >> Multi: %s", connection); } catch (Throwable throwable) { @@ -209,6 +222,9 @@ public void handle(Void event) { connection); } securitySupport.onClose(); + if (trafficLogger != null) { + trafficLogger.connectionClosed(connection); + } onClose.run(); if (timerId != null) { vertx.cancelTimer(timerId); diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/TrafficLogger.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/TrafficLogger.java new file mode 100644 index 00000000000000..4a2a86dd9eadf9 --- /dev/null +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/TrafficLogger.java @@ -0,0 +1,137 @@ +package io.quarkus.websockets.next.runtime; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.jboss.logging.Logger; + +import io.quarkus.websockets.next.HandshakeRequest; +import io.quarkus.websockets.next.WebSocketClientConnection; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.WebSocketsClientRuntimeConfig; +import io.quarkus.websockets.next.WebSocketsServerRuntimeConfig; +import io.vertx.core.buffer.Buffer; + +class TrafficLogger { + + static TrafficLogger forClient(WebSocketsClientRuntimeConfig config) { + return config.trafficLogging().enabled() ? new TrafficLogger(Type.CLIENT, config.trafficLogging().textPayloadLimit()) + : null; + } + + static TrafficLogger forServer(WebSocketsServerRuntimeConfig config) { + return config.trafficLogging().enabled() ? new TrafficLogger(Type.SERVER, config.trafficLogging().textPayloadLimit()) + : null; + } + + private static final Logger LOG = Logger.getLogger("io.quarkus.websockets.next.traffic"); + + private final Type type; + + private final int textPayloadLimit; + + private TrafficLogger(Type type, int textPayloadLimit) { + this.type = type; + this.textPayloadLimit = textPayloadLimit; + } + + void connectionOpened(WebSocketConnectionBase connection) { + if (LOG.isDebugEnabled()) { + LOG.debugf("%s connection opened: %s, Connection[%s], Handshake headers[%s]", + typeToString(), + connection.handshakeRequest().path(), + connectionToString(connection), + headersToString(connection.handshakeRequest())); + } + } + + void textMessageReceived(WebSocketConnectionBase connection, String payload) { + if (LOG.isDebugEnabled()) { + LOG.debugf("%s received text message, Connection[%s], Payload: \n%s", + typeToString(), + connectionToString(connection), + payloadToString(payload)); + } + } + + void textMessageSent(WebSocketConnectionBase connection, String payload) { + if (LOG.isDebugEnabled()) { + LOG.debugf("%s sent text message, Connection[%s], Payload: \n%s", + typeToString(), + connectionToString(connection), + payloadToString(payload)); + } + } + + void binaryMessageReceived(WebSocketConnectionBase connection, Buffer payload) { + if (LOG.isDebugEnabled()) { + LOG.debugf("%s received binary message, Connection[%s], Payload[%s bytes]", + typeToString(), + connectionToString(connection), + payload.length()); + } + } + + void binaryMessageSent(WebSocketConnectionBase connection, Buffer payload) { + if (LOG.isDebugEnabled()) { + LOG.debugf("%s sent binary message, Connection[%s], Payload[%s bytes]", + typeToString(), + connectionToString(connection), + payload.length()); + } + } + + void connectionClosed(WebSocketConnectionBase connection) { + if (LOG.isDebugEnabled()) { + LOG.debugf("%s connection closed, Connection[%s]", + typeToString(), + connectionToString(connection)); + } + } + + private String payloadToString(String payload) { + if (payload == null || payload.isBlank()) { + return "n/a"; + } else if (textPayloadLimit < 0 || payload.length() <= textPayloadLimit) { + return payload; + } else { + return payload.substring(0, payload.length()) + "..."; + } + } + + private String headersToString(HandshakeRequest request) { + Map> headers = request.headers(); + if (headers.isEmpty()) { + return ""; + } + StringBuilder builder = new StringBuilder(); + for (Entry> e : headers.entrySet()) { + for (String value : e.getValue()) { + builder.append(" ").append(e.getKey()).append("=").append(value); + } + } + return builder.toString(); + } + + private String typeToString() { + return type == Type.SERVER ? "[server]" : "[client]"; + } + + private String connectionToString(WebSocketConnectionBase connection) { + StringBuilder builder = new StringBuilder(); + if (connection instanceof WebSocketConnection) { + builder.append("endpointId=").append(((WebSocketConnection) connection).endpointId()); + } else { + builder.append("clientId=").append(((WebSocketClientConnection) connection).clientId()); + } + builder.append(", id=").append(connection.id()); + return builder.toString(); + } + + enum Type { + SERVER, + CLIENT + } + +} diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketClientConnectionImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketClientConnectionImpl.java index 83b9745ab7cf5a..040f2df87e097b 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketClientConnectionImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketClientConnectionImpl.java @@ -19,8 +19,9 @@ class WebSocketClientConnectionImpl extends WebSocketConnectionBase implements W private final WebSocket webSocket; WebSocketClientConnectionImpl(String clientId, WebSocket webSocket, Codecs codecs, - Map pathParams, URI serverEndpointUri, Map> headers) { - super(Map.copyOf(pathParams), codecs, new ClientHandshakeRequestImpl(serverEndpointUri, headers)); + Map pathParams, URI serverEndpointUri, Map> headers, + TrafficLogger trafficLogger) { + super(Map.copyOf(pathParams), codecs, new ClientHandshakeRequestImpl(serverEndpointUri, headers), trafficLogger); this.clientId = clientId; this.webSocket = Objects.requireNonNull(webSocket); } diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java index 00ae0dc9e0d1f2..15433958a20458 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionBase.java @@ -32,12 +32,16 @@ public abstract class WebSocketConnectionBase { protected final Instant creationTime; - WebSocketConnectionBase(Map pathParams, Codecs codecs, HandshakeRequest handshakeRequest) { + protected final TrafficLogger trafficLogger; + + WebSocketConnectionBase(Map pathParams, Codecs codecs, HandshakeRequest handshakeRequest, + TrafficLogger trafficLogger) { this.identifier = UUID.randomUUID().toString(); this.pathParams = pathParams; this.codecs = codecs; this.handshakeRequest = handshakeRequest; this.creationTime = Instant.now(); + this.trafficLogger = trafficLogger; } abstract WebSocketBase webSocket(); @@ -51,11 +55,15 @@ public String pathParam(String name) { } public Uni sendText(String message) { - return UniHelper.toUni(webSocket().writeTextMessage(message)); + Uni uni = UniHelper.toUni(webSocket().writeTextMessage(message)); + return trafficLogger == null ? uni : uni.invoke(() -> { + trafficLogger.textMessageSent(this, message); + }); } public Uni sendBinary(Buffer message) { - return UniHelper.toUni(webSocket().writeBinaryMessage(message)); + Uni uni = UniHelper.toUni(webSocket().writeBinaryMessage(message)); + return trafficLogger == null ? uni : uni.invoke(() -> trafficLogger.binaryMessageSent(this, message)); } public Uni sendText(M message) { diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java index 124fc48bdab6b3..d1d4cad07638e1 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectionImpl.java @@ -33,8 +33,8 @@ class WebSocketConnectionImpl extends WebSocketConnectionBase implements WebSock WebSocketConnectionImpl(String generatedEndpointClass, String endpointClass, ServerWebSocket webSocket, ConnectionManager connectionManager, - Codecs codecs, RoutingContext ctx) { - super(Map.copyOf(ctx.pathParams()), codecs, new HandshakeRequestImpl(webSocket, ctx)); + Codecs codecs, RoutingContext ctx, TrafficLogger trafficLogger) { + super(Map.copyOf(ctx.pathParams()), codecs, new HandshakeRequestImpl(webSocket, ctx), trafficLogger); this.generatedEndpointClass = generatedEndpointClass; this.endpointId = endpointClass; this.webSocket = Objects.requireNonNull(webSocket); @@ -70,7 +70,7 @@ public String subprotocol() { @Override public String toString() { - return "WebSocket connection [id=" + identifier + ", path=" + webSocket.path() + "]"; + return "WebSocket connection [endpointId=" + endpointId + ", path=" + webSocket.path() + ", id=" + identifier + "]"; } @Override diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java index be39e41799564c..359a400f5160af 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketConnectorImpl.java @@ -13,7 +13,6 @@ import jakarta.enterprise.inject.spi.InjectionPoint; import org.eclipse.microprofile.config.ConfigProvider; -import org.jboss.logging.Logger; import io.quarkus.arc.Arc; import io.quarkus.tls.TlsConfigurationRegistry; @@ -34,8 +33,6 @@ public class WebSocketConnectorImpl extends WebSocketConnectorBase> implements WebSocketConnector { - private static final Logger LOG = Logger.getLogger(WebSocketConnectorImpl.class); - // derived properties private final ClientEndpoint clientEndpoint; @@ -97,16 +94,19 @@ public Uni connect() { return UniHelper.toUni(client.connect(connectOptions)) .map(ws -> { + TrafficLogger trafficLogger = TrafficLogger.forClient(config); WebSocketClientConnectionImpl connection = new WebSocketClientConnectionImpl(clientEndpoint.clientId, ws, codecs, pathParams, - serverEndpointUri, headers); - LOG.debugf("Client connection created: %s", connection); + serverEndpointUri, headers, trafficLogger); + if (trafficLogger != null) { + trafficLogger.connectionOpened(connection); + } connectionManager.add(clientEndpoint.generatedEndpointClass, connection); Endpoints.initialize(vertx, Arc.container(), codecs, connection, ws, clientEndpoint.generatedEndpointClass, config.autoPingInterval(), SecuritySupport.NOOP, - config.unhandledFailureStrategy(), + config.unhandledFailureStrategy(), trafficLogger, () -> { connectionManager.remove(clientEndpoint.generatedEndpointClass, connection); client.close(); diff --git a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index 4a14a2cd80d401..6deedde0f64091 100644 --- a/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -8,8 +8,6 @@ import jakarta.enterprise.inject.Instance; -import org.jboss.logging.Logger; - import io.quarkus.arc.Arc; import io.quarkus.arc.ArcContainer; import io.quarkus.runtime.annotations.Recorder; @@ -36,8 +34,6 @@ @Recorder public class WebSocketServerRecorder { - private static final Logger LOG = Logger.getLogger(WebSocketServerRecorder.class); - private final WebSocketsServerRuntimeConfig config; public WebSocketServerRecorder(WebSocketsServerRuntimeConfig config) { @@ -94,6 +90,7 @@ public Handler createEndpointHandler(String generatedEndpointCla ConnectionManager connectionManager = container.instance(ConnectionManager.class).get(); Codecs codecs = container.instance(Codecs.class).get(); HttpUpgradeCheck[] httpUpgradeChecks = getHttpUpgradeChecks(endpointId, container); + TrafficLogger trafficLogger = TrafficLogger.forServer(config); return new Handler() { @Override @@ -121,14 +118,16 @@ private void httpUpgrade(RoutingContext ctx) { Vertx vertx = VertxCoreRecorder.getVertx().get(); WebSocketConnectionImpl connection = new WebSocketConnectionImpl(generatedEndpointClass, endpointId, ws, - connectionManager, codecs, ctx); + connectionManager, codecs, ctx, trafficLogger); connectionManager.add(generatedEndpointClass, connection); - LOG.debugf("Connection created: %s", connection); + if (trafficLogger != null) { + trafficLogger.connectionOpened(connection); + } SecuritySupport securitySupport = initializeSecuritySupport(container, ctx, vertx, connection); Endpoints.initialize(vertx, container, codecs, connection, ws, generatedEndpointClass, - config.autoPingInterval(), securitySupport, config.unhandledFailureStrategy(), + config.autoPingInterval(), securitySupport, config.unhandledFailureStrategy(), trafficLogger, () -> connectionManager.remove(generatedEndpointClass, connection)); }); }