Skip to content

Commit

Permalink
chore: reapply GG changes to moquette 0.17
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeDombo committed Sep 14, 2023
1 parent 657f725 commit fdb7b0f
Show file tree
Hide file tree
Showing 32 changed files with 420 additions and 313 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
greengrass-build/
integration/target/**
moquette_messages.log
target/
.gradle/
*.iml
2 changes: 1 addition & 1 deletion integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@
<dependency>
<groupId>io.moquette</groupId>
<artifactId>moquette-broker</artifactId>
<version>0.16-gg</version>
<version>0.17-gg</version>
</dependency>
</dependencies>
<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,11 @@ public void onConnectionLost(InterceptConnectionLostMessage msg) {
closeAuthSession(msg.getClientID(), msg.getUsername());
}

@Override
public void onSessionLoopError(Throwable error) {
LOG.atWarn().log("Moquette session error", error);
}

private void closeAuthSession(String clientId, String username) {
UserSessionPair sessionPair = getSessionForClient(clientId, username);
if (sessionPair != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,14 @@ private synchronized void startWithProperties(Properties properties, boolean for
IConfig config = new MemoryConfig(properties);
ISslContextCreator sslContextCreator =
new GreengrassMoquetteSslContextCreator(config, clientDeviceTrustManager);
mqttBroker.startServer(config, interceptHandlers, sslContextCreator, clientDeviceAuthorizer,
clientDeviceAuthorizer);
try {
mqttBroker.startServer(config, interceptHandlers, sslContextCreator, clientDeviceAuthorizer,
clientDeviceAuthorizer);
} catch (IOException e) {
// IO Exception can only be thrown from H2 right now and we do not configure moquette to use h2.
serviceErrored(e);
return;
}
serverRunning = true;
runningProperties = properties;
}
Expand Down Expand Up @@ -182,6 +188,9 @@ private Properties getProperties() {
//Disable plain TCP port
p.setProperty(BrokerConstants.PORT_PROPERTY_NAME, BrokerConstants.DISABLED_PORT_BIND);

// Telemetry is actually deleted from the code base, but just set the flag here to be sure.
p.setProperty(BrokerConstants.ENABLE_TELEMETRY_NAME, "false");

return p;
}
}
6 changes: 3 additions & 3 deletions moquette-0.17/broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<relativePath>../pom.xml</relativePath>
<artifactId>moquette-parent</artifactId>
<groupId>io.moquette</groupId>
<version>0.17</version>
<version>0.17-gg</version>
</parent>

<artifactId>moquette-broker</artifactId>
Expand All @@ -14,12 +14,12 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netty.version>4.1.93.Final</netty.version>
<netty.version>4.1.94.Final</netty.version>
<!-- Check Netty pom.xm to know the right version of tcnative, for example
https://github.com/netty/netty/blob/netty-4.1.93.Final/pom.xml#L625 -->
<netty.tcnative.version>2.0.61.Final</netty.tcnative.version>
<paho.version>1.2.5</paho.version>
<h2.version>2.1.212</h2.version>
<h2.version>2.2.220</h2.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public final class BrokerConstants {
public static final String KEY_MANAGER_PASSWORD_PROPERTY_NAME = IConfig.KEY_MANAGER_PASSWORD_PROPERTY_NAME;
@Deprecated
public static final String ALLOW_ANONYMOUS_PROPERTY_NAME = IConfig.ALLOW_ANONYMOUS_PROPERTY_NAME;
public static final String PEER_CERTIFICATE_AS_USERNAME = "peer_certificate_as_username";
public static final String REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT = "reauthorize_subscriptions_on_connect";
public static final String ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME = "allow_zero_byte_client_id";
@Deprecated
Expand Down Expand Up @@ -103,6 +104,7 @@ public final class BrokerConstants {
public static final String NETTY_MAX_BYTES_PROPERTY_NAME = IConfig.NETTY_MAX_BYTES_PROPERTY_NAME;
@Deprecated
public static final int DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE = IConfig.DEFAULT_NETTY_MAX_BYTES_IN_MESSAGE;
public static final String NETTY_ENABLED_TLS_PROTOCOLS_PROPERTY_NAME = "netty.enabled.tls.protocols";
/**
* @deprecated use the BUFFER_FLUSH_MS_PROPERTY_NAME
* */
Expand All @@ -126,6 +128,11 @@ public final class BrokerConstants {

public static final String STORAGE_CLASS_NAME = "storage_class";

public static final String NETTY_CHANNEL_WRITE_LIMIT_PROPERTY_NAME = "netty.channel.write.limit";
public static final int DEFAULT_NETTY_CHANNEL_WRITE_LIMIT_BYTES = 512 * 1024;
public static final String NETTY_CHANNEL_READ_LIMIT_PROPERTY_NAME = "netty.channel.read.limit";
public static final int DEFAULT_NETTY_CHANNEL_READ_LIMIT_BYTES = 512 * 1024;

@Deprecated
public static final String PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME = IConfig.PERSISTENT_CLIENT_EXPIRATION_PROPERTY_NAME;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ class BrokerConfiguration {
private final boolean allowZeroByteClientId;
private final boolean reauthorizeSubscriptionsOnConnect;
private final int bufferFlushMillis;
private final boolean peerCertificateAsUsername;

BrokerConfiguration(IConfig props) {
allowAnonymous = props.boolProp(BrokerConstants.ALLOW_ANONYMOUS_PROPERTY_NAME, true);
allowZeroByteClientId = props.boolProp(BrokerConstants.ALLOW_ZERO_BYTE_CLIENT_ID_PROPERTY_NAME, false);
reauthorizeSubscriptionsOnConnect = props.boolProp(BrokerConstants.REAUTHORIZE_SUBSCRIPTIONS_ON_CONNECT, false);
peerCertificateAsUsername = props.boolProp(BrokerConstants.PEER_CERTIFICATE_AS_USERNAME, false);

// BUFFER_FLUSH_MS_PROPERTY_NAME has precedence over the deprecated IMMEDIATE_BUFFER_FLUSH_PROPERTY_NAME
final String bufferFlushMillisProp = props.getProperty(BrokerConstants.BUFFER_FLUSH_MS_PROPERTY_NAME);
Expand Down Expand Up @@ -65,10 +67,17 @@ class BrokerConfiguration {

public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis) {
this(allowAnonymous, allowZeroByteClientId, reauthorizeSubscriptionsOnConnect, bufferFlushMillis, false);
}

public BrokerConfiguration(boolean allowAnonymous, boolean allowZeroByteClientId,
boolean reauthorizeSubscriptionsOnConnect, int bufferFlushMillis,
boolean peerCertificateAsUsername) {
this.allowAnonymous = allowAnonymous;
this.allowZeroByteClientId = allowZeroByteClientId;
this.reauthorizeSubscriptionsOnConnect = reauthorizeSubscriptionsOnConnect;
this.bufferFlushMillis = bufferFlushMillis;
this.peerCertificateAsUsername = peerCertificateAsUsername;
}

public boolean isAllowAnonymous() {
Expand All @@ -86,4 +95,8 @@ public boolean isReauthorizeSubscriptionsOnConnect() {
public int getBufferFlushMillis() {
return bufferFlushMillis;
}

public boolean isPeerCertificateAsUsername() {
return peerCertificateAsUsername;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@

package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.config.IConfig;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -32,17 +41,8 @@
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.Objects;

import io.moquette.BrokerConstants;
import io.moquette.broker.config.IConfig;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Moquette integration implementation to load SSL certificate from local filesystem path configured in
Expand Down Expand Up @@ -89,6 +89,14 @@ public SslContext initSSLContext() {
if (Boolean.valueOf(sNeedsClientAuth)) {
addClientAuthentication(ks, contextBuilder);
}

// if enabled tls protocols are not provided, we use the default
String enabledTLSProtocols = props.getProperty(BrokerConstants.NETTY_ENABLED_TLS_PROTOCOLS_PROPERTY_NAME);
if (enabledTLSProtocols != null) {
LOG.info(String.format("Enabled TLS Protocols: {%s}", enabledTLSProtocols));
contextBuilder.protocols(enabledTLSProtocols.split(";"));
}

contextBuilder.sslProvider(sslProvider);
SslContext sslContext = contextBuilder.build();
LOG.info("The SSL context has been initialized successfully.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,59 @@
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.security.PemUtils;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.*;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLPeerUnverifiedException;

import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;

final class MQTTConnection {

Expand Down Expand Up @@ -297,25 +326,46 @@ private void abortConnection(MqttConnectReturnCode returnCode) {
}

private boolean login(MqttConnectMessage msg, final String clientId) {
// handle user authentication
String userName = null;
byte[] pwd = null;

if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
userName = msg.payload().userName();
// MQTT 3.1.2.9 does not mandate that there is a password - let the authenticator determine if it's needed
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().passwordInBytes();
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.info("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
return false;
}
final String login = msg.payload().userName();
if (!authenticator.checkValid(clientId, login, pwd)) {
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
}

if (brokerConfig.isPeerCertificateAsUsername()) {
try {
// Use peer cert as username
SslHandler sslhandler = (SslHandler) channel.pipeline().get("ssl");
if (sslhandler != null) {
Certificate[] certificateChain = sslhandler.engine().getSession().getPeerCertificates();
userName = PemUtils.certificatesToPem(certificateChain);
}
} catch (SSLPeerUnverifiedException e) {
LOG.debug("No peer cert provided. CId={}", clientId);
} catch (CertificateEncodingException | IOException e) {
LOG.warn("Unable to decode client certificate. CId={}", clientId);
}
}

if (userName == null || userName.isEmpty()) {
if (brokerConfig.isAllowAnonymous()) {
return true;
} else {
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
return false;
}
NettyUtils.userName(channel, login);
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.info("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
}

if (!authenticator.checkValid(clientId, userName, pwd)) {
LOG.info("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, userName);
return false;
}
NettyUtils.userName(channel, userName);
return true;
}

Expand Down
Loading

0 comments on commit fdb7b0f

Please sign in to comment.