From 373f289fb76766b50549b321c960cc5d616b73f5 Mon Sep 17 00:00:00 2001 From: Meshiest Date: Mon, 2 Dec 2024 02:24:54 -0500 Subject: [PATCH] feat(controlplane): events string parsing --- crates/controlplane/src/events/filter.rs | 40 ++- .../controlplane/src/events/filter_parse.rs | 320 ++++++++++++++++++ crates/controlplane/src/events/mod.rs | 9 +- crates/controlplane/src/events/models.rs | 191 ++++------- crates/controlplane/src/events/stream.rs | 4 + .../src/events/test_filter_parse.rs | 169 +++++++++ crates/controlplane/src/events/traits.rs | 95 ++++++ 7 files changed, 695 insertions(+), 133 deletions(-) create mode 100644 crates/controlplane/src/events/filter_parse.rs create mode 100644 crates/controlplane/src/events/test_filter_parse.rs create mode 100644 crates/controlplane/src/events/traits.rs diff --git a/crates/controlplane/src/events/filter.rs b/crates/controlplane/src/events/filter.rs index 05c3a8c0..a0b70eda 100644 --- a/crates/controlplane/src/events/filter.rs +++ b/crates/controlplane/src/events/filter.rs @@ -1,4 +1,42 @@ -use super::{Event, EventFilter}; +use std::sync::Arc; + +use snops_common::{ + node_targets::NodeTargets, + state::{AgentId, EnvId, InternedId, NodeKey}, +}; + +use super::{Event, EventKindFilter}; + +#[derive(Clone, Debug, PartialEq)] + +pub enum EventFilter { + /// No filter + Unfiltered, + + /// Logical AND of filters + AllOf(Vec), + /// Logical OR of filters + AnyOf(Vec), + /// Logical XOR of filters + OneOf(Vec), + /// Logical NOT of filter + Not(Box), + + /// Filter by agent ID + AgentIs(AgentId), + /// Filter by environment ID + EnvIs(EnvId), + /// Filter by transaction ID + TransactionIs(Arc), + /// Filter by cannon ID + CannonIs(InternedId), + /// Filter by event kind + EventIs(EventKindFilter), + /// Filter by node key + NodeKeyIs(NodeKey), + /// Filter by node target + NodeTargetIs(NodeTargets), +} impl Event { pub fn matches(&self, filter: &EventFilter) -> bool { diff --git a/crates/controlplane/src/events/filter_parse.rs b/crates/controlplane/src/events/filter_parse.rs new file mode 100644 index 00000000..40be55dc --- /dev/null +++ b/crates/controlplane/src/events/filter_parse.rs @@ -0,0 +1,320 @@ +use std::{fmt::Display, str::FromStr, sync::Arc}; + +use snops_common::node_targets::{NodeTarget, NodeTargets}; + +use super::EventFilter; +use crate::events::EventKindFilter; + +/* + +Example EventFilter string representation: + +unfiltered +any-of(agent-connected, agent-disconnected) +all-of(not(agent-is(foo-bar)), env-is(default)) +node-key-is(client/foo) +node-target-is(client/test-*@*) +node-target-is(client/any) +not(unfiltered) + +*/ + +#[derive(Debug, Copy, Clone)] +enum Token<'a> { + OpenParen, + CloseParen, + Comma, + Whitespace, + Text(&'a str), +} + +impl<'a> Token<'a> { + fn label(self) -> &'static str { + match self { + Token::OpenParen => "open paren", + Token::CloseParen => "close paren", + Token::Comma => "comma", + Token::Whitespace => "whitespace", + Token::Text(_) => "text", + } + } + + fn text(self) -> Option<&'a str> { + match self { + Token::Text(s) => Some(s), + _ => None, + } + } + + fn parsed_text(self) -> Option> { + self.text().map(|s| s.trim().parse()) + } + + fn open_paren(self) -> Option<()> { + matches!(self, Token::OpenParen).then(|| ()) + } + + fn close_paren(self) -> Option<()> { + matches!(self, Token::CloseParen).then(|| ()) + } +} + +struct Lexer<'a> { + string: &'a str, + chars: std::iter::Peekable>>, +} + +impl<'a> Lexer<'a> { + fn new(string: &'a str) -> Lexer<'a> { + Lexer { + string, + chars: string.chars().enumerate().peekable(), + } + } +} + +impl<'a> Iterator for Lexer<'a> { + type Item = Token<'a>; + + fn next(&mut self) -> Option { + let (index, c) = self.chars.next()?; + Some(match c { + '(' => Token::OpenParen, + ')' => Token::CloseParen, + ',' => Token::Comma, + c if c.is_whitespace() => { + while let Some((_, c)) = self.chars.peek() { + if !c.is_whitespace() { + break; + } + self.chars.next(); + } + // In the future, we might want to return the whitespace + + // let end = self + // .chars + // .peek() + // .map_or_else(|| self.string.len(), |(i, _)| *i); + // Token::Whitespace(&self.string[index..end]) + + Token::Whitespace + } + _ => { + while let Some((_, c)) = self.chars.peek() { + if c == &'(' || c == &')' || c == &',' { + break; + } + self.chars.next(); + } + let end = self + .chars + .peek() + .map_or_else(|| self.string.len(), |(i, _)| *i); + Token::Text(&self.string[index..end]) + } + }) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum EventFilterParseError { + #[error("invalid filter: {0}")] + InvalidFilter(String), + #[error("expected token {0:?}, received {1}")] + ExpectedToken(EventFilterParsable, String), + #[error("error parsing {0:?}: {1}")] + ParseError(EventFilterParsable, String), + #[error("unexpected trailing tokens")] + TrailingTokens, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum EventFilterParsable { + OpenParen, + CloseParen, + CommaOrCloseParen, + FilterName, + AgentId, + EnvId, + TransactionId, + CannonId, + EventKind, + NodeKey, + NodeTarget, +} + +struct FilterParser<'a> { + tokens: std::iter::Peekable>, +} + +fn expect_token<'a, T>( + token: Option>, + label: EventFilterParsable, + matcher: impl Fn(Token<'a>) -> Option, +) -> Result { + use EventFilterParseError::*; + let token = token.ok_or_else(|| ExpectedToken(label, "EOF".to_string()))?; + matcher(token).ok_or_else(|| ExpectedToken(label, token.label().to_string())) +} + +fn expect_parsed_text( + token: Option, + label: EventFilterParsable, +) -> Result +where + ::Err: Display, +{ + expect_token(token, label, |token| token.parsed_text::())? + .map_err(|e| EventFilterParseError::ParseError(label, e.to_string())) +} + +fn expect_open_paren(token: Option) -> Result<(), EventFilterParseError> { + expect_token(token, EventFilterParsable::OpenParen, |token| { + token.open_paren() + }) +} + +fn expect_close_paren(token: Option) -> Result<(), EventFilterParseError> { + expect_token(token, EventFilterParsable::CloseParen, |token| { + token.close_paren() + }) +} + +impl<'a> FilterParser<'a> { + fn new(str: &'a str) -> Self { + Self { + tokens: Lexer::new(str).peekable(), + } + } + + fn next(&mut self) -> Option> { + self.tokens.next() + } + + fn expect_parens( + &mut self, + filter: impl Fn(&mut Self) -> Result, + ) -> Result { + self.trim_whitespace(); + expect_open_paren(self.next())?; + self.trim_whitespace(); + let filter = filter(self)?; + expect_close_paren(self.next())?; + Ok(filter) + } + + fn expect_filter(&mut self) -> Result { + self.trim_whitespace(); + use EventFilterParsable as P; + use EventFilterParseError::*; + + let filter_name = expect_token(self.next(), P::FilterName, |token| token.text())?; + + match filter_name.trim() { + "unfiltered" => Ok(EventFilter::Unfiltered), + "any-of" => self.expect_parens(|t| t.expect_filter_vec().map(EventFilter::AnyOf)), + "all-of" => self.expect_parens(|t| t.expect_filter_vec().map(EventFilter::AllOf)), + "one-of" => self.expect_parens(|t| t.expect_filter_vec().map(EventFilter::OneOf)), + "not" => self.expect_parens(|t| Ok(EventFilter::Not(Box::new(t.expect_filter()?)))), + "agent-is" => self.expect_parens(|t| { + expect_parsed_text(t.next(), P::AgentId).map(EventFilter::AgentIs) + }), + "env-is" => self + .expect_parens(|t| expect_parsed_text(t.next(), P::EnvId).map(EventFilter::EnvIs)), + "transaction-is" => self.expect_parens(|t| { + Ok(EventFilter::TransactionIs(Arc::new( + expect_token(t.next(), P::TransactionId, |token| token.text())?.to_string(), + ))) + }), + "cannon-is" => self.expect_parens(|t| { + expect_parsed_text(t.next(), P::CannonId).map(EventFilter::CannonIs) + }), + "event-is" => self.expect_parens(|t| { + expect_parsed_text(t.next(), P::EventKind).map(EventFilter::EventIs) + }), + "node-key-is" => self.expect_parens(|t| { + expect_parsed_text(t.next(), P::NodeKey).map(EventFilter::NodeKeyIs) + }), + "node-target-is" => self.expect_parens(|t| { + expect_parsed_text::(t.next(), P::NodeTarget) + .map(|t| EventFilter::NodeTargetIs(NodeTargets::One(t))) + }), + + // Try to parse as an event kind filter as a fallback + unknown => unknown + .parse::() + .map(EventFilter::EventIs) + .map_err(|_| InvalidFilter(unknown.to_string())), + } + } + + fn expect_filter_vec(&mut self) -> Result, EventFilterParseError> { + self.trim_whitespace(); + let mut filters = Vec::new(); + loop { + match self.tokens.peek() { + Some(Token::CloseParen) => break, + Some(_) => { + filters.push(self.expect_filter()?); + self.trim_whitespace(); + + // Expect either a comma or a close paren + match self.tokens.peek() { + // This also supports trailing commas + Some(Token::Comma) => { + self.tokens.next(); + self.trim_whitespace(); + } + Some(Token::CloseParen) => break, + Some(_) => { + return Err(EventFilterParseError::ExpectedToken( + EventFilterParsable::CommaOrCloseParen, + self.tokens.peek().unwrap().label().to_string(), + )) + } + None => { + return Err(EventFilterParseError::ExpectedToken( + EventFilterParsable::CommaOrCloseParen, + "EOF".to_string(), + )) + } + } + } + None => { + return Err(EventFilterParseError::ExpectedToken( + EventFilterParsable::CloseParen, + "EOF".to_string(), + )) + } + } + } + Ok(filters) + } + + /// Remove leading whitespace tokens from the token stream. + fn trim_whitespace(&mut self) { + while let Some(Token::Whitespace) = self.tokens.peek() { + self.tokens.next(); + } + } + + fn trailing_tokens(&mut self) -> Result<(), EventFilterParseError> { + self.trim_whitespace(); + if self.tokens.next().is_some() { + Err(EventFilterParseError::TrailingTokens) + } else { + Ok(()) + } + } +} + +impl FromStr for EventFilter { + type Err = EventFilterParseError; + + fn from_str(s: &str) -> Result { + let mut parser = FilterParser::new(s); + let filter = parser.expect_filter()?; + parser.trailing_tokens()?; + Ok(filter) + } +} diff --git a/crates/controlplane/src/events/mod.rs b/crates/controlplane/src/events/mod.rs index 675d4556..fb60b7b6 100644 --- a/crates/controlplane/src/events/mod.rs +++ b/crates/controlplane/src/events/mod.rs @@ -2,12 +2,15 @@ mod models; pub use models::*; mod stream; pub use stream::*; - +mod filter_parse; +mod traits; +pub use traits::*; mod filter; +pub use filter::*; mod filter_ops; pub mod prelude { - pub use super::models::EventFilter::*; + pub use super::filter::EventFilter::*; pub use super::models::EventKindFilter::*; pub use super::models::*; } @@ -17,4 +20,6 @@ mod test_filter; #[cfg(test)] mod test_filter_ops; #[cfg(test)] +mod test_filter_parse; +#[cfg(test)] mod test_stream; diff --git a/crates/controlplane/src/events/models.rs b/crates/controlplane/src/events/models.rs index 94236ddc..45ff918f 100644 --- a/crates/controlplane/src/events/models.rs +++ b/crates/controlplane/src/events/models.rs @@ -1,21 +1,14 @@ -use std::sync::Arc; +use std::{fmt::Display, str::FromStr, sync::Arc}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use snops_common::{ aot_cmds::Authorization, - node_targets::NodeTargets, rpc::error::ReconcileError, - state::{ - AgentId, AgentState, EnvId, InternedId, LatestBlockInfo, NodeKey, NodeStatus, - ReconcileStatus, - }, + state::{AgentId, EnvId, InternedId, LatestBlockInfo, NodeKey, NodeStatus, ReconcileStatus}, }; -use crate::{ - cannon::{context::ExecutionContext, status::TransactionSendState}, - state::{Agent, GetGlobalState}, -}; +use crate::{cannon::status::TransactionSendState, state::GetGlobalState}; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct Event { @@ -29,14 +22,14 @@ pub struct Event { } #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(tag = "type")] +#[serde(tag = "type", rename_all = "snake_case")] pub enum EventKind { Agent(AgentEvent), Transaction(TransactionEvent), } #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(tag = "kind")] +#[serde(tag = "kind", rename_all = "snake_case")] pub enum AgentEvent { /// An agent connects to the control plane Connected, @@ -57,7 +50,7 @@ pub enum AgentEvent { } #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(tag = "kind")] +#[serde(tag = "kind", rename_all = "snake_case")] pub enum TransactionEvent { /// The authorization was inserted into the cannon AuthorizationReceived(Arc), @@ -85,7 +78,7 @@ pub enum TransactionEvent { } #[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(tag = "reason")] +#[serde(tag = "reason", rename_all = "snake_case")] pub enum TransactionAbortReason { MissingTracker, UnexpectedStatus(TransactionSendState), @@ -93,7 +86,6 @@ pub enum TransactionAbortReason { } #[derive(Clone, Copy, Debug, PartialEq)] -#[repr(u8)] pub enum EventKindFilter { AgentConnected, AgentHandshakeComplete, @@ -145,34 +137,62 @@ impl EventKind { } } -#[derive(Clone, Debug, PartialEq)] -pub enum EventFilter { - /// No filter - Unfiltered, +impl FromStr for EventKindFilter { + type Err = String; + + fn from_str(s: &str) -> Result { + match s { + // kebab-case + "agent-connected" => Ok(Self::AgentConnected), + "agent-handshake-complete" => Ok(Self::AgentHandshakeComplete), + "agent-disconnected" => Ok(Self::AgentDisconnected), + "agent-reconcile-complete" => Ok(Self::AgentReconcileComplete), + "agent-reconcile" => Ok(Self::AgentReconcile), + "agent-reconcile-error" => Ok(Self::AgentReconcileError), + "agent-node-status" => Ok(Self::AgentNodeStatus), + "agent-block-info" => Ok(Self::AgentBlockInfo), + "transaction-authorization-received" => Ok(Self::TransactionAuthorizationReceived), + "transaction-execute-aborted" => Ok(Self::TransactionExecuteAborted), + "transaction-execute-awaiting-compute" => Ok(Self::TransactionExecuteAwaitingCompute), + "transaction-execute-exceeded" => Ok(Self::TransactionExecuteExceeded), + "transaction-execute-failed" => Ok(Self::TransactionExecuteFailed), + "transaction-executing" => Ok(Self::TransactionExecuting), + "transaction-execute-complete" => Ok(Self::TransactionExecuteComplete), + "transaction-broadcasted" => Ok(Self::TransactionBroadcasted), + "transaction-broadcast-exceeded" => Ok(Self::TransactionBroadcastExceeded), + "transaction-confirmed" => Ok(Self::TransactionConfirmed), + _ => Err(format!("invalid event kind: {s}")), + } + } +} - /// Logical AND of filters - AllOf(Vec), - /// Logical OR of filters - AnyOf(Vec), - /// Logical XOR of filters - OneOf(Vec), - /// Logical NOT of filter - Not(Box), +impl Display for EventKindFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use EventKindFilter::*; - /// Filter by agent ID - AgentIs(AgentId), - /// Filter by environment ID - EnvIs(EnvId), - /// Filter by transaction ID - TransactionIs(Arc), - /// Filter by cannon ID - CannonIs(InternedId), - /// Filter by event kind - EventIs(EventKindFilter), - /// Filter by node key - NodeKeyIs(NodeKey), - /// Filter by node target - NodeTargetIs(NodeTargets), + let s = match self { + AgentConnected => "agent-connected", + AgentHandshakeComplete => "agent-handshake-complete", + AgentDisconnected => "agent-disconnected", + AgentReconcileComplete => "agent-reconcile-complete", + AgentReconcile => "agent-reconcile", + AgentReconcileError => "agent-reconcile-error", + AgentNodeStatus => "agent-node-status", + AgentBlockInfo => "agent-block-info", + TransactionAuthorizationReceived => "transaction-authorization-received", + TransactionExecuteAborted => "transaction-execute-aborted", + TransactionExecuteAwaitingCompute => "transaction-execute-awaiting-compute", + TransactionExecuteExceeded => "transaction-execute-exceeded", + TransactionExecuteFailed => "transaction-execute-failed", + TransactionExecuting => "transaction-executing", + TransactionExecuteComplete => "transaction-execute-complete", + TransactionBroadcasted => "transaction-broadcasted", + TransactionBroadcastExceeded => "transaction-broadcast-exceeded", + TransactionConfirmed => "transaction-confirmed", + }; + + write!(f, "{}", s) + } } impl Event { @@ -205,92 +225,3 @@ impl Event { state.global_state().events.emit(self) } } - -impl From for EventFilter { - fn from(kind: EventKindFilter) -> Self { - EventFilter::EventIs(kind) - } -} - -pub trait EventHelpers { - fn event(self) -> Event; - fn with_agent(self, agent: &Agent) -> Event; - fn with_agent_id(self, agent_id: AgentId) -> Event; - fn with_node_key(self, node_key: NodeKey) -> Event; - fn with_env_id(self, env_id: EnvId) -> Event; - fn with_transaction(self, transaction: Arc) -> Event; - fn with_cannon(self, cannon: InternedId) -> Event; - fn with_cannon_ctx(self, ctx: &ExecutionContext, transaction: Arc) -> Event; -} - -impl> EventHelpers for T { - fn event(self) -> Event { - self.into() - } - - fn with_agent(self, agent: &Agent) -> Event { - let mut event = self.into(); - event.agent = Some(agent.id); - if let AgentState::Node(env_id, node) = &agent.state { - event.node_key = Some(node.node_key.clone()); - event.env = Some(*env_id); - } - event - } - - fn with_agent_id(self, agent_id: AgentId) -> Event { - let mut event = self.into(); - event.agent = Some(agent_id); - event - } - - fn with_node_key(self, node_key: NodeKey) -> Event { - let mut event = self.into(); - event.node_key = Some(node_key); - event - } - - fn with_env_id(self, env_id: EnvId) -> Event { - let mut event = self.into(); - event.env = Some(env_id); - event - } - - fn with_transaction(self, transaction: Arc) -> Event { - let mut event = self.into(); - event.transaction = Some(transaction); - event - } - - fn with_cannon(self, cannon: InternedId) -> Event { - let mut event = self.into(); - event.cannon = Some(cannon); - event - } - - fn with_cannon_ctx(self, ctx: &ExecutionContext, transaction: Arc) -> Event { - let mut event = self.into(); - event.cannon = Some(ctx.id); - event.env = Some(ctx.env_id); - event.transaction = Some(transaction); - event - } -} - -impl From for Event { - fn from(kind: EventKind) -> Self { - Self::new(kind) - } -} - -impl From for Event { - fn from(kind: AgentEvent) -> Self { - Self::new(EventKind::Agent(kind)) - } -} - -impl From for Event { - fn from(kind: TransactionEvent) -> Self { - Self::new(EventKind::Transaction(kind)) - } -} diff --git a/crates/controlplane/src/events/stream.rs b/crates/controlplane/src/events/stream.rs index 8e59b905..c1c8c608 100644 --- a/crates/controlplane/src/events/stream.rs +++ b/crates/controlplane/src/events/stream.rs @@ -80,6 +80,10 @@ impl EventSubscriber { } events } + + pub fn set_filter(&mut self, filter: impl Into) { + self.filter = filter.into(); + } } impl Stream for EventSubscriber { diff --git a/crates/controlplane/src/events/test_filter_parse.rs b/crates/controlplane/src/events/test_filter_parse.rs new file mode 100644 index 00000000..186e34c2 --- /dev/null +++ b/crates/controlplane/src/events/test_filter_parse.rs @@ -0,0 +1,169 @@ +use std::sync::Arc; + +use snops_common::{node_targets::NodeTargets, state::InternedId}; + +use super::{ + filter_parse::EventFilterParseError, + EventFilter::{self, *}, + EventKindFilter::*, +}; +use crate::events::filter_parse::EventFilterParsable; + +macro_rules! eq { + ($s:expr, $f:expr) => { + assert_eq!($s.parse::()?, $f); + }; +} + +macro_rules! err { + ($s:expr, $pattern:pat $(if $guard:expr)?) => { + assert!(match $s.parse::() { + $pattern $(if $guard)? => true, + other => { + eprintln!("Received {other:?}"); + false + } + }) + }; +} + +#[test] +fn test_each_filter() -> Result<(), EventFilterParseError> { + eq!("unfiltered", Unfiltered); + eq!("all-of(unfiltered)", AllOf(vec![Unfiltered])); + eq!("any-of(unfiltered)", AnyOf(vec![Unfiltered])); + eq!("one-of(unfiltered)", OneOf(vec![Unfiltered])); + eq!("not(unfiltered)", Not(Box::new(Unfiltered))); + eq!("agent-is(default)", AgentIs(InternedId::default())); + eq!("env-is(default)", EnvIs(InternedId::default())); + eq!( + "transaction-is(foo)", + TransactionIs(Arc::new(String::from("foo"))) + ); + eq!("cannon-is(default)", CannonIs(InternedId::default())); + eq!("event-is(agent-connected)", EventIs(AgentConnected)); + eq!( + "node-key-is(client/foo)", + NodeKeyIs("client/foo".parse().unwrap()) + ); + eq!( + "node-target-is(client/any)", + NodeTargetIs(NodeTargets::One("client/any".parse().unwrap())) + ); + + Ok(()) +} + +#[test] +fn test_array() -> Result<(), EventFilterParseError> { + eq!( + "all-of(unfiltered, unfiltered)", + AllOf(vec![Unfiltered, Unfiltered]) + ); + eq!( + "any-of(unfiltered, unfiltered)", + AnyOf(vec![Unfiltered, Unfiltered]) + ); + eq!( + "one-of(unfiltered, unfiltered)", + OneOf(vec![Unfiltered, Unfiltered]) + ); + + eq!( + "any-of( + unfiltered, + all-of(unfiltered), + any-of(unfiltered), + one-of(unfiltered), + not(unfiltered), + agent-is(default), + env-is(default), + transaction-is(foo), + cannon-is(default), + event-is(agent-connected), + node-key-is(client/foo), + node-target-is(client/any) + )", + AnyOf(vec![ + Unfiltered, + AllOf(vec![Unfiltered]), + AnyOf(vec![Unfiltered]), + OneOf(vec![Unfiltered]), + Not(Box::new(Unfiltered)), + AgentIs(InternedId::default()), + EnvIs(InternedId::default()), + TransactionIs(Arc::new(String::from("foo"))), + CannonIs(InternedId::default()), + EventIs(AgentConnected), + NodeKeyIs("client/foo".parse().unwrap()), + NodeTargetIs(NodeTargets::One("client/any".parse().unwrap())), + ]) + ); + + Ok(()) +} + +#[test] +fn test_whitespace_ignore() -> Result<(), EventFilterParseError> { + eq!( + " all-of ( unfiltered , unfiltered ) ", + AllOf(vec![Unfiltered, Unfiltered]) + ); + Ok(()) +} + +#[test] +fn test_trailing_commas() -> Result<(), EventFilterParseError> { + eq!("all-of(unfiltered,)", AllOf(vec![Unfiltered])); + Ok(()) +} + +#[test] +fn test_deep_nesting() -> Result<(), EventFilterParseError> { + eq!( + "all-of(all-of(all-of(all-of(all-of(all-of(unfiltered))))))", + AllOf(vec![AllOf(vec![AllOf(vec![AllOf(vec![AllOf(vec![ + AllOf(vec![Unfiltered]) + ])])])])]) + ); + + // not + eq!("not(not(not(not(not(not(unfiltered))))))", !!!!!!Unfiltered); + + Ok(()) +} + +#[test] +fn test_invalid() { + err!( + "invalid", + Err(EventFilterParseError::InvalidFilter(e)) if e == "invalid" + ); +} + +#[test] +fn test_expected_parens() { + use EventFilterParsable::*; + + err!( + "all-of", + Err(EventFilterParseError::ExpectedToken(a, b)) if a == OpenParen && b == "EOF" + ); + err!( + "all-of(", + Err(EventFilterParseError::ExpectedToken(a, b)) if a == CloseParen && b == "EOF" + ); + err!( + "all-of(unfiltered", + Err(EventFilterParseError::ExpectedToken(a, b)) if a == CommaOrCloseParen && b == "EOF" + ); +} + +#[test] +fn test_failed_agent_parse() { + err!( + "agent-is(|)", + Err(EventFilterParseError::ParseError(EventFilterParsable::AgentId, e)) + if e.starts_with("invalid InternedId expected pattern") + ); +} diff --git a/crates/controlplane/src/events/traits.rs b/crates/controlplane/src/events/traits.rs new file mode 100644 index 00000000..e41fc5c9 --- /dev/null +++ b/crates/controlplane/src/events/traits.rs @@ -0,0 +1,95 @@ +use std::sync::Arc; + +use snops_common::state::{AgentId, AgentState, EnvId, InternedId, NodeKey}; + +use super::{AgentEvent, Event, EventFilter, EventKind, EventKindFilter, TransactionEvent}; +use crate::{cannon::context::ExecutionContext, state::Agent}; + +impl From for EventFilter { + fn from(kind: EventKindFilter) -> Self { + EventFilter::EventIs(kind) + } +} + +pub trait EventHelpers { + fn event(self) -> Event; + fn with_agent(self, agent: &Agent) -> Event; + fn with_agent_id(self, agent_id: AgentId) -> Event; + fn with_node_key(self, node_key: NodeKey) -> Event; + fn with_env_id(self, env_id: EnvId) -> Event; + fn with_transaction(self, transaction: Arc) -> Event; + fn with_cannon(self, cannon: InternedId) -> Event; + fn with_cannon_ctx(self, ctx: &ExecutionContext, transaction: Arc) -> Event; +} + +impl> EventHelpers for T { + fn event(self) -> Event { + self.into() + } + + fn with_agent(self, agent: &Agent) -> Event { + let mut event = self.into(); + event.agent = Some(agent.id); + if let AgentState::Node(env_id, node) = &agent.state { + event.node_key = Some(node.node_key.clone()); + event.env = Some(*env_id); + } + event + } + + fn with_agent_id(self, agent_id: AgentId) -> Event { + let mut event = self.into(); + event.agent = Some(agent_id); + event + } + + fn with_node_key(self, node_key: NodeKey) -> Event { + let mut event = self.into(); + event.node_key = Some(node_key); + event + } + + fn with_env_id(self, env_id: EnvId) -> Event { + let mut event = self.into(); + event.env = Some(env_id); + event + } + + fn with_transaction(self, transaction: Arc) -> Event { + let mut event = self.into(); + event.transaction = Some(transaction); + event + } + + fn with_cannon(self, cannon: InternedId) -> Event { + let mut event = self.into(); + event.cannon = Some(cannon); + event + } + + fn with_cannon_ctx(self, ctx: &ExecutionContext, transaction: Arc) -> Event { + let mut event = self.into(); + event.cannon = Some(ctx.id); + event.env = Some(ctx.env_id); + event.transaction = Some(transaction); + event + } +} + +impl From for Event { + fn from(kind: EventKind) -> Self { + Self::new(kind) + } +} + +impl From for Event { + fn from(kind: AgentEvent) -> Self { + Self::new(EventKind::Agent(kind)) + } +} + +impl From for Event { + fn from(kind: TransactionEvent) -> Self { + Self::new(EventKind::Transaction(kind)) + } +}