Skip to content

Commit

Permalink
Add csv-event-journal-msr-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
uklotzde committed Oct 10, 2021
1 parent e8dc49a commit ed111d4
Show file tree
Hide file tree
Showing 17 changed files with 861 additions and 9 deletions.
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ members = [
"crates/msr",
"crates/msr-core",
"crates/msr-legacy",
"crates/msr-plugin"
"crates/msr-plugin",
"plugins/csv-event-journal-msr-plugin",
]

[patch.crates-io]
msr = { path = "crates/msr" }
msr-core = { path = "crates/msr-core" }
msr-legacy = { path = "crates/msr-legacy" }
msr-plugin = { path = "crates/msr-plugin" }
csv-event-journal-msr-plugin = { path = "plugins/csv-event-journal-msr-plugin" }
6 changes: 3 additions & 3 deletions crates/msr-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "msr-core"
description = "Industrial Automation Toolbox - Core"
description = "Industrial Automation Toolbox - Core Components"
license = "MIT/Apache-2.0"
edition = "2018"
version = "0.3.0"
Expand All @@ -24,7 +24,7 @@ tempfile = "3"
[features]
# FIXME: Disable all default features before release!
# Keeping all top-level features enabled by default is convenient during development.
default = ["csv-journaling", "csv-register-recording"]
default = ["csv-event-journal", "csv-register-recording"]
csv-storage = ["csv"]
csv-journaling = ["anyhow", "bs58", "chrono/serde", "csv-storage", "serde", "uuid/v4"]
csv-event-journal = ["anyhow", "bs58", "chrono/serde", "csv-storage", "serde", "uuid/v4"]
csv-register-recording = ["csv-storage", "serde", "serde_json"]
File renamed without changes.
4 changes: 2 additions & 2 deletions crates/msr-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ pub mod register;
pub mod storage;
pub mod time;

#[cfg(feature = "csv-journaling")]
pub mod journal;
#[cfg(feature = "csv-event-journal")]
pub mod csv_event_journal;
3 changes: 2 additions & 1 deletion crates/msr-plugin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "msr-plugin"
description = "Industrial Automation Toolbox - Plugins"
description = "Industrial Automation Toolbox - Plugin Foundation"
license = "MIT/Apache-2.0"
edition = "2018"
version = "0.3.0"
Expand All @@ -11,4 +11,5 @@ msr-core = "*"

# Other
log = "0.4"
thiserror = "1"
tokio = { version = "1", default_features = false, features = ["sync"] }
90 changes: 88 additions & 2 deletions crates/msr-plugin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use msr_core::audit::Activity;
use std::{fmt, future::Future, pin::Pin};
use tokio::sync::{broadcast, mpsc, oneshot};
use std::{error::Error as StdError, fmt, future::Future, pin::Pin};
use thiserror::Error;
use tokio::sync::{
broadcast,
mpsc::{self, error::SendError},
oneshot,
};

// ------ -------
// Plugin shape
Expand Down Expand Up @@ -42,6 +47,17 @@ pub struct PluginPorts<M, P, E> {
pub event_subscriber: EventSubscriber<P, E>,
}

#[derive(Error, Debug)]
pub enum PluginError<E: StdError> {
#[error("communication error")]
Communication,

#[error("internal error: {0}")]
Internal(E),
}

pub type PluginResult<T, E> = Result<T, PluginError<E>>;

// ------ -------
// Messages
// ------ -------
Expand Down Expand Up @@ -168,3 +184,73 @@ where
}
}
}

pub fn send_message<M, E>(
message: impl Into<M>,
message_tx: &MessageSender<M>,
) -> PluginResult<(), E>
where
M: fmt::Debug,
E: StdError,
{
message_tx.send(message.into()).map_err(|send_error| {
let SendError(message) = send_error;
log::error!("Unexpected send error: Dropping message {:?}", message);
PluginError::Communication
})
}

