Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit mqtt metrics #106

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.aws.greengrass.clientdevices.auth.exception.AuthorizationException;
import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.mqtt.moquette.metrics.MetricsStore;
import com.aws.greengrass.mqtt.moquette.metrics.MetricsStore.MqttMetric;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.IAuthorizatorPolicy;
import io.moquette.broker.subscriptions.Topic;
Expand All @@ -29,6 +31,9 @@ public class ClientDeviceAuthorizer implements IAuthenticator, IAuthorizatorPoli
private static final String CERTIFICATE_PEM = "certificatePem";
private static final String SESSION_ID = "sessionId";
private static final String MQTT_CREDENTIAL = "mqtt";
private static final String MQTT_CONNECT_OP = "mqtt:connect";
private static final String MQTT_PUBLISH_OP = "mqtt:publish";
private static final String MQTT_SUBSCRIBE_OP = "mqtt:subscribe";

private final ClientDevicesAuthServiceApi clientDevicesAuthService;
private final Map<String, UserSessionPair> clientToSessionMap;
Expand Down Expand Up @@ -63,7 +68,7 @@ public boolean checkValid(String clientId, String username, byte[] password) {
return false;
}

boolean canConnect = canDevicePerform(sessionId, "mqtt:connect", "mqtt:clientId:" + clientId);
boolean canConnect = canDevicePerform(sessionId, MQTT_CONNECT_OP, "mqtt:clientId:" + clientId);

// Add mapping from client id to session id for future canRead/canWrite calls
if (canConnect) {
Expand All @@ -80,6 +85,7 @@ public boolean checkValid(String clientId, String username, byte[] password) {
} else {
LOG.atWarn().kv(CLIENT_ID, clientId).kv(SESSION_ID, sessionId).log("Device isn't authorized to connect");
clientDevicesAuthService.closeClientDeviceAuthSession(sessionId);
emitAuthErrorMetric(MQTT_CONNECT_OP);
}

return canConnect;
Expand All @@ -90,12 +96,16 @@ public boolean canWrite(Topic topic, String user, String client) {
String resource = "mqtt:topic:" + topic;
boolean canPerform = false;
try {
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), "mqtt:publish", resource);
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), MQTT_PUBLISH_OP, resource);
} catch (AuthenticationException e) {
LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client.");
}
LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client)
.log("MQTT publish request");

if (!canPerform) {
emitAuthErrorMetric(MQTT_PUBLISH_OP);
}
return canPerform;
}

Expand All @@ -104,12 +114,16 @@ public boolean canRead(Topic topic, String user, String client) {
String resource = "mqtt:topicfilter:" + topic;
boolean canPerform = false;
try {
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), "mqtt:subscribe", resource);
canPerform = canDevicePerform(getOrCreateSessionForClient(client, user), MQTT_SUBSCRIBE_OP, resource);
} catch (AuthenticationException e) {
LOG.atWarn().cause(e).kv(CLIENT_ID, client).log("Unable to re-authenticate client.");
}
LOG.atDebug().kv("topic", topic).kv("isAllowed", canPerform).kv(CLIENT_ID, client)
.log("MQTT subscribe request");

if (!canPerform) {
emitAuthErrorMetric(MQTT_SUBSCRIBE_OP);
}
return canPerform;
}

Expand Down Expand Up @@ -201,4 +215,31 @@ private void closeAuthSession(String clientId, String username) {
}
}
}

