Skip to content

Commit

Permalink
Add initial modbus plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
flosse committed Oct 6, 2021
1 parent 7ca1ebf commit dbac52b
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 2 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ members = [
"crates/msr",
"crates/msr-core",
"crates/msr-legacy",
"crates/msr-plugin"
"crates/msr-plugin",
"crates/msr-plugin-fieldbus-modbus"
]

[patch.crates-io]
msr = { path = "crates/msr" }
msr-core = { path = "crates/msr-core" }
msr-legacy = { path = "crates/msr-legacy" }
msr-plugin = { path = "crates/msr-plugin" }
msr-plugin-fieldbus-modbus = { path = "crates/msr-plugin-fieldbus-modbus" }
2 changes: 1 addition & 1 deletion crates/msr-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ thiserror = "1"
anyhow = { version = "1", optional = true }
bs58 = { version = "0.4", optional = true }
csv = { version = "1", optional = true }
serde = { version = "1", optional = true }
serde = { version = "1", optional = true, features = ["derive"] }
serde_json = { version = "1", optional = true }
uuid = { version = "0.8", optional = true }

Expand Down
12 changes: 12 additions & 0 deletions crates/msr-plugin-fieldbus-modbus/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "msr-plugin-fieldbus-modbus"
version = "0.0.0"
edition = "2018"

[dependencies]
log = "0.4"
msr-core = { path = "../msr-core" }
msr-plugin = { path = "../msr-plugin" }
thiserror = "1"
tokio = { version = "1", default_features = false, features = ["sync", "macros"] }
tokio-modbus = "0.5"
19 changes: 19 additions & 0 deletions crates/msr-plugin-fieldbus-modbus/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::net::SocketAddr;

#[derive(Debug)]
pub enum Message {
Connect(ConnCfg),
Shutdown,
}

#[derive(Debug)]
pub enum ConnCfg {
Tcp(SocketAddr),
}

#[derive(Debug, Clone)]
pub enum Event {
Connecting,
Connected,
ConnectingError(String),
}
94 changes: 94 additions & 0 deletions crates/msr-plugin-fieldbus-modbus/src/internal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::api::{ConnCfg, Event, Message};
use msr_plugin::MessageLoop;
use std::net::SocketAddr;
use tokio::sync::{broadcast, mpsc};

pub struct Context {
conn: Conn,
event_tx: broadcast::Sender<Event>,
}

impl Context {
fn new(event_tx: broadcast::Sender<Event>) -> Self {
Self {
conn: Conn::default(),
event_tx,
}
}
fn send_ev(&self, ev: Event) {
if let Err(ev) = self.event_tx.send(ev) {
log::debug!("No subscribers, dropping event: {:?}", ev);
}
}
async fn connect_tcp(&mut self, addr: SocketAddr) {
match self.conn {
Conn::Disconnected => {
self.conn = Conn::Connecting;
self.send_ev(Event::Connecting);
match tokio_modbus::client::tcp::connect(addr).await {
Ok(mb_ctx) => {
self.conn = Conn::Connected(mb_ctx);
self.send_ev(Event::Connected);
}
Err(err) => {
self.send_ev(Event::ConnectingError(err.to_string()));
}
}
}
Conn::Connecting => {
self.send_ev(Event::ConnectingError("Already connecting".to_string()));
}
Conn::Connected(_) => {
self.send_ev(Event::ConnectingError("Already connected".to_string()));
}
}
}
}

enum Conn {
Disconnected,
Connecting,
Connected(tokio_modbus::client::Context),
}

impl Default for Conn {
fn default() -> Self {
Self::Disconnected
}
}

pub fn create_message_loop(
event_tx: broadcast::Sender<Event>,
) -> (MessageLoop, mpsc::UnboundedSender<Message>) {
let mut ctx = Context::new(event_tx);
let (message_tx, mut message_rx) = mpsc::unbounded_channel();
let message_loop = async move {
log::debug!("Entering modbus plugin message loop");
loop {
tokio::select! {
next_msg = message_rx.recv() => {
if let Some(msg) = next_msg {
log::debug!("Received message: {:?}", msg);
match msg {
Message::Connect(cfg) => {
match cfg {
ConnCfg::Tcp(addr) => {
ctx.connect_tcp(addr).await;
}
}
}
Message::Shutdown => {
break;
}
}
} else {
log::debug!("All message senders have been dropped");
break;
}
}
}
}
log::debug!("Exiting modbus plugin message loop");
};
(Box::pin(message_loop), message_tx)
}
44 changes: 44 additions & 0 deletions crates/msr-plugin-fieldbus-modbus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use msr_plugin::MessageLoop;
use thiserror::Error;
use tokio::sync::{broadcast, mpsc};

mod api;
mod internal;

pub use api::*;

pub struct Plugin {
message_loop: MessageLoop,
message_tx: mpsc::UnboundedSender<Message>,
broadcast_tx: broadcast::Sender<Event>,
}

#[derive(Debug, Error)]
pub enum SetupError {}

impl Plugin {
pub fn setup() -> Result<Self, SetupError> {
let (broadcast_tx, _) = broadcast::channel(100);
let event_tx = broadcast_tx.clone();
let (message_loop, message_tx) = internal::create_message_loop(event_tx);
Ok(Self {
message_tx,
message_loop,
broadcast_tx,
})
}
}

impl msr_plugin::Plugin for Plugin {
type Message = Message;
type Event = Event;
fn message_sender(&self) -> mpsc::UnboundedSender<Self::Message> {
self.message_tx.clone()
}
fn subscribe_events(&self) -> broadcast::Receiver<Self::Event> {
self.broadcast_tx.subscribe()
}
fn run(self) -> MessageLoop {
self.message_loop
}
}

0 comments on commit dbac52b

Please sign in to comment.