pub fn send_reply<R>(reply_tx: ReplySender<R>, reply: impl Into<R>)
where
R: fmt::Debug,
{
if let Err(reply) = reply_tx.send(reply.into()) {
// Not an error, may occur if the receiver side has already been dropped
log::info!("Unexpected send error: Dropping reply {:?}", reply);
}
}

pub async fn receive_reply<R, E>(reply_rx: ReplyReceiver<R>) -> PluginResult<R, E>
where
E: StdError,
{
reply_rx.await.map_err(|receive_error| {
log::error!("No reply received: {}", receive_error);
PluginError::Communication
})
}

pub async fn send_message_receive_reply<M, R, E>(
message: impl Into<M>,
message_tx: &MessageSender<M>,
reply_rx: ReplyReceiver<R>,
) -> PluginResult<R, E>
where
M: fmt::Debug,
E: StdError,
{
send_message(message, message_tx)?;
receive_reply(reply_rx).await
}

pub async fn receive_result<R, E>(result_rx: ResultReceiver<R, E>) -> PluginResult<R, E>
where
E: StdError,
{
receive_reply(result_rx)
.await?
.map_err(PluginError::Internal)
}

pub async fn send_message_receive_result<M, R, E>(
message: impl Into<M>,
message_tx: &MessageSender<M>,
result_rx: ResultReceiver<R, E>,
) -> PluginResult<R, E>
where
M: fmt::Debug,
E: StdError,
{
send_message(message, message_tx)?;
receive_result(result_rx).await
}
17 changes: 17 additions & 0 deletions plugins/csv-event-journal-msr-plugin/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "csv-event-journal-msr-plugin"
version = "0.0.0"
description = "Industrial Automation Toolbox - CSV Event Journal Plugin"
authors = ["slowtec GmbH <[email protected]>"]
edition = "2018"
publish = false

[dependencies]
anyhow = "1"
log = "0.4"
thiserror = "1"
tokio = { version = "*", default_features = false, features = ["rt-multi-thread", "sync"] }

# Workspace dependencies
msr-core = { version = "*", default_features = false, features = ["csv-event-journal"] }
msr-plugin = "*"
11 changes: 11 additions & 0 deletions plugins/csv-event-journal-msr-plugin/src/api/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use msr_core::csv_event_journal::Entry;

use super::{Config, RecordEntryOutcome, ResultSender, State};

#[derive(Debug)]
pub enum Command {
ReplaceConfig(ResultSender<Config>, Config),
SwitchState(ResultSender<()>, State),
RecordEntry(ResultSender<RecordEntryOutcome>, Entry),
Shutdown(ResultSender<()>),
}
77 changes: 77 additions & 0 deletions plugins/csv-event-journal-msr-plugin/src/api/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use msr_core::csv_event_journal::{Entry, StoredRecord};
use msr_plugin::{reply_channel, send_message_receive_result};

use crate::internal::MessageSender;

use super::{query, Command, Config, PluginResult, Query, RecordEntryOutcome, State, Status};

/// Remote controller for the plugin
///
/// Wraps the message-based communication with the plugin
/// into asynchronous functions.
#[derive(Debug, Clone)]
pub struct Controller {
message_tx: MessageSender,
}

