diff --git a/Cargo.lock b/Cargo.lock index fe96463..aa70648 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,12 +220,12 @@ dependencies = [ [[package]] name = "async-channel" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ca33f4bc4ed1babef42cad36cc1f51fa88be00420404e5b1e80ab1b18f7678c" +checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" dependencies = [ "concurrent-queue", - "event-listener 4.0.0", + "event-listener 5.0.0", "event-listener-strategy", "futures-core", "pin-project-lite 0.2.13", @@ -300,7 +300,7 @@ dependencies = [ "futures-lite", "log", "parking", - "polling", + "polling 2.8.0", "rustix 0.37.25", "slab", "socket2 0.4.9", @@ -316,15 +316,6 @@ dependencies = [ "event-listener 2.5.3", ] -[[package]] -name = "async-oneshot" -version = "0.5.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae47de2a02d543205f3f5457a90b6ecbc9494db70557bd29590ec8f1ddff5463" -dependencies = [ - "futures-micro", -] - [[package]] name = "async-process" version = "1.7.0" @@ -350,7 +341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29479d362e242e320fa8f5c831940a5b83c1679af014068196cd20d4bf497b6b" dependencies = [ "futures-io", - "rustls 0.21.7", + "rustls", ] [[package]] @@ -787,12 +778,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" -[[package]] -name = "convert_case" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" - [[package]] name = "cookie" version = "0.14.4" @@ -923,19 +908,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "derive_more" -version = "0.99.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" -dependencies = [ - "convert_case", - "proc-macro2", - "quote", - "rustc_version 0.4.0", - "syn 1.0.109", -] - [[package]] name = "digest" version = "0.9.0" @@ -1065,13 +1037,24 @@ dependencies = [ "pin-project-lite 0.2.13", ] +[[package]] +name = "event-listener" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72557800024fabbaa2449dd4bf24e37b93702d457a4d4f2b0dd1f0f039f20c1" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite 0.2.13", +] + [[package]] name = "event-listener-strategy" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +checksum = "feedafcaa9b749175d5ac357452a9d41ea2911da598fde46ce1fe02c37751291" dependencies = [ - "event-listener 4.0.0", + "event-listener 5.0.0", "pin-project-lite 0.2.13", ] @@ -1240,15 +1223,6 @@ dependencies = [ "syn 2.0.33", ] -[[package]] -name = "futures-micro" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b460264b3593d68b16a7bc35f7bc226ddfebdf9a1c8db1ed95d5cc6b7168c826" -dependencies = [ - "pin-project-lite 0.2.13", -] - [[package]] name = "futures-sink" version = "0.3.28" @@ -2002,14 +1976,13 @@ dependencies = [ [[package]] name = "ntex" -version = "0.5.31" +version = "0.7.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97cdd157200a9720f6f72af7960f5e1b69cfaf933ab4fb702237a710c09feb65" +checksum = "7e8da0767496674cfc59663af137b9487a1af597809ba952be07d742f4f75fef" dependencies = [ - "async-channel 1.9.0", - "async-oneshot", - "base64 0.13.1", - "bitflags 1.3.2", + "async-channel 2.2.0", + "base64 0.21.4", + "bitflags 2.4.0", "encoding_rs", "httparse", "httpdate", @@ -2029,33 +2002,33 @@ dependencies = [ "ntex-service", "ntex-tls", "ntex-util", - "num_cpus", + "oneshot", "percent-encoding", "pin-project-lite 0.2.13", - "polling", + "polling 3.4.0", "regex", - "rustls 0.20.9", + "rustls", "serde", "serde_json", "serde_urlencoded", "sha-1", - "socket2 0.4.9", + "socket2 0.5.4", "thiserror", - "webpki-roots 0.22.6", + "webpki-roots 0.25.4", ] [[package]] name = "ntex-async-std" -version = "0.1.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed2aee761bc1f2f729c1ff3e66e160f77feb8c3579a8d129ed2eee53989473ed" +checksum = "35eb118c94b5bc332965887346b84ad6a2b7311708dd26e1d21a82baff678956" dependencies = [ - "async-oneshot", "async-std", "log", "ntex-bytes", "ntex-io", "ntex-util", + "oneshot", "pin-project-lite 0.2.13", ] @@ -2082,9 +2055,9 @@ dependencies = [ [[package]] name = "ntex-connect" -version = "0.1.1" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cf521bfca1a19976efb3c0780ed707599f009d1ef0f28a93cf93fd499a4a2e4" +checksum = "fa9c711a3564135ae0e499882da4a5b6cd8dfdd818dcc57e26e146aec541abe7" dependencies = [ "log", "ntex-async-std", @@ -2095,20 +2068,21 @@ dependencies = [ "ntex-service", "ntex-tls", "ntex-util", - "rustls 0.20.9", + "rustls", "thiserror", - "webpki-roots 0.22.6", + "webpki-roots 0.25.4", ] [[package]] name = "ntex-h2" -version = "0.1.6" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3ddb3e44f0b7945a64700afd699adf64162f302d0f13b434664ecd7ed0d30b" +checksum = "138b90775e51cfb9e0a6df6b8f12cfb80ee1d23344a02a138385844061c388fb" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.0", "fxhash", "log", + "nanorand", "ntex-bytes", "ntex-codec", "ntex-connect", @@ -2137,11 +2111,11 @@ dependencies = [ [[package]] name = "ntex-io" -version = "0.1.11" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fe5bb4c3a6f37a91250b0bd55e7dc0a7ff4cd85f074c2e82d4ad885fbdbc043" +checksum = "62c21a70836a8bfd9a673ee86dcf37d35e23b06449faa70bed25ce7c75ca0e81" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.0", "log", "ntex-bytes", "ntex-codec", @@ -2163,18 +2137,17 @@ dependencies = [ [[package]] name = "ntex-mqtt" -version = "0.9.2" +version = "0.12.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d2d3695d454076c89e9a4f7e2796b0c024e6d0cfea52eae514d64e4b957377" +checksum = "5b4a66a0bcd070f5af0709e3959d6f96ef002deaebd3aa69e5ff8905ded10bf9" dependencies = [ - "bitflags 1.3.2", - "derive_more", + "bitflags 2.4.0", "log", "ntex", - "ntex-util", "pin-project-lite 0.2.13", "serde", "serde_json", + "thiserror", ] [[package]] @@ -2196,7 +2169,7 @@ version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0302b642268700f5ac5233d6e2c547f5053efc8807bdc797c1acbad55996e0fb" dependencies = [ - "async-channel 2.1.1", + "async-channel 2.2.0", "async-std", "futures-core", "log", @@ -2205,18 +2178,19 @@ dependencies = [ [[package]] name = "ntex-service" -version = "0.3.3" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302fb630cedac18da0b1e11c3decc63253ab5a77661f6aac8ab64e490a423b9c" +checksum = "00ed5da53199c78416207467c565502e191a2c3b3b8f77edfe0560fef117af86" dependencies = [ "pin-project-lite 0.2.13", + "slab", ] [[package]] name = "ntex-tls" -version = "0.1.7" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33d5feb16c760330dc53795823b704ef682ae78f354566640465324e63b65772" +checksum = "4bfa487eb2ee31f0e3e097f17d6a895add0ee2a1ffd83806e5cca5d40e935536" dependencies = [ "log", "ntex-bytes", @@ -2224,16 +2198,16 @@ dependencies = [ "ntex-service", "ntex-util", "pin-project-lite 0.2.13", - "rustls 0.20.9", + "rustls", ] [[package]] name = "ntex-util" -version = "0.1.19" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de71abede318e4ff8a41d73858205dfb7ab8b55cc48bc1be81e032813c06fb7e" +checksum = "3376de7b99ba5209c1d73bf62bb3642f5158a26dac6347321088ea848c84019b" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.4.0", "futures-core", "futures-sink", "futures-timer", @@ -2699,6 +2673,20 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "polling" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30054e72317ab98eddd8561db0f6524df3367636884b7b21b703e4b280a84a14" +dependencies = [ + "cfg-if 1.0.0", + "concurrent-queue", + "pin-project-lite 0.2.13", + "rustix 0.38.13", + "tracing", + "windows-sys 0.52.0", +] + [[package]] name = "polyval" version = "0.4.5" @@ -2742,7 +2730,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.21.7", + "rustls", "thiserror", "tokio", "tracing", @@ -2758,7 +2746,7 @@ dependencies = [ "rand 0.8.5", "ring 0.16.20", "rustc-hash", - "rustls 0.21.7", + "rustls", "rustls-native-certs 0.6.3", "slab", "thiserror", @@ -2890,14 +2878,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.5", + "regex-syntax 0.8.2", ] [[package]] @@ -2911,13 +2899,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.8" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -2928,9 +2916,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.7.5" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" @@ -3090,18 +3078,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "rustls" -version = "0.20.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" -dependencies = [ - "log", - "ring 0.16.20", - "sct", - "webpki", -] - [[package]] name = "rustls" version = "0.21.7" @@ -4381,24 +4357,11 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "webpki" -version = "0.22.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed63aea5ce73d0ff405984102c42de94fc55a6b75765d621c65262469b3c9b53" -dependencies = [ - "ring 0.17.6", - "untrusted 0.9.0", -] - [[package]] name = "webpki-roots" -version = "0.22.6" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87" -dependencies = [ - "webpki", -] +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "webpki-roots" @@ -4804,7 +4767,7 @@ dependencies = [ "futures", "log", "quinn", - "rustls 0.21.7", + "rustls", "rustls-native-certs 0.7.0", "rustls-pemfile 2.0.0", "rustls-webpki 0.102.0", @@ -4845,7 +4808,7 @@ dependencies = [ "base64 0.21.4", "futures", "log", - "rustls 0.21.7", + "rustls", "rustls-pemfile 2.0.0", "rustls-webpki 0.102.0", "secrecy", @@ -4933,6 +4896,7 @@ dependencies = [ name = "zenoh-plugin-mqtt" version = "0.11.0-dev" dependencies = [ + "async-channel 2.2.0", "async-std", "async-trait", "derivative", diff --git a/Cargo.toml b/Cargo.toml index 23c5429..7d76bd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ license = "EPL-2.0 OR Apache-2.0" categories = ["network-programming"] [workspace.dependencies] +async-channel = "2.2.0" async-std = "=1.12.0" async-trait = "0.1.66" clap = "3.2.23" @@ -36,8 +37,8 @@ git-version = "0.3.5" hex = "0.4.3" lazy_static = "1.4.0" log = "0.4.17" -ntex = "0.5.31" -ntex-mqtt = "0.9.2" +ntex = "0.7.17" +ntex-mqtt = "0.12.16" regex = "1.7.1" rustc_version = "0.4" serde = "1.0.154" diff --git a/zenoh-plugin-mqtt/Cargo.toml b/zenoh-plugin-mqtt/Cargo.toml index b0325e5..5335aef 100644 --- a/zenoh-plugin-mqtt/Cargo.toml +++ b/zenoh-plugin-mqtt/Cargo.toml @@ -32,6 +32,7 @@ no_mangle = ["zenoh-plugin-trait/no_mangle"] stats = ["zenoh/stats"] [dependencies] +async-channel = { workspace = true } async-std = { workspace = true, features = ["unstable", "attributes"] } async-trait = { workspace = true } derivative = { workspace = true } diff --git a/zenoh-plugin-mqtt/src/lib.rs b/zenoh-plugin-mqtt/src/lib.rs index a44c2b4..fd6465a 100644 --- a/zenoh-plugin-mqtt/src/lib.rs +++ b/zenoh-plugin-mqtt/src/lib.rs @@ -1,5 +1,5 @@ // -// Copyright (c) 2017, 2020 ADLINK Technology Inc. +// Copyright (c) 2017, 2024 ZettaScale Technology // // This program and the accompanying materials are made available under the // terms of the Eclipse Public License 2.0 which is available at @@ -9,7 +9,7 @@ // SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 // // Contributors: -// ADLINK zenoh team, +// ZettaScale Zenoh Team, // use git_version::git_version; use ntex::service::{fn_factory_with_config, fn_service}; @@ -273,7 +273,7 @@ async fn handshake_v3<'a>( let client_id = handshake.packet().client_id.to_string(); log::info!("MQTT client {} connects using v3", client_id); - let session = MqttSessionState::new(client_id, zsession, config); + let session = MqttSessionState::new(client_id, zsession, config, handshake.sink().into()); Ok(handshake.ack(session, false)) } @@ -313,12 +313,8 @@ async fn control_v3( session.client_id, topic ); - match session - .state() - .map_mqtt_subscription(topic, session.sink().clone().into()) - .await - { - Ok(()) => s.confirm(v5::QoS::AtMostOnce), + match session.state().map_mqtt_subscription(topic).await { + Ok(()) => s.confirm(v3::QoS::AtMostOnce), Err(e) => { log::error!("Subscription to '{}' failed: {}", topic, e); s.fail() @@ -377,7 +373,7 @@ async fn handshake_v5<'a>( let client_id = handshake.packet().client_id.to_string(); log::info!("MQTT client {} connects using v5", client_id); - let session = MqttSessionState::new(client_id, zsession, config); + let session = MqttSessionState::new(client_id, zsession, config, handshake.sink().into()); Ok(handshake.ack(session)) } @@ -424,15 +420,11 @@ async fn control_v5( for mut s in msg.iter_mut() { let topic = s.topic().as_str(); log::debug!( - "MQTT client {} subscribes 'to' {}", + "MQTT client {} subscribes to '{}'", session.client_id, topic ); - match session - .state() - .map_mqtt_subscription(topic, session.sink().clone().into()) - .await - { + match session.state().map_mqtt_subscription(topic).await { Ok(()) => s.confirm(v5::QoS::AtMostOnce), Err(e) => { log::error!("Subscription to '{}' failed: {}", topic, e); diff --git a/zenoh-plugin-mqtt/src/mqtt_helpers.rs b/zenoh-plugin-mqtt/src/mqtt_helpers.rs index b137010..92c21a4 100644 --- a/zenoh-plugin-mqtt/src/mqtt_helpers.rs +++ b/zenoh-plugin-mqtt/src/mqtt_helpers.rs @@ -15,10 +15,8 @@ use ntex::util::{ByteString, Bytes}; use ntex_mqtt::{error::SendPacketError, v3, v5}; use std::convert::TryInto; -use std::sync::{Arc, Mutex}; use zenoh::plugins::ZResult; use zenoh::prelude::*; -use zenoh_core::zlock; use crate::config::Config; @@ -119,13 +117,10 @@ pub(crate) fn guess_encoding(payload: &[u8]) -> Encoding { #[derive(Clone, Debug)] pub(crate) enum MqttSink { - V3(Arc>), - V5(Arc>), + V3(v3::MqttSink), + V5(v5::MqttSink), } -unsafe impl Send for MqttSink {} -unsafe impl Sync for MqttSink {} - impl MqttSink { pub(crate) fn publish_at_most_once( &self, @@ -136,28 +131,38 @@ impl MqttSink { ByteString: From, { match self { - MqttSink::V3(s) => { - let guard = zlock!(s); - guard.publish(topic, payload).send_at_most_once() + MqttSink::V3(sink) => sink.publish(topic, payload).send_at_most_once(), + MqttSink::V5(sink) => sink.publish(topic, payload).send_at_most_once(), + } + } + + pub(crate) fn close(&self) { + match self { + MqttSink::V3(sink) => { + sink.close(); } - MqttSink::V5(s) => { - let guard = zlock!(s); - guard.publish(topic, payload).send_at_most_once() + MqttSink::V5(sink) => { + sink.close(); } } } + + pub(crate) fn is_open(&self) -> bool { + match self { + MqttSink::V3(sink) => sink.is_open(), + MqttSink::V5(sink) => sink.is_open(), + } + } } impl From for MqttSink { fn from(s: v3::MqttSink) -> Self { - #[allow(clippy::arc_with_non_send_sync)] // TODO - MqttSink::V3(Arc::new(Mutex::new(s))) + MqttSink::V3(s) } } impl From for MqttSink { fn from(s: v5::MqttSink) -> Self { - #[allow(clippy::arc_with_non_send_sync)] // TODO - MqttSink::V5(Arc::new(Mutex::new(s))) + MqttSink::V5(s) } } diff --git a/zenoh-plugin-mqtt/src/mqtt_session_state.rs b/zenoh-plugin-mqtt/src/mqtt_session_state.rs index 15ced1c..e9c844a 100644 --- a/zenoh-plugin-mqtt/src/mqtt_session_state.rs +++ b/zenoh-plugin-mqtt/src/mqtt_session_state.rs @@ -13,6 +13,7 @@ // use crate::config::Config; use crate::mqtt_helpers::*; +use async_channel::{Receiver, Sender}; use async_std::sync::RwLock; use lazy_static::__Deref; use ntex::util::{ByteString, Bytes}; @@ -28,6 +29,7 @@ pub(crate) struct MqttSessionState<'a> { pub(crate) zsession: Arc, pub(crate) config: Arc, pub(crate) subs: RwLock>>, + pub(crate) tx: Sender<(ByteString, Bytes)>, } impl MqttSessionState<'_> { @@ -35,20 +37,21 @@ impl MqttSessionState<'_> { client_id: String, zsession: Arc, config: Arc, + sink: MqttSink, ) -> MqttSessionState<'a> { + let (tx, rx) = async_channel::bounded::<(ByteString, Bytes)>(1); + spawn_mqtt_publisher(client_id.clone(), rx, sink); + MqttSessionState { client_id, zsession, config, subs: RwLock::new(HashMap::new()), + tx, } } - pub(crate) async fn map_mqtt_subscription<'a>( - &'a self, - topic: &str, - sink: MqttSink, - ) -> ZResult<()> { + pub(crate) async fn map_mqtt_subscription<'a>(&'a self, topic: &str) -> ZResult<()> { let sub_origin = if is_allowed(topic, &self.config) { // if topic is allowed, subscribe to publications coming from anywhere Locality::Any @@ -67,11 +70,12 @@ impl MqttSessionState<'_> { let ke = mqtt_topic_to_ke(topic, &self.config.scope)?; let client_id = self.client_id.clone(); let config = self.config.clone(); + let tx = self.tx.clone(); let sub = self .zsession .declare_subscriber(ke) .callback(move |sample| { - if let Err(e) = route_zenoh_to_mqtt(sample, &client_id, &config, &sink) { + if let Err(e) = route_zenoh_to_mqtt(sample, &client_id, &config, &tx) { log::warn!("{}", e); } }) @@ -136,7 +140,7 @@ fn route_zenoh_to_mqtt( sample: Sample, client_id: &str, config: &Config, - sink: &MqttSink, + tx: &Sender<(ByteString, Bytes)>, ) -> ZResult<()> { let topic = ke_to_mqtt_topic_publish(&sample.key_expr, &config.scope)?; log::trace!( @@ -145,7 +149,7 @@ fn route_zenoh_to_mqtt( sample.key_expr, topic ); - sink.publish_at_most_once(topic, sample.payload.contiguous().to_vec().into()) + tx.send_blocking((topic, sample.payload.contiguous().to_vec().into())) .map_err(|e| { zerror!( "MQTT client {}: error re-publishing on MQTT a Zenoh publication on {}: {}", @@ -156,3 +160,32 @@ fn route_zenoh_to_mqtt( .into() }) } + +fn spawn_mqtt_publisher(client_id: String, rx: Receiver<(ByteString, Bytes)>, sink: MqttSink) { + ntex::rt::spawn(async move { + loop { + match rx.recv().await { + Ok((topic, payload)) => { + if sink.is_open() { + if let Err(e) = sink.publish_at_most_once(topic, payload) { + log::trace!( + "Failed to send MQTT message for client {} - {}", + client_id, + e + ); + sink.close(); + break; + } + } else { + log::trace!("MQTT sink closed for client {}", client_id); + break; + } + } + Err(_) => { + log::trace!("MPSC Channel closed for client {}", client_id); + break; + } + } + } + }); +}