From 2e73db4138dd6803693ffe96bad2653906da0912 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 8 Aug 2024 16:38:07 +0200 Subject: [PATCH] rusk: refactor RuesEvent to include headers --- rusk/src/lib/http.rs | 77 ++++++++++++++------------------------ rusk/src/lib/http/event.rs | 54 +++++++++++++++++--------- 2 files changed, 64 insertions(+), 67 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 7ccd6cc45b..9c62064208 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -69,7 +69,7 @@ use crate::http::event::FullOrStreamBody; use crate::VERSION; pub use self::event::{ContractEvent, RuesEvent, RUES_LOCATION_PREFIX}; -use self::event::{MessageRequest, ResponseData, RuesSubscription, SessionId}; +use self::event::{MessageRequest, ResponseData, RuesEventUri, SessionId}; use self::stream::{Listener, Stream}; const RUSK_VERSION_HEADER: &str = "Rusk-Version"; @@ -424,12 +424,9 @@ where } enum SubscriptionAction { - Subscribe(RuesSubscription), - Unsubscribe(RuesSubscription), - Dispatch { - sub: RuesSubscription, - body: Incoming, - }, + Subscribe(RuesEventUri), + Unsubscribe(RuesEventUri), + Dispatch { uri: RuesEventUri, body: Incoming }, } async fn handle_stream_rues( @@ -506,11 +503,11 @@ async fn handle_stream_rues( subscription_set.remove(&subscription); }, SubscriptionAction::Dispatch { - sub, + uri, body } => { // TODO figure out if we should subscribe to the event we dispatch - task::spawn(handle_dispatch(sub, body, handler.clone(), dispatch_sender.clone())); + task::spawn(handle_dispatch(uri, body, handler.clone(), dispatch_sender.clone())); } } } @@ -548,7 +545,7 @@ async fn handle_stream_rues( // If the event is subscribed, we send it to the client. if is_subscribed { - event.apply_location(); + event.add_header("Content-Location", event.uri.to_string()); let event = event.to_bytes(); // If the event fails sending we close the socket on the client @@ -570,7 +567,7 @@ async fn handle_stream_rues( } async fn handle_dispatch( - sub: RuesSubscription, + uri: RuesEventUri, body: Incoming, handler: Arc, sender: mpsc::Sender>, @@ -599,12 +596,9 @@ async fn handle_dispatch( } }; - sender - .send(Ok(RuesEvent { - subscription: sub, - data: rsp, - })) - .await; + let (data, headers) = rsp.into_inner(); + + sender.send(Ok(RuesEvent { uri, data, headers })).await; } fn response( @@ -656,11 +650,6 @@ async fn handle_request_rues( Ok(response.map(Into::into)) } else { let headers = req.headers(); - let mut path_split = req.uri().path().split('/'); - - // Skip '/on' since we already know its present - path_split.next(); - path_split.next(); let sid = match SessionId::parse_from_req(&req) { None => { @@ -672,16 +661,15 @@ async fn handle_request_rues( Some(sid) => sid, }; - let subscription = - match RuesSubscription::parse_from_path_split(path_split) { - None => { - return response( - StatusCode::NOT_FOUND, - "{{\"error\":\"Invalid URL path\n\"}}", - ); - } - Some(s) => s, - }; + let uri = match RuesEventUri::parse_from_path(req.uri().path()) { + None => { + return response( + StatusCode::NOT_FOUND, + "{{\"error\":\"Invalid URL path\n\"}}", + ); + } + Some(s) => s, + }; let action_sender = match sockets_map.read().await.get(&sid) { Some(sender) => sender.clone(), @@ -694,10 +682,10 @@ async fn handle_request_rues( }; let action = match *req.method() { - Method::GET => SubscriptionAction::Subscribe(subscription), - Method::DELETE => SubscriptionAction::Unsubscribe(subscription), + Method::GET => SubscriptionAction::Subscribe(uri), + Method::DELETE => SubscriptionAction::Unsubscribe(uri), Method::POST => SubscriptionAction::Dispatch { - sub: subscription, + uri, body: req.into_body(), }, _ => { @@ -1213,28 +1201,19 @@ mod tests { } pub fn from_bytes(data: &[u8]) -> anyhow::Result { - let (mut header, data) = crate::http::event::parse_header(data)?; + let (mut headers, data) = crate::http::event::parse_header(data)?; - let path = header + let path = headers .remove("Content-Location") .ok_or(anyhow::anyhow!("Content location is not set"))? .as_str() .ok_or(anyhow::anyhow!("Content location is not a string"))? .to_string(); - let mut path_split = path.split('/'); - - // Skip '/on' since we already know its present - path_split.next(); - path_split.next(); - - let subscription = RuesSubscription::parse_from_path_split(path_split) + let uri = RuesEventUri::parse_from_path(&path) .ok_or(anyhow::anyhow!("Invalid location"))?; - let mut data = ResponseData::new(data.to_vec()); - for (key, value) in header { - data = data.with_header(key, value); - } - Ok(RuesEvent { data, subscription }) + let data = data.to_vec().into(); + Ok(RuesEvent { data, headers, uri }) } } diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index b9c4201693..dcfe557908 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -732,7 +732,7 @@ impl SessionId { /// `transactions`, etc...) and an optional entity within the component that /// the event targets. #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] -pub struct RuesSubscription { +pub struct RuesEventUri { pub component: String, pub entity: Option, pub topic: String, @@ -740,8 +740,8 @@ pub struct RuesSubscription { pub const RUES_LOCATION_PREFIX: &str = "/on"; -impl RuesSubscription { - pub fn to_location(&self) -> String { +impl Display for RuesEventUri { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let component = &self.component; let entity = self .entity @@ -750,10 +750,23 @@ impl RuesSubscription { .unwrap_or_default(); let topic = &self.topic; - format!("{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}") + write!(f, "{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}") } +} + +impl RuesEventUri { + pub fn parse_from_path(path: &str) -> Option { + if !path.starts_with(RUES_LOCATION_PREFIX) { + return None; + } + // Skip '/on' since we already know its present + let path = &path[RUES_LOCATION_PREFIX.len()..]; + + let mut path_split = path.split('/'); + + // Skip first '/' + path_split.next()?; - pub fn parse_from_path_split(mut path_split: Split) -> Option { // If the segment contains a `:`, we split the string in two after the // first one - meaning entities with `:` are still possible. // If the segment doesn't contain a `:` then the segment is just a @@ -778,7 +791,7 @@ impl RuesSubscription { } pub fn matches(&self, event: &RuesEvent) -> bool { - let event = &event.subscription; + let event = &event.uri; if self.component != event.component { return false; } @@ -827,25 +840,29 @@ impl From for execution_core::Event { /// A RUES event #[derive(Debug, Clone, Eq, PartialEq)] pub struct RuesEvent { - pub subscription: RuesSubscription, - pub data: ResponseData, + pub uri: RuesEventUri, + pub headers: serde_json::Map, + pub data: DataType, } impl RuesEvent { - pub fn apply_location(&mut self) { - let location = self.subscription.to_location(); - self.data.add_header("Content-Location", location); + pub fn add_header, V: Into>( + &mut self, + key: K, + value: V, + ) { + self.headers.insert(key.into(), value.into()); } /// Serialize the event into a vector of bytes. pub fn to_bytes(&self) -> Vec { - let headers_bytes = serde_json::to_vec(&self.data.header) + let headers_bytes = serde_json::to_vec(&self.headers) .expect("Serializing JSON should succeed"); let headers_len = headers_bytes.len() as u32; let headers_len_bytes = headers_len.to_le_bytes(); - let data_bytes = self.data.data.to_bytes(); + let data_bytes = self.data.to_bytes(); let len = headers_len_bytes.len() + headers_bytes.len() + data_bytes.len(); @@ -862,12 +879,13 @@ impl RuesEvent { impl From for RuesEvent { fn from(event: ContractEvent) -> Self { Self { - subscription: RuesSubscription { + uri: RuesEventUri { component: "contracts".into(), entity: Some(hex::encode(event.target.0.as_bytes())), topic: event.topic, }, - data: ResponseData::new(event.data), + data: event.data.into(), + headers: Default::default(), } } } @@ -884,13 +902,13 @@ impl From for RuesEvent { let data = value.data.map_or(DataType::None, DataType::Json); Self { - subscription: RuesSubscription { + uri: RuesEventUri { component: value.component.into(), entity: Some(value.entity), topic: value.topic.into(), }, - - data: ResponseData::new(data), + data, + headers: Default::default(), } } }