diff --git a/Cargo.toml b/Cargo.toml index 59208dc..ef2f02e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/msr-legacy", "crates/msr-plugin", "plugins/csv-event-journal-msr-plugin", + "plugins/csv-register-recorder-msr-plugin", ] [patch.crates-io] @@ -13,3 +14,4 @@ msr-core = { path = "crates/msr-core" } msr-legacy = { path = "crates/msr-legacy" } msr-plugin = { path = "crates/msr-plugin" } csv-event-journal-msr-plugin = { path = "plugins/csv-event-journal-msr-plugin" } +csv-register-recorder-msr-plugin = { path = "plugins/csv-register-recorder-msr-plugin" } diff --git a/plugins/csv-register-recorder-msr-plugin/Cargo.toml b/plugins/csv-register-recorder-msr-plugin/Cargo.toml new file mode 100644 index 0000000..6b3c3be --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "csv-register-recorder-msr-plugin" +version = "0.0.0" +description = "slowtec Industrial Automation Runtime - CSV Register Recorder Plugins" +authors = ["slowtec GmbH "] +edition = "2018" +publish = false + +[dependencies] +anyhow = "*" +bs58 = "*" +log = "*" +thiserror = "*" +tokio = { version = "*", default_features = false, features = ["rt-multi-thread"] } + +# Workspace dependencies +msr-core = "*" +msr-plugin = "*" diff --git a/plugins/csv-register-recorder-msr-plugin/src/api.rs b/plugins/csv-register-recorder-msr-plugin/src/api.rs new file mode 100644 index 0000000..15e69b4 --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/api.rs @@ -0,0 +1,107 @@ +use msr_plugin::{reply_channel, send_message_receive_result}; + +use super::*; + +pub use super::Message; + +pub async fn command_replace_config( + message_tx: &MessageSender, + new_config: Config, +) -> PluginResult { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Command(Command::ReplaceConfig(reply_tx, new_config)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn command_replace_register_group_config( + message_tx: &MessageSender, + register_group_id: RegisterGroupId, + new_config: RegisterGroupConfig, +) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Command(Command::ReplaceRegisterGroupConfig( + reply_tx, + register_group_id, + new_config, + )); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn command_switch_state( + message_tx: &MessageSender, + new_state: State, +) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Command(Command::SwitchState(reply_tx, new_state)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn command_record_observed_register_group_values( + message_tx: &MessageSender, + register_group_id: RegisterGroupId, + observed_register_values: ObservedRegisterValues, +) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Command(Command::RecordObservedRegisterGroupValues( + reply_tx, + register_group_id, + observed_register_values, + )); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn command_shutdown(message_tx: &MessageSender) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Command(Command::Shutdown(reply_tx)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn command_smoke_test(message_tx: &MessageSender) -> PluginResult<()> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Command(Command::SmokeTest(reply_tx)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn query_config(message_tx: &MessageSender) -> PluginResult { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Query(Query::Config(reply_tx)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn query_register_group_config( + message_tx: &MessageSender, + register_group_id: RegisterGroupId, +) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Query(Query::RegisterGroupConfig(reply_tx, register_group_id)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn query_status( + message_tx: &MessageSender, + request: QueryStatusRequest, +) -> PluginResult { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Query(Query::Status(reply_tx, request)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn query_recent_records( + message_tx: &MessageSender, + register_group_id: RegisterGroupId, + req: RecentRecordsRequest, +) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Query(Query::RecentRecords(reply_tx, register_group_id, req)); + send_message_receive_result(message, message_tx, reply_rx).await +} + +pub async fn query_filter_records( + message_tx: &MessageSender, + register_group_id: RegisterGroupId, + req: FilterRecordsRequest, +) -> PluginResult> { + let (reply_tx, reply_rx) = reply_channel(); + let message = Message::Query(Query::FilterRecords(reply_tx, register_group_id, req)); + send_message_receive_result(message, message_tx, reply_rx).await +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/context.rs b/plugins/csv-register-recorder-msr-plugin/src/context.rs new file mode 100644 index 0000000..3ce4f1b --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/context.rs @@ -0,0 +1,536 @@ +use std::{ + collections::{hash_map::Entry, HashMap}, + fmt, fs, + io::Error as IoError, + num::NonZeroUsize, + path::{Path, PathBuf}, + time::SystemTime, +}; + +use thiserror::Error; + +use msr_core::{ + register::recording::{ + CsvFileRecordStorage, Error as RecordError, RecordPrelude, RecordStorage as _, + StoredRecordPrelude, + }, + storage::RecordStorageBase, + time::SystemTimeInstant, +}; + +pub use msr_core::{ + csv_event_journal::Severity, + register::Index as RegisterIndex, + storage::{ + CreatedAtOffset, Error as StorageError, MemorySize, RecordPreludeFilter, + Result as StorageResult, StorageConfig, StorageSegmentConfig, StorageStatus, TimeInterval, + WritableRecordPrelude, + }, + ScalarType, ScalarValue, +}; + +pub use crate::register::{ + GroupId as RegisterGroupId, ObservedRegisterValues, Record, StoredRecord, Type as RegisterType, + Value as RegisterValue, +}; + +#[derive(Error, Debug)] +pub enum Error { + #[error("register group not configured")] + RegisterGroupUnknown, + + #[error("invalid data format")] + DataFormatInvalid, + + #[error(transparent)] + Io(#[from] IoError), + + #[error(transparent)] + Record(#[from] RecordError), + + #[error(transparent)] + Storage(#[from] StorageError), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +pub type Result = std::result::Result; + +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub struct PartitionId(String); + +impl PartitionId { + pub fn encode(s: &str) -> Self { + Self(bs58::encode(s).into_string()) + } +} + +impl AsRef for PartitionId { + fn as_ref(&self) -> &str { + let Self(inner) = &self; + inner + } +} + +impl fmt::Display for PartitionId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_ref()) + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct RegisterGroupConfig { + pub registers: Vec<(RegisterIndex, RegisterType)>, + pub storage: StorageConfig, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum State { + Inactive, + Active, +} + +#[derive(Debug, Clone)] +pub struct Status { + pub state: State, + pub register_groups: Option>, +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct Config { + pub default_storage: StorageConfig, + pub register_groups: HashMap, +} + +pub struct Context { + data_path: PathBuf, // immutable + + config: Config, + + state: State, + + register_groups: HashMap, + + event_cb: Box, +} + +pub trait ContextEventCallback { + fn data_directory_created(&self, register_group_id: &RegisterGroupId, data_dir: &Path); +} + +struct RegisterGroupContext { + storage: CsvFileRecordStorage, +} + +#[derive(Debug, Clone)] +pub struct RegisterGroupStatus { + pub storage: StorageStatus, +} + +fn partition_id_as_path(partition_id: &PartitionId) -> &Path { + let path = Path::new(partition_id.as_ref()); + debug_assert!(!path.has_root()); + debug_assert!(path.is_relative()); + debug_assert!(path.components().count() == 1); + path +} + +impl RegisterGroupContext { + fn try_new( + register_group_id: &RegisterGroupId, + data_path: &Path, + config: RegisterGroupConfig, + event_cb: &dyn ContextEventCallback, + ) -> Result { + let mut data_path = PathBuf::from(data_path); + let partition_id = PartitionId::encode(register_group_id.as_ref()); + let id_path = partition_id_as_path(&partition_id); + data_path.push(id_path); + if !data_path.is_dir() { + log::info!("Creating non-existent directory {}", data_path.display()); + fs::create_dir_all(&data_path)?; + event_cb.data_directory_created(register_group_id, &data_path); + } + let storage = CsvFileRecordStorage::try_new(config.storage, data_path, config.registers) + .map_err(anyhow::Error::from)?; + let context = Self { storage }; + Ok(context) + } + + fn status(&mut self, with_storage_statistics: bool) -> StorageResult { + let storage_statistics = if with_storage_statistics { + Some(self.storage.report_statistics()?) + } else { + None + }; + let storage = StorageStatus { + descriptor: self.storage.descriptor().clone(), + statistics: storage_statistics, + }; + Ok(RegisterGroupStatus { storage }) + } +} + +pub trait RecordPreludeGenerator { + fn generate_prelude(&self) -> Result<(SystemTimeInstant, RecordPrelude)>; +} + +#[derive(Debug)] +struct DefaultRecordPreludeGenerator; + +impl RecordPreludeGenerator for DefaultRecordPreludeGenerator { + fn generate_prelude(&self) -> Result<(SystemTimeInstant, RecordPrelude)> { + Ok((SystemTimeInstant::now(), Default::default())) + } +} + +pub trait RecordRepo { + fn append_record(&mut self, record: Record) -> Result<()>; + + fn recent_records(&self, limit: NonZeroUsize) -> Result>; + + fn filter_records( + &self, + limit: NonZeroUsize, + filter: RecordPreludeFilter, + ) -> Result>; + + fn total_record_count(&self) -> usize; +} + +fn create_register_group_contexts( + data_path: &Path, + register_group_configs: HashMap, + event_cb: &dyn ContextEventCallback, +) -> Result> { + let mut register_group_contexts = HashMap::with_capacity(register_group_configs.len()); + for (register_group_id, register_group_config) in register_group_configs { + let register_group_context = RegisterGroupContext::try_new( + ®ister_group_id, + data_path, + register_group_config.clone(), + event_cb, + )?; + register_group_contexts.insert(register_group_id, register_group_context); + } + Ok(register_group_contexts) +} + +impl Context { + pub fn try_new( + data_path: PathBuf, + initial_config: Config, + initial_state: State, + event_cb: Box, + ) -> Result { + let register_groups = create_register_group_contexts( + &data_path, + initial_config.register_groups.clone(), + &*event_cb, + )?; + Ok(Self { + data_path, + config: initial_config, + state: initial_state, + register_groups, + event_cb, + }) + } + + pub fn config(&self) -> &Config { + &self.config + } + + pub fn state(&self) -> State { + self.state + } + + pub fn register_group_config(&self, id: &RegisterGroupId) -> Option<&RegisterGroupConfig> { + self.config.register_groups.get(id) + } + + pub fn status( + &mut self, + with_register_groups: bool, + with_storage_statistics: bool, + ) -> Result { + let state = self.state(); + let register_groups = if with_register_groups { + let mut register_groups = HashMap::with_capacity(self.register_groups.len()); + for (id, context) in &mut self.register_groups { + let status = context + .status(with_storage_statistics) + .map_err(Error::Storage)?; + register_groups.insert(id.clone(), status); + } + Some(register_groups) + } else { + None + }; + Ok(Status { + state, + register_groups, + }) + } + + pub fn recent_records( + &mut self, + register_group_id: &RegisterGroupId, + limit: NonZeroUsize, + ) -> Result> { + let context = self + .register_groups + .get_mut(register_group_id) + .ok_or(Error::RegisterGroupUnknown)?; + Ok(context.storage.recent_records(limit)?) + } + + pub fn filter_records( + &mut self, + register_group_id: &RegisterGroupId, + limit: NonZeroUsize, + filter: &RecordPreludeFilter, + ) -> Result> { + let context = self + .register_groups + .get_mut(register_group_id) + .ok_or(Error::RegisterGroupUnknown)?; + Ok(context.storage.filter_records(limit, filter)?) + } + + /// Switch the current configuration + /// + /// Returns the previous configuration. + pub fn replace_config(&mut self, new_config: Config) -> Result { + if self.config == new_config { + return Ok(new_config); + } + log::debug!( + "Replacing configuration: {:?} -> {:?}", + self.config, + new_config + ); + let new_register_groups = create_register_group_contexts( + &self.data_path, + new_config.register_groups.clone(), + &*self.event_cb, + )?; + // Replace atomically + self.register_groups = new_register_groups; + Ok(std::mem::replace(&mut self.config, new_config)) + } + + /// Switch the current configuration of a single register group + /// + /// Returns the previous configuration. + pub fn replace_register_group_config( + &mut self, + register_group_id: RegisterGroupId, + new_config: RegisterGroupConfig, + ) -> Result> { + let entry = self.config.register_groups.entry(register_group_id); + match entry { + Entry::Vacant(vacant) => { + let register_group_id = vacant.key().clone(); + log::debug!( + "Configuring register group {}: {:?}", + register_group_id, + new_config + ); + let register_group_context = RegisterGroupContext::try_new( + ®ister_group_id, + &self.data_path, + new_config.clone(), + &*self.event_cb, + )?; + self.register_groups + .insert(register_group_id, register_group_context); + vacant.insert(new_config); + Ok(None) + } + Entry::Occupied(mut occupied) => { + if occupied.get() == &new_config { + return Ok(Some(new_config)); + } + let register_group_id = occupied.key().clone(); + log::debug!( + "Replacing configuration of register group {}: {:?} -> {:?}", + register_group_id, + occupied.get(), + new_config + ); + let register_group_context = RegisterGroupContext::try_new( + ®ister_group_id, + &self.data_path, + new_config.clone(), + &*self.event_cb, + )?; + self.register_groups + .insert(register_group_id, register_group_context); + let old_config = std::mem::replace(occupied.get_mut(), new_config); + Ok(Some(old_config)) + } + } + } + + /// Switch the current state + /// + /// Returns the previous state. + pub fn switch_state(&mut self, new_state: State) -> Result { + if self.state == new_state { + return Ok(new_state); + } + log::debug!("Switching state: {:?} -> {:?}", self.state, new_state); + Ok(std::mem::replace(&mut self.state, new_state)) + } + + pub fn record_observed_register_group_values( + &mut self, + register_group_id: &RegisterGroupId, + observed_register_values: ObservedRegisterValues, + ) -> Result> { + match self.state { + State::Inactive => { + log::debug!( + "Discarding new observation for register group {} while inactive: {:?}", + register_group_id, + observed_register_values + ); + Ok(None) + } + State::Active => { + if let Some(config) = self.config.register_groups.get(register_group_id) { + let expected_register_count = config.registers.len(); + let actual_register_count = observed_register_values.register_values.len(); + if expected_register_count != actual_register_count { + log::warn!( + "Mismatching number of register values in observation for group {}: expected = {}, actual = {}", + register_group_id, + expected_register_count, + actual_register_count); + return Err(Error::DataFormatInvalid); + } + for ((register_index, expected_type), actual_type) in + config.registers.iter().zip( + observed_register_values + .register_values + .iter() + .map(|v| v.as_ref().map(|v| v.to_type())), + ) + { + if let Some(actual_type) = actual_type { + if *expected_type != actual_type { + log::warn!( + "Mismatching register type for register {} in observation for group {}: expected = {}, actual = {}", + register_index, + register_group_id, + expected_type, + actual_type); + } + } + } + } else { + log::warn!( + "Missing configuration for register group {} - rejecting observation", + register_group_id + ); + return Err(Error::RegisterGroupUnknown); + } + let context = self + .register_groups + .get_mut(register_group_id) + .ok_or(Error::RegisterGroupUnknown)?; + DefaultRecordPreludeGenerator.generate_prelude().and_then( + |(created_at, prelude)| { + let new_record = Record { + prelude, + observation: observed_register_values, + }; + log::debug!( + "Recording new observation for register group {}: {:?}", + register_group_id, + new_record + ); + let prelude = context.storage.append_record(&created_at, new_record)?; + Ok(Some(prelude)) + }, + ) + } + } + } + + // FIXME: Replace with an integration test + pub fn smoke_test(&mut self) -> Result<()> { + let register_group_id = RegisterGroupId::from_value("smoke-test-register-group".into()); + let register_group_config = RegisterGroupConfig { + registers: vec![ + ( + RegisterIndex::new(1), + RegisterType::Scalar(ScalarType::Bool), + ), + (RegisterIndex::new(2), RegisterType::Scalar(ScalarType::I64)), + (RegisterIndex::new(3), RegisterType::Scalar(ScalarType::U64)), + (RegisterIndex::new(4), RegisterType::Scalar(ScalarType::F64)), + (RegisterIndex::new(5), RegisterType::String), + ], + storage: self.config.default_storage.clone(), + }; + let orig_config = + self.replace_register_group_config(register_group_id.clone(), register_group_config)?; + let recorded_observations = vec![ + ObservedRegisterValues { + observed_at: SystemTime::now(), + register_values: vec![ + None, + Some(ScalarValue::I64(0).into()), + Some(ScalarValue::U64(0).into()), + Some(ScalarValue::F64(0.0).into()), + None, + ], + }, + ObservedRegisterValues { + observed_at: SystemTime::now(), + register_values: vec![ + Some(ScalarValue::Bool(false).into()), + Some(ScalarValue::I64(-1).into()), + Some(ScalarValue::U64(1).into()), + Some(ScalarValue::F64(-1.125).into()), + Some(RegisterValue::String("Hello".to_string())), + ], + }, + ObservedRegisterValues { + observed_at: SystemTime::now(), + register_values: vec![ + Some(ScalarValue::Bool(true).into()), + Some(ScalarValue::I64(1).into()), + None, + Some(ScalarValue::F64(1.125).into()), + Some(RegisterValue::String(", world!".to_string())), + ], + }, + ObservedRegisterValues { + observed_at: SystemTime::now(), + register_values: vec![None, None, None, None, None], + }, + ]; + for observation in &recorded_observations { + self.record_observed_register_group_values(®ister_group_id, observation.clone())?; + } + let recent_records = self.recent_records( + ®ister_group_id, + NonZeroUsize::new(recorded_observations.len()).unwrap(), + )?; + assert_eq!(recent_records.len(), recorded_observations.len()); + log::info!( + "Smoke test recorded observations: {:?}", + recorded_observations + ); + log::info!("Smoke test records: {:?}", recent_records); + // Restore configuration + if let Some(orig_config) = orig_config { + self.replace_register_group_config(register_group_id, orig_config)?; + } + Ok(()) + } +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/lib.rs b/plugins/csv-register-recorder-msr-plugin/src/lib.rs new file mode 100644 index 0000000..440304c --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/lib.rs @@ -0,0 +1,157 @@ +#![deny(missing_debug_implementations)] +#![warn(rust_2018_idioms)] + +use std::{ + num::{NonZeroU32, NonZeroU64, NonZeroUsize}, + path::{Path, PathBuf}, + result::Result as StdResult, +}; + +use tokio::sync::mpsc; + +use msr_core::storage::{ + MemorySize, RecordPreludeFilter, StorageConfig, StorageSegmentConfig, TimeInterval, +}; + +pub mod api; + +mod plugin; +pub use self::plugin::{create_plugin, Plugin, PluginConfig, PluginPorts}; + +mod context; +pub use self::context::{ + Config, Error, RegisterGroupConfig, RegisterGroupStatus, Result as NewResult, State, Status, +}; + +pub mod register; +use self::register::{GroupId as RegisterGroupId, ObservedRegisterValues, StoredRecord}; + +pub type Result = StdResult; +pub type PluginError = msr_plugin::PluginError; +pub type PluginResult = msr_plugin::PluginResult; + +pub type MessageSender = mpsc::UnboundedSender; + +type ResultSender = msr_plugin::ResultSender; +pub type ResultReceiver = msr_plugin::ResultReceiver; + +pub type EventPublisherId = String; +pub type PublishedEvent = msr_plugin::PublishedEvent; +pub type EventReceiver = msr_plugin::EventReceiver; +type EventPubSub = msr_plugin::EventPubSub; + +pub fn default_storage_config() -> StorageConfig { + StorageConfig { + retention_time: TimeInterval::Days(NonZeroU32::new(180).unwrap()), // 180 days + segmentation: StorageSegmentConfig { + time_interval: TimeInterval::Days(NonZeroU32::new(1).unwrap()), // daily + size_limit: MemorySize::Bytes(NonZeroU64::new(1_048_576).unwrap()), // 1 MiB + }, + } +} + +#[derive(Debug)] +pub enum Message { + Command(Command), + Query(Query), +} + +/// Commands are sent over a separate channel apart from +/// queries. This allows to handle them differently when +/// needed, e.g. process them with higher priority. +#[derive(Debug)] +pub enum Command { + ReplaceConfig(ResultSender, Config), + ReplaceRegisterGroupConfig( + ResultSender>, + RegisterGroupId, + RegisterGroupConfig, + ), + SwitchState(ResultSender<()>, State), + RecordObservedRegisterGroupValues(ResultSender<()>, RegisterGroupId, ObservedRegisterValues), + Shutdown(ResultSender<()>), + // TODO: Replace pseudo smoke test command with integration test + SmokeTest(ResultSender<()>), +} + +#[derive(Debug, Clone)] +pub struct RecentRecordsRequest { + pub limit: NonZeroUsize, +} + +#[derive(Debug, Clone)] +pub struct FilterRecordsRequest { + pub limit: NonZeroUsize, + pub filter: RecordPreludeFilter, +} + +#[derive(Debug, Clone, Default)] +pub struct QueryStatusRequest { + pub with_register_groups: bool, + pub with_storage_statistics: bool, +} + +#[derive(Debug)] +pub enum Query { + Config(ResultSender), + RegisterGroupConfig(ResultSender>, RegisterGroupId), + Status(ResultSender, QueryStatusRequest), + RecentRecords( + ResultSender>, + RegisterGroupId, + RecentRecordsRequest, + ), + FilterRecords( + ResultSender>, + RegisterGroupId, + FilterRecordsRequest, + ), +} + +#[derive(Debug, Clone)] +pub enum Event { + Lifecycle(LifecycleEvent), + Notification(NotificationEvent), + Incident(IncidentEvent), +} + +/// Common lifecycle events +#[derive(Debug, Clone)] +pub enum LifecycleEvent { + Started, + Stopped, + ConfigChanged(Config), + StateChanged(State), +} + +/// Regular notifications +#[derive(Debug, Clone)] +pub enum NotificationEvent { + DataDirectoryCreated { + register_group_id: RegisterGroupId, + fs_path: PathBuf, + }, +} + +/// Unexpected incidents that might require intervention +#[derive(Debug, Clone)] +pub enum IncidentEvent { + IoWriteError { + os_code: Option, + message: String, + }, +} + +struct ContextEventCallback { + event_pubsub: EventPubSub, +} + +impl context::ContextEventCallback for ContextEventCallback { + fn data_directory_created(&self, register_group_id: &RegisterGroupId, fs_path: &Path) { + let event = Event::Notification(NotificationEvent::DataDirectoryCreated { + register_group_id: register_group_id.to_owned(), + fs_path: fs_path.to_owned(), + }); + self.event_pubsub.publish_event(event) + } +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/plugin/handlers.rs b/plugins/csv-register-recorder-msr-plugin/src/plugin/handlers.rs new file mode 100644 index 0000000..c91d282 --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/plugin/handlers.rs @@ -0,0 +1,162 @@ +use msr_plugin::send_reply; + +use super::*; + +pub fn command_replace_config( + context: &mut Context, + event_pubsub: &EventPubSub, + reply_tx: ResultSender, + new_config: Config, +) { + let response = task::block_in_place(|| { + context.replace_config(new_config.clone()).map_err(|err| { + log::warn!("Failed to replace configuration: {}", err); + err + }) + }) + .map(|old_config| { + let event = Event::Lifecycle(LifecycleEvent::ConfigChanged(new_config)); + event_pubsub.publish_event(event); + old_config + }); + send_reply(reply_tx, response); +} + +pub fn command_replace_register_group_config( + context: &mut Context, + event_pubsub: &EventPubSub, + reply_tx: ResultSender>, + register_group_id: RegisterGroupId, + new_config: RegisterGroupConfig, +) { + let response = task::block_in_place(|| { + context + .replace_register_group_config(register_group_id.clone(), new_config) + .map_err(|err| { + log::warn!( + "Failed replace configuration of register group {}: {}", + register_group_id, + err + ); + err + }) + }) + .map(|old_config| { + let event = Event::Lifecycle(LifecycleEvent::ConfigChanged(context.config().clone())); + event_pubsub.publish_event(event); + old_config + }); + send_reply(reply_tx, response); +} + +pub fn command_switch_state( + context: &mut Context, + event_pubsub: &EventPubSub, + reply_tx: ResultSender<()>, + new_state: State, +) { + let response = task::block_in_place(|| { + context.switch_state(new_state).map_err(|err| { + log::warn!("Failed to switch state: {}", err); + err + }) + }) + .map(|_old_state| { + let event = Event::Lifecycle(LifecycleEvent::StateChanged(new_state)); + event_pubsub.publish_event(event); + }); + send_reply(reply_tx, response); +} + +pub fn command_record_observed_register_group_values( + context: &mut Context, + reply_tx: ResultSender<()>, + register_group_id: RegisterGroupId, + observed_register_values: ObservedRegisterValues, +) { + let response = task::block_in_place(|| { + context + .record_observed_register_group_values(®ister_group_id, observed_register_values) + .map(|_| ()) + .map_err(|err| { + log::warn!("Failed record new observation: {}", err); + err + }) + }); + send_reply(reply_tx, response); +} + +pub fn command_shutdown(reply_tx: ResultSender<()>) { + send_reply(reply_tx, Ok(())); +} + +pub fn query_config(context: &Context, reply_tx: ResultSender) { + let response = task::block_in_place(|| Ok(context.config().to_owned())); + send_reply(reply_tx, response); +} + +pub fn query_register_group_config( + context: &Context, + reply_tx: ResultSender>, + register_group_id: &RegisterGroupId, +) { + let response = + task::block_in_place(|| Ok(context.register_group_config(register_group_id).cloned())); + send_reply(reply_tx, response); +} + +pub fn query_status( + context: &mut Context, + reply_tx: ResultSender, + request: QueryStatusRequest, +) { + let response = task::block_in_place(|| { + let QueryStatusRequest { + with_register_groups, + with_storage_statistics, + } = request; + context + .status(with_register_groups, with_storage_statistics) + .map_err(|err| { + log::warn!("Failed to query status: {}", err); + err + }) + }); + send_reply(reply_tx, response); +} + +pub fn query_recent_records( + context: &mut Context, + reply_tx: ResultSender>, + register_group_id: &RegisterGroupId, + request: RecentRecordsRequest, +) { + let response = task::block_in_place(|| { + let RecentRecordsRequest { limit } = request; + context + .recent_records(register_group_id, limit) + .map_err(|err| { + log::warn!("Failed to query recent records: {}", err); + err + }) + }); + send_reply(reply_tx, response); +} + +pub fn query_filter_records( + context: &mut Context, + reply_tx: ResultSender>, + register_group_id: &RegisterGroupId, + request: FilterRecordsRequest, +) { + let response = task::block_in_place(|| { + let FilterRecordsRequest { limit, filter } = request; + context + .filter_records(register_group_id, limit, &filter) + .map_err(|err| { + log::warn!("Failed to query filtered records: {}", err); + err + }) + }); + send_reply(reply_tx, response); +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/plugin/mod.rs b/plugins/csv-register-recorder-msr-plugin/src/plugin/mod.rs new file mode 100644 index 0000000..a878b00 --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/plugin/mod.rs @@ -0,0 +1,185 @@ +use std::path::{Path, PathBuf}; + +use tokio::task; + +use msr_plugin::{message_channel, send_reply}; + +use crate::register::GroupId as RegisterGroupId; + +use super::{context::Context, *}; + +mod handlers; + +pub const DEFAULT_JOURNAL_SCOPE: &str = "slowrt.plugin.msr.recorder"; + +pub const DEFAULT_EVENT_PUBLISHER_ID: &str = DEFAULT_JOURNAL_SCOPE; + +#[derive(Debug, Clone, PartialEq)] +pub struct PluginConfig { + pub initial_config: Config, + pub event_publisher_id: EventPublisherId, +} + +impl Default for PluginConfig { + fn default() -> Self { + Self { + initial_config: Config { + default_storage: default_storage_config(), + register_groups: Default::default(), + }, + event_publisher_id: DEFAULT_EVENT_PUBLISHER_ID.to_owned(), + } + } +} + +pub type Plugin = msr_plugin::PluginContainer; +pub type PluginPorts = msr_plugin::PluginPorts; + +struct ContextEventCallback { + event_pubsub: EventPubSub, +} + +impl context::ContextEventCallback for ContextEventCallback { + fn data_directory_created(&self, register_group_id: &RegisterGroupId, fs_path: &Path) { + let event = Event::Notification(NotificationEvent::DataDirectoryCreated { + register_group_id: register_group_id.to_owned(), + fs_path: fs_path.to_owned(), + }); + self.event_pubsub.publish_event(event) + } +} + +pub fn create_plugin( + data_path: PathBuf, + initial_config: PluginConfig, + initial_state: State, + event_channel_capacity: usize, +) -> NewResult { + let PluginConfig { + event_publisher_id, + initial_config, + } = initial_config; + let (message_tx, mut message_rx) = message_channel(); + let (event_pubsub, event_subscriber) = + EventPubSub::new(event_publisher_id, event_channel_capacity); + let context_events = ContextEventCallback { + event_pubsub: event_pubsub.clone(), + }; + let mut context = Context::try_new( + data_path, + initial_config, + initial_state, + Box::new(context_events) as _, + )?; + let message_loop = async move { + let mut exit_message_loop = false; + log::info!("Starting message loop"); + event_pubsub.publish_event(Event::Lifecycle(LifecycleEvent::Started)); + while let Some(msg) = message_rx.recv().await { + match msg { + Message::Command(command) => { + log::trace!("Received command {:?}", command); + match command { + Command::ReplaceConfig(reply_tx, new_config) => { + handlers::command_replace_config( + &mut context, + &event_pubsub, + reply_tx, + new_config, + ); + } + Command::ReplaceRegisterGroupConfig( + reply_tx, + register_group_id, + new_config, + ) => { + handlers::command_replace_register_group_config( + &mut context, + &event_pubsub, + reply_tx, + register_group_id, + new_config, + ); + } + Command::SwitchState(reply_tx, new_state) => { + handlers::command_switch_state( + &mut context, + &event_pubsub, + reply_tx, + new_state, + ); + } + Command::RecordObservedRegisterGroupValues( + reply_tx, + register_group_id, + observed_register_values, + ) => { + handlers::command_record_observed_register_group_values( + &mut context, + reply_tx, + register_group_id, + observed_register_values, + ); + } + Command::Shutdown(reply_tx) => { + handlers::command_shutdown(reply_tx); + exit_message_loop = true; + } + Command::SmokeTest(reply_tx) => { + // TODO: Remove + let response = task::block_in_place(|| context.smoke_test()); + send_reply(reply_tx, response); + } + } + } + Message::Query(query) => { + log::debug!("Received query {:?}", query); + match query { + Query::Config(reply_tx) => { + handlers::query_config(&context, reply_tx); + } + Query::RegisterGroupConfig(reply_tx, register_group_id) => { + handlers::query_register_group_config( + &context, + reply_tx, + ®ister_group_id, + ); + } + Query::Status(reply_tx, request) => { + handlers::query_status(&mut context, reply_tx, request); + } + Query::RecentRecords(reply_tx, register_group_id, request) => { + handlers::query_recent_records( + &mut context, + reply_tx, + ®ister_group_id, + request, + ); + } + Query::FilterRecords(reply_tx, register_group_id, request) => { + handlers::query_filter_records( + &mut context, + reply_tx, + ®ister_group_id, + request, + ); + } + } + } + } + if exit_message_loop { + log::info!("Exiting message loop"); + break; + } + } + log::info!("Message loop terminated"); + event_pubsub.publish_event(Event::Lifecycle(LifecycleEvent::Stopped)); + }; + Ok(Plugin { + ports: PluginPorts { + message_tx, + event_subscriber, + }, + message_loop: Box::pin(message_loop), + }) +} diff --git a/plugins/csv-register-recorder-msr-plugin/src/register.rs b/plugins/csv-register-recorder-msr-plugin/src/register.rs new file mode 100644 index 0000000..dfc4253 --- /dev/null +++ b/plugins/csv-register-recorder-msr-plugin/src/register.rs @@ -0,0 +1,56 @@ +use std::fmt; + +use msr_core::Measurement; + +// Re-exports +pub use msr_core::{register::Index, Value, ValueType as Type}; + +// Generic re-exports of msr-core, specialized with concrete value type +pub type ValueMeasurement = Measurement; +pub type IndexedValueMeasurement = msr_core::register::IndexedMeasurement; +pub type ObservedValue = msr_core::register::ObservedValue; +pub type ObservedValues = msr_core::register::ObservedValues; + +pub type ObservedRegisterValues = msr_core::register::recording::ObservedRegisterValues; +pub type Record = msr_core::register::recording::Record; +pub type StoredRecord = msr_core::register::recording::StoredRecord; + +pub type GroupIdValue = String; + +#[derive(Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)] +pub struct GroupId(GroupIdValue); + +impl GroupId { + pub const fn from_value(value: GroupIdValue) -> Self { + Self(value) + } + + pub fn into_value(self) -> GroupIdValue { + let GroupId(value) = self; + value + } +} + +impl From for GroupId { + fn from(from: GroupIdValue) -> Self { + Self::from_value(from) + } +} + +impl From for GroupIdValue { + fn from(from: GroupId) -> Self { + from.into_value() + } +} + +impl AsRef for GroupId { + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for GroupId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_ref()) + } +}