From af76df30a64644e12822408715bed5476c4dd36b Mon Sep 17 00:00:00 2001 From: Vaibhav Murkute Date: Thu, 30 Jun 2022 14:03:36 -0400 Subject: [PATCH 1/2] feat: ingest mqtt metrics --- .../mqtt/moquette/ClientDeviceAuthorizer.java | 62 ++++++++++- .../greengrass/mqtt/moquette/MQTTService.java | 10 +- .../metrics/MoquetteMqttMetricsEmmitter.java | 102 ++++++++++++++++++ .../mqtt/moquette/metrics/MqttMetrics.java | 20 ++++ .../moquette/ClientDeviceAuthorizerTest.java | 23 ++-- 5 files changed, 201 insertions(+), 16 deletions(-) create mode 100644 integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java create mode 100644 integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java index 77d94cf4..cb95b874 100644 --- a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java @@ -11,6 +11,11 @@ import com.aws.greengrass.clientdevices.auth.exception.AuthorizationException; import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; +import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter; +import com.aws.greengrass.mqtt.moquette.metrics.MqttMetrics; +import com.aws.greengrass.telemetry.impl.Metric; +import com.aws.greengrass.telemetry.models.TelemetryAggregation; +import com.aws.greengrass.telemetry.models.TelemetryUnit; import io.moquette.broker.security.IAuthenticator; import io.moquette.broker.security.IAuthorizatorPolicy; import io.moquette.broker.subscriptions.Topic; @@ -19,6 +24,7 @@ import io.moquette.interception.messages.InterceptConnectionLostMessage; import io.moquette.interception.messages.InterceptDisconnectMessage; +import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,15 +37,19 @@ public class ClientDeviceAuthorizer implements IAuthenticator, IAuthorizatorPoli private static final String MQTT_CREDENTIAL = "mqtt"; private final ClientDevicesAuthServiceApi clientDevicesAuthService; + private final MoquetteMqttMetricsEmmitter metricsEmitter; private final Map clientToSessionMap; /** * Constructor. * * @param clientDevicesAuthService Client devices auth service handle + * @param metricsEmmitter MQTT metrics emitter */ - public ClientDeviceAuthorizer(ClientDevicesAuthServiceApi clientDevicesAuthService) { + public ClientDeviceAuthorizer(ClientDevicesAuthServiceApi clientDevicesAuthService, + MoquetteMqttMetricsEmmitter metricsEmmitter) { this.clientDevicesAuthService = clientDevicesAuthService; + this.metricsEmitter = metricsEmmitter; this.clientToSessionMap = new ConcurrentHashMap<>(); } @@ -80,6 +90,7 @@ public boolean checkValid(String clientId, String username, byte[] password) { } else { LOG.atWarn().kv(CLIENT_ID, clientId).kv(SESSION_ID, sessionId).log("Device isn't authorized to connect"); clientDevicesAuthService.closeClientDeviceAuthSession(sessionId); + emitAuthErrorMetric("mqtt:connect"); } return canConnect; @@ -89,13 +100,19 @@ public boolean checkValid(String clientId, String username, byte[] password) { public boolean canWrite(Topic topic, String user, String client) { String resource = "mqtt:topic:" + topic; boolean canPerform = false; + String publishOp = "mqtt:publish"; try { - canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), "mqtt:publish", resource); + canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), publishOp, resource); } catch (AuthenticationException e) { LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client."); + emitAuthErrorMetric(publishOp); } LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client) .log("MQTT publish request"); + + if (!canPerform) { + emitAuthErrorMetric(publishOp); + } return canPerform; } @@ -103,13 +120,19 @@ public boolean canWrite(Topic topic, String user, String client) { public boolean canRead(Topic topic, String user, String client) { String resource = "mqtt:topicfilter:" + topic; boolean canPerform = false; + String subscribeOp = "mqtt:subscribe"; try { - canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), "mqtt:subscribe", resource); + canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), subscribeOp, resource); } catch (AuthenticationException e) { LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client."); + emitAuthErrorMetric(subscribeOp); } LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client) .log("MQTT subscribe request"); + + if (!canPerform) { + emitAuthErrorMetric(subscribeOp); + } return canPerform; } @@ -201,4 +224,37 @@ private void closeAuthSession(String clientId, String username) { } } } + + /** + * Emits MQTT AuthError metrics for requested MQTT operation. + * Ideally these metrics should be emitted from the Moquette request handlers. + * Emitting metrics from Authorizer integration to avoid broker code change. + * + * @param operation Requested MQTT operation + */ + private void emitAuthErrorMetric(String operation) { + String authErrorMetric; + switch (operation) { + case "mqtt:connect": + authErrorMetric = MqttMetrics.CONNECT_AUTH_ERROR; + break; + case "mqtt:publish" : + authErrorMetric = MqttMetrics.PUBLISH_IN_AUTH_ERROR; + break; + case "mqtt:subscribe" : + authErrorMetric = MqttMetrics.SUBSCRIBE_AUTH_ERROR; + break; + default: + authErrorMetric = MqttMetrics.UNKNOWN_AUTH_ERROR; + break; + } + metricsEmitter.emitMetric(Metric.builder() + .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) + .name(authErrorMetric) + .unit(TelemetryUnit.Count) + .aggregation(TelemetryAggregation.Sum) + .value(1) + .timestamp(Instant.now().toEpochMilli()) + .build()); + } } diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java index 6a684028..c26568a6 100644 --- a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java @@ -17,6 +17,7 @@ import com.aws.greengrass.dependency.State; import com.aws.greengrass.lifecyclemanager.Kernel; import com.aws.greengrass.lifecyclemanager.PluginService; +import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter; import com.aws.greengrass.util.Coerce; import io.moquette.BrokerConstants; import io.moquette.broker.ISslContextCreator; @@ -27,7 +28,7 @@ import java.io.IOException; import java.security.KeyStoreException; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.Properties; import javax.inject.Inject; @@ -43,6 +44,7 @@ public class MQTTService extends PluginService { private final Kernel kernel; private final ClientDeviceTrustManager clientDeviceTrustManager; private final ClientDeviceAuthorizer clientDeviceAuthorizer; + private final MoquetteMqttMetricsEmmitter mqttMetricsEmmitter; private final List interceptHandlers; private final ClientDevicesAuthServiceApi clientDevicesAuthServiceApi; private final GetCertificateRequest serverCertificateRequest; @@ -62,8 +64,10 @@ public MQTTService(Topics topics, Kernel kernel, ClientDevicesAuthServiceApi cli super(topics); this.kernel = kernel; this.clientDeviceTrustManager = new ClientDeviceTrustManager(clientDevicesAuthService); - this.clientDeviceAuthorizer = new ClientDeviceAuthorizer(clientDevicesAuthService); - this.interceptHandlers = Collections.singletonList(clientDeviceAuthorizer.new ConnectionTerminationListener()); + this.mqttMetricsEmmitter = new MoquetteMqttMetricsEmmitter(); + this.clientDeviceAuthorizer = new ClientDeviceAuthorizer(clientDevicesAuthService, mqttMetricsEmmitter); + this.interceptHandlers = Arrays.asList(clientDeviceAuthorizer.new ConnectionTerminationListener(), + mqttMetricsEmmitter. new MqttMetricsCaptor()); this.clientDevicesAuthServiceApi = clientDevicesAuthService; GetCertificateRequestOptions options = new GetCertificateRequestOptions(); diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java new file mode 100644 index 00000000..f432b244 --- /dev/null +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java @@ -0,0 +1,102 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.mqtt.moquette.metrics; + +import com.aws.greengrass.telemetry.impl.Metric; +import com.aws.greengrass.telemetry.impl.MetricFactory; +import com.aws.greengrass.telemetry.models.TelemetryAggregation; +import com.aws.greengrass.telemetry.models.TelemetryUnit; +import io.moquette.interception.AbstractInterceptHandler; +import io.moquette.interception.InterceptHandler; +import io.moquette.interception.messages.InterceptConnectMessage; +import io.moquette.interception.messages.InterceptDisconnectMessage; +import io.moquette.interception.messages.InterceptPublishMessage; +import io.moquette.interception.messages.InterceptSubscribeMessage; +import io.moquette.interception.messages.InterceptUnsubscribeMessage; + +import java.time.Instant; + +public class MoquetteMqttMetricsEmmitter { + + private final MetricFactory metricFactory = new MetricFactory(MqttMetrics.MOQUETTE_MQTT_NAMESPACE); + + /** + * Emits Moquette MQTT metrics. + * + * @param metric {@link Metric} to be emitted in Moquette MQTT namespace + */ + public void emitMetric(Metric metric) { + metricFactory.putMetricData(metric); + } + + public class MqttMetricsCaptor extends AbstractInterceptHandler implements InterceptHandler { + + @Override + public String getID() { + return "MoquetteMqttMetricsCaptor"; + } + + @Override + public void onConnect(InterceptConnectMessage msg) { + emitMetric(Metric.builder() + .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) + .name(MqttMetrics.CONNECT_SUCCESS) + .unit(TelemetryUnit.Count) + .aggregation(TelemetryAggregation.Sum) + .value(1) + .timestamp(Instant.now().toEpochMilli()) + .build()); + } + + @Override + public void onDisconnect(InterceptDisconnectMessage msg) { + emitMetric(Metric.builder() + .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) + .name(MqttMetrics.DISCONNECT) + .unit(TelemetryUnit.Count) + .aggregation(TelemetryAggregation.Sum) + .value(1) + .timestamp(Instant.now().toEpochMilli()) + .build()); + } + + @Override + public void onPublish(InterceptPublishMessage msg) { + emitMetric(Metric.builder() + .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) + .name(MqttMetrics.PUBLISH_OUT_SUCCESS) + .unit(TelemetryUnit.Count) + .aggregation(TelemetryAggregation.Sum) + .value(1) + .timestamp(Instant.now().toEpochMilli()) + .build()); + } + + @Override + public void onSubscribe(InterceptSubscribeMessage msg) { + emitMetric(Metric.builder() + .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) + .name(MqttMetrics.SUBSCRIBE_SUCCESS) + .unit(TelemetryUnit.Count) + .aggregation(TelemetryAggregation.Sum) + .value(1) + .timestamp(Instant.now().toEpochMilli()) + .build()); + } + + @Override + public void onUnsubscribe(InterceptUnsubscribeMessage msg) { + emitMetric(Metric.builder() + .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) + .name(MqttMetrics.UNSUBSCRIBE) + .unit(TelemetryUnit.Count) + .aggregation(TelemetryAggregation.Sum) + .value(1) + .timestamp(Instant.now().toEpochMilli()) + .build()); + } + } +} diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java new file mode 100644 index 00000000..9e5461ab --- /dev/null +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java @@ -0,0 +1,20 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.mqtt.moquette.metrics; + + +public final class MqttMetrics { + public static final String MOQUETTE_MQTT_NAMESPACE = "MoquetteMqtt"; + public static final String CONNECT_SUCCESS = "Connect.Success"; + public static final String CONNECT_AUTH_ERROR = "Connect.AuthError"; + public static final String SUBSCRIBE_SUCCESS = "Subscribe.Success"; + public static final String SUBSCRIBE_AUTH_ERROR = "Subscribe.AuthError"; + public static final String PUBLISH_OUT_SUCCESS = "PublishOut.Success"; + public static final String PUBLISH_IN_AUTH_ERROR = "PublishIn.AuthError"; + public static final String DISCONNECT = "Disconnect"; + public static final String UNSUBSCRIBE = "Unsubscribe"; + public static final String UNKNOWN_AUTH_ERROR = "UnknownAuthError"; +} diff --git a/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java index 34368e06..5704644a 100644 --- a/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java +++ b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java @@ -9,6 +9,7 @@ import com.aws.greengrass.clientdevices.auth.api.ClientDevicesAuthServiceApi; import com.aws.greengrass.clientdevices.auth.exception.AuthenticationException; import com.aws.greengrass.clientdevices.auth.exception.AuthorizationException; +import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter; import com.aws.greengrass.testcommons.testutilities.GGExtension; import com.aws.greengrass.testcommons.testutilities.GGServiceTestUtil; import io.moquette.broker.subscriptions.Topic; @@ -31,6 +32,8 @@ public class ClientDeviceAuthorizerTest extends GGServiceTestUtil { @Mock ClientDevicesAuthServiceApi mockClientDevicesAuthService; + @Mock + MoquetteMqttMetricsEmmitter mockMqttMetricsEmmitter; private static final String DEFAULT_SESSION = "SESSION_ID"; private static final String DEFAULT_CLIENT = "clientId"; @@ -72,13 +75,13 @@ void configureSubscribeResponse(boolean doAllow) throws AuthorizationException { @Test void GIVEN_clientDataWithoutCertificate_WHEN_checkValid_THEN_returnsFalse() { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); assertThat(authorizer.checkValid(DEFAULT_CLIENT, EMPTY_PEER_CERT, DEFAULT_PASSWORD), is(false)); } @Test void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalseAndClosesSession() throws Exception { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(false); @@ -92,7 +95,7 @@ void GIVEN_duplicateClientIds_WHEN_checkValid_THEN_firstSessionClosed() throws A AuthorizationException { final String USERNAME1 = "PeerCert1"; final String USERNAME2 = "PeerCert2"; - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn("SESSION1"); configureConnectResponse("SESSION1", DEFAULT_CLIENT, true); @@ -115,7 +118,7 @@ void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalse(ExtensionContext throws AuthenticationException { ignoreExceptionOfType(context, AuthenticationException.class); - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenThrow( new AuthenticationException("Invalid client")); @@ -125,7 +128,7 @@ void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalse(ExtensionContext @Test void GIVEN_authorizedClient_WHEN_checkValid_THEN_returnsTrue() throws Exception { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); @@ -138,7 +141,7 @@ void GIVEN_unknownClient_WHEN_canReadCanWrite_THEN_returnsFalse(ExtensionContext throws AuthenticationException { ignoreExceptionOfType(context, AuthenticationException.class); - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenThrow( new AuthenticationException("Invalid client")); @@ -149,7 +152,7 @@ void GIVEN_unknownClient_WHEN_canReadCanWrite_THEN_returnsFalse(ExtensionContext @Test void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsFalse() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); @@ -163,7 +166,7 @@ void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsFalse() throws Au @Test void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsTrue() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); @@ -178,7 +181,7 @@ void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsTrue() throws Aut @Test void GIVEN_twoClientsWithDifferingPermissions_WHEN_canReadCanWrite_THEN_correctSessionIsUsed() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); String session1 = "SESSION_ID1"; String session2 = "SESSION_ID2"; String client1 = "clientId1"; @@ -220,7 +223,7 @@ void GIVEN_twoClientsWithDifferingPermissions_WHEN_canReadCanWrite_THEN_correctS @Test void GIVEN_authorizedClient_WHEN_onDisconnect_THEN_closeAuthSession() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); From 2df709aefaee8e0a648d25bfe2a1602a229ff8b8 Mon Sep 17 00:00:00 2001 From: Vaibhav Murkute Date: Fri, 1 Jul 2022 12:09:10 -0400 Subject: [PATCH 2/2] fix: schedule periodic batch metrics emit --- .../mqtt/moquette/ClientDeviceAuthorizer.java | 59 ++++------ .../greengrass/mqtt/moquette/MQTTService.java | 15 +-- .../mqtt/moquette/metrics/MetricsStore.java | 100 +++++++++++++++++ .../metrics/MoquetteMqttMetricsEmmitter.java | 102 ------------------ .../mqtt/moquette/metrics/MqttMetrics.java | 20 ---- .../moquette/metrics/MqttMetricsCaptor.java | 49 +++++++++ .../moquette/metrics/MqttMetricsEmitter.java | 52 +++++++++ .../moquette/ClientDeviceAuthorizerTest.java | 23 ++-- .../mqtt/moquette/MQTTServiceTest.java | 12 +++ .../moquette/metrics/MetricsStoreTest.java | 65 +++++++++++ 10 files changed, 318 insertions(+), 179 deletions(-) create mode 100644 integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStore.java delete mode 100644 integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java delete mode 100644 integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java create mode 100644 integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsCaptor.java create mode 100644 integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsEmitter.java create mode 100644 integration/src/test/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStoreTest.java diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java index cb95b874..a33c7515 100644 --- a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizer.java @@ -11,11 +11,8 @@ import com.aws.greengrass.clientdevices.auth.exception.AuthorizationException; import com.aws.greengrass.logging.api.Logger; import com.aws.greengrass.logging.impl.LogManager; -import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter; -import com.aws.greengrass.mqtt.moquette.metrics.MqttMetrics; -import com.aws.greengrass.telemetry.impl.Metric; -import com.aws.greengrass.telemetry.models.TelemetryAggregation; -import com.aws.greengrass.telemetry.models.TelemetryUnit; +import com.aws.greengrass.mqtt.moquette.metrics.MetricsStore; +import com.aws.greengrass.mqtt.moquette.metrics.MetricsStore.MqttMetric; import io.moquette.broker.security.IAuthenticator; import io.moquette.broker.security.IAuthorizatorPolicy; import io.moquette.broker.subscriptions.Topic; @@ -24,7 +21,6 @@ import io.moquette.interception.messages.InterceptConnectionLostMessage; import io.moquette.interception.messages.InterceptDisconnectMessage; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,21 +31,20 @@ public class ClientDeviceAuthorizer implements IAuthenticator, IAuthorizatorPoli private static final String CERTIFICATE_PEM = "certificatePem"; private static final String SESSION_ID = "sessionId"; private static final String MQTT_CREDENTIAL = "mqtt"; + private static final String MQTT_CONNECT_OP = "mqtt:connect"; + private static final String MQTT_PUBLISH_OP = "mqtt:publish"; + private static final String MQTT_SUBSCRIBE_OP = "mqtt:subscribe"; private final ClientDevicesAuthServiceApi clientDevicesAuthService; - private final MoquetteMqttMetricsEmmitter metricsEmitter; private final Map clientToSessionMap; /** * Constructor. * * @param clientDevicesAuthService Client devices auth service handle - * @param metricsEmmitter MQTT metrics emitter */ - public ClientDeviceAuthorizer(ClientDevicesAuthServiceApi clientDevicesAuthService, - MoquetteMqttMetricsEmmitter metricsEmmitter) { + public ClientDeviceAuthorizer(ClientDevicesAuthServiceApi clientDevicesAuthService) { this.clientDevicesAuthService = clientDevicesAuthService; - this.metricsEmitter = metricsEmmitter; this.clientToSessionMap = new ConcurrentHashMap<>(); } @@ -73,7 +68,7 @@ public boolean checkValid(String clientId, String username, byte[] password) { return false; } - boolean canConnect = canDevicePerform(sessionId, "mqtt:connect", "mqtt:clientId:" + clientId); + boolean canConnect = canDevicePerform(sessionId, MQTT_CONNECT_OP, "mqtt:clientId:" + clientId); // Add mapping from client id to session id for future canRead/canWrite calls if (canConnect) { @@ -90,7 +85,7 @@ public boolean checkValid(String clientId, String username, byte[] password) { } else { LOG.atWarn().kv(CLIENT_ID, clientId).kv(SESSION_ID, sessionId).log("Device isn't authorized to connect"); clientDevicesAuthService.closeClientDeviceAuthSession(sessionId); - emitAuthErrorMetric("mqtt:connect"); + emitAuthErrorMetric(MQTT_CONNECT_OP); } return canConnect; @@ -100,18 +95,16 @@ public boolean checkValid(String clientId, String username, byte[] password) { public boolean canWrite(Topic topic, String user, String client) { String resource = "mqtt:topic:" + topic; boolean canPerform = false; - String publishOp = "mqtt:publish"; try { - canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), publishOp, resource); + canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), MQTT_PUBLISH_OP, resource); } catch (AuthenticationException e) { LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client."); - emitAuthErrorMetric(publishOp); } LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client) .log("MQTT publish request"); if (!canPerform) { - emitAuthErrorMetric(publishOp); + emitAuthErrorMetric(MQTT_PUBLISH_OP); } return canPerform; } @@ -120,18 +113,16 @@ public boolean canWrite(Topic topic, String user, String client) { public boolean canRead(Topic topic, String user, String client) { String resource = "mqtt:topicfilter:" + topic; boolean canPerform = false; - String subscribeOp = "mqtt:subscribe"; try { - canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), subscribeOp, resource); + canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), MQTT_SUBSCRIBE_OP, resource); } catch (AuthenticationException e) { LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client."); - emitAuthErrorMetric(subscribeOp); } LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client) .log("MQTT subscribe request"); if (!canPerform) { - emitAuthErrorMetric(subscribeOp); + emitAuthErrorMetric(MQTT_SUBSCRIBE_OP); } return canPerform; } @@ -233,28 +224,22 @@ private void closeAuthSession(String clientId, String username) { * @param operation Requested MQTT operation */ private void emitAuthErrorMetric(String operation) { - String authErrorMetric; + MqttMetric authErrorMetric; switch (operation) { - case "mqtt:connect": - authErrorMetric = MqttMetrics.CONNECT_AUTH_ERROR; + case MQTT_CONNECT_OP: + authErrorMetric = MqttMetric.CONNECT_AUTH_ERROR; break; - case "mqtt:publish" : - authErrorMetric = MqttMetrics.PUBLISH_IN_AUTH_ERROR; + case MQTT_PUBLISH_OP: + authErrorMetric = MqttMetric.PUBLISH_IN_AUTH_ERROR; break; - case "mqtt:subscribe" : - authErrorMetric = MqttMetrics.SUBSCRIBE_AUTH_ERROR; + case MQTT_SUBSCRIBE_OP: + authErrorMetric = MqttMetric.SUBSCRIBE_AUTH_ERROR; break; default: - authErrorMetric = MqttMetrics.UNKNOWN_AUTH_ERROR; + authErrorMetric = MqttMetric.UNKNOWN_AUTH_ERROR; break; } - metricsEmitter.emitMetric(Metric.builder() - .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) - .name(authErrorMetric) - .unit(TelemetryUnit.Count) - .aggregation(TelemetryAggregation.Sum) - .value(1) - .timestamp(Instant.now().toEpochMilli()) - .build()); + + MetricsStore.getInstance().incrementMetricValue(authErrorMetric); } } diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java index c26568a6..62aa0a96 100644 --- a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/MQTTService.java @@ -17,7 +17,8 @@ import com.aws.greengrass.dependency.State; import com.aws.greengrass.lifecyclemanager.Kernel; import com.aws.greengrass.lifecyclemanager.PluginService; -import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter; +import com.aws.greengrass.mqtt.moquette.metrics.MqttMetricsCaptor; +import com.aws.greengrass.mqtt.moquette.metrics.MqttMetricsEmitter; import com.aws.greengrass.util.Coerce; import io.moquette.BrokerConstants; import io.moquette.broker.ISslContextCreator; @@ -44,7 +45,6 @@ public class MQTTService extends PluginService { private final Kernel kernel; private final ClientDeviceTrustManager clientDeviceTrustManager; private final ClientDeviceAuthorizer clientDeviceAuthorizer; - private final MoquetteMqttMetricsEmmitter mqttMetricsEmmitter; private final List interceptHandlers; private final ClientDevicesAuthServiceApi clientDevicesAuthServiceApi; private final GetCertificateRequest serverCertificateRequest; @@ -58,21 +58,22 @@ public class MQTTService extends PluginService { * @param topics Root Configuration topic for this service * @param kernel Greengrass Nucleus * @param clientDevicesAuthService Client devices auth service handle + * @param mqttMetricsEmitter MQTT metrics emitter */ @Inject - public MQTTService(Topics topics, Kernel kernel, ClientDevicesAuthServiceApi clientDevicesAuthService) { + public MQTTService(Topics topics, Kernel kernel, ClientDevicesAuthServiceApi clientDevicesAuthService, + MqttMetricsEmitter mqttMetricsEmitter) { super(topics); this.kernel = kernel; this.clientDeviceTrustManager = new ClientDeviceTrustManager(clientDevicesAuthService); - this.mqttMetricsEmmitter = new MoquetteMqttMetricsEmmitter(); - this.clientDeviceAuthorizer = new ClientDeviceAuthorizer(clientDevicesAuthService, mqttMetricsEmmitter); + this.clientDeviceAuthorizer = new ClientDeviceAuthorizer(clientDevicesAuthService); this.interceptHandlers = Arrays.asList(clientDeviceAuthorizer.new ConnectionTerminationListener(), - mqttMetricsEmmitter. new MqttMetricsCaptor()); + new MqttMetricsCaptor()); this.clientDevicesAuthServiceApi = clientDevicesAuthService; - GetCertificateRequestOptions options = new GetCertificateRequestOptions(); options.setCertificateType(GetCertificateRequestOptions.CertificateType.SERVER); serverCertificateRequest = new GetCertificateRequest(SERVICE_NAME, options, this::updateServerCertificate); + mqttMetricsEmitter.schedulePeriodicMetricEmit(); } @Override diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStore.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStore.java new file mode 100644 index 00000000..a76ec94a --- /dev/null +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStore.java @@ -0,0 +1,100 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.mqtt.moquette.metrics; + + +import com.aws.greengrass.telemetry.impl.Metric; +import com.aws.greengrass.telemetry.models.TelemetryAggregation; +import com.aws.greengrass.telemetry.models.TelemetryUnit; +import lombok.AccessLevel; +import lombok.Getter; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class MetricsStore { + public static final String LOCAL_MQTT_NAMESPACE = "LocalMQTT"; + @Getter(AccessLevel.PACKAGE) + private final Map metrics = new HashMap<>(); + + private MetricsStore() { + init(); + } + + private static class MetricsStoreHelper { + @SuppressWarnings("PMD.AccessorClassGeneration") + private static final MetricsStore INSTANCE = new MetricsStore(); + } + + /** + * Gets the singleton instance of MetricsStore with lazy initialization. + * + * @return {@link MetricsStore} + */ + public static MetricsStore getInstance() { + return MetricsStoreHelper.INSTANCE; + } + + public void incrementMetricValue(MqttMetric metric) { + Optional.ofNullable(metrics.get(metric)).map(AtomicInteger::incrementAndGet); + } + + public List getAndResetTelemetryMetrics() { + return metrics.keySet().stream().map(this::getTelemetryMetricAndReset) + .collect(Collectors.toList()); + } + + private Metric getTelemetryMetricAndReset(MqttMetric metric) { + int metricValue = metrics.get(metric).getAndSet(0); + return Metric.builder() + .namespace(LOCAL_MQTT_NAMESPACE) + .name(metric.toString()) + .unit(TelemetryUnit.Count) + .aggregation(TelemetryAggregation.Sum) + .value(metricValue) + .timestamp(Instant.now().toEpochMilli()) + .build(); + } + + private void init() { + metrics.put(MqttMetric.CONNECT_SUCCESS, new AtomicInteger(0)); + metrics.put(MqttMetric.CONNECT_AUTH_ERROR, new AtomicInteger(0)); + metrics.put(MqttMetric.SUBSCRIBE_SUCCESS, new AtomicInteger(0)); + metrics.put(MqttMetric.SUBSCRIBE_AUTH_ERROR, new AtomicInteger(0)); + metrics.put(MqttMetric.PUBLISH_OUT_SUCCESS, new AtomicInteger(0)); + metrics.put(MqttMetric.PUBLISH_IN_AUTH_ERROR, new AtomicInteger(0)); + metrics.put(MqttMetric.UNSUBSCRIBE, new AtomicInteger(0)); + metrics.put(MqttMetric.DISCONNECT, new AtomicInteger(0)); + metrics.put(MqttMetric.UNKNOWN_AUTH_ERROR, new AtomicInteger(0)); + } + + public enum MqttMetric { + CONNECT_SUCCESS("Connect.Success"), + CONNECT_AUTH_ERROR("Connect.AuthError"), + SUBSCRIBE_SUCCESS("Subscribe.Success"), + SUBSCRIBE_AUTH_ERROR("Subscribe.AuthError"), + PUBLISH_OUT_SUCCESS("PublishOut.Success"), + PUBLISH_IN_AUTH_ERROR("PublishIn.AuthError"), + DISCONNECT("Disconnect"), + UNSUBSCRIBE("Unsubscribe"), + UNKNOWN_AUTH_ERROR("UnknownAuthError"); + + private final String name; + MqttMetric(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } +} diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java deleted file mode 100644 index f432b244..00000000 --- a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MoquetteMqttMetricsEmmitter.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.aws.greengrass.mqtt.moquette.metrics; - -import com.aws.greengrass.telemetry.impl.Metric; -import com.aws.greengrass.telemetry.impl.MetricFactory; -import com.aws.greengrass.telemetry.models.TelemetryAggregation; -import com.aws.greengrass.telemetry.models.TelemetryUnit; -import io.moquette.interception.AbstractInterceptHandler; -import io.moquette.interception.InterceptHandler; -import io.moquette.interception.messages.InterceptConnectMessage; -import io.moquette.interception.messages.InterceptDisconnectMessage; -import io.moquette.interception.messages.InterceptPublishMessage; -import io.moquette.interception.messages.InterceptSubscribeMessage; -import io.moquette.interception.messages.InterceptUnsubscribeMessage; - -import java.time.Instant; - -public class MoquetteMqttMetricsEmmitter { - - private final MetricFactory metricFactory = new MetricFactory(MqttMetrics.MOQUETTE_MQTT_NAMESPACE); - - /** - * Emits Moquette MQTT metrics. - * - * @param metric {@link Metric} to be emitted in Moquette MQTT namespace - */ - public void emitMetric(Metric metric) { - metricFactory.putMetricData(metric); - } - - public class MqttMetricsCaptor extends AbstractInterceptHandler implements InterceptHandler { - - @Override - public String getID() { - return "MoquetteMqttMetricsCaptor"; - } - - @Override - public void onConnect(InterceptConnectMessage msg) { - emitMetric(Metric.builder() - .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) - .name(MqttMetrics.CONNECT_SUCCESS) - .unit(TelemetryUnit.Count) - .aggregation(TelemetryAggregation.Sum) - .value(1) - .timestamp(Instant.now().toEpochMilli()) - .build()); - } - - @Override - public void onDisconnect(InterceptDisconnectMessage msg) { - emitMetric(Metric.builder() - .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) - .name(MqttMetrics.DISCONNECT) - .unit(TelemetryUnit.Count) - .aggregation(TelemetryAggregation.Sum) - .value(1) - .timestamp(Instant.now().toEpochMilli()) - .build()); - } - - @Override - public void onPublish(InterceptPublishMessage msg) { - emitMetric(Metric.builder() - .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) - .name(MqttMetrics.PUBLISH_OUT_SUCCESS) - .unit(TelemetryUnit.Count) - .aggregation(TelemetryAggregation.Sum) - .value(1) - .timestamp(Instant.now().toEpochMilli()) - .build()); - } - - @Override - public void onSubscribe(InterceptSubscribeMessage msg) { - emitMetric(Metric.builder() - .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) - .name(MqttMetrics.SUBSCRIBE_SUCCESS) - .unit(TelemetryUnit.Count) - .aggregation(TelemetryAggregation.Sum) - .value(1) - .timestamp(Instant.now().toEpochMilli()) - .build()); - } - - @Override - public void onUnsubscribe(InterceptUnsubscribeMessage msg) { - emitMetric(Metric.builder() - .namespace(MqttMetrics.MOQUETTE_MQTT_NAMESPACE) - .name(MqttMetrics.UNSUBSCRIBE) - .unit(TelemetryUnit.Count) - .aggregation(TelemetryAggregation.Sum) - .value(1) - .timestamp(Instant.now().toEpochMilli()) - .build()); - } - } -} diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java deleted file mode 100644 index 9e5461ab..00000000 --- a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetrics.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * SPDX-License-Identifier: Apache-2.0 - */ - -package com.aws.greengrass.mqtt.moquette.metrics; - - -public final class MqttMetrics { - public static final String MOQUETTE_MQTT_NAMESPACE = "MoquetteMqtt"; - public static final String CONNECT_SUCCESS = "Connect.Success"; - public static final String CONNECT_AUTH_ERROR = "Connect.AuthError"; - public static final String SUBSCRIBE_SUCCESS = "Subscribe.Success"; - public static final String SUBSCRIBE_AUTH_ERROR = "Subscribe.AuthError"; - public static final String PUBLISH_OUT_SUCCESS = "PublishOut.Success"; - public static final String PUBLISH_IN_AUTH_ERROR = "PublishIn.AuthError"; - public static final String DISCONNECT = "Disconnect"; - public static final String UNSUBSCRIBE = "Unsubscribe"; - public static final String UNKNOWN_AUTH_ERROR = "UnknownAuthError"; -} diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsCaptor.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsCaptor.java new file mode 100644 index 00000000..49850f3c --- /dev/null +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsCaptor.java @@ -0,0 +1,49 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.mqtt.moquette.metrics; + +import com.aws.greengrass.mqtt.moquette.metrics.MetricsStore.MqttMetric; +import io.moquette.interception.AbstractInterceptHandler; +import io.moquette.interception.InterceptHandler; +import io.moquette.interception.messages.InterceptConnectMessage; +import io.moquette.interception.messages.InterceptDisconnectMessage; +import io.moquette.interception.messages.InterceptPublishMessage; +import io.moquette.interception.messages.InterceptSubscribeMessage; +import io.moquette.interception.messages.InterceptUnsubscribeMessage; + + +public class MqttMetricsCaptor extends AbstractInterceptHandler implements InterceptHandler { + + @Override + public String getID() { + return "MoquetteMqttMetricsCaptor"; + } + + @Override + public void onConnect(InterceptConnectMessage msg) { + MetricsStore.getInstance().incrementMetricValue(MqttMetric.CONNECT_SUCCESS); + } + + @Override + public void onDisconnect(InterceptDisconnectMessage msg) { + MetricsStore.getInstance().incrementMetricValue(MqttMetric.DISCONNECT); + } + + @Override + public void onPublish(InterceptPublishMessage msg) { + MetricsStore.getInstance().incrementMetricValue(MqttMetric.PUBLISH_OUT_SUCCESS); + } + + @Override + public void onSubscribe(InterceptSubscribeMessage msg) { + MetricsStore.getInstance().incrementMetricValue(MqttMetric.SUBSCRIBE_SUCCESS); + } + + @Override + public void onUnsubscribe(InterceptUnsubscribeMessage msg) { + MetricsStore.getInstance().incrementMetricValue(MqttMetric.UNSUBSCRIBE); + } +} diff --git a/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsEmitter.java b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsEmitter.java new file mode 100644 index 00000000..f2c364f2 --- /dev/null +++ b/integration/src/main/java/com/aws/greengrass/mqtt/moquette/metrics/MqttMetricsEmitter.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.mqtt.moquette.metrics; + +import com.aws.greengrass.telemetry.impl.Metric; +import com.aws.greengrass.telemetry.impl.MetricFactory; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import javax.inject.Inject; + +public class MqttMetricsEmitter { + private static final long DEFAULT_METRIC_EMIT_FREQUENCY_SECONDS = 30; + + private final ScheduledExecutorService ses; + private final MetricFactory metricFactory = new MetricFactory(MetricsStore.LOCAL_MQTT_NAMESPACE); + + private ScheduledFuture emitFuture; + + /** + * Constructor. + * + * @param ses ScheduledExecutorService for periodic metric emission + */ + @Inject + public MqttMetricsEmitter(ScheduledExecutorService ses) { + this.ses = ses; + } + + /** + * Schedules periodic MQTT metric emission. + */ + public void schedulePeriodicMetricEmit() { + if (emitFuture != null) { + emitFuture.cancel(true); + } + + emitFuture = ses.scheduleAtFixedRate(this::emitMetrics, DEFAULT_METRIC_EMIT_FREQUENCY_SECONDS, + DEFAULT_METRIC_EMIT_FREQUENCY_SECONDS, TimeUnit.SECONDS); + } + + private void emitMetrics() { + List metrics = MetricsStore.getInstance().getAndResetTelemetryMetrics(); + metrics.forEach(metricFactory::putMetricData); + } + +} diff --git a/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java index 5704644a..34368e06 100644 --- a/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java +++ b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/ClientDeviceAuthorizerTest.java @@ -9,7 +9,6 @@ import com.aws.greengrass.clientdevices.auth.api.ClientDevicesAuthServiceApi; import com.aws.greengrass.clientdevices.auth.exception.AuthenticationException; import com.aws.greengrass.clientdevices.auth.exception.AuthorizationException; -import com.aws.greengrass.mqtt.moquette.metrics.MoquetteMqttMetricsEmmitter; import com.aws.greengrass.testcommons.testutilities.GGExtension; import com.aws.greengrass.testcommons.testutilities.GGServiceTestUtil; import io.moquette.broker.subscriptions.Topic; @@ -32,8 +31,6 @@ public class ClientDeviceAuthorizerTest extends GGServiceTestUtil { @Mock ClientDevicesAuthServiceApi mockClientDevicesAuthService; - @Mock - MoquetteMqttMetricsEmmitter mockMqttMetricsEmmitter; private static final String DEFAULT_SESSION = "SESSION_ID"; private static final String DEFAULT_CLIENT = "clientId"; @@ -75,13 +72,13 @@ void configureSubscribeResponse(boolean doAllow) throws AuthorizationException { @Test void GIVEN_clientDataWithoutCertificate_WHEN_checkValid_THEN_returnsFalse() { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); assertThat(authorizer.checkValid(DEFAULT_CLIENT, EMPTY_PEER_CERT, DEFAULT_PASSWORD), is(false)); } @Test void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalseAndClosesSession() throws Exception { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(false); @@ -95,7 +92,7 @@ void GIVEN_duplicateClientIds_WHEN_checkValid_THEN_firstSessionClosed() throws A AuthorizationException { final String USERNAME1 = "PeerCert1"; final String USERNAME2 = "PeerCert2"; - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn("SESSION1"); configureConnectResponse("SESSION1", DEFAULT_CLIENT, true); @@ -118,7 +115,7 @@ void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalse(ExtensionContext throws AuthenticationException { ignoreExceptionOfType(context, AuthenticationException.class); - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenThrow( new AuthenticationException("Invalid client")); @@ -128,7 +125,7 @@ void GIVEN_unauthorizedClient_WHEN_checkValid_THEN_returnsFalse(ExtensionContext @Test void GIVEN_authorizedClient_WHEN_checkValid_THEN_returnsTrue() throws Exception { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); @@ -141,7 +138,7 @@ void GIVEN_unknownClient_WHEN_canReadCanWrite_THEN_returnsFalse(ExtensionContext throws AuthenticationException { ignoreExceptionOfType(context, AuthenticationException.class); - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenThrow( new AuthenticationException("Invalid client")); @@ -152,7 +149,7 @@ void GIVEN_unknownClient_WHEN_canReadCanWrite_THEN_returnsFalse(ExtensionContext @Test void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsFalse() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); @@ -166,7 +163,7 @@ void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsFalse() throws Au @Test void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsTrue() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); @@ -181,7 +178,7 @@ void GIVEN_unauthorizedClient_WHEN_canReadCanWrite_THEN_returnsTrue() throws Aut @Test void GIVEN_twoClientsWithDifferingPermissions_WHEN_canReadCanWrite_THEN_correctSessionIsUsed() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); String session1 = "SESSION_ID1"; String session2 = "SESSION_ID2"; String client1 = "clientId1"; @@ -223,7 +220,7 @@ void GIVEN_twoClientsWithDifferingPermissions_WHEN_canReadCanWrite_THEN_correctS @Test void GIVEN_authorizedClient_WHEN_onDisconnect_THEN_closeAuthSession() throws AuthenticationException, AuthorizationException { - ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService, mockMqttMetricsEmmitter); + ClientDeviceAuthorizer authorizer = new ClientDeviceAuthorizer(mockClientDevicesAuthService); when(mockClientDevicesAuthService.getClientDeviceAuthToken(anyString(), anyMap())).thenReturn(DEFAULT_SESSION); configureConnectResponse(true); diff --git a/integration/src/test/java/com/aws/greengrass/mqtt/moquette/MQTTServiceTest.java b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/MQTTServiceTest.java index 4abc82d3..1b434139 100644 --- a/integration/src/test/java/com/aws/greengrass/mqtt/moquette/MQTTServiceTest.java +++ b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/MQTTServiceTest.java @@ -12,6 +12,7 @@ import com.aws.greengrass.lifecyclemanager.GreengrassService; import com.aws.greengrass.lifecyclemanager.Kernel; import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException; +import com.aws.greengrass.mqtt.moquette.metrics.MqttMetricsEmitter; import com.aws.greengrass.testcommons.testutilities.GGExtension; import com.aws.greengrass.testcommons.testutilities.GGServiceTestUtil; import org.junit.jupiter.api.AfterEach; @@ -36,6 +37,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @ExtendWith({MockitoExtension.class, GGExtension.class}) @@ -50,6 +52,9 @@ public class MQTTServiceTest extends GGServiceTestUtil { @Mock ClientDevicesAuthServiceApi mockCDAServiceApi; + @Mock + MqttMetricsEmitter mockMqttMetricsEmitter; + @BeforeEach void setup() { // Set this property for kernel to scan its own classpath to find plugins @@ -57,6 +62,7 @@ void setup() { kernel = new Kernel(); kernel.getContext().put(ClientDevicesAuthServiceApi.class, mockCDAServiceApi); + kernel.getContext().put(MqttMetricsEmitter.class, mockMqttMetricsEmitter); } @AfterEach @@ -113,4 +119,10 @@ void GIVEN_nonDefaultPort_WHEN_startComponent_THEN_brokerStartsOnConfiguredPort( assertThat(isListeningOnPort(8883), is(false)); assertThat(isListeningOnPort(1883), is(false)); } + + @Test + void GIVEN_mqttMetricsEmitter_WHEN_startComponent_THEN_schedulesPeriodicMetricEmit() throws InterruptedException { + startNucleusWithConfig("config.yaml"); + verify(mockMqttMetricsEmitter, times(1)).schedulePeriodicMetricEmit(); + } } diff --git a/integration/src/test/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStoreTest.java b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStoreTest.java new file mode 100644 index 00000000..ecd20d44 --- /dev/null +++ b/integration/src/test/java/com/aws/greengrass/mqtt/moquette/metrics/MetricsStoreTest.java @@ -0,0 +1,65 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.aws.greengrass.mqtt.moquette.metrics; + +import com.aws.greengrass.mqtt.moquette.metrics.MetricsStore.MqttMetric; +import com.aws.greengrass.telemetry.impl.Metric; +import com.aws.greengrass.testcommons.testutilities.GGExtension; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalToObject; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +@ExtendWith({MockitoExtension.class, GGExtension.class}) +class MetricsStoreTest { + + private final MetricsStore metricsStore = MetricsStore.getInstance(); + + @AfterEach + void reset() { + metricsStore.getAndResetTelemetryMetrics(); + } + + @Test + void GIVEN_MetricsStore_WHEN_getInstance_THEN_returnsSingletonMetricsStore() { + MetricsStore anotherStoreRef = MetricsStore.getInstance(); + assertThat(anotherStoreRef, is(notNullValue())); + assertThat(anotherStoreRef, equalToObject(metricsStore)); + } + + @Test + void GIVEN_MetricsStore_WHEN_incrementMetric_and_getAndResetTelemetryMetrics_THEN_returnsUpdatedTelemetryMetric_and_reset_store() { + // verify that the metrics are initialized with zero value + Map metrics = metricsStore.getMetrics(); + assertThat(metrics.get(MqttMetric.CONNECT_SUCCESS).get(), is(0)); + + // verify the metrics value increment + metricsStore.incrementMetricValue(MqttMetric.CONNECT_SUCCESS); + List telemetryMetrics = metricsStore.getAndResetTelemetryMetrics(); + Metric connectSuccessMetric = + lookupTelemetryMetricByName(telemetryMetrics, MqttMetric.CONNECT_SUCCESS.toString()).get(); + assertThat(connectSuccessMetric, is(notNullValue())); + assertThat(connectSuccessMetric.getValue(), is(1)); + + // verify metrics are reset to zero after last getAndResetTelemetryMetrics call + int newConnectSuccessMetricValue = metricsStore.getMetrics().get(MqttMetric.CONNECT_SUCCESS).get(); + assertThat(newConnectSuccessMetricValue, is(0)); + } + + private Optional lookupTelemetryMetricByName(List telemetryMetrics, String name) { + return telemetryMetrics.stream().filter(metric -> metric.getName().equals(name)).findAny(); + } +}