/**
* Emits MQTT AuthError metrics for requested MQTT operation.
* Ideally these metrics should be emitted from the Moquette request handlers.
* Emitting metrics from Authorizer integration to avoid broker code change.
*
* @param operation Requested MQTT operation
*/
private void emitAuthErrorMetric(String operation) {
MqttMetric authErrorMetric;
switch (operation) {
case MQTT_CONNECT_OP:
authErrorMetric = MqttMetric.CONNECT_AUTH_ERROR;
break;
case MQTT_PUBLISH_OP:
authErrorMetric = MqttMetric.PUBLISH_IN_AUTH_ERROR;
break;
case MQTT_SUBSCRIBE_OP:
authErrorMetric = MqttMetric.SUBSCRIBE_AUTH_ERROR;
break;
default:
authErrorMetric = MqttMetric.UNKNOWN_AUTH_ERROR;
break;
}

MetricsStore.getInstance().incrementMetricValue(authErrorMetric);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.lifecyclemanager.PluginService;
import com.aws.greengrass.mqtt.moquette.metrics.MqttMetricsCaptor;
import com.aws.greengrass.mqtt.moquette.metrics.MqttMetricsEmitter;
import com.aws.greengrass.util.Coerce;
import io.moquette.BrokerConstants;
import io.moquette.broker.ISslContextCreator;
Expand All @@ -27,7 +29,7 @@

import java.io.IOException;
import java.security.KeyStoreException;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import javax.inject.Inject;
Expand Down Expand Up @@ -56,19 +58,22 @@ public class MQTTService extends PluginService {
* @param topics Root Configuration topic for this service
* @param kernel Greengrass Nucleus
* @param clientDevicesAuthService Client devices auth service handle
* @param mqttMetricsEmitter MQTT metrics emitter
*/
@Inject
public MQTTService(Topics topics, Kernel kernel, ClientDevicesAuthServiceApi clientDevicesAuthService) {
public MQTTService(Topics topics, Kernel kernel, ClientDevicesAuthServiceApi clientDevicesAuthService,
MqttMetricsEmitter mqttMetricsEmitter) {
super(topics);
this.kernel = kernel;
this.clientDeviceTrustManager = new ClientDeviceTrustManager(clientDevicesAuthService);
this.clientDeviceAuthorizer = new ClientDeviceAuthorizer(clientDevicesAuthService);
this.interceptHandlers = Collections.singletonList(clientDeviceAuthorizer.new ConnectionTerminationListener());
this.interceptHandlers = Arrays.asList(clientDeviceAuthorizer.new ConnectionTerminationListener(),
new MqttMetricsCaptor());
this.clientDevicesAuthServiceApi = clientDevicesAuthService;

GetCertificateRequestOptions options = new GetCertificateRequestOptions();
options.setCertificateType(GetCertificateRequestOptions.CertificateType.SERVER);
serverCertificateRequest = new GetCertificateRequest(SERVICE_NAME, options, this::updateServerCertificate);
mqttMetricsEmitter.schedulePeriodicMetricEmit();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.mqtt.moquette.metrics;


import com.aws.greengrass.telemetry.impl.Metric;
import com.aws.greengrass.telemetry.models.TelemetryAggregation;
import com.aws.greengrass.telemetry.models.TelemetryUnit;
import lombok.AccessLevel;
import lombok.Getter;

import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class MetricsStore {
public static final String LOCAL_MQTT_NAMESPACE = "LocalMQTT";
@Getter(AccessLevel.PACKAGE)
private final Map<MqttMetric, AtomicInteger> metrics = new HashMap<>();

private MetricsStore() {
init();
}

private static class MetricsStoreHelper {
@SuppressWarnings("PMD.AccessorClassGeneration")
private static final MetricsStore INSTANCE = new MetricsStore();
}

/**
* Gets the singleton instance of MetricsStore with lazy initialization.
*
* @return {@link MetricsStore}
*/
public static MetricsStore getInstance() {
return MetricsStoreHelper.INSTANCE;
}

public void incrementMetricValue(MqttMetric metric) {
Optional.ofNullable(metrics.get(metric)).map(AtomicInteger::incrementAndGet);
}

public List<Metric> getAndResetTelemetryMetrics() {
return metrics.keySet().stream().map(this::getTelemetryMetricAndReset)
.collect(Collectors.toList());
}

private Metric getTelemetryMetricAndReset(MqttMetric metric) {
int metricValue = metrics.get(metric).getAndSet(0);
return Metric.builder()
.namespace(LOCAL_MQTT_NAMESPACE)
.name(metric.toString())
.unit(TelemetryUnit.Count)
.aggregation(TelemetryAggregation.Sum)
.value(metricValue)
.timestamp(Instant.now().toEpochMilli())
.build();
}

private void init() {
metrics.put(MqttMetric.CONNECT_SUCCESS, new AtomicInteger(0));
metrics.put(MqttMetric.CONNECT_AUTH_ERROR, new AtomicInteger(0));
metrics.put(MqttMetric.SUBSCRIBE_SUCCESS, new AtomicInteger(0));
metrics.put(MqttMetric.SUBSCRIBE_AUTH_ERROR, new AtomicInteger(0));
metrics.put(MqttMetric.PUBLISH_OUT_SUCCESS, new AtomicInteger(0));
metrics.put(MqttMetric.PUBLISH_IN_AUTH_ERROR, new AtomicInteger(0));
metrics.put(MqttMetric.UNSUBSCRIBE, new AtomicInteger(0));
metrics.put(MqttMetric.DISCONNECT, new AtomicInteger(0));
metrics.put(MqttMetric.UNKNOWN_AUTH_ERROR, new AtomicInteger(0));
}

public enum MqttMetric {
CONNECT_SUCCESS("Connect.Success"),
CONNECT_AUTH_ERROR("Connect.AuthError"),
SUBSCRIBE_SUCCESS("Subscribe.Success"),
SUBSCRIBE_AUTH_ERROR("Subscribe.AuthError"),
PUBLISH_OUT_SUCCESS("PublishOut.Success"),
PUBLISH_IN_AUTH_ERROR("PublishIn.AuthError"),
DISCONNECT("Disconnect"),
UNSUBSCRIBE("Unsubscribe"),
UNKNOWN_AUTH_ERROR("UnknownAuthError");

private final String name;
MqttMetric(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.mqtt.moquette.metrics;

import com.aws.greengrass.mqtt.moquette.metrics.MetricsStore.MqttMetric;
import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.InterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.moquette.interception.messages.InterceptSubscribeMessage;
import io.moquette.interception.messages.InterceptUnsubscribeMessage;


public class MqttMetricsCaptor extends AbstractInterceptHandler implements InterceptHandler {

@Override
public String getID() {
return "MoquetteMqttMetricsCaptor";
}

@Override
public void onConnect(InterceptConnectMessage msg) {
MetricsStore.getInstance().incrementMetricValue(MqttMetric.CONNECT_SUCCESS);
}

@Override
public void onDisconnect(InterceptDisconnectMessage msg) {
MetricsStore.getInstance().incrementMetricValue(MqttMetric.DISCONNECT);
}

@Override
public void onPublish(InterceptPublishMessage msg) {
MetricsStore.getInstance().incrementMetricValue(MqttMetric.PUBLISH_OUT_SUCCESS);
}

@Override
public void onSubscribe(InterceptSubscribeMessage msg) {
MetricsStore.getInstance().incrementMetricValue(MqttMetric.SUBSCRIBE_SUCCESS);
}

@Override
public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
MetricsStore.getInstance().incrementMetricValue(MqttMetric.UNSUBSCRIBE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.mqtt.moquette.metrics;

import com.aws.greengrass.telemetry.impl.Metric;
import com.aws.greengrass.telemetry.impl.MetricFactory;

import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.inject.Inject;

public class MqttMetricsEmitter {
private static final long DEFAULT_METRIC_EMIT_FREQUENCY_SECONDS = 30;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this configurable?


private final ScheduledExecutorService ses;
private final MetricFactory metricFactory = new MetricFactory(MetricsStore.LOCAL_MQTT_NAMESPACE);

private ScheduledFuture<?> emitFuture;

/**
* Constructor.
*
* @param ses ScheduledExecutorService for periodic metric emission
*/
@Inject
public MqttMetricsEmitter(ScheduledExecutorService ses) {
this.ses = ses;
}

/**
* Schedules periodic MQTT metric emission.
*/
public void schedulePeriodicMetricEmit() {
if (emitFuture != null) {
emitFuture.cancel(true);
}

emitFuture = ses.scheduleAtFixedRate(this::emitMetrics, DEFAULT_METRIC_EMIT_FREQUENCY_SECONDS,
DEFAULT_METRIC_EMIT_FREQUENCY_SECONDS, TimeUnit.SECONDS);
}

private void emitMetrics() {
List<Metric> metrics = MetricsStore.getInstance().getAndResetTelemetryMetrics();
metrics.forEach(metricFactory::putMetricData);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.aws.greengrass.lifecyclemanager.GreengrassService;
import com.aws.greengrass.lifecyclemanager.Kernel;
import com.aws.greengrass.lifecyclemanager.exceptions.ServiceLoadException;
import com.aws.greengrass.mqtt.moquette.metrics.MqttMetricsEmitter;
import com.aws.greengrass.testcommons.testutilities.GGExtension;
import com.aws.greengrass.testcommons.testutilities.GGServiceTestUtil;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -36,6 +37,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@ExtendWith({MockitoExtension.class, GGExtension.class})
Expand All @@ -50,13 +52,17 @@ public class MQTTServiceTest extends GGServiceTestUtil {
@Mock
ClientDevicesAuthServiceApi mockCDAServiceApi;

@Mock
MqttMetricsEmitter mockMqttMetricsEmitter;

@BeforeEach
void setup() {
// Set this property for kernel to scan its own classpath to find plugins
System.setProperty("aws.greengrass.scanSelfClasspath", "true");

kernel = new Kernel();
kernel.getContext().put(ClientDevicesAuthServiceApi.class, mockCDAServiceApi);
kernel.getContext().put(MqttMetricsEmitter.class, mockMqttMetricsEmitter);
}

@AfterEach
Expand Down Expand Up @@ -113,4 +119,10 @@ void GIVEN_nonDefaultPort_WHEN_startComponent_THEN_brokerStartsOnConfiguredPort(
assertThat(isListeningOnPort(8883), is(false));
assertThat(isListeningOnPort(1883), is(false));
}

@Test
void GIVEN_mqttMetricsEmitter_WHEN_startComponent_THEN_schedulesPeriodicMetricEmit() throws InterruptedException {
startNucleusWithConfig("config.yaml");
verify(mockMqttMetricsEmitter, times(1)).schedulePeriodicMetricEmit();
}
}
Loading