diff --git a/Cargo.toml b/Cargo.toml index 87308b4..426e265 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,8 @@ members = [ "crates/msr", "crates/msr-core", "crates/msr-legacy", - "crates/msr-plugin" + "crates/msr-plugin", + "crates/msr-plugin-fieldbus-modbus" ] [patch.crates-io] @@ -11,3 +12,4 @@ msr = { path = "crates/msr" } msr-core = { path = "crates/msr-core" } msr-legacy = { path = "crates/msr-legacy" } msr-plugin = { path = "crates/msr-plugin" } +msr-plugin-fieldbus-modbus = { path = "crates/msr-plugin-fieldbus-modbus" } diff --git a/crates/msr-core/Cargo.toml b/crates/msr-core/Cargo.toml index 6837595..e70a92d 100644 --- a/crates/msr-core/Cargo.toml +++ b/crates/msr-core/Cargo.toml @@ -14,7 +14,7 @@ thiserror = "1" anyhow = { version = "1", optional = true } bs58 = { version = "0.4", optional = true } csv = { version = "1", optional = true } -serde = { version = "1", optional = true } +serde = { version = "1", optional = true, features = ["derive"] } serde_json = { version = "1", optional = true } uuid = { version = "0.8", optional = true } diff --git a/crates/msr-plugin-fieldbus-modbus/Cargo.toml b/crates/msr-plugin-fieldbus-modbus/Cargo.toml new file mode 100644 index 0000000..5fcf59b --- /dev/null +++ b/crates/msr-plugin-fieldbus-modbus/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "msr-plugin-fieldbus-modbus" +version = "0.0.0" +edition = "2018" + +[dependencies] +log = "0.4" +msr-core = { path = "../msr-core" } +msr-plugin = { path = "../msr-plugin" } +thiserror = "1" +tokio = { version = "1", default_features = false, features = ["sync", "macros"] } +tokio-modbus = "0.5" diff --git a/crates/msr-plugin-fieldbus-modbus/src/api.rs b/crates/msr-plugin-fieldbus-modbus/src/api.rs new file mode 100644 index 0000000..03f29e4 --- /dev/null +++ b/crates/msr-plugin-fieldbus-modbus/src/api.rs @@ -0,0 +1,19 @@ +use std::net::SocketAddr; + +#[derive(Debug)] +pub enum Message { + Connect(ConnCfg), + Shutdown, +} + +#[derive(Debug)] +pub enum ConnCfg { + Tcp(SocketAddr), +} + +#[derive(Debug, Clone)] +pub enum Event { + Connecting, + Connected, + ConnectingError(String), +} diff --git a/crates/msr-plugin-fieldbus-modbus/src/internal.rs b/crates/msr-plugin-fieldbus-modbus/src/internal.rs new file mode 100644 index 0000000..9e6246e --- /dev/null +++ b/crates/msr-plugin-fieldbus-modbus/src/internal.rs @@ -0,0 +1,94 @@ +use crate::api::{ConnCfg, Event, Message}; +use msr_plugin::MessageLoop; +use std::net::SocketAddr; +use tokio::sync::{broadcast, mpsc}; + +pub struct Context { + conn: Conn, + event_tx: broadcast::Sender, +} + +impl Context { + fn new(event_tx: broadcast::Sender) -> Self { + Self { + conn: Conn::default(), + event_tx, + } + } + fn send_ev(&self, ev: Event) { + if let Err(ev) = self.event_tx.send(ev) { + log::debug!("No subscribers, dropping event: {:?}", ev); + } + } + async fn connect_tcp(&mut self, addr: SocketAddr) { + match self.conn { + Conn::Disconnected => { + self.conn = Conn::Connecting; + self.send_ev(Event::Connecting); + match tokio_modbus::client::tcp::connect(addr).await { + Ok(mb_ctx) => { + self.conn = Conn::Connected(mb_ctx); + self.send_ev(Event::Connected); + } + Err(err) => { + self.send_ev(Event::ConnectingError(err.to_string())); + } + } + } + Conn::Connecting => { + self.send_ev(Event::ConnectingError("Already connecting".to_string())); + } + Conn::Connected(_) => { + self.send_ev(Event::ConnectingError("Already connected".to_string())); + } + } + } +} + +enum Conn { + Disconnected, + Connecting, + Connected(tokio_modbus::client::Context), +} + +impl Default for Conn { + fn default() -> Self { + Self::Disconnected + } +} + +pub fn create_message_loop( + event_tx: broadcast::Sender, +) -> (MessageLoop, mpsc::UnboundedSender) { + let mut ctx = Context::new(event_tx); + let (message_tx, mut message_rx) = mpsc::unbounded_channel(); + let message_loop = async move { + log::debug!("Entering modbus plugin message loop"); + loop { + tokio::select! { + next_msg = message_rx.recv() => { + if let Some(msg) = next_msg { + log::debug!("Received message: {:?}", msg); + match msg { + Message::Connect(cfg) => { + match cfg { + ConnCfg::Tcp(addr) => { + ctx.connect_tcp(addr).await; + } + } + } + Message::Shutdown => { + break; + } + } + } else { + log::debug!("All message senders have been dropped"); + break; + } + } + } + } + log::debug!("Exiting modbus plugin message loop"); + }; + (Box::pin(message_loop), message_tx) +} diff --git a/crates/msr-plugin-fieldbus-modbus/src/lib.rs b/crates/msr-plugin-fieldbus-modbus/src/lib.rs new file mode 100644 index 0000000..9577a1c --- /dev/null +++ b/crates/msr-plugin-fieldbus-modbus/src/lib.rs @@ -0,0 +1,44 @@ +use msr_plugin::MessageLoop; +use thiserror::Error; +use tokio::sync::{broadcast, mpsc}; + +mod api; +mod internal; + +pub use api::*; + +pub struct Plugin { + message_loop: MessageLoop, + message_tx: mpsc::UnboundedSender, + broadcast_tx: broadcast::Sender, +} + +#[derive(Debug, Error)] +pub enum SetupError {} + +impl Plugin { + pub fn setup() -> Result { + let (broadcast_tx, _) = broadcast::channel(100); + let event_tx = broadcast_tx.clone(); + let (message_loop, message_tx) = internal::create_message_loop(event_tx); + Ok(Self { + message_tx, + message_loop, + broadcast_tx, + }) + } +} + +impl msr_plugin::Plugin for Plugin { + type Message = Message; + type Event = Event; + fn message_sender(&self) -> mpsc::UnboundedSender { + self.message_tx.clone() + } + fn subscribe_events(&self) -> broadcast::Receiver { + self.broadcast_tx.subscribe() + } + fn run(self) -> MessageLoop { + self.message_loop + } +}