diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 9c62064208..719fa06172 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -17,7 +17,7 @@ mod stream; pub(crate) use event::{ BinaryWrapper, DataType, ExecutionError, MessageResponse as EventResponse, - RequestData, Target, + RequestData, RuesSubscription, Target, }; use execution_core::Event; @@ -424,8 +424,8 @@ where } enum SubscriptionAction { - Subscribe(RuesEventUri), - Unsubscribe(RuesEventUri), + Subscribe(RuesSubscription), + Unsubscribe(RuesSubscription), Dispatch { uri: RuesEventUri, body: Incoming }, } @@ -534,23 +534,19 @@ async fn handle_stream_rues( }, }; - // The event is subscribed to if it matches any of the subscriptions. - let mut is_subscribed = false; - for sub in &subscription_set { - if sub.matches(&event) { - is_subscribed = true; - break; - } - } + let subscription = subscription_set.iter().find(|s|s.uri.matches(&event)); // If the event is subscribed, we send it to the client. - if is_subscribed { + if let Some(subscription) = subscription { event.add_header("Content-Location", event.uri.to_string()); - let event = event.to_bytes(); + let event = match subscription.binary { + true => Message::Binary(event.to_bytes()), + false => Message::Text(event.to_json()), + }; // If the event fails sending we close the socket on the client // and stop processing further. - if stream.send(Message::Binary(event)).await.is_err() { + if stream.send(event).await.is_err() { let _ = stream.close(Some(CloseFrame { code: CloseCode::Error, reason: Cow::from("Failed sending event"), @@ -661,7 +657,7 @@ async fn handle_request_rues( Some(sid) => sid, }; - let uri = match RuesEventUri::parse_from_path(req.uri().path()) { + let subscription = match RuesSubscription::parse_from_req(&req) { None => { return response( StatusCode::NOT_FOUND, @@ -682,10 +678,10 @@ async fn handle_request_rues( }; let action = match *req.method() { - Method::GET => SubscriptionAction::Subscribe(uri), - Method::DELETE => SubscriptionAction::Unsubscribe(uri), + Method::GET => SubscriptionAction::Subscribe(subscription), + Method::DELETE => SubscriptionAction::Unsubscribe(subscription), Method::POST => SubscriptionAction::Dispatch { - uri, + uri: subscription.uri, body: req.into_body(), }, _ => { @@ -811,7 +807,7 @@ mod tests { use std::{fs, thread}; use super::*; - use event::Event as EventRequest; + use event::{Event as EventRequest, RuesEventUri}; use crate::http::event::WrappedContractId; use execution_core::ContractId; @@ -1101,6 +1097,7 @@ mod tests { server.local_addr )) .header("Rusk-Session-Id", sid.to_string()) + .header("Accept", "application/octet-stream".to_string()) .send() .await .expect("Requesting should succeed"); @@ -1113,6 +1110,7 @@ mod tests { server.local_addr )) .header("Rusk-Session-Id", sid.to_string()) + .header("Accept", "application/octet-stream".to_string()) .send() .await .expect("Requesting should succeed"); @@ -1167,11 +1165,13 @@ mod tests { assert_eq!(received_event, event, "Event should be the same"); + let unsubscribe_location = format!( + "http://{}{}", + server.local_addr, at_first_received_event.uri + ); + let response = client - .delete(format!( - "http://{}/on/contracts:{maybe_sub_contract_id_hex}/{TOPIC}", - server.local_addr - )) + .delete(unsubscribe_location) .header("Rusk-Session-Id", sid.to_string()) .send() .await diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 9ae9caa940..1e1051e0e4 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -354,7 +354,7 @@ impl From> for RequestData { } } -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct ResponseData { data: DataType, header: serde_json::Map, @@ -414,11 +414,11 @@ impl Eq for DataType {} impl PartialEq for DataType { fn eq(&self, other: &Self) -> bool { match (self, other) { - (Self::Channel(_), Self::Channel(_)) => true, (Self::Text(a), Self::Text(b)) => a == b, (Self::Json(a), Self::Json(b)) => a == b, (Self::Binary(a), Self::Binary(b)) => a == b, (Self::None, Self::None) => true, + // Two mpsc::Receiver are always different _ => false, } } @@ -742,6 +742,42 @@ pub struct RuesEventUri { pub const RUES_LOCATION_PREFIX: &str = "/on"; +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct RuesSubscription { + pub binary: bool, + pub uri: RuesEventUri, +} + +// We won't to take into consideration the "binary" flag while inserting into +// subscriptions_set +impl core::hash::Hash for RuesSubscription { + fn hash(&self, state: &mut H) { + self.uri.hash(state); + } +} + +impl PartialEq for RuesSubscription { + fn eq(&self, other: &Self) -> bool { + self.uri == other.uri + } +} + +impl Eq for RuesSubscription {} + +impl RuesSubscription { + pub fn parse_from_req(req: &Request) -> Option { + let uri = RuesEventUri::parse_from_path(req.uri().path())?; + let binary = req + .headers() + .get(ACCEPT) + .and_then(|h| h.to_str().ok()) + .map(|v| v.eq_ignore_ascii_case(CONTENT_TYPE_BINARY)) + .unwrap_or_default(); + + Some(Self { binary, uri }) + } +} + impl Display for RuesEventUri { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let component = &self.component; @@ -876,6 +912,10 @@ impl RuesEvent { bytes } + + pub fn to_json(&self) -> String { + serde_json::to_string(&self.data).expect("Data to be json serializable") + } } impl From for RuesEvent {