impl Controller {
pub const fn new(message_tx: MessageSender) -> Self {
Self { message_tx }
}

pub async fn command_replace_config(&self, new_config: Config) -> PluginResult<Config> {
let (reply_tx, reply_rx) = reply_channel();
let command = Command::ReplaceConfig(reply_tx, new_config);

send_message_receive_result(command, &self.message_tx, reply_rx).await
}

pub async fn command_switch_state(&self, new_state: State) -> PluginResult<()> {
let (reply_tx, reply_rx) = reply_channel();
let command = Command::SwitchState(reply_tx, new_state);
send_message_receive_result(command, &self.message_tx, reply_rx).await
}

pub async fn command_record_entry(&self, new_entry: Entry) -> PluginResult<RecordEntryOutcome> {
let (reply_tx, reply_rx) = reply_channel();
let command = Command::RecordEntry(reply_tx, new_entry);

send_message_receive_result(command, &self.message_tx, reply_rx).await
}

pub async fn command_shutdown(&self) -> PluginResult<()> {
let (reply_tx, reply_rx) = reply_channel();
let command = Command::Shutdown(reply_tx);
send_message_receive_result(command, &self.message_tx, reply_rx).await
}

pub async fn query_config(&self) -> PluginResult<Config> {
let (reply_tx, reply_rx) = reply_channel();
let query = Query::Config(reply_tx);
send_message_receive_result(query, &self.message_tx, reply_rx).await
}

pub async fn query_status(&self, request: query::StatusRequest) -> PluginResult<Status> {
let (reply_tx, reply_rx) = reply_channel();
let query = Query::Status(reply_tx, request);
send_message_receive_result(query, &self.message_tx, reply_rx).await
}

pub async fn query_recent_records(
&self,
request: query::RecentRecordsRequest,
) -> PluginResult<Vec<StoredRecord>> {
let (reply_tx, reply_rx) = reply_channel();
let query = Query::RecentRecords(reply_tx, request);
send_message_receive_result(query, &self.message_tx, reply_rx).await
}

pub async fn query_filter_records(
&self,
request: query::FilterRecordsRequest,
) -> PluginResult<Vec<StoredRecord>> {
let (reply_tx, reply_rx) = reply_channel();
let query = Query::FilterRecords(reply_tx, request);
send_message_receive_result(query, &self.message_tx, reply_rx).await
}
}
32 changes: 32 additions & 0 deletions plugins/csv-event-journal-msr-plugin/src/api/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use super::*;

#[derive(Debug, Clone)]
pub enum Event {
Lifecycle(LifecycleEvent),
Notification(NotificationEvent),
Incident(IncidentEvent),
}

/// Common lifecycle events
#[derive(Debug, Clone)]
pub enum LifecycleEvent {
Started,
Stopped,
ConfigChanged(Config),
StateChanged(State),
}

/// Regular notifications for informational purposes
#[derive(Debug, Clone)]
pub struct NotificationEvent {
// Empty placeholder
}

/// Unexpected incidents that might require (manual) intervention
#[derive(Debug, Clone)]
pub enum IncidentEvent {
IoWriteError {
os_code: Option<i32>,
message: String,
},
}
36 changes: 36 additions & 0 deletions plugins/csv-event-journal-msr-plugin/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use super::*;

// Re-export internal types that are used in the public API
pub use crate::internal::context::{
Config, EntryNotRecorded, EntryRecorded, RecordEntryOutcome, State, Status,
};

pub mod controller;
pub use self::controller::Controller;

pub mod command;
pub use self::command::Command;

pub mod query;
pub use self::query::Query;

pub mod event;
pub use self::event::Event;

#[derive(Debug)]
pub enum Message {
Command(Command),
Query(Query),
}

impl From<Command> for Message {
fn from(command: Command) -> Self {
Self::Command(command)
}
}

impl From<Query> for Message {
fn from(query: Query) -> Self {
Self::Query(query)
}
}
29 changes: 29 additions & 0 deletions plugins/csv-event-journal-msr-plugin/src/api/query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::num::NonZeroUsize;

use msr_core::csv_event_journal::{RecordFilter, StoredRecord};

use super::{Config, ResultSender, Status};

#[derive(Debug)]
pub enum Query {
Config(ResultSender<Config>),
Status(ResultSender<Status>, StatusRequest),
RecentRecords(ResultSender<Vec<StoredRecord>>, RecentRecordsRequest),
FilterRecords(ResultSender<Vec<StoredRecord>>, FilterRecordsRequest),
}

#[derive(Debug, Clone)]
pub struct StatusRequest {
pub with_storage_statistics: bool,
}

#[derive(Debug, Clone)]
pub struct RecentRecordsRequest {
pub limit: NonZeroUsize,
}

#[derive(Debug, Clone)]
pub struct FilterRecordsRequest {
pub limit: NonZeroUsize,
pub filter: RecordFilter,
}
Loading

0 comments on commit ed111d4

Please sign in to comment.