Skip to content

Commit

Permalink
Identify event publishers by an usize index
Browse files Browse the repository at this point in the history
...and restructure the csv-register-recorder-msr-plugin.
  • Loading branch information
uklotzde committed Oct 11, 2021
1 parent ce54709 commit ea306ae
Show file tree
Hide file tree
Showing 22 changed files with 569 additions and 481 deletions.
14 changes: 14 additions & 0 deletions crates/msr-core/src/csv_event_journal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{
convert::{TryFrom, TryInto},
fmt,
num::NonZeroUsize,
path::PathBuf,
time::SystemTime,
Expand Down Expand Up @@ -160,6 +161,19 @@ impl From<Scope> for ScopeValue {
}
}

impl AsRef<ScopeValue> for Scope {
fn as_ref(&self) -> &ScopeValue {
let Self(inner) = self;
inner
}
}

impl fmt::Display for Scope {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

pub type CodeValue = i32;

#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
Expand Down
115 changes: 78 additions & 37 deletions crates/msr-plugin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
use msr_core::audit::Activity;
use std::{error::Error as StdError, fmt, future::Future, pin::Pin};

use thiserror::Error;
use tokio::sync::{
broadcast,
mpsc::{self, error::SendError},
oneshot,
};

use msr_core::audit::Activity;

// ------ -------
// Plugin shape
// ------ -------

pub trait Plugin {
type Message;
type Event;
fn message_sender(&self) -> mpsc::UnboundedSender<Self::Message>;
fn subscribe_events(&self) -> broadcast::Receiver<Self::Event>;
fn message_sender(&self) -> MessageSender<Self::Message>;
fn subscribe_events(&self) -> BroadcastReceiver<Self::Event>;
fn run(self) -> MessageLoop;
}

#[allow(missing_debug_implementations)]
pub struct PluginContainer<M, P, E> {
pub ports: PluginPorts<M, P, E>,
pub struct PluginContainer<M, E> {
pub ports: PluginPorts<M, E>,
pub message_loop: MessageLoop,
}

impl<M, P, E> Plugin for PluginContainer<M, P, E> {
impl<M, E> Plugin for PluginContainer<M, E> {
type Message = M;
type Event = PublishedEvent<P, E>;
fn message_sender(&self) -> mpsc::UnboundedSender<Self::Message> {
type Event = PublishedEvent<E>;

fn message_sender(&self) -> MessageSender<Self::Message> {
self.ports.message_tx.clone()
}
fn subscribe_events(&self) -> broadcast::Receiver<Self::Event> {
fn subscribe_events(&self) -> BroadcastReceiver<Self::Event> {
self.ports.event_subscriber.subscribe()
}
fn run(self) -> MessageLoop {
Expand All @@ -42,9 +45,9 @@ impl<M, P, E> Plugin for PluginContainer<M, P, E> {
pub type MessageLoop = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;

#[allow(missing_debug_implementations)]
pub struct PluginPorts<M, P, E> {
pub struct PluginPorts<M, E> {
pub message_tx: MessageSender<M>,
pub event_subscriber: EventSubscriber<P, E>,
pub event_subscriber: EventSubscriber<E>,
}

#[derive(Error, Debug)]
Expand All @@ -63,8 +66,8 @@ pub type PluginResult<T, E> = Result<T, PluginError<E>>;
// ------ -------

// TODO: Use bounded channels for backpressure?
type MessageSender<T> = mpsc::UnboundedSender<T>;
type MessageReceiver<T> = mpsc::UnboundedReceiver<T>;
pub type MessageSender<T> = mpsc::UnboundedSender<T>;
pub type MessageReceiver<T> = mpsc::UnboundedReceiver<T>;

pub fn message_channel<T>() -> (MessageSender<T>, MessageReceiver<T>) {
mpsc::unbounded_channel()
Expand Down Expand Up @@ -119,49 +122,84 @@ where
// Events
// ----- ------

/// Internal index into a lookup table with event publisher metadata
pub type EventPublisherIndexValue = usize;

/// Numeric identifier of an event publisher control cycle
///
/// Uniquely identifies an event publisher in the system at runtime.
///
/// The value is supposed to be used as a key or index to retrieve
/// extended metadata for an event publisher that does not need to
/// be sent with every event. This metadata is probably immutable.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct EventPublisherIndex(EventPublisherIndexValue);

impl EventPublisherIndex {
pub const fn from_value(value: EventPublisherIndexValue) -> Self {
Self(value)
}

pub const fn to_value(self) -> EventPublisherIndexValue {
self.0
}
}

impl From<EventPublisherIndexValue> for EventPublisherIndex {
fn from(from: EventPublisherIndexValue) -> Self {
Self::from_value(from)
}
}

impl From<EventPublisherIndex> for EventPublisherIndexValue {
fn from(from: EventPublisherIndex) -> Self {
from.to_value()
}
}

#[derive(Debug, Clone)]
pub struct PublishedEvent<P, T> {
pub published: Activity<P>,
pub payload: T,
pub struct PublishedEvent<E> {
pub published: Activity<EventPublisherIndex>,
pub payload: E,
}

pub type EventSender<P, T> = broadcast::Sender<PublishedEvent<P, T>>;
pub type EventReceiver<P, T> = broadcast::Receiver<PublishedEvent<P, T>>;
pub type EventSubscriber<P, T> = BroadcastSubscriber<PublishedEvent<P, T>>;
pub type EventSender<E> = broadcast::Sender<PublishedEvent<E>>;
pub type EventReceiver<E> = broadcast::Receiver<PublishedEvent<E>>;
pub type EventSubscriber<E> = BroadcastSubscriber<PublishedEvent<E>>;

pub fn event_channel<P, T>(channel_capacity: usize) -> (EventSender<P, T>, EventSubscriber<P, T>)
pub fn event_channel<E>(channel_capacity: usize) -> (EventSender<E>, EventSubscriber<E>)
where
P: Clone,
T: Clone,
E: Clone,
{
broadcast_channel(channel_capacity)
}

#[derive(Debug, Clone)]
pub struct EventPubSub<P, E> {
publisher: P,
event_tx: EventSender<P, E>,
pub struct EventPubSub<E> {
publisher_index: EventPublisherIndex,
event_tx: EventSender<E>,
}

impl<P, T> EventPubSub<P, T>
impl<E> EventPubSub<E>
where
P: fmt::Debug + Clone,
T: fmt::Debug + Clone,
E: fmt::Debug + Clone,
{
pub fn new(publisher: impl Into<P>, channel_capacity: usize) -> (Self, EventSubscriber<P, T>) {
pub fn new(
publisher_index: impl Into<EventPublisherIndex>,
channel_capacity: usize,
) -> (Self, EventSubscriber<E>) {
let (event_tx, event_subscriber) = event_channel(channel_capacity);
(
Self {
publisher_index: publisher_index.into(),
event_tx,
publisher: publisher.into(),
},
event_subscriber,
)
}

pub fn publish_event(&self, payload: T) {
let publisher = self.publisher.clone();
let published = Activity::now(publisher);
pub fn publish_event(&self, payload: E) {
let published = Activity::now(self.publisher_index);
let event = PublishedEvent { published, payload };
self.dispatch_event(event);
}
Expand All @@ -171,12 +209,11 @@ pub trait EventDispatcher<E> {
fn dispatch_event(&self, event: E);
}

impl<P, T> EventDispatcher<PublishedEvent<P, T>> for EventPubSub<P, T>
impl<E> EventDispatcher<PublishedEvent<E>> for EventPubSub<E>
where
P: fmt::Debug + Clone,
T: fmt::Debug + Clone,
E: fmt::Debug + Clone,
{
fn dispatch_event(&self, event: PublishedEvent<P, T>) {
fn dispatch_event(&self, event: PublishedEvent<E>) {
if let Err(event) = self.event_tx.send(event) {
// Ignore all send errors that are expected if no subscribers
// are connected.
Expand All @@ -185,6 +222,10 @@ where
}
}

// --------- -----------
// Utility functions
// --------- -----------

pub fn send_message<M, E>(
message: impl Into<M>,
message_tx: &MessageSender<M>,
Expand Down
4 changes: 3 additions & 1 deletion plugins/csv-event-journal-msr-plugin/src/api/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use msr_core::csv_event_journal::Entry;

use super::{Config, RecordEntryOutcome, ResultSender, State};
use crate::ResultSender;

use super::{Config, RecordEntryOutcome, State};

#[derive(Debug)]
pub enum Command {
Expand Down
5 changes: 3 additions & 2 deletions plugins/csv-event-journal-msr-plugin/src/api/controller.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use msr_core::csv_event_journal::{Entry, StoredRecord};

use msr_plugin::{reply_channel, send_message_receive_result};

use crate::internal::MessageSender;
use crate::{MessageSender, PluginResult};

use super::{query, Command, Config, PluginResult, Query, RecordEntryOutcome, State, Status};
use super::{query, Command, Config, Query, RecordEntryOutcome, State, Status};

/// Remote controller for the plugin
///
Expand Down
2 changes: 0 additions & 2 deletions plugins/csv-event-journal-msr-plugin/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use super::*;

// Re-export internal types that are used in the public API
pub use crate::internal::context::{
Config, EntryNotRecorded, EntryRecorded, RecordEntryOutcome, State, Status,
Expand Down
4 changes: 3 additions & 1 deletion plugins/csv-event-journal-msr-plugin/src/api/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::num::NonZeroUsize;

use msr_core::csv_event_journal::{RecordFilter, StoredRecord};

use super::{Config, ResultSender, Status};
use crate::ResultSender;

use super::{Config, Status};

#[derive(Debug)]
pub enum Query {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::time::SystemTime;

use tokio::task;

use msr_core::csv_event_journal::{Entry, Error, Scope, Severity, StoredRecord};
use msr_core::csv_event_journal::{Entry, Error, StoredRecord};

use msr_plugin::send_reply;

Expand All @@ -11,7 +9,7 @@ use crate::{
event::{IncidentEvent, LifecycleEvent},
query, Config, Event, RecordEntryOutcome, State, Status,
},
EventPubSub, JournalCodes, ResultSender,
EventPubSub, ResultSender,
};

use super::context::Context;
Expand Down Expand Up @@ -79,20 +77,7 @@ pub fn command_record_entry(
send_reply(reply_tx, result.map_err(Into::into));
}

pub fn command_shutdown(context: &mut Context, reply_tx: ResultSender<()>, journal_scope: &str) {
let _ = task::block_in_place(|| {
let new_entry = Entry {
occurred_at: SystemTime::now(),
scope: Scope(journal_scope.to_string()),
code: JournalCodes::STOPPING,
severity: Severity::Information,
text: Some("Stopping".to_string()),
json: None,
};
context.record_entry(new_entry).map_err(|err| {
log::warn!("Failed to record entry about stopping: {}", err);
})
});
pub fn command_shutdown(_context: &mut Context, reply_tx: ResultSender<()>) {
send_reply(reply_tx, Ok(()));
}

Expand Down
Loading

0 comments on commit ea306ae

Please sign in to comment.