Skip to content

Commit

Permalink
rusk: process dispatched events
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduardo Leegwater Simões committed May 8, 2024
1 parent 2e34f0c commit cfb706c
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
59 changes: 42 additions & 17 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use rand::rngs::OsRng;

#[cfg(feature = "node")]
use crate::chain::{Rusk, RuskNode};
use crate::http::event::FullOrStreamBody;
use crate::http::event::{FullOrStreamBody, RuesEventData};
use crate::VERSION;

pub use self::event::{ContractEvent, RuesEvent};
Expand Down Expand Up @@ -597,44 +597,69 @@ async fn handle_dispatch<H: HandleRequest>(
let bytes = match body.collect().await {
Ok(bytes) => bytes.to_bytes(),
Err(err) => {
let _ = sender.send(Err(err.into()));
let _ = sender.send(Err(err.into())).await;
return;
}
};

let req = match MessageRequest::parse(&bytes) {
Ok(req) => req,
Err(err) => {
let _ = sender.send(Err(err.into()));
let _ = sender.send(Err(err)).await;
return;
}
};

let rsp = match handler.handle(&req).await {
Ok(rsp) => rsp,
Err(err) => {
let _ = sender.send(Err(err.into()));
let _ = sender.send(Err(err)).await;
return;
}
};

let (data, header) = rsp.into_inner();
match data {
DataType::Binary(_) => {}
DataType::Text(_) => {}
DataType::Json(_) => {}
DataType::Channel(_) => {}
DataType::Binary(bytes) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes.inner),
}))
.await;
}
DataType::Text(text) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(text.into_bytes()),
}))
.await;
}
DataType::Json(json) => {
let _ = sender
.send(
serde_json::to_vec(&json)
.map(|bytes| RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
})
.map_err(Into::into),
)
.await;
}
DataType::Channel(channel) => {
for bytes in channel {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
}))
.await;
}
}
DataType::None => {}
}

todo!(
"\
Figure out if the subscription is a contract subscription (meaning a \
contract call) and, if so, parse the body for the arguments and execute, \
giving somehow passing the resulting events through to the websocket stream
that dispatched the event.
"
)
}

fn response(
Expand Down
13 changes: 13 additions & 0 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,10 +839,13 @@ impl From<rusk_abi::Event> for RuesEvent {
pub enum RuesEventData {
/// A contract event.
Contract(ContractEvent),
/// An event whose provenance is unknown.
Other(Vec<u8>),
}

impl RuesEventData {
const CONTRACT_TAG: u8 = 1;
const OTHER_TAG: u8 = 255;

fn to_bytes(&self) -> Vec<u8> {
match self {
Expand All @@ -853,6 +856,12 @@ impl RuesEventData {
bytes.insert(0, Self::CONTRACT_TAG);
bytes
}
Self::Other(data) => {
let mut bytes = vec![0; data.len() + 1];
bytes[0] = Self::OTHER_TAG;
bytes[1..].copy_from_slice(data);
bytes
}
}
}

Expand All @@ -864,6 +873,10 @@ impl RuesEventData {
let event = serde_json::from_slice(bytes).ok()?;
Some(Self::Contract(event))
}
Self::OTHER_TAG => {
let data = bytes.to_vec();
Some(Self::Other(data))
}
_ => None,
}
}
Expand Down

0 comments on commit cfb706c

Please sign in to comment.