diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 7ccd6cc45b..0b8bdeefa2 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -537,23 +537,19 @@ async fn handle_stream_rues( }, }; - // The event is subscribed to if it matches any of the subscriptions. - let mut is_subscribed = false; - for sub in &subscription_set { - if sub.matches(&event) { - is_subscribed = true; - break; - } - } + let subscription = subscription_set.iter().find(|s|s.location.matches(&event)); // If the event is subscribed, we send it to the client. - if is_subscribed { + if let Some(subscription) = subscription { event.apply_location(); - let event = event.to_bytes(); + let event = match subscription.binary { + true => Message::Binary(event.to_bytes()), + false => Message::Text(event.to_json()), + }; // If the event fails sending we close the socket on the client // and stop processing further. - if stream.send(Message::Binary(event)).await.is_err() { + if stream.send(event).await.is_err() { let _ = stream.close(Some(CloseFrame { code: CloseCode::Error, reason: Cow::from("Failed sending event"), @@ -601,7 +597,7 @@ async fn handle_dispatch( sender .send(Ok(RuesEvent { - subscription: sub, + location: sub.location, data: rsp, })) .await; @@ -655,13 +651,6 @@ async fn handle_request_rues( Ok(response.map(Into::into)) } else { - let headers = req.headers(); - let mut path_split = req.uri().path().split('/'); - - // Skip '/on' since we already know its present - path_split.next(); - path_split.next(); - let sid = match SessionId::parse_from_req(&req) { None => { return response( @@ -672,16 +661,15 @@ async fn handle_request_rues( Some(sid) => sid, }; - let subscription = - match RuesSubscription::parse_from_path_split(path_split) { - None => { - return response( - StatusCode::NOT_FOUND, - "{{\"error\":\"Invalid URL path\n\"}}", - ); - } - Some(s) => s, - }; + let subscription = match RuesSubscription::parse_from_req(&req) { + None => { + return response( + StatusCode::NOT_FOUND, + "{{\"error\":\"Invalid URL path\n\"}}", + ); + } + Some(s) => s, + }; let action_sender = match sockets_map.read().await.get(&sid) { Some(sender) => sender.clone(), @@ -823,7 +811,7 @@ mod tests { use std::{fs, thread}; use super::*; - use event::Event as EventRequest; + use event::{Event as EventRequest, RuesLocation}; use crate::http::event::WrappedContractId; use execution_core::ContractId; @@ -1113,6 +1101,7 @@ mod tests { server.local_addr )) .header("Rusk-Session-Id", sid.to_string()) + .header("Accept", "application/octet-stream".to_string()) .send() .await .expect("Requesting should succeed"); @@ -1125,6 +1114,7 @@ mod tests { server.local_addr )) .header("Rusk-Session-Id", sid.to_string()) + .header("Accept", "application/octet-stream".to_string()) .send() .await .expect("Requesting should succeed"); @@ -1179,11 +1169,14 @@ mod tests { assert_eq!(received_event, event, "Event should be the same"); + let unsubscribe_location = format!( + "http://{}{}", + server.local_addr, + at_first_received_event.location.to_path() + ); + let response = client - .delete(format!( - "http://{}/on/contracts:{maybe_sub_contract_id_hex}/{TOPIC}", - server.local_addr - )) + .delete(unsubscribe_location) .header("Rusk-Session-Id", sid.to_string()) .send() .await @@ -1222,19 +1215,13 @@ mod tests { .ok_or(anyhow::anyhow!("Content location is not a string"))? .to_string(); - let mut path_split = path.split('/'); - - // Skip '/on' since we already know its present - path_split.next(); - path_split.next(); - - let subscription = RuesSubscription::parse_from_path_split(path_split) + let location = RuesLocation::parse_from_path(&path) .ok_or(anyhow::anyhow!("Invalid location"))?; let mut data = ResponseData::new(data.to_vec()); for (key, value) in header { data = data.with_header(key, value); } - Ok(RuesEvent { data, subscription }) + Ok(RuesEvent { data, location }) } } diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 47bb9d405c..20c397ddd1 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -354,7 +354,7 @@ impl From> for RequestData { } } -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct ResponseData { data: DataType, header: serde_json::Map, @@ -414,11 +414,11 @@ impl Eq for DataType {} impl PartialEq for DataType { fn eq(&self, other: &Self) -> bool { match (self, other) { - (Self::Channel(_), Self::Channel(_)) => true, (Self::Text(a), Self::Text(b)) => a == b, (Self::Json(a), Self::Json(b)) => a == b, (Self::Binary(a), Self::Binary(b)) => a == b, (Self::None, Self::None) => true, + // Two mpsc::Receiver are always different _ => false, } } @@ -734,7 +734,7 @@ impl SessionId { /// `transactions`, etc...) and an optional entity within the component that /// the event targets. #[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)] -pub struct RuesSubscription { +pub struct RuesLocation { pub component: String, pub entity: Option, pub topic: String, @@ -742,8 +742,44 @@ pub struct RuesSubscription { pub const RUES_LOCATION_PREFIX: &str = "/on"; +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RuesSubscription { + pub binary: bool, + pub location: RuesLocation, +} + +// We won't to take into consideration the "binary" flag while inserting into +// subscriptions_set +impl core::hash::Hash for RuesSubscription { + fn hash(&self, state: &mut H) { + self.location.hash(state); + } +} + +impl PartialEq for RuesSubscription { + fn eq(&self, other: &Self) -> bool { + self.location == other.location + } +} + +impl Eq for RuesSubscription {} + impl RuesSubscription { - pub fn to_location(&self) -> String { + pub fn parse_from_req(req: &Request) -> Option { + let location = RuesLocation::parse_from_path(req.uri().path())?; + let binary = req + .headers() + .get(ACCEPT) + .and_then(|h| h.to_str().ok()) + .map(|v| v.eq_ignore_ascii_case(CONTENT_TYPE_BINARY)) + .unwrap_or_default(); + + Some(Self { binary, location }) + } +} + +impl RuesLocation { + pub fn to_path(&self) -> String { let component = &self.component; let entity = self .entity @@ -755,7 +791,13 @@ impl RuesSubscription { format!("{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}") } - pub fn parse_from_path_split(mut path_split: Split) -> Option { + pub fn parse_from_path(path: &str) -> Option { + let mut path_split = path.split('/'); + + // Skip '/on' since we already know its present + path_split.next(); + path_split.next(); + // If the segment contains a `:`, we split the string in two after the // first one - meaning entities with `:` are still possible. // If the segment doesn't contain a `:` then the segment is just a @@ -780,7 +822,7 @@ impl RuesSubscription { } pub fn matches(&self, event: &RuesEvent) -> bool { - let event = &event.subscription; + let event = &event.location; if self.component != event.component { return false; } @@ -829,13 +871,13 @@ impl From for execution_core::Event { /// A RUES event #[derive(Debug, Clone, Eq, PartialEq)] pub struct RuesEvent { - pub subscription: RuesSubscription, + pub location: RuesLocation, pub data: ResponseData, } impl RuesEvent { pub fn apply_location(&mut self) { - let location = self.subscription.to_location(); + let location = self.location.to_path(); self.data.add_header("Content-Location", location); } @@ -859,12 +901,16 @@ impl RuesEvent { bytes } + + pub fn to_json(&self) -> String { + serde_json::to_string(&self.data).expect("Data to be json serializable") + } } impl From for RuesEvent { fn from(event: ContractEvent) -> Self { Self { - subscription: RuesSubscription { + location: RuesLocation { component: "contracts".into(), entity: Some(hex::encode(event.target.0.as_bytes())), topic: event.topic, @@ -884,7 +930,7 @@ impl From for RuesEvent { impl From for RuesEvent { fn from(value: node_data::events::Event) -> Self { Self { - subscription: RuesSubscription { + location: RuesLocation { component: value.component.into(), entity: Some(value.entity), topic: value.topic.into(),