From ea306aebf82904399cb63ccef7e28e09f5d98149 Mon Sep 17 00:00:00 2001 From: Uwe Klotz Date: Mon, 11 Oct 2021 17:02:46 +0200 Subject: [PATCH] Identify event publishers by an usize index ...and restructure the csv-register-recorder-msr-plugin. --- crates/msr-core/src/csv_event_journal.rs | 14 ++ crates/msr-plugin/src/lib.rs | 115 ++++++---- .../src/api/command.rs | 4 +- .../src/api/controller.rs | 5 +- .../src/api/mod.rs | 2 - .../src/api/query.rs | 4 +- ...rs => invoke_context_from_message_loop.rs} | 21 +- .../src/internal/message_loop.rs | 29 ++- .../src/internal/mod.rs | 7 +- .../csv-event-journal-msr-plugin/src/lib.rs | 78 +++---- .../src/api.rs | 107 --------- .../src/api/command.rs | 20 ++ .../src/api/controller.rs | 112 ++++++++++ .../src/api/event.rs | 37 ++++ .../src/api/mod.rs | 38 ++++ .../src/api/query.rs | 41 ++++ .../src/{ => internal}/context.rs | 76 ++----- .../invoke_context_from_message_loop.rs} | 28 ++- .../mod.rs => internal/message_loop.rs} | 92 +++----- .../src/internal/mod.rs | 5 + .../src/{ => internal}/register.rs | 8 - .../src/lib.rs | 207 ++++++++---------- 22 files changed, 569 insertions(+), 481 deletions(-) rename plugins/csv-event-journal-msr-plugin/src/internal/{invoke_context_from_plugin.rs => invoke_context_from_message_loop.rs} (83%) delete mode 100644 plugins/csv-register-recorder-msr-plugin/src/api.rs create mode 100644 plugins/csv-register-recorder-msr-plugin/src/api/command.rs create mode 100644 plugins/csv-register-recorder-msr-plugin/src/api/controller.rs create mode 100644 plugins/csv-register-recorder-msr-plugin/src/api/event.rs create mode 100644 plugins/csv-register-recorder-msr-plugin/src/api/mod.rs create mode 100644 plugins/csv-register-recorder-msr-plugin/src/api/query.rs rename plugins/csv-register-recorder-msr-plugin/src/{ => internal}/context.rs (91%) rename plugins/csv-register-recorder-msr-plugin/src/{plugin/handlers.rs => internal/invoke_context_from_message_loop.rs} (87%) rename plugins/csv-register-recorder-msr-plugin/src/{plugin/mod.rs => internal/message_loop.rs} (69%) create mode 100644 plugins/csv-register-recorder-msr-plugin/src/internal/mod.rs rename plugins/csv-register-recorder-msr-plugin/src/{ => internal}/register.rs (75%) diff --git a/crates/msr-core/src/csv_event_journal.rs b/crates/msr-core/src/csv_event_journal.rs index 25d54da..e3e7e93 100644 --- a/crates/msr-core/src/csv_event_journal.rs +++ b/crates/msr-core/src/csv_event_journal.rs @@ -2,6 +2,7 @@ use std::{ convert::{TryFrom, TryInto}, + fmt, num::NonZeroUsize, path::PathBuf, time::SystemTime, @@ -160,6 +161,19 @@ impl From for ScopeValue { } } +impl AsRef for Scope { + fn as_ref(&self) -> &ScopeValue { + let Self(inner) = self; + inner + } +} + +impl fmt::Display for Scope { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + pub type CodeValue = i32; #[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] diff --git a/crates/msr-plugin/src/lib.rs b/crates/msr-plugin/src/lib.rs index 8b09ed7..ab28fcc 100644 --- a/crates/msr-plugin/src/lib.rs +++ b/crates/msr-plugin/src/lib.rs @@ -1,5 +1,5 @@ -use msr_core::audit::Activity; use std::{error::Error as StdError, fmt, future::Future, pin::Pin}; + use thiserror::Error; use tokio::sync::{ broadcast, @@ -7,6 +7,8 @@ use tokio::sync::{ oneshot, }; +use msr_core::audit::Activity; + // ------ ------- // Plugin shape // ------ ------- @@ -14,24 +16,25 @@ use tokio::sync::{ pub trait Plugin { type Message; type Event; - fn message_sender(&self) -> mpsc::UnboundedSender; - fn subscribe_events(&self) -> broadcast::Receiver; + fn message_sender(&self) -> MessageSender; + fn subscribe_events(&self) -> BroadcastReceiver; fn run(self) -> MessageLoop; } #[allow(missing_debug_implementations)] -pub struct PluginContainer { - pub ports: PluginPorts, +pub struct PluginContainer { + pub ports: PluginPorts, pub message_loop: MessageLoop, } -impl Plugin for PluginContainer { +impl Plugin for PluginContainer { type Message = M; - type Event = PublishedEvent; - fn message_sender(&self) -> mpsc::UnboundedSender { + type Event = PublishedEvent; + + fn message_sender(&self) -> MessageSender { self.ports.message_tx.clone() } - fn subscribe_events(&self) -> broadcast::Receiver { + fn subscribe_events(&self) -> BroadcastReceiver { self.ports.event_subscriber.subscribe() } fn run(self) -> MessageLoop { @@ -42,9 +45,9 @@ impl Plugin for PluginContainer { pub type MessageLoop = Pin + Send + 'static>>; #[allow(missing_debug_implementations)] -pub struct PluginPorts { +pub struct PluginPorts { pub message_tx: MessageSender, - pub event_subscriber: EventSubscriber, + pub event_subscriber: EventSubscriber, } #[derive(Error, Debug)] @@ -63,8 +66,8 @@ pub type PluginResult = Result>; // ------ ------- // TODO: Use bounded channels for backpressure? -type MessageSender = mpsc::UnboundedSender; -type MessageReceiver = mpsc::UnboundedReceiver; +pub type MessageSender = mpsc::UnboundedSender; +pub type MessageReceiver = mpsc::UnboundedReceiver; pub fn message_channel() -> (MessageSender, MessageReceiver) { mpsc::unbounded_channel() @@ -119,49 +122,84 @@ where // Events // ----- ------ +/// Internal index into a lookup table with event publisher metadata +pub type EventPublisherIndexValue = usize; + +/// Numeric identifier of an event publisher control cycle +/// +/// Uniquely identifies an event publisher in the system at runtime. +/// +/// The value is supposed to be used as a key or index to retrieve +/// extended metadata for an event publisher that does not need to +/// be sent with every event. This metadata is probably immutable. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)] +pub struct EventPublisherIndex(EventPublisherIndexValue); + +impl EventPublisherIndex { + pub const fn from_value(value: EventPublisherIndexValue) -> Self { + Self(value) + } + + pub const fn to_value(self) -> EventPublisherIndexValue { + self.0 + } +} + +impl From for EventPublisherIndex { + fn from(from: EventPublisherIndexValue) -> Self { + Self::from_value(from) + } +} + +impl From for EventPublisherIndexValue { + fn from(from: EventPublisherIndex) -> Self { + from.to_value() + } +} + #[derive(Debug, Clone)] -pub struct PublishedEvent { - pub published: Activity

, - pub payload: T, +pub struct PublishedEvent { + pub published: Activity, + pub payload: E, } -pub type EventSender = broadcast::Sender>; -pub type EventReceiver = broadcast::Receiver>; -pub type EventSubscriber = BroadcastSubscriber>; +pub type EventSender = broadcast::Sender>; +pub type EventReceiver = broadcast::Receiver>; +pub type EventSubscriber = BroadcastSubscriber>; -pub fn event_channel(channel_capacity: usize) -> (EventSender, EventSubscriber) +pub fn event_channel(channel_capacity: usize) -> (EventSender, EventSubscriber) where - P: Clone, - T: Clone, + E: Clone, { broadcast_channel(channel_capacity) } #[derive(Debug, Clone)] -pub struct EventPubSub { - publisher: P, - event_tx: EventSender, +pub struct EventPubSub { + publisher_index: EventPublisherIndex, + event_tx: EventSender, } -impl EventPubSub +impl EventPubSub where - P: fmt::Debug + Clone, - T: fmt::Debug + Clone, + E: fmt::Debug + Clone, { - pub fn new(publisher: impl Into

, channel_capacity: usize) -> (Self, EventSubscriber) { + pub fn new( + publisher_index: impl Into, + channel_capacity: usize, + ) -> (Self, EventSubscriber) { let (event_tx, event_subscriber) = event_channel(channel_capacity); ( Self { + publisher_index: publisher_index.into(), event_tx, - publisher: publisher.into(), }, event_subscriber, ) } - pub fn publish_event(&self, payload: T) { - let publisher = self.publisher.clone(); - let published = Activity::now(publisher); + pub fn publish_event(&self, payload: E) { + let published = Activity::now(self.publisher_index); let event = PublishedEvent { published, payload }; self.dispatch_event(event); } @@ -171,12 +209,11 @@ pub trait EventDispatcher { fn dispatch_event(&self, event: E); } -impl EventDispatcher> for EventPubSub +impl EventDispatcher> for EventPubSub where - P: fmt::Debug + Clone, - T: fmt::Debug + Clone, + E: fmt::Debug + Clone, { - fn dispatch_event(&self, event: PublishedEvent) { + fn dispatch_event(&self, event: PublishedEvent) { if let Err(event) = self.event_tx.send(event) { // Ignore all send errors that are expected if no subscribers // are connected. @@ -185,6 +222,10 @@ where } } +// --------- ----------- +// Utility functions +// --------- ----------- + pub fn send_message( message: impl Into, message_tx: &MessageSender, diff --git a/plugins/csv-event-journal-msr-plugin/src/api/command.rs b/plugins/csv-event-journal-msr-plugin/src/api/command.rs index 03fecd7..94c17af 100644 --- a/plugins/csv-event-journal-msr-plugin/src/api/command.rs +++ b/plugins/csv-event-journal-msr-plugin/src/api/command.rs @@ -1,6 +1,8 @@ use msr_core::csv_event_journal::Entry; -use super::{Config, RecordEntryOutcome, ResultSender, State}; +use crate::ResultSender; + +use super::{Config, RecordEntryOutcome, State}; #[derive(Debug)] pub enum Command { diff --git a/plugins/csv-event-journal-msr-plugin/src/api/controller.rs b/plugins/csv-event-journal-msr-plugin/src/api/controller.rs index fb5dba0..5e6be03 100644 --- a/plugins/csv-event-journal-msr-plugin/src/api/controller.rs +++ b/plugins/csv-event-journal-msr-plugin/src/api/controller.rs @@ -1,9 +1,10 @@ use msr_core::csv_event_journal::{Entry, StoredRecord}; + use msr_plugin::{reply_channel, send_message_receive_result}; -use crate::internal::MessageSender; +use crate::{MessageSender, PluginResult}; -use super::{query, Command, Config, PluginResult, Query, RecordEntryOutcome, State, Status}; +use super::{query, Command, Config, Query, RecordEntryOutcome, State, Status}; /// Remote controller for the plugin /// diff --git a/plugins/csv-event-journal-msr-plugin/src/api/mod.rs b/plugins/csv-event-journal-msr-plugin/src/api/mod.rs index 10c707d..05d7641 100644 --- a/plugins/csv-event-journal-msr-plugin/src/api/mod.rs +++ b/plugins/csv-event-journal-msr-plugin/src/api/mod.rs @@ -1,5 +1,3 @@ -use super::*; - // Re-export internal types that are used in the public API pub use crate::internal::context::{ Config, EntryNotRecorded, EntryRecorded, RecordEntryOutcome, State, Status, diff --git a/plugins/csv-event-journal-msr-plugin/src/api/query.rs b/plugins/csv-event-journal-msr-plugin/src/api/query.rs index 9b019d9..b3b551f 100644 --- a/plugins/csv-event-journal-msr-plugin/src/api/query.rs +++ b/plugins/csv-event-journal-msr-plugin/src/api/query.rs @@ -2,7 +2,9 @@ use std::num::NonZeroUsize; use msr_core::csv_event_journal::{RecordFilter, StoredRecord}; -use super::{Config, ResultSender, Status}; +use crate::ResultSender; + +use super::{Config, Status}; #[derive(Debug)] pub enum Query { diff --git a/plugins/csv-event-journal-msr-plugin/src/internal/invoke_context_from_plugin.rs b/plugins/csv-event-journal-msr-plugin/src/internal/invoke_context_from_message_loop.rs similarity index 83% rename from plugins/csv-event-journal-msr-plugin/src/internal/invoke_context_from_plugin.rs rename to plugins/csv-event-journal-msr-plugin/src/internal/invoke_context_from_message_loop.rs index 9d3cb03..c5211aa 100644 --- a/plugins/csv-event-journal-msr-plugin/src/internal/invoke_context_from_plugin.rs +++ b/plugins/csv-event-journal-msr-plugin/src/internal/invoke_context_from_message_loop.rs @@ -1,8 +1,6 @@ -use std::time::SystemTime; - use tokio::task; -use msr_core::csv_event_journal::{Entry, Error, Scope, Severity, StoredRecord}; +use msr_core::csv_event_journal::{Entry, Error, StoredRecord}; use msr_plugin::send_reply; @@ -11,7 +9,7 @@ use crate::{ event::{IncidentEvent, LifecycleEvent}, query, Config, Event, RecordEntryOutcome, State, Status, }, - EventPubSub, JournalCodes, ResultSender, + EventPubSub, ResultSender, }; use super::context::Context; @@ -79,20 +77,7 @@ pub fn command_record_entry( send_reply(reply_tx, result.map_err(Into::into)); } -pub fn command_shutdown(context: &mut Context, reply_tx: ResultSender<()>, journal_scope: &str) { - let _ = task::block_in_place(|| { - let new_entry = Entry { - occurred_at: SystemTime::now(), - scope: Scope(journal_scope.to_string()), - code: JournalCodes::STOPPING, - severity: Severity::Information, - text: Some("Stopping".to_string()), - json: None, - }; - context.record_entry(new_entry).map_err(|err| { - log::warn!("Failed to record entry about stopping: {}", err); - }) - }); +pub fn command_shutdown(_context: &mut Context, reply_tx: ResultSender<()>) { send_reply(reply_tx, Ok(())); } diff --git a/plugins/csv-event-journal-msr-plugin/src/internal/message_loop.rs b/plugins/csv-event-journal-msr-plugin/src/internal/message_loop.rs index 80661ab..db40d99 100644 --- a/plugins/csv-event-journal-msr-plugin/src/internal/message_loop.rs +++ b/plugins/csv-event-journal-msr-plugin/src/internal/message_loop.rs @@ -1,21 +1,21 @@ +use std::path::PathBuf; + use msr_plugin::{message_channel, MessageLoop}; use crate::{ api::{event::LifecycleEvent, Command, Config, Event, Message, Query, State}, - Environment, EventPubSub, Result, + EventPubSub, MessageSender, Result, }; -use super::{context::Context, invoke_context_from_plugin, MessageSender}; +use super::{context::Context, invoke_context_from_message_loop}; pub fn create_message_loop( - environment: Environment, + data_dir: PathBuf, + event_pubsub: EventPubSub, initial_config: Config, initial_state: State, - journal_scope: String, - event_pubsub: EventPubSub, ) -> Result<(MessageLoop, MessageSender)> { let (message_tx, mut message_rx) = message_channel(); - let Environment { data_dir } = environment; let mut context = Context::try_new(data_dir, initial_config, initial_state)?; let message_loop = async move { let mut exit_message_loop = false; @@ -27,7 +27,7 @@ pub fn create_message_loop( log::trace!("Received command {:?}", command); match command { Command::ReplaceConfig(reply_tx, new_config) => { - invoke_context_from_plugin::command_replace_config( + invoke_context_from_message_loop::command_replace_config( &mut context, &event_pubsub, reply_tx, @@ -35,7 +35,7 @@ pub fn create_message_loop( ); } Command::SwitchState(reply_tx, new_state) => { - invoke_context_from_plugin::command_switch_state( + invoke_context_from_message_loop::command_switch_state( &mut context, &event_pubsub, reply_tx, @@ -43,7 +43,7 @@ pub fn create_message_loop( ); } Command::RecordEntry(reply_tx, new_entry) => { - invoke_context_from_plugin::command_record_entry( + invoke_context_from_message_loop::command_record_entry( &mut context, &event_pubsub, reply_tx, @@ -51,10 +51,9 @@ pub fn create_message_loop( ); } Command::Shutdown(reply_tx) => { - invoke_context_from_plugin::command_shutdown( + invoke_context_from_message_loop::command_shutdown( &mut context, reply_tx, - &journal_scope, ); exit_message_loop = true; } @@ -64,24 +63,24 @@ pub fn create_message_loop( log::debug!("Received query {:?}", query); match query { Query::Config(reply_tx) => { - invoke_context_from_plugin::query_config(&context, reply_tx); + invoke_context_from_message_loop::query_config(&context, reply_tx); } Query::Status(reply_tx, request) => { - invoke_context_from_plugin::query_status( + invoke_context_from_message_loop::query_status( &mut context, reply_tx, request, ); } Query::RecentRecords(reply_tx, request) => { - invoke_context_from_plugin::query_recent_records( + invoke_context_from_message_loop::query_recent_records( &mut context, reply_tx, request, ); } Query::FilterRecords(reply_tx, request) => { - invoke_context_from_plugin::query_filter_records( + invoke_context_from_message_loop::query_filter_records( &mut context, reply_tx, request, diff --git a/plugins/csv-event-journal-msr-plugin/src/internal/mod.rs b/plugins/csv-event-journal-msr-plugin/src/internal/mod.rs index c440239..6cb324a 100644 --- a/plugins/csv-event-journal-msr-plugin/src/internal/mod.rs +++ b/plugins/csv-event-journal-msr-plugin/src/internal/mod.rs @@ -1,9 +1,4 @@ -use tokio::sync::mpsc; - -use crate::api::Message; - pub mod context; -pub mod invoke_context_from_plugin; pub mod message_loop; -pub type MessageSender = mpsc::UnboundedSender; +mod invoke_context_from_message_loop; diff --git a/plugins/csv-event-journal-msr-plugin/src/lib.rs b/plugins/csv-event-journal-msr-plugin/src/lib.rs index ceb1c43..2043684 100644 --- a/plugins/csv-event-journal-msr-plugin/src/lib.rs +++ b/plugins/csv-event-journal-msr-plugin/src/lib.rs @@ -10,35 +10,23 @@ use std::{ use thiserror::Error; use msr_core::{ - csv_event_journal::{Code, Severity}, + csv_event_journal::Severity, storage::{MemorySize, StorageConfig, StorageSegmentConfig, TimeInterval}, }; +use msr_plugin::EventPublisherIndex; + pub mod api; -use self::api::{Config, State}; mod internal; use self::internal::message_loop::create_message_loop; -pub const DEFAULT_JOURNAL_SCOPE: &str = "plugin.msr.csv-event-journal"; - -pub const DEFAULT_EVENT_PUBLISHER_ID: &str = DEFAULT_JOURNAL_SCOPE; - -pub const DEFAULT_SEVERITY_THRESHOLD: Severity = Severity::Information; - -#[derive(Debug)] -pub struct JournalCodes; - -impl JournalCodes { - const STOPPING: Code = Code(1); -} +#[derive(Debug, Clone)] +pub struct Environment { + pub event_publisher_index: EventPublisherIndex, -#[derive(Debug, Clone, PartialEq)] -pub struct PluginSetup { - pub initial_config: Config, - pub initial_state: State, - pub journal_scope: String, - pub event_publisher_id: EventPublisherId, + /// Directory for storing CSV data + pub data_dir: PathBuf, } pub fn default_storage_config() -> StorageConfig { @@ -51,23 +39,24 @@ pub fn default_storage_config() -> StorageConfig { } } -pub fn default_config() -> Config { - Config { - severity_threshold: DEFAULT_SEVERITY_THRESHOLD, +pub fn default_config() -> api::Config { + api::Config { + severity_threshold: Severity::Information, storage: default_storage_config(), } } +#[derive(Debug, Clone, PartialEq)] +pub struct PluginSetup { + pub initial_config: api::Config, + pub initial_state: api::State, +} + impl Default for PluginSetup { fn default() -> Self { Self { - initial_config: Config { - severity_threshold: DEFAULT_SEVERITY_THRESHOLD, - storage: default_storage_config(), - }, - initial_state: State::Inactive, - journal_scope: DEFAULT_JOURNAL_SCOPE.to_owned(), - event_publisher_id: DEFAULT_EVENT_PUBLISHER_ID.to_owned(), + initial_config: default_config(), + initial_state: api::State::Inactive, } } } @@ -92,24 +81,22 @@ pub enum Error { } pub type Result = std::result::Result; + pub type PluginError = msr_plugin::PluginError; pub type PluginResult = msr_plugin::PluginResult; +pub type MessageSender = msr_plugin::MessageSender; +pub type MessageReceiver = msr_plugin::MessageReceiver; + pub type ResultSender = msr_plugin::ResultSender; pub type ResultReceiver = msr_plugin::ResultReceiver; -pub type EventPublisherId = String; -pub type PublishedEvent = msr_plugin::PublishedEvent; -pub type EventReceiver = msr_plugin::EventReceiver; -type EventPubSub = msr_plugin::EventPubSub; - -pub type Plugin = msr_plugin::PluginContainer; -pub type PluginPorts = msr_plugin::PluginPorts; +pub type PublishedEvent = msr_plugin::PublishedEvent; +pub type EventReceiver = msr_plugin::EventReceiver; +type EventPubSub = msr_plugin::EventPubSub; -#[derive(Debug, Clone)] -pub struct Environment { - pub data_dir: PathBuf, -} +pub type Plugin = msr_plugin::PluginContainer; +pub type PluginPorts = msr_plugin::PluginPorts; pub fn create_plugin( environment: Environment, @@ -119,17 +106,14 @@ pub fn create_plugin( let PluginSetup { initial_config, initial_state, - journal_scope, - event_publisher_id, } = plugin_setup; let (event_pubsub, event_subscriber) = - EventPubSub::new(event_publisher_id, event_channel_capacity); + EventPubSub::new(environment.event_publisher_index, event_channel_capacity); let (message_loop, message_tx) = create_message_loop( - environment, + environment.data_dir, + event_pubsub, initial_config, initial_state, - journal_scope, - event_pubsub, )?; Ok(Plugin { ports: PluginPorts { diff --git a/plugins/csv-register-recorder-msr-plugin/src/api.rs b/plugins/csv-register-recorder-msr-plugin/src/api.rs deleted file mode 100644 index 15e69b4..0000000 --- a/plugins/csv-register-recorder-msr-plugin/src/api.rs +++ /dev/null @@ -1,107 +0,0 @@ -use msr_plugin::{reply_channel, send_message_receive_result}; - -use super::*; - -pub use super::Message; - -pub async fn command_replace_config( - message_tx: &MessageSender, - new_config: Config, -) -> PluginResult { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Command(Command::ReplaceConfig(reply_tx, new_config)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn command_replace_register_group_config( - message_tx: &MessageSender, - register_group_id: RegisterGroupId, - new_config: RegisterGroupConfig, -) -> PluginResult> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Command(Command::ReplaceRegisterGroupConfig( - reply_tx, - register_group_id, - new_config, - )); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn command_switch_state( - message_tx: &MessageSender, - new_state: State, -) -> PluginResult<()> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Command(Command::SwitchState(reply_tx, new_state)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn command_record_observed_register_group_values( - message_tx: &MessageSender, - register_group_id: RegisterGroupId, - observed_register_values: ObservedRegisterValues, -) -> PluginResult<()> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Command(Command::RecordObservedRegisterGroupValues( - reply_tx, - register_group_id, - observed_register_values, - )); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn command_shutdown(message_tx: &MessageSender) -> PluginResult<()> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Command(Command::Shutdown(reply_tx)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn command_smoke_test(message_tx: &MessageSender) -> PluginResult<()> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Command(Command::SmokeTest(reply_tx)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn query_config(message_tx: &MessageSender) -> PluginResult { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Query(Query::Config(reply_tx)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn query_register_group_config( - message_tx: &MessageSender, - register_group_id: RegisterGroupId, -) -> PluginResult> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Query(Query::RegisterGroupConfig(reply_tx, register_group_id)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn query_status( - message_tx: &MessageSender, - request: QueryStatusRequest, -) -> PluginResult { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Query(Query::Status(reply_tx, request)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn query_recent_records( - message_tx: &MessageSender, - register_group_id: RegisterGroupId, - req: RecentRecordsRequest, -) -> PluginResult> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Query(Query::RecentRecords(reply_tx, register_group_id, req)); - send_message_receive_result(message, message_tx, reply_rx).await -} - -pub async fn query_filter_records( - message_tx: &MessageSender, - register_group_id: RegisterGroupId, - req: FilterRecordsRequest, -) -> PluginResult> { - let (reply_tx, reply_rx) = reply_channel(); - let message = Message::Query(Query::FilterRecords(reply_tx, register_group_id, req)); - send_message_receive_result(message, message_tx, reply_rx).await -} diff --git a/plugins/csv-register-recorder-msr-plugin/src/api/command.rs b/plugins/csv-register-recorder-msr-plugin/src/api/command.rs new file mode 100644 index 0000000..9ad34df --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/api/command.rs @@ -0,0 +1,20 @@ +use crate::ResultSender; + +use super::{ObservedRegisterValues, RegisterGroupId}; + +#[derive(Debug)] +pub enum Command { + ReplaceConfig(ResultSender, Config), + ReplaceRegisterGroupConfig( + ResultSender>, + RegisterGroupId, + RegisterGroupConfig, + ), + SwitchState(ResultSender<()>, State), + RecordObservedRegisterGroupValues(ResultSender<()>, RegisterGroupId, ObservedRegisterValues), + Shutdown(ResultSender<()>), + // TODO: Replace pseudo smoke test command with integration test + SmokeTest(ResultSender<()>), +} + +use super::{Config, RegisterGroupConfig, State}; diff --git a/plugins/csv-register-recorder-msr-plugin/src/api/controller.rs b/plugins/csv-register-recorder-msr-plugin/src/api/controller.rs new file mode 100644 index 0000000..8644926 --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/api/controller.rs @@ -0,0 +1,112 @@ +use msr_plugin::{reply_channel, send_message_receive_result}; + +use crate::{MessageSender, PluginResult}; + +use super::{ + query, Command, Config, ObservedRegisterValues, Query, RegisterGroupConfig, RegisterGroupId, + State, Status, StoredRegisterRecord, +}; + +/// Remote controller for the plugin +/// +/// Wraps the message-based communication with the plugin +/// into asynchronous functions. +#[derive(Debug, Clone)] +pub struct Controller { + message_tx: MessageSender, +} + +impl Controller { + pub const fn new(message_tx: MessageSender) -> Self { + Self { message_tx } + } + + pub async fn command_replace_config(&self, new_config: Config) -> PluginResult { + let (reply_tx, reply_rx) = reply_channel(); + let command = Command::ReplaceConfig(reply_tx, new_config); + send_message_receive_result(command, &self.message_tx, reply_rx).await + } + + pub async fn command_replace_register_group_config( + &self, + register_group_id: RegisterGroupId, + new_config: RegisterGroupConfig, + ) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let command = Command::ReplaceRegisterGroupConfig(reply_tx, register_group_id, new_config); + send_message_receive_result(command, &self.message_tx, reply_rx).await + } + + pub async fn command_switch_state(&self, new_state: State) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let command = Command::SwitchState(reply_tx, new_state); + send_message_receive_result(command, &self.message_tx, reply_rx).await + } + + pub async fn command_record_observed_register_group_values( + &self, + register_group_id: RegisterGroupId, + observed_register_values: ObservedRegisterValues, + ) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let command = Command::RecordObservedRegisterGroupValues( + reply_tx, + register_group_id, + observed_register_values, + ); + send_message_receive_result(command, &self.message_tx, reply_rx).await + } + + pub async fn command_shutdown(&self) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let command = Command::Shutdown(reply_tx); + send_message_receive_result(command, &self.message_tx, reply_rx).await + } + + pub async fn command_smoke_test(&self) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let command = Command::SmokeTest(reply_tx); + send_message_receive_result(command, &self.message_tx, reply_rx).await + } + + pub async fn query_config(&self) -> PluginResult { + let (reply_tx, reply_rx) = reply_channel(); + let query = Query::Config(reply_tx); + send_message_receive_result(query, &self.message_tx, reply_rx).await + } + + pub async fn query_register_group_config( + &self, + register_group_id: RegisterGroupId, + ) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let query = Query::RegisterGroupConfig(reply_tx, register_group_id); + send_message_receive_result(query, &self.message_tx, reply_rx).await + } + + pub async fn query_status(&self, request: query::StatusRequest) -> PluginResult { + let (reply_tx, reply_rx) = reply_channel(); + let query = Query::Status(reply_tx, request); + send_message_receive_result(query, &self.message_tx, reply_rx).await + } + + pub async fn query_recent_records( + &self, + register_group_id: RegisterGroupId, + req: query::RecentRecordsRequest, + ) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let query = Query::RecentRecords(reply_tx, register_group_id, req); + send_message_receive_result(query, &self.message_tx, reply_rx).await + } + + pub async fn query_filter_records( + &self, + register_group_id: RegisterGroupId, + req: query::FilterRecordsRequest, + ) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let query = Query::FilterRecords(reply_tx, register_group_id, req); + send_message_receive_result(query, &self.message_tx, reply_rx).await + } +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/api/event.rs b/plugins/csv-register-recorder-msr-plugin/src/api/event.rs new file mode 100644 index 0000000..7f149dd --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/api/event.rs @@ -0,0 +1,37 @@ +use std::path::PathBuf; + +use super::{Config, RegisterGroupId, State}; + +#[derive(Debug, Clone)] +pub enum Event { + Lifecycle(LifecycleEvent), + Notification(NotificationEvent), + Incident(IncidentEvent), +} + +/// Common lifecycle events +#[derive(Debug, Clone)] +pub enum LifecycleEvent { + Started, + Stopped, + ConfigChanged(Config), + StateChanged(State), +} + +/// Regular notifications +#[derive(Debug, Clone)] +pub enum NotificationEvent { + DataDirectoryCreated { + register_group_id: RegisterGroupId, + fs_path: PathBuf, + }, +} + +/// Unexpected incidents that might require intervention +#[derive(Debug, Clone)] +pub enum IncidentEvent { + IoWriteError { + os_code: Option, + message: String, + }, +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/api/mod.rs b/plugins/csv-register-recorder-msr-plugin/src/api/mod.rs new file mode 100644 index 0000000..e8b1344 --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/api/mod.rs @@ -0,0 +1,38 @@ +// Re-export internal types that are used in the public API +pub use crate::internal::{ + context::{Config, RegisterGroupConfig, RegisterGroupStatus, State, Status}, + register::{ + GroupId as RegisterGroupId, GroupIdValue as RegisterGroupIdValue, ObservedRegisterValues, Record as RegisterRecord, + StoredRecord as StoredRegisterRecord, Type as RegisterType, Value as RegisterValue, + }, +}; + +pub mod controller; +pub use self::controller::Controller; + +pub mod command; +pub use self::command::Command; + +pub mod query; +pub use self::query::Query; + +pub mod event; +pub use self::event::Event; + +#[derive(Debug)] +pub enum Message { + Command(Command), + Query(Query), +} + +impl From for Message { + fn from(command: Command) -> Self { + Self::Command(command) + } +} + +impl From for Message { + fn from(query: Query) -> Self { + Self::Query(query) + } +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/api/query.rs b/plugins/csv-register-recorder-msr-plugin/src/api/query.rs new file mode 100644 index 0000000..1d9a1ca --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/api/query.rs @@ -0,0 +1,41 @@ +use std::num::NonZeroUsize; + +use msr_core::storage::RecordPreludeFilter; + +use crate::ResultSender; + +use super::{Config, RegisterGroupConfig, RegisterGroupId, Status, StoredRegisterRecord}; + +#[derive(Debug, Clone)] +pub struct RecentRecordsRequest { + pub limit: NonZeroUsize, +} + +#[derive(Debug, Clone)] +pub struct FilterRecordsRequest { + pub limit: NonZeroUsize, + pub filter: RecordPreludeFilter, +} + +#[derive(Debug, Clone, Default)] +pub struct StatusRequest { + pub with_register_groups: bool, + pub with_storage_statistics: bool, +} + +#[derive(Debug)] +pub enum Query { + Config(ResultSender), + RegisterGroupConfig(ResultSender>, RegisterGroupId), + Status(ResultSender, StatusRequest), + RecentRecords( + ResultSender>, + RegisterGroupId, + RecentRecordsRequest, + ), + FilterRecords( + ResultSender>, + RegisterGroupId, + FilterRecordsRequest, + ), +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/context.rs b/plugins/csv-register-recorder-msr-plugin/src/internal/context.rs similarity index 91% rename from plugins/csv-register-recorder-msr-plugin/src/context.rs rename to plugins/csv-register-recorder-msr-plugin/src/internal/context.rs index 3ce4f1b..af6c23c 100644 --- a/plugins/csv-register-recorder-msr-plugin/src/context.rs +++ b/plugins/csv-register-recorder-msr-plugin/src/internal/context.rs @@ -1,62 +1,34 @@ use std::{ collections::{hash_map::Entry, HashMap}, fmt, fs, - io::Error as IoError, num::NonZeroUsize, path::{Path, PathBuf}, time::SystemTime, }; -use thiserror::Error; - use msr_core::{ - register::recording::{ - CsvFileRecordStorage, Error as RecordError, RecordPrelude, RecordStorage as _, - StoredRecordPrelude, + register::{ + recording::{ + CsvFileRecordStorage, RecordPrelude, RecordStorage as _, + StoredRecordPrelude as StoredRegisterRecordPrelude, + }, + Index as RegisterIndex, }, - storage::RecordStorageBase, - time::SystemTimeInstant, -}; - -pub use msr_core::{ - csv_event_journal::Severity, - register::Index as RegisterIndex, storage::{ - CreatedAtOffset, Error as StorageError, MemorySize, RecordPreludeFilter, - Result as StorageResult, StorageConfig, StorageSegmentConfig, StorageStatus, TimeInterval, - WritableRecordPrelude, + RecordPreludeFilter, RecordStorageBase, Result as StorageResult, StorageConfig, + StorageStatus, }, + time::SystemTimeInstant, ScalarType, ScalarValue, }; -pub use crate::register::{ - GroupId as RegisterGroupId, ObservedRegisterValues, Record, StoredRecord, Type as RegisterType, - Value as RegisterValue, +use crate::{ + api::{ + ObservedRegisterValues, RegisterGroupId, RegisterRecord, RegisterType, StoredRegisterRecord, + }, + Error, Result, }; -#[derive(Error, Debug)] -pub enum Error { - #[error("register group not configured")] - RegisterGroupUnknown, - - #[error("invalid data format")] - DataFormatInvalid, - - #[error(transparent)] - Io(#[from] IoError), - - #[error(transparent)] - Record(#[from] RecordError), - - #[error(transparent)] - Storage(#[from] StorageError), - - #[error(transparent)] - Other(#[from] anyhow::Error), -} - -pub type Result = std::result::Result; - #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub struct PartitionId(String); @@ -186,15 +158,15 @@ impl RecordPreludeGenerator for DefaultRecordPreludeGenerator { } pub trait RecordRepo { - fn append_record(&mut self, record: Record) -> Result<()>; + fn append_record(&mut self, record: RegisterRecord) -> Result<()>; - fn recent_records(&self, limit: NonZeroUsize) -> Result>; + fn recent_records(&self, limit: NonZeroUsize) -> Result>; fn filter_records( &self, limit: NonZeroUsize, filter: RecordPreludeFilter, - ) -> Result>; + ) -> Result>; fn total_record_count(&self) -> usize; } @@ -261,7 +233,7 @@ impl Context { for (id, context) in &mut self.register_groups { let status = context .status(with_storage_statistics) - .map_err(Error::Storage)?; + .map_err(Error::MsrStorage)?; register_groups.insert(id.clone(), status); } Some(register_groups) @@ -278,7 +250,7 @@ impl Context { &mut self, register_group_id: &RegisterGroupId, limit: NonZeroUsize, - ) -> Result> { + ) -> Result> { let context = self .register_groups .get_mut(register_group_id) @@ -291,7 +263,7 @@ impl Context { register_group_id: &RegisterGroupId, limit: NonZeroUsize, filter: &RecordPreludeFilter, - ) -> Result> { + ) -> Result> { let context = self .register_groups .get_mut(register_group_id) @@ -389,7 +361,7 @@ impl Context { &mut self, register_group_id: &RegisterGroupId, observed_register_values: ObservedRegisterValues, - ) -> Result> { + ) -> Result> { match self.state { State::Inactive => { log::debug!( @@ -443,7 +415,7 @@ impl Context { .ok_or(Error::RegisterGroupUnknown)?; DefaultRecordPreludeGenerator.generate_prelude().and_then( |(created_at, prelude)| { - let new_record = Record { + let new_record = RegisterRecord { prelude, observation: observed_register_values, }; @@ -496,7 +468,7 @@ impl Context { Some(ScalarValue::I64(-1).into()), Some(ScalarValue::U64(1).into()), Some(ScalarValue::F64(-1.125).into()), - Some(RegisterValue::String("Hello".to_string())), + Some("Hello".to_owned().into()), ], }, ObservedRegisterValues { @@ -506,7 +478,7 @@ impl Context { Some(ScalarValue::I64(1).into()), None, Some(ScalarValue::F64(1.125).into()), - Some(RegisterValue::String(", world!".to_string())), + Some(", world!".to_owned().into()), ], }, ObservedRegisterValues { diff --git a/plugins/csv-register-recorder-msr-plugin/src/plugin/handlers.rs b/plugins/csv-register-recorder-msr-plugin/src/internal/invoke_context_from_message_loop.rs similarity index 87% rename from plugins/csv-register-recorder-msr-plugin/src/plugin/handlers.rs rename to plugins/csv-register-recorder-msr-plugin/src/internal/invoke_context_from_message_loop.rs index c91d282..bb9609c 100644 --- a/plugins/csv-register-recorder-msr-plugin/src/plugin/handlers.rs +++ b/plugins/csv-register-recorder-msr-plugin/src/internal/invoke_context_from_message_loop.rs @@ -1,6 +1,16 @@ +use tokio::task; + use msr_plugin::send_reply; -use super::*; +use crate::{ + api::{ + event::LifecycleEvent, query, Config, Event, ObservedRegisterValues, RegisterGroupConfig, + RegisterGroupId, State, Status, StoredRegisterRecord, + }, + EventPubSub, ResultSender, +}; + +use super::context::Context; pub fn command_replace_config( context: &mut Context, @@ -108,10 +118,10 @@ pub fn query_register_group_config( pub fn query_status( context: &mut Context, reply_tx: ResultSender, - request: QueryStatusRequest, + request: query::StatusRequest, ) { let response = task::block_in_place(|| { - let QueryStatusRequest { + let query::StatusRequest { with_register_groups, with_storage_statistics, } = request; @@ -127,12 +137,12 @@ pub fn query_status( pub fn query_recent_records( context: &mut Context, - reply_tx: ResultSender>, + reply_tx: ResultSender>, register_group_id: &RegisterGroupId, - request: RecentRecordsRequest, + request: query::RecentRecordsRequest, ) { let response = task::block_in_place(|| { - let RecentRecordsRequest { limit } = request; + let query::RecentRecordsRequest { limit } = request; context .recent_records(register_group_id, limit) .map_err(|err| { @@ -145,12 +155,12 @@ pub fn query_recent_records( pub fn query_filter_records( context: &mut Context, - reply_tx: ResultSender>, + reply_tx: ResultSender>, register_group_id: &RegisterGroupId, - request: FilterRecordsRequest, + request: query::FilterRecordsRequest, ) { let response = task::block_in_place(|| { - let FilterRecordsRequest { limit, filter } = request; + let query::FilterRecordsRequest { limit, filter } = request; context .filter_records(register_group_id, limit, &filter) .map_err(|err| { diff --git a/plugins/csv-register-recorder-msr-plugin/src/plugin/mod.rs b/plugins/csv-register-recorder-msr-plugin/src/internal/message_loop.rs similarity index 69% rename from plugins/csv-register-recorder-msr-plugin/src/plugin/mod.rs rename to plugins/csv-register-recorder-msr-plugin/src/internal/message_loop.rs index a878b00..f0187fe 100644 --- a/plugins/csv-register-recorder-msr-plugin/src/plugin/mod.rs +++ b/plugins/csv-register-recorder-msr-plugin/src/internal/message_loop.rs @@ -2,38 +2,20 @@ use std::path::{Path, PathBuf}; use tokio::task; -use msr_plugin::{message_channel, send_reply}; +use msr_plugin::{message_channel, send_reply, MessageLoop}; -use crate::register::GroupId as RegisterGroupId; +use crate::{ + api::{ + event::{LifecycleEvent, NotificationEvent}, + Command, Config, Event, Message, Query, RegisterGroupId, State, + }, + EventPubSub, MessageSender, Result, +}; -use super::{context::Context, *}; - -mod handlers; - -pub const DEFAULT_JOURNAL_SCOPE: &str = "slowrt.plugin.msr.recorder"; - -pub const DEFAULT_EVENT_PUBLISHER_ID: &str = DEFAULT_JOURNAL_SCOPE; - -#[derive(Debug, Clone, PartialEq)] -pub struct PluginConfig { - pub initial_config: Config, - pub event_publisher_id: EventPublisherId, -} - -impl Default for PluginConfig { - fn default() -> Self { - Self { - initial_config: Config { - default_storage: default_storage_config(), - register_groups: Default::default(), - }, - event_publisher_id: DEFAULT_EVENT_PUBLISHER_ID.to_owned(), - } - } -} - -pub type Plugin = msr_plugin::PluginContainer; -pub type PluginPorts = msr_plugin::PluginPorts; +use super::{ + context::{self, Context}, + invoke_context_from_message_loop, +}; struct ContextEventCallback { event_pubsub: EventPubSub, @@ -49,24 +31,18 @@ impl context::ContextEventCallback for ContextEventCallback { } } -pub fn create_plugin( - data_path: PathBuf, - initial_config: PluginConfig, +pub fn create_message_loop( + data_dir: PathBuf, + event_pubsub: EventPubSub, + initial_config: Config, initial_state: State, - event_channel_capacity: usize, -) -> NewResult { - let PluginConfig { - event_publisher_id, - initial_config, - } = initial_config; +) -> Result<(MessageLoop, MessageSender)> { let (message_tx, mut message_rx) = message_channel(); - let (event_pubsub, event_subscriber) = - EventPubSub::new(event_publisher_id, event_channel_capacity); let context_events = ContextEventCallback { event_pubsub: event_pubsub.clone(), }; let mut context = Context::try_new( - data_path, + data_dir, initial_config, initial_state, Box::new(context_events) as _, @@ -81,7 +57,7 @@ pub fn create_plugin( log::trace!("Received command {:?}", command); match command { Command::ReplaceConfig(reply_tx, new_config) => { - handlers::command_replace_config( + invoke_context_from_message_loop::command_replace_config( &mut context, &event_pubsub, reply_tx, @@ -93,7 +69,7 @@ pub fn create_plugin( register_group_id, new_config, ) => { - handlers::command_replace_register_group_config( + invoke_context_from_message_loop::command_replace_register_group_config( &mut context, &event_pubsub, reply_tx, @@ -102,7 +78,7 @@ pub fn create_plugin( ); } Command::SwitchState(reply_tx, new_state) => { - handlers::command_switch_state( + invoke_context_from_message_loop::command_switch_state( &mut context, &event_pubsub, reply_tx, @@ -114,7 +90,7 @@ pub fn create_plugin( register_group_id, observed_register_values, ) => { - handlers::command_record_observed_register_group_values( + invoke_context_from_message_loop::command_record_observed_register_group_values( &mut context, reply_tx, register_group_id, @@ -122,7 +98,7 @@ pub fn create_plugin( ); } Command::Shutdown(reply_tx) => { - handlers::command_shutdown(reply_tx); + invoke_context_from_message_loop::command_shutdown(reply_tx); exit_message_loop = true; } Command::SmokeTest(reply_tx) => { @@ -136,20 +112,24 @@ pub fn create_plugin( log::debug!("Received query {:?}", query); match query { Query::Config(reply_tx) => { - handlers::query_config(&context, reply_tx); + invoke_context_from_message_loop::query_config(&context, reply_tx); } Query::RegisterGroupConfig(reply_tx, register_group_id) => { - handlers::query_register_group_config( + invoke_context_from_message_loop::query_register_group_config( &context, reply_tx, ®ister_group_id, ); } Query::Status(reply_tx, request) => { - handlers::query_status(&mut context, reply_tx, request); + invoke_context_from_message_loop::query_status( + &mut context, + reply_tx, + request, + ); } Query::RecentRecords(reply_tx, register_group_id, request) => { - handlers::query_recent_records( + invoke_context_from_message_loop::query_recent_records( &mut context, reply_tx, ®ister_group_id, @@ -157,7 +137,7 @@ pub fn create_plugin( ); } Query::FilterRecords(reply_tx, register_group_id, request) => { - handlers::query_filter_records( + invoke_context_from_message_loop::query_filter_records( &mut context, reply_tx, ®ister_group_id, @@ -175,11 +155,5 @@ pub fn create_plugin( log::info!("Message loop terminated"); event_pubsub.publish_event(Event::Lifecycle(LifecycleEvent::Stopped)); }; - Ok(Plugin { - ports: PluginPorts { - message_tx, - event_subscriber, - }, - message_loop: Box::pin(message_loop), - }) + Ok((Box::pin(message_loop), message_tx)) } diff --git a/plugins/csv-register-recorder-msr-plugin/src/internal/mod.rs b/plugins/csv-register-recorder-msr-plugin/src/internal/mod.rs new file mode 100644 index 0000000..bb9c658 --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/internal/mod.rs @@ -0,0 +1,5 @@ +pub mod context; +pub mod message_loop; +pub mod register; + +mod invoke_context_from_message_loop; diff --git a/plugins/csv-register-recorder-msr-plugin/src/register.rs b/plugins/csv-register-recorder-msr-plugin/src/internal/register.rs similarity index 75% rename from plugins/csv-register-recorder-msr-plugin/src/register.rs rename to plugins/csv-register-recorder-msr-plugin/src/internal/register.rs index dfc4253..b94cac5 100644 --- a/plugins/csv-register-recorder-msr-plugin/src/register.rs +++ b/plugins/csv-register-recorder-msr-plugin/src/internal/register.rs @@ -1,16 +1,8 @@ use std::fmt; -use msr_core::Measurement; - // Re-exports pub use msr_core::{register::Index, Value, ValueType as Type}; -// Generic re-exports of msr-core, specialized with concrete value type -pub type ValueMeasurement = Measurement; -pub type IndexedValueMeasurement = msr_core::register::IndexedMeasurement; -pub type ObservedValue = msr_core::register::ObservedValue; -pub type ObservedValues = msr_core::register::ObservedValues; - pub type ObservedRegisterValues = msr_core::register::recording::ObservedRegisterValues; pub type Record = msr_core::register::recording::Record; pub type StoredRecord = msr_core::register::recording::StoredRecord; diff --git a/plugins/csv-register-recorder-msr-plugin/src/lib.rs b/plugins/csv-register-recorder-msr-plugin/src/lib.rs index 440304c..74f6d75 100644 --- a/plugins/csv-register-recorder-msr-plugin/src/lib.rs +++ b/plugins/csv-register-recorder-msr-plugin/src/lib.rs @@ -2,43 +2,35 @@ #![warn(rust_2018_idioms)] use std::{ - num::{NonZeroU32, NonZeroU64, NonZeroUsize}, - path::{Path, PathBuf}, - result::Result as StdResult, + io::Error as IoError, + num::{NonZeroU32, NonZeroU64}, + path::PathBuf, }; -use tokio::sync::mpsc; +use thiserror::Error; -use msr_core::storage::{ - MemorySize, RecordPreludeFilter, StorageConfig, StorageSegmentConfig, TimeInterval, -}; - -pub mod api; - -mod plugin; -pub use self::plugin::{create_plugin, Plugin, PluginConfig, PluginPorts}; - -mod context; -pub use self::context::{ - Config, Error, RegisterGroupConfig, RegisterGroupStatus, Result as NewResult, State, Status, +use msr_core::{ + register::recording::Error as MsrRecordError, + storage::{ + Error as MsrStorageError, MemorySize, StorageConfig, StorageSegmentConfig, TimeInterval, + }, }; -pub mod register; -use self::register::{GroupId as RegisterGroupId, ObservedRegisterValues, StoredRecord}; +use msr_plugin::EventPublisherIndex; -pub type Result = StdResult; -pub type PluginError = msr_plugin::PluginError; -pub type PluginResult = msr_plugin::PluginResult; +pub mod api; +use self::api::Config; -pub type MessageSender = mpsc::UnboundedSender; +mod internal; +use self::internal::message_loop::create_message_loop; -type ResultSender = msr_plugin::ResultSender; -pub type ResultReceiver = msr_plugin::ResultReceiver; +#[derive(Debug, Clone)] +pub struct Environment { + pub event_publisher_index: EventPublisherIndex, -pub type EventPublisherId = String; -pub type PublishedEvent = msr_plugin::PublishedEvent; -pub type EventReceiver = msr_plugin::EventReceiver; -type EventPubSub = msr_plugin::EventPubSub; + /// Directory for storing CSV data + pub data_dir: PathBuf, +} pub fn default_storage_config() -> StorageConfig { StorageConfig { @@ -50,108 +42,89 @@ pub fn default_storage_config() -> StorageConfig { } } -#[derive(Debug)] -pub enum Message { - Command(Command), - Query(Query), +pub fn default_config() -> Config { + Config { + default_storage: default_storage_config(), + register_groups: Default::default(), + } } -/// Commands are sent over a separate channel apart from -/// queries. This allows to handle them differently when -/// needed, e.g. process them with higher priority. -#[derive(Debug)] -pub enum Command { - ReplaceConfig(ResultSender, Config), - ReplaceRegisterGroupConfig( - ResultSender>, - RegisterGroupId, - RegisterGroupConfig, - ), - SwitchState(ResultSender<()>, State), - RecordObservedRegisterGroupValues(ResultSender<()>, RegisterGroupId, ObservedRegisterValues), - Shutdown(ResultSender<()>), - // TODO: Replace pseudo smoke test command with integration test - SmokeTest(ResultSender<()>), +#[derive(Debug, Clone, PartialEq)] +pub struct PluginSetup { + pub initial_config: api::Config, + pub initial_state: api::State, } -#[derive(Debug, Clone)] -pub struct RecentRecordsRequest { - pub limit: NonZeroUsize, +impl Default for PluginSetup { + fn default() -> Self { + Self { + initial_config: default_config(), + initial_state: api::State::Inactive, + } + } } -#[derive(Debug, Clone)] -pub struct FilterRecordsRequest { - pub limit: NonZeroUsize, - pub filter: RecordPreludeFilter, -} +#[derive(Error, Debug)] +pub enum Error { + #[error("register group not configured")] + RegisterGroupUnknown, -#[derive(Debug, Clone, Default)] -pub struct QueryStatusRequest { - pub with_register_groups: bool, - pub with_storage_statistics: bool, -} + #[error("invalid data format")] + DataFormatInvalid, -#[derive(Debug)] -pub enum Query { - Config(ResultSender), - RegisterGroupConfig(ResultSender>, RegisterGroupId), - Status(ResultSender, QueryStatusRequest), - RecentRecords( - ResultSender>, - RegisterGroupId, - RecentRecordsRequest, - ), - FilterRecords( - ResultSender>, - RegisterGroupId, - FilterRecordsRequest, - ), -} + #[error(transparent)] + Io(#[from] IoError), -#[derive(Debug, Clone)] -pub enum Event { - Lifecycle(LifecycleEvent), - Notification(NotificationEvent), - Incident(IncidentEvent), -} + #[error(transparent)] + MsrRecord(#[from] MsrRecordError), -/// Common lifecycle events -#[derive(Debug, Clone)] -pub enum LifecycleEvent { - Started, - Stopped, - ConfigChanged(Config), - StateChanged(State), -} + #[error(transparent)] + MsrStorage(#[from] MsrStorageError), -/// Regular notifications -#[derive(Debug, Clone)] -pub enum NotificationEvent { - DataDirectoryCreated { - register_group_id: RegisterGroupId, - fs_path: PathBuf, - }, + #[error(transparent)] + Other(#[from] anyhow::Error), } -/// Unexpected incidents that might require intervention -#[derive(Debug, Clone)] -pub enum IncidentEvent { - IoWriteError { - os_code: Option, - message: String, - }, -} +pub type Result = std::result::Result; -struct ContextEventCallback { - event_pubsub: EventPubSub, -} +pub type PluginError = msr_plugin::PluginError; +pub type PluginResult = msr_plugin::PluginResult; -impl context::ContextEventCallback for ContextEventCallback { - fn data_directory_created(&self, register_group_id: &RegisterGroupId, fs_path: &Path) { - let event = Event::Notification(NotificationEvent::DataDirectoryCreated { - register_group_id: register_group_id.to_owned(), - fs_path: fs_path.to_owned(), - }); - self.event_pubsub.publish_event(event) - } +pub type MessageSender = msr_plugin::MessageSender; +pub type MessageReceiver = msr_plugin::MessageReceiver; + +pub type ResultSender = msr_plugin::ResultSender; +pub type ResultReceiver = msr_plugin::ResultReceiver; + +pub type PublishedEvent = msr_plugin::PublishedEvent; +pub type EventReceiver = msr_plugin::EventReceiver; +type EventPubSub = msr_plugin::EventPubSub; + +pub type Plugin = msr_plugin::PluginContainer; +pub type PluginPorts = msr_plugin::PluginPorts; + +pub fn create_plugin( + environment: Environment, + plugin_setup: PluginSetup, + event_channel_capacity: usize, +) -> Result { + let PluginSetup { + initial_config, + initial_state, + } = plugin_setup; + let (event_pubsub, event_subscriber) = + EventPubSub::new(environment.event_publisher_index, event_channel_capacity); + let (message_loop, message_tx) = create_message_loop( + environment.data_dir, + event_pubsub, + initial_config, + initial_state, + )?; + Ok(Plugin { + ports: PluginPorts { + message_tx, + event_subscriber, + }, + message_loop, + }) }