Skip to content

Commit

Permalink
wip: remove RuesEventData
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 6, 2024
1 parent fab8b26 commit d17b4bc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 150 deletions.
78 changes: 22 additions & 56 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use anyhow::Error as AnyhowError;
use hyper_util::rt::TokioIo;
use rand::rngs::OsRng;

use crate::http::event::{FullOrStreamBody, RuesEventData};
use crate::http::event::FullOrStreamBody;
use crate::VERSION;

pub use self::event::{ContractEvent, RuesEvent};
Expand Down Expand Up @@ -598,48 +598,12 @@ async fn handle_dispatch<H: HandleRequest>(
}
};

let (data, header) = rsp.into_inner();
match data {
DataType::Binary(bytes) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes.inner),
}))
.await;
}
DataType::Text(text) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(text.into_bytes()),
}))
.await;
}
DataType::Json(json) => {
let _ = sender
.send(
serde_json::to_vec(&json)
.map(|bytes| RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
})
.map_err(Into::into),
)
.await;
}
DataType::Channel(channel) => {
for bytes in channel {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
}))
.await;
}
}
DataType::None => {}
}
sender
.send(Ok(RuesEvent {
subscription: sub,
data: rsp,
}))
.await;
}

fn response(
Expand Down Expand Up @@ -1203,18 +1167,20 @@ mod tests {
let message = stream.read().expect("Event should be received");
let event_bytes = message.into_data();

let event = RuesEvent::from_bytes(&event_bytes)
.expect("Event should deserialize");

assert_eq!(at_first_received_event, event, "Event should be the same");
assert_eq!(
at_first_received_event.to_bytes(),
event_bytes,
"Event should be the same"
);

let message = stream.read().expect("Event should be received");
let event_bytes = message.into_data();

let event = RuesEvent::from_bytes(&event_bytes)
.expect("Event should deserialize");

assert_eq!(received_event, event, "Event should be the same");
assert_eq!(
received_event.to_bytes(),
event_bytes,
"Event should be the same"
);

let response = client
.delete(format!(
Expand All @@ -1241,11 +1207,11 @@ mod tests {
.expect("Sending event should succeed");

let message = stream.read().expect("Event should be received");
let event_bytes = message.into_data();

let event = RuesEvent::from_bytes(&event_bytes)
.expect("Event should deserialize");

assert_eq!(received_event, event, "Event should be the same");
assert_eq!(
received_event.to_bytes(),
event_bytes,
"Event should be the same"
);
}
}
142 changes: 48 additions & 94 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ impl From<Vec<u8>> for RequestData {
}
}

#[derive(Debug, Clone)]
pub struct ResponseData {
data: DataType,
header: serde_json::Map<String, serde_json::Value>,
Expand Down Expand Up @@ -396,6 +397,30 @@ pub enum DataType {
None,
}

impl Clone for DataType {
fn clone(&self) -> Self {
match self {
Self::Binary(b) => Self::Binary(BinaryWrapper {
inner: b.inner.clone(),
}),
Self::Text(s) => Self::Text(s.clone()),
Self::Json(s) => Self::Json(s.clone()),
_ => Self::None,
}
}
}

impl DataType {
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Self::Binary(b) => b.inner.clone(),
Self::Text(s) => s.as_bytes().to_vec(),
Self::Json(s) => s.to_string().as_bytes().to_vec(),
_ => vec![],
}
}
}

impl From<serde_json::Value> for DataType {
fn from(value: serde_json::Value) -> Self {
Self::Json(value)
Expand Down Expand Up @@ -714,19 +739,15 @@ impl RuesSubscription {
}

pub fn matches(&self, event: &RuesEvent) -> bool {
let event = match &event.data {
RuesEventData::Contract(event) => event,
_ => return false,
};

if self.component != "contracts" {
let event = &event.subscription;
if self.component != event.component {
return false;
}

let target = hex::encode(event.target.0.as_bytes());

if self.entity != Some(target) {
return false;
if let Some(entity) = &self.entity {
if Some(entity) != event.entity.as_ref() {
return false;
}
}

if self.topic != event.topic {
Expand Down Expand Up @@ -768,22 +789,22 @@ impl From<ContractEvent> for execution_core::Event {
}

/// A RUES event
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone)]
pub struct RuesEvent {
pub headers: serde_json::Map<String, serde_json::Value>,
pub data: RuesEventData,
pub subscription: RuesSubscription,
pub data: ResponseData,
}

impl RuesEvent {
/// Serialize the event into a vector of bytes.
pub fn to_bytes(&self) -> Vec<u8> {
let headers_bytes = serde_json::to_vec(&self.headers)
let headers_bytes = serde_json::to_vec(&self.data.header)
.expect("Serializing JSON should succeed");

let headers_len = headers_bytes.len() as u32;
let headers_len_bytes = headers_len.to_le_bytes();

let data_bytes = self.data.to_bytes();
let data_bytes = self.data.data.to_bytes();

let len =
headers_len_bytes.len() + headers_bytes.len() + data_bytes.len();
Expand All @@ -795,35 +816,17 @@ impl RuesEvent {

bytes
}

pub fn from_bytes(bytes: &[u8]) -> Option<Self> {
if bytes.len() < 4 {
return None;
}
let (headers_len_bytes, bytes) = bytes.split_at(4);

let mut headers_len_array = [0u8; 4];
headers_len_array.copy_from_slice(headers_len_bytes);

let headers_len = u32::from_le_bytes(headers_len_array) as usize;
if bytes.len() < headers_len {
return None;
}

let (headers_bytes, data_bytes) = bytes.split_at(headers_len);
let headers = serde_json::from_slice(headers_bytes).ok()?;

let data = RuesEventData::from_bytes(data_bytes)?;

Some(Self { headers, data })
}
}

impl From<ContractEvent> for RuesEvent {
fn from(event: ContractEvent) -> Self {
Self {
headers: serde_json::Map::new(),
data: RuesEventData::Contract(event),
subscription: RuesSubscription {
component: "contracts".into(),
entity: Some(hex::encode(event.target.0.as_bytes())),
topic: event.topic,
},
data: ResponseData::new(event.data),
}
}
}
Expand All @@ -837,63 +840,14 @@ impl From<execution_core::Event> for RuesEvent {
#[cfg(feature = "node")]
impl From<node_data::events::Event> for RuesEvent {
fn from(value: node_data::events::Event) -> Self {
let mut headers = serde_json::Map::new();
headers.insert(
"Content-Location".into(),
format!("/on/{}:{}/{}", value.target, value.id, value.topic).into(),
);

Self {
headers: serde_json::Map::new(),
data: RuesEventData::Other(value.data.unwrap_or_default().into()),
}
}
}

/// Types of event data that RUES supports.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RuesEventData {
/// A contract event.
Contract(ContractEvent),
/// An event whose provenance is unknown.
Other(Vec<u8>),
}

impl RuesEventData {
const CONTRACT_TAG: u8 = 1;
const OTHER_TAG: u8 = 255;
subscription: RuesSubscription {
component: value.target.into(),
entity: Some(value.id),
topic: value.topic.into(),
},

fn to_bytes(&self) -> Vec<u8> {
match self {
Self::Contract(event) => {
let mut bytes = serde_json::to_vec(event).expect(
"Serializing contract event to JSON should succeed",
);
bytes.insert(0, Self::CONTRACT_TAG);
bytes
}
Self::Other(data) => {
let mut bytes = vec![0; data.len() + 1];
bytes[0] = Self::OTHER_TAG;
bytes[1..].copy_from_slice(data);
bytes
}
}
}

fn from_bytes(bytes: &[u8]) -> Option<Self> {
let (tag, bytes) = bytes.split_first()?;

match *tag {
Self::CONTRACT_TAG => {
let event = serde_json::from_slice(bytes).ok()?;
Some(Self::Contract(event))
}
Self::OTHER_TAG => {
let data = bytes.to_vec();
Some(Self::Other(data))
}
_ => None,
data: ResponseData::new(value.data.unwrap_or_default()),
}
}
}
Expand Down

0 comments on commit d17b4bc

Please sign in to comment.