diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 01995cb343..f1c44cd02e 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -65,7 +65,7 @@ use anyhow::Error as AnyhowError; use hyper_util::rt::TokioIo; use rand::rngs::OsRng; -use crate::http::event::{FullOrStreamBody, RuesEventData}; +use crate::http::event::FullOrStreamBody; use crate::VERSION; pub use self::event::{ContractEvent, RuesEvent}; @@ -598,48 +598,12 @@ async fn handle_dispatch( } }; - let (data, header) = rsp.into_inner(); - match data { - DataType::Binary(bytes) => { - let _ = sender - .send(Ok(RuesEvent { - headers: req.headers.clone(), - data: RuesEventData::Other(bytes.inner), - })) - .await; - } - DataType::Text(text) => { - let _ = sender - .send(Ok(RuesEvent { - headers: req.headers.clone(), - data: RuesEventData::Other(text.into_bytes()), - })) - .await; - } - DataType::Json(json) => { - let _ = sender - .send( - serde_json::to_vec(&json) - .map(|bytes| RuesEvent { - headers: req.headers.clone(), - data: RuesEventData::Other(bytes), - }) - .map_err(Into::into), - ) - .await; - } - DataType::Channel(channel) => { - for bytes in channel { - let _ = sender - .send(Ok(RuesEvent { - headers: req.headers.clone(), - data: RuesEventData::Other(bytes), - })) - .await; - } - } - DataType::None => {} - } + sender + .send(Ok(RuesEvent { + subscription: sub, + data: rsp, + })) + .await; } fn response( @@ -1203,18 +1167,20 @@ mod tests { let message = stream.read().expect("Event should be received"); let event_bytes = message.into_data(); - let event = RuesEvent::from_bytes(&event_bytes) - .expect("Event should deserialize"); - - assert_eq!(at_first_received_event, event, "Event should be the same"); + assert_eq!( + at_first_received_event.to_bytes(), + event_bytes, + "Event should be the same" + ); let message = stream.read().expect("Event should be received"); let event_bytes = message.into_data(); - let event = RuesEvent::from_bytes(&event_bytes) - .expect("Event should deserialize"); - - assert_eq!(received_event, event, "Event should be the same"); + assert_eq!( + received_event.to_bytes(), + event_bytes, + "Event should be the same" + ); let response = client .delete(format!( @@ -1241,11 +1207,11 @@ mod tests { .expect("Sending event should succeed"); let message = stream.read().expect("Event should be received"); - let event_bytes = message.into_data(); - - let event = RuesEvent::from_bytes(&event_bytes) - .expect("Event should deserialize"); - assert_eq!(received_event, event, "Event should be the same"); + assert_eq!( + received_event.to_bytes(), + event_bytes, + "Event should be the same" + ); } } diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index d992dc36ff..9ea4abed70 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -354,6 +354,7 @@ impl From> for RequestData { } } +#[derive(Debug, Clone)] pub struct ResponseData { data: DataType, header: serde_json::Map, @@ -396,6 +397,30 @@ pub enum DataType { None, } +impl Clone for DataType { + fn clone(&self) -> Self { + match self { + Self::Binary(b) => Self::Binary(BinaryWrapper { + inner: b.inner.clone(), + }), + Self::Text(s) => Self::Text(s.clone()), + Self::Json(s) => Self::Json(s.clone()), + _ => Self::None, + } + } +} + +impl DataType { + pub fn to_bytes(&self) -> Vec { + match self { + Self::Binary(b) => b.inner.clone(), + Self::Text(s) => s.as_bytes().to_vec(), + Self::Json(s) => s.to_string().as_bytes().to_vec(), + _ => vec![], + } + } +} + impl From for DataType { fn from(value: serde_json::Value) -> Self { Self::Json(value) @@ -714,19 +739,15 @@ impl RuesSubscription { } pub fn matches(&self, event: &RuesEvent) -> bool { - let event = match &event.data { - RuesEventData::Contract(event) => event, - _ => return false, - }; - - if self.component != "contracts" { + let event = &event.subscription; + if self.component != event.component { return false; } - let target = hex::encode(event.target.0.as_bytes()); - - if self.entity != Some(target) { - return false; + if let Some(entity) = &self.entity { + if Some(entity) != event.entity.as_ref() { + return false; + } } if self.topic != event.topic { @@ -768,22 +789,22 @@ impl From for execution_core::Event { } /// A RUES event -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct RuesEvent { - pub headers: serde_json::Map, - pub data: RuesEventData, + pub subscription: RuesSubscription, + pub data: ResponseData, } impl RuesEvent { /// Serialize the event into a vector of bytes. pub fn to_bytes(&self) -> Vec { - let headers_bytes = serde_json::to_vec(&self.headers) + let headers_bytes = serde_json::to_vec(&self.data.header) .expect("Serializing JSON should succeed"); let headers_len = headers_bytes.len() as u32; let headers_len_bytes = headers_len.to_le_bytes(); - let data_bytes = self.data.to_bytes(); + let data_bytes = self.data.data.to_bytes(); let len = headers_len_bytes.len() + headers_bytes.len() + data_bytes.len(); @@ -795,35 +816,17 @@ impl RuesEvent { bytes } - - pub fn from_bytes(bytes: &[u8]) -> Option { - if bytes.len() < 4 { - return None; - } - let (headers_len_bytes, bytes) = bytes.split_at(4); - - let mut headers_len_array = [0u8; 4]; - headers_len_array.copy_from_slice(headers_len_bytes); - - let headers_len = u32::from_le_bytes(headers_len_array) as usize; - if bytes.len() < headers_len { - return None; - } - - let (headers_bytes, data_bytes) = bytes.split_at(headers_len); - let headers = serde_json::from_slice(headers_bytes).ok()?; - - let data = RuesEventData::from_bytes(data_bytes)?; - - Some(Self { headers, data }) - } } impl From for RuesEvent { fn from(event: ContractEvent) -> Self { Self { - headers: serde_json::Map::new(), - data: RuesEventData::Contract(event), + subscription: RuesSubscription { + component: "contracts".into(), + entity: Some(hex::encode(event.target.0.as_bytes())), + topic: event.topic, + }, + data: ResponseData::new(event.data), } } } @@ -837,63 +840,14 @@ impl From for RuesEvent { #[cfg(feature = "node")] impl From for RuesEvent { fn from(value: node_data::events::Event) -> Self { - let mut headers = serde_json::Map::new(); - headers.insert( - "Content-Location".into(), - format!("/on/{}:{}/{}", value.target, value.id, value.topic).into(), - ); - Self { - headers: serde_json::Map::new(), - data: RuesEventData::Other(value.data.unwrap_or_default().into()), - } - } -} - -/// Types of event data that RUES supports. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum RuesEventData { - /// A contract event. - Contract(ContractEvent), - /// An event whose provenance is unknown. - Other(Vec), -} - -impl RuesEventData { - const CONTRACT_TAG: u8 = 1; - const OTHER_TAG: u8 = 255; + subscription: RuesSubscription { + component: value.target.into(), + entity: Some(value.id), + topic: value.topic.into(), + }, - fn to_bytes(&self) -> Vec { - match self { - Self::Contract(event) => { - let mut bytes = serde_json::to_vec(event).expect( - "Serializing contract event to JSON should succeed", - ); - bytes.insert(0, Self::CONTRACT_TAG); - bytes - } - Self::Other(data) => { - let mut bytes = vec![0; data.len() + 1]; - bytes[0] = Self::OTHER_TAG; - bytes[1..].copy_from_slice(data); - bytes - } - } - } - - fn from_bytes(bytes: &[u8]) -> Option { - let (tag, bytes) = bytes.split_first()?; - - match *tag { - Self::CONTRACT_TAG => { - let event = serde_json::from_slice(bytes).ok()?; - Some(Self::Contract(event)) - } - Self::OTHER_TAG => { - let data = bytes.to_vec(); - Some(Self::Other(data)) - } - _ => None, + data: ResponseData::new(value.data.unwrap_or_default()), } } }