From 029e2c8fc1e532ed6ceee1066b5236cc9025f67e Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 6 Aug 2024 14:52:11 +0200 Subject: [PATCH] rusk: change RUES to allow Text or Binary subscription --- rusk/src/lib/http.rs | 39 ++++++++++++++----------------- rusk/src/lib/http/event.rs | 48 +++++++++++++++++++++++++++++++------- 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 636bc1499f..f326417cea 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -537,23 +537,19 @@ async fn handle_stream_rues( }, }; - // The event is subscribed to if it matches any of the subscriptions. - let mut is_subscribed = false; - for sub in &subscription_set { - if sub.matches(&event) { - is_subscribed = true; - break; - } - } + let subscription = subscription_set.iter().find(|s|s.location.matches(&event)); // If the event is subscribed, we send it to the client. - if is_subscribed { + if let Some(subscription) = subscription { event.apply_location(); - let event = event.to_bytes(); + let event = match subscription.binary { + true => Message::Binary(event.to_bytes()), + false => Message::Text(event.to_json()), + }; // If the event fails sending we close the socket on the client // and stop processing further. - if stream.send(Message::Binary(event)).await.is_err() { + if stream.send(event).await.is_err() { let _ = stream.close(Some(CloseFrame { code: CloseCode::Error, reason: Cow::from("Failed sending event"), @@ -601,7 +597,7 @@ async fn handle_dispatch( sender .send(Ok(RuesEvent { - subscription: sub, + location: sub.location, data: rsp, })) .await; @@ -672,16 +668,15 @@ async fn handle_request_rues( Some(sid) => sid, }; - let subscription = - match RuesSubscription::parse_from_path_split(path_split) { - None => { - return response( - StatusCode::NOT_FOUND, - "{{\"error\":\"Invalid URL path\n\"}}", - ); - } - Some(s) => s, - }; + let subscription = match RuesSubscription::parse_from_req(&req) { + None => { + return response( + StatusCode::NOT_FOUND, + "{{\"error\":\"Invalid URL path\n\"}}", + ); + } + Some(s) => s, + }; let action_sender = match sockets_map.read().await.get(&sid) { Some(sender) => sender.clone(), diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index ae9a4a33d0..0d035bcd6b 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -354,7 +354,7 @@ impl From> for RequestData { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResponseData { data: DataType, header: serde_json::Map, @@ -715,7 +715,7 @@ impl SessionId { /// `transactions`, etc...) and an optional entity within the component that /// the event targets. #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] -pub struct RuesSubscription { +pub struct RuesLocation { pub component: String, pub entity: Option, pub topic: String, @@ -723,8 +723,28 @@ pub struct RuesSubscription { pub const RUES_LOCATION_PREFIX: &str = "/on"; +#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] +pub struct RuesSubscription { + pub binary: bool, + pub location: RuesLocation, +} + impl RuesSubscription { - pub fn to_location(&self) -> String { + pub fn parse_from_req(req: &Request) -> Option { + let location = RuesLocation::parse_from_path(req.uri().path())?; + let binary = req + .headers() + .get(ACCEPT) + .and_then(|h| h.to_str().ok()) + .map(|v| v.eq_ignore_ascii_case(CONTENT_TYPE_BINARY)) + .unwrap_or_default(); + + Some(Self { binary, location }) + } +} + +impl RuesLocation { + pub fn to_path(&self) -> String { let component = &self.component; let entity = self .entity @@ -736,7 +756,13 @@ impl RuesSubscription { format!("{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}") } - pub fn parse_from_path_split(mut path_split: Split) -> Option { + pub fn parse_from_path(path: &str) -> Option { + let mut path_split = path.split('/'); + + // Skip '/on' since we already know its present + path_split.next(); + path_split.next(); + // If the segment contains a `:`, we split the string in two after the // first one - meaning entities with `:` are still possible. // If the segment doesn't contain a `:` then the segment is just a @@ -761,7 +787,7 @@ impl RuesSubscription { } pub fn matches(&self, event: &RuesEvent) -> bool { - let event = &event.subscription; + let event = &event.location; if self.component != event.component { return false; } @@ -810,13 +836,13 @@ impl From for execution_core::Event { /// A RUES event #[derive(Debug, Clone)] pub struct RuesEvent { - pub subscription: RuesSubscription, + pub location: RuesLocation, pub data: ResponseData, } impl RuesEvent { pub fn apply_location(&mut self) { - let location = self.subscription.to_location(); + let location = self.location.to_path(); self.data.add_header("Content-Location", location); } @@ -840,12 +866,16 @@ impl RuesEvent { bytes } + + pub fn to_json(&self) -> String { + serde_json::to_string(&self.data).expect("Data to be json serializable") + } } impl From for RuesEvent { fn from(event: ContractEvent) -> Self { Self { - subscription: RuesSubscription { + location: RuesLocation { component: "contracts".into(), entity: Some(hex::encode(event.target.0.as_bytes())), topic: event.topic, @@ -865,7 +895,7 @@ impl From for RuesEvent { impl From for RuesEvent { fn from(value: node_data::events::Event) -> Self { Self { - subscription: RuesSubscription { + location: RuesLocation { component: value.component.into(), entity: Some(value.entity), topic: value.topic.into(),