Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for user/password authentication #46

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@
// // root_ca_certificate_base64: "base64-root-ca-certificate",
// },

////
//// MQTT client authentication related configuration.
////
// auth: {
// ////
// //// dictionary_file: Path to a file containing the MQTT client username/password dictionary.
// ////
// dictionary_file: "/path/to/dictionary-file",
// },

},

////
Expand Down
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The `"mqtt"` part of this same configuration file can also be used in the config
- **`-r, --generalise-sub <String>`** : A list of key expressions to use for generalising the declaration of
the zenoh subscriptions, and thus minimizing the discovery traffic (usable multiple times).
See [this blog](https://zenoh.io/blog/2021-03-23-discovery/#leveraging-resource-generalisation) for more details.
- **`--dictionary-file <FILE>`** : Path to a file containing the MQTT client username/password dictionary.
- **`--server-private-key <FILE>`** : Path to the TLS private key for the MQTT server. If specified a valid certificate for the server must also be provided.
- **`--server-certificate <FILE>`** : Path to the TLS public certificate for the MQTT server. If specified a valid private key for the server must also be provided.
- **`--root-ca-certificate <FILE>`** : Path to the certificate of the certificate authority used to validate clients connecting to the MQTT server. If specified a valid private key and certificate for the server must also be provided.
Expand Down Expand Up @@ -147,6 +148,39 @@ An example configuration file supporting server side authentication would be:
```
The standalone bridge (`zenoh-bridge-mqtt`) also allows the required file to be provided through the **`--root-ca-certificate`** command line argument.

## Username/password authentication

The MQTT plugin and standalone bridge for Eclipse Zenoh supports basic username/password authentication of MQTT clients. Credentials are provided via a dictionary file with each line containing the username and password for a single user in the following format:

```
username:password
```

Username/passord authentication can be configured via the configuration file or, if using the standalone bridge, via command line arguments.

In the configuration file, the required **auth** field for configuring the dictionary file is **dictionary_file**.

An example configuration file supporting username/password authentication would be:

```json
{
"plugins": {
"mqtt": {
"auth": {
"dictionary_file": "/path/to/dictionary-file",
}
}
}
}
```
The standalone bridge (`zenoh-bridge-mqtt`) also allows the required file to be provided through the **`--dictionary-file`** command line argument.

### Security considerations

Usernames and passwords are sent as part of the MQTT `CONNECT` message in clear text. As such, they can potentially be viewed using tools such as [Wireshark](https://www.wireshark.org/).

To prevent this, it is highly recommended that this feature is used in conjunction with the MQTTS feature to ensure credentials are encrypted on the wire.

## How to install it

To install the latest release of either the MQTT plugin for the Zenoh router, either the `zenoh-bridge-mqtt` standalone executable, you can do as follows:
Expand Down
4 changes: 4 additions & 0 deletions zenoh-bridge-mqtt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ r#"-r, --generalise-sub=[String]... 'A list of key expression to use for gener
r#"-w, --generalise-pub=[String]... 'A list of key expression to use for generalising publications (usable multiple times).'"#
))
.arg(Arg::from_usage(
r#"--dictionary-file=[FILE] 'Path to the file containing the MQTT client username/password dictionary.'"#
))
.arg(Arg::from_usage(
r#"--server-private-key=[FILE] 'Path to the TLS private key for the MQTT server. If specified a valid certificate for the server must also be provided.'"#
)
.requires("server-certificate"))
Expand Down Expand Up @@ -175,6 +178,7 @@ r#"--root-ca-certificate=[FILE] 'Path to the certificate of the certificate au
insert_json5!(config, args, "plugins/mqtt/deny", if "deny", );
insert_json5!(config, args, "plugins/mqtt/generalise_pubs", for "generalise-pub", .collect::<Vec<_>>());
insert_json5!(config, args, "plugins/mqtt/generalise_subs", for "generalise-sub", .collect::<Vec<_>>());
insert_json5!(config, args, "plugins/mqtt/auth/dictionary_file", if "dictionary-file", );
insert_json5!(config, args, "plugins/mqtt/tls/server_private_key", if "server-private-key", );
insert_json5!(config, args, "plugins/mqtt/tls/server_certificate", if "server-certificate", );
insert_json5!(config, args, "plugins/mqtt/tls/root_ca_certificate", if "root-ca-certificate", );
Expand Down
8 changes: 8 additions & 0 deletions zenoh-plugin-mqtt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub struct Config {
#[serde(default)]
pub tls: Option<TLSConfig>,
__required__: Option<bool>,
#[serde(default)]
pub auth: Option<AuthConfig>,
#[serde(default, deserialize_with = "deserialize_path")]
__path__: Option<Vec<String>>,
}
Expand All @@ -68,6 +70,12 @@ pub struct TLSConfig {
pub root_ca_certificate_base64: Option<SecretValue>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct AuthConfig {
pub dictionary_file: String,
}

fn default_mqtt_port() -> String {
format!("{DEFAULT_MQTT_INTERFACE}:{DEFAULT_MQTT_PORT}")
}
Expand Down
151 changes: 137 additions & 14 deletions zenoh-plugin-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
use ntex::io::IoBoxed;
use ntex::service::{chain_factory, fn_factory_with_config, fn_service};
use ntex::time::Deadline;
use ntex::util::Ready;
use ntex::util::{ByteString, Bytes, Ready};
use ntex::ServiceFactory;
use ntex_mqtt::{v3, v5, MqttError, MqttServer};
use ntex_tls::rustls::Acceptor;
use rustls::server::AllowAnyAuthenticatedClient;
use rustls::{Certificate, PrivateKey, RootCertStore, ServerConfig};
use secrecy::ExposeSecret;
use serde_json::Value;
use std::collections::HashMap;
use std::env;
use std::io::BufReader;
use std::sync::Arc;
Expand All @@ -41,7 +42,7 @@ extern crate zenoh_core;
pub mod config;
mod mqtt_helpers;
mod mqtt_session_state;
use config::{Config, TLSConfig};
use config::{AuthConfig, Config, TLSConfig};
use mqtt_session_state::MqttSessionState;

macro_rules! ke_for_sure {
Expand All @@ -61,6 +62,10 @@ zenoh_plugin_trait::declare_plugin!(MqttPlugin);

pub struct MqttPlugin;

// Authentication types
type User = Vec<u8>;
type Password = Vec<u8>;

impl ZenohPlugin for MqttPlugin {}
impl Plugin for MqttPlugin {
type StartArgs = Runtime;
Expand Down Expand Up @@ -88,15 +93,25 @@ impl Plugin for MqttPlugin {
None => None,
};

async_std::task::spawn(run(runtime.clone(), config, tls_config));
let auth_dictionary = match config.auth.as_ref() {
Some(auth) => Some(create_auth_dictionary(auth)?),
None => None,
};

async_std::task::spawn(run(runtime.clone(), config, tls_config, auth_dictionary));
Ok(Box::new(MqttPlugin))
}
}

impl PluginControl for MqttPlugin {}
impl RunningPluginTrait for MqttPlugin {}

async fn run(runtime: Runtime, config: Config, tls_config: Option<Arc<ServerConfig>>) {
async fn run(
runtime: Runtime,
config: Config,
tls_config: Option<Arc<ServerConfig>>,
auth_dictionary: Option<HashMap<User, Password>>,
) {
// Try to initiate login.
// Required in case of dynamic lib, otherwise no logs.
// But cannot be done twice in case of static link.
Expand Down Expand Up @@ -131,21 +146,34 @@ async fn run(runtime: Runtime, config: Config, tls_config: Option<Arc<ServerConf
.await
.expect("Failed to create AdminSpace queryable");

if auth_dictionary.is_some() && tls_config.is_none() {
log::warn!("Warning: MQTT client username/password authentication enabled without TLS!");
}

// Start MQTT Server task
let config = Arc::new(config);
let auth_dictionary = Arc::new(auth_dictionary);
ntex::rt::System::new(MqttPlugin::DEFAULT_NAME)
.block_on(async move {
let server = match tls_config {
Some(tls) => {
ntex::server::Server::build().bind("mqtt", config.port.clone(), move |_| {
chain_factory(Acceptor::new(tls.clone()))
.map_err(|err| MqttError::Service(MqttPluginError::from(err)))
.and_then(create_mqtt_server(zsession.clone(), config.clone()))
.and_then(create_mqtt_server(
zsession.clone(),
config.clone(),
auth_dictionary.clone(),
))
})?
}
None => {
ntex::server::Server::build().bind("mqtt", config.port.clone(), move |_| {
create_mqtt_server(zsession.clone(), config.clone())
create_mqtt_server(
zsession.clone(),
config.clone(),
auth_dictionary.clone(),
)
})?
}
};
Expand Down Expand Up @@ -284,9 +312,64 @@ fn load_trust_anchors(bytes: Vec<u8>) -> ZResult<RootCertStore> {
Ok(root_cert_store)
}

fn create_auth_dictionary(config: &AuthConfig) -> ZResult<HashMap<User, Password>> {
let mut dictionary: HashMap<User, Password> = HashMap::new();
let content = std::fs::read_to_string(config.dictionary_file.as_str())
.map_err(|e| zerror!("Invalid user/password dictionary file: {}", e))?;

// Populate the user/password dictionary
// The dictionary file is expected to be in the form of:
// usr1:pwd1
// usr2:pwd2
// usr3:pwd3
for line in content.lines() {
let idx = line
.find(':')
.ok_or_else(|| zerror!("Invalid user/password dictionary file: invalid format"))?;
let user = line[..idx].as_bytes().to_owned();
if user.is_empty() {
return Err(zerror!("Invalid user/password dictionary file: empty user").into());
}
let password = line[idx + 1..].as_bytes().to_owned();
if password.is_empty() {
return Err(zerror!("Invalid user/password dictionary file: empty password").into());
}
dictionary.insert(user, password);
}
Ok(dictionary)
}

fn is_authorized(
dictionary: Option<&HashMap<User, Password>>,
usr: Option<&ByteString>,
pwd: Option<&Bytes>,
) -> Result<(), String> {
match (dictionary, usr, pwd) {
// No user/password dictionary - all clients authorized to connect
(None, _, _) => Ok(()),
// User/password dictionary provided - clients must provide credentials to connect
(Some(dictionary), Some(usr), Some(pwd)) => {
match dictionary.get(&usr.as_bytes().to_vec()) {
Some(expected_pwd) => {
if pwd == expected_pwd {
Ok(())
} else {
Err(format!("Incorrect password for user {usr:?}"))
}
}
None => Err(format!("Unknown user {usr:?}")),
}
}
(Some(_), Some(usr), None) => Err(format!("Missing password for user {usr:?}")),
(Some(_), None, Some(_)) => Err(("Missing user name").to_string()),
(Some(_), None, None) => Err(("Missing user credentials").to_string()),
}
}

fn create_mqtt_server(
session: Arc<Session>,
config: Arc<Config>,
auth_dictionary: Arc<Option<HashMap<User, Password>>>,
) -> MqttServer<
impl ServiceFactory<
(IoBoxed, Deadline),
Expand All @@ -309,13 +392,16 @@ fn create_mqtt_server(
let zs_v5 = session.clone();
let config_v3 = config.clone();
let config_v5 = config.clone();
let auth_dictionary_v3 = auth_dictionary.clone();
let auth_dictionary_v5 = auth_dictionary.clone();

MqttServer::new()
.v3(v3::MqttServer::new(fn_factory_with_config(move |_| {
let zs = zs_v3.clone();
let config = config_v3.clone();
let auth_dictionary = auth_dictionary_v3.clone();
Ready::Ok::<_, ()>(fn_service(move |h| {
handshake_v3(h, zs.clone(), config.clone())
handshake_v3(h, zs.clone(), config.clone(), auth_dictionary.clone())
}))
}))
.publish(fn_factory_with_config(
Expand All @@ -335,8 +421,9 @@ fn create_mqtt_server(
.v5(v5::MqttServer::new(fn_factory_with_config(move |_| {
let zs = zs_v5.clone();
let config = config_v5.clone();
let auth_dictionary = auth_dictionary_v5.clone();
Ready::Ok::<_, ()>(fn_service(move |h| {
handshake_v5(h, zs.clone(), config.clone())
handshake_v5(h, zs.clone(), config.clone(), auth_dictionary.clone())
}))
}))
.publish(fn_factory_with_config(
Expand Down Expand Up @@ -444,12 +531,30 @@ async fn handshake_v3<'a>(
handshake: v3::Handshake,
zsession: Arc<Session>,
config: Arc<Config>,
auth_dictionary: Arc<Option<HashMap<User, Password>>>,
) -> Result<v3::HandshakeAck<MqttSessionState<'a>>, MqttPluginError> {
let client_id = handshake.packet().client_id.to_string();
log::info!("MQTT client {} connects using v3", client_id);

let session = MqttSessionState::new(client_id, zsession, config, handshake.sink().into());
Ok(handshake.ack(session, false))
match is_authorized(
(*auth_dictionary).as_ref(),
handshake.packet().username.as_ref(),
handshake.packet().password.as_ref(),
) {
Ok(_) => {
log::info!("MQTT client {} connects using v3", client_id);
let session =
MqttSessionState::new(client_id, zsession, config, handshake.sink().into());
Ok(handshake.ack(session, false))
}
Err(err) => {
log::warn!(
"MQTT client {} connect using v3 rejected: {}",
client_id,
err
);
Ok(handshake.not_authorized())
}
}
}

async fn publish_v3(
Expand Down Expand Up @@ -544,12 +649,30 @@ async fn handshake_v5<'a>(
handshake: v5::Handshake,
zsession: Arc<Session>,
config: Arc<Config>,
auth_dictionary: Arc<Option<HashMap<User, Password>>>,
) -> Result<v5::HandshakeAck<MqttSessionState<'a>>, MqttPluginError> {
let client_id = handshake.packet().client_id.to_string();
log::info!("MQTT client {} connects using v5", client_id);

let session = MqttSessionState::new(client_id, zsession, config, handshake.sink().into());
Ok(handshake.ack(session))
match is_authorized(
(*auth_dictionary).as_ref(),
handshake.packet().username.as_ref(),
handshake.packet().password.as_ref(),
) {
Ok(_) => {
log::info!("MQTT client {} connects using v5", client_id);
let session =
MqttSessionState::new(client_id, zsession, config, handshake.sink().into());
Ok(handshake.ack(session))
}
Err(err) => {
log::warn!(
"MQTT client {} connect using v5 rejected: {}",
client_id,
err
);
Ok(handshake.failed(ntex_mqtt::v5::codec::ConnectAckReason::NotAuthorized))
}
}
}

async fn publish_v5(
Expand Down