Skip to content

Commit

Permalink
rusk: process dispatched events
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduardo Leegwater Simões committed May 8, 2024
1 parent 169accf commit cf2db5a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 19 deletions.
63 changes: 45 additions & 18 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) DUSK NETWORK. All rights reserved.

#![allow(unused)]

#[cfg(feature = "node")]
Expand All @@ -18,6 +19,7 @@ pub(crate) use event::{
BinaryWrapper, DataType, ExecutionError, MessageResponse as EventResponse,
RequestData, Target,
};

use rusk_abi::Event;
use tracing::{info, warn};

Expand All @@ -43,7 +45,7 @@ use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tokio_stream::StreamExt;
use tokio_util::either::Either;

use http_body_util::Full;
use http_body_util::{BodyExt, Full};
use hyper::http::{HeaderName, HeaderValue};
use hyper::service::Service;
use hyper::{
Expand All @@ -65,7 +67,7 @@ use rand::rngs::OsRng;

#[cfg(feature = "node")]
use crate::chain::{Rusk, RuskNode};
use crate::http::event::FullOrStreamBody;
use crate::http::event::{FullOrStreamBody, RuesEventData};
use crate::VERSION;

pub use self::event::{ContractEvent, RuesEvent};
Expand Down Expand Up @@ -597,44 +599,69 @@ async fn handle_dispatch<H: HandleRequest>(
let bytes = match body.collect().await {
Ok(bytes) => bytes.to_bytes(),
Err(err) => {
let _ = sender.send(Err(err.into()));
let _ = sender.send(Err(err.into())).await;
return;
}
};

let req = match MessageRequest::parse(&bytes) {
Ok(req) => req,
Err(err) => {
let _ = sender.send(Err(err.into()));
let _ = sender.send(Err(err)).await;
return;
}
};

let rsp = match handler.handle(&req).await {
Ok(rsp) => rsp,
Err(err) => {
let _ = sender.send(Err(err.into()));
let _ = sender.send(Err(err)).await;
return;
}
};

let (data, header) = rsp.into_inner();
match data {
DataType::Binary(_) => {}
DataType::Text(_) => {}
DataType::Json(_) => {}
DataType::Channel(_) => {}
DataType::Binary(bytes) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes.inner),
}))
.await;
}
DataType::Text(text) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(text.into_bytes()),
}))
.await;
}
DataType::Json(json) => {
let _ = sender
.send(
serde_json::to_vec(&json)
.map(|bytes| RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
})
.map_err(Into::into),
)
.await;
}
DataType::Channel(channel) => {
for bytes in channel {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
}))
.await;
}
}
DataType::None => {}
}

todo!(
"\
Figure out if the subscription is a contract subscription (meaning a \
contract call) and, if so, parse the body for the arguments and execute, \
giving somehow passing the resulting events through to the websocket stream
that dispatched the event.
"
)
}

fn response(
Expand Down
15 changes: 14 additions & 1 deletion rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ impl RuesEvent {
let (headers_len_bytes, bytes) = bytes.split_at(4);

let mut headers_len_array = [0u8; 4];
headers_len_array.copy_from_slice(&headers_len_bytes);
headers_len_array.copy_from_slice(headers_len_bytes);

let headers_len = u32::from_le_bytes(headers_len_array) as usize;
if bytes.len() < headers_len {
Expand Down Expand Up @@ -839,10 +839,13 @@ impl From<rusk_abi::Event> for RuesEvent {
pub enum RuesEventData {
/// A contract event.
Contract(ContractEvent),
/// An event whose provenance is unknown.
Other(Vec<u8>),
}

impl RuesEventData {
const CONTRACT_TAG: u8 = 1;
const OTHER_TAG: u8 = 255;

fn to_bytes(&self) -> Vec<u8> {
match self {
Expand All @@ -853,6 +856,12 @@ impl RuesEventData {
bytes.insert(0, Self::CONTRACT_TAG);
bytes
}
Self::Other(data) => {
let mut bytes = vec![0; data.len() + 1];
bytes[0] = Self::OTHER_TAG;
bytes[1..].copy_from_slice(data);
bytes
}
}
}

Expand All @@ -864,6 +873,10 @@ impl RuesEventData {
let event = serde_json::from_slice(bytes).ok()?;
Some(Self::Contract(event))
}
Self::OTHER_TAG => {
let data = bytes.to_vec();
Some(Self::Other(data))
}
_ => None,
}
}
Expand Down

0 comments on commit cf2db5a

Please sign in to comment.