From cf2db5a02b98857f5b4c9134ce8170fb1450250b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduardo=20Leegwater=20Sim=C3=B5es?= Date: Thu, 2 May 2024 22:34:45 +0200 Subject: [PATCH] rusk: process dispatched events --- rusk/src/lib/http.rs | 63 +++++++++++++++++++++++++++----------- rusk/src/lib/http/event.rs | 15 ++++++++- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 84008f758..81aa5e0f7 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -3,6 +3,7 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // // Copyright (c) DUSK NETWORK. All rights reserved. + #![allow(unused)] #[cfg(feature = "node")] @@ -18,6 +19,7 @@ pub(crate) use event::{ BinaryWrapper, DataType, ExecutionError, MessageResponse as EventResponse, RequestData, Target, }; + use rusk_abi::Event; use tracing::{info, warn}; @@ -43,7 +45,7 @@ use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use tokio_stream::StreamExt; use tokio_util::either::Either; -use http_body_util::Full; +use http_body_util::{BodyExt, Full}; use hyper::http::{HeaderName, HeaderValue}; use hyper::service::Service; use hyper::{ @@ -65,7 +67,7 @@ use rand::rngs::OsRng; #[cfg(feature = "node")] use crate::chain::{Rusk, RuskNode}; -use crate::http::event::FullOrStreamBody; +use crate::http::event::{FullOrStreamBody, RuesEventData}; use crate::VERSION; pub use self::event::{ContractEvent, RuesEvent}; @@ -597,7 +599,7 @@ async fn handle_dispatch( let bytes = match body.collect().await { Ok(bytes) => bytes.to_bytes(), Err(err) => { - let _ = sender.send(Err(err.into())); + let _ = sender.send(Err(err.into())).await; return; } }; @@ -605,7 +607,7 @@ async fn handle_dispatch( let req = match MessageRequest::parse(&bytes) { Ok(req) => req, Err(err) => { - let _ = sender.send(Err(err.into())); + let _ = sender.send(Err(err)).await; return; } }; @@ -613,28 +615,53 @@ async fn handle_dispatch( let rsp = match handler.handle(&req).await { Ok(rsp) => rsp, Err(err) => { - let _ = sender.send(Err(err.into())); + let _ = sender.send(Err(err)).await; return; } }; let (data, header) = rsp.into_inner(); match data { - DataType::Binary(_) => {} - DataType::Text(_) => {} - DataType::Json(_) => {} - DataType::Channel(_) => {} + DataType::Binary(bytes) => { + let _ = sender + .send(Ok(RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(bytes.inner), + })) + .await; + } + DataType::Text(text) => { + let _ = sender + .send(Ok(RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(text.into_bytes()), + })) + .await; + } + DataType::Json(json) => { + let _ = sender + .send( + serde_json::to_vec(&json) + .map(|bytes| RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(bytes), + }) + .map_err(Into::into), + ) + .await; + } + DataType::Channel(channel) => { + for bytes in channel { + let _ = sender + .send(Ok(RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(bytes), + })) + .await; + } + } DataType::None => {} } - - todo!( - "\ - Figure out if the subscription is a contract subscription (meaning a \ - contract call) and, if so, parse the body for the arguments and execute, \ - giving somehow passing the resulting events through to the websocket stream - that dispatched the event. - " - ) } fn response( diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 3511e0a04..7d1a01eeb 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -803,7 +803,7 @@ impl RuesEvent { let (headers_len_bytes, bytes) = bytes.split_at(4); let mut headers_len_array = [0u8; 4]; - headers_len_array.copy_from_slice(&headers_len_bytes); + headers_len_array.copy_from_slice(headers_len_bytes); let headers_len = u32::from_le_bytes(headers_len_array) as usize; if bytes.len() < headers_len { @@ -839,10 +839,13 @@ impl From for RuesEvent { pub enum RuesEventData { /// A contract event. Contract(ContractEvent), + /// An event whose provenance is unknown. + Other(Vec), } impl RuesEventData { const CONTRACT_TAG: u8 = 1; + const OTHER_TAG: u8 = 255; fn to_bytes(&self) -> Vec { match self { @@ -853,6 +856,12 @@ impl RuesEventData { bytes.insert(0, Self::CONTRACT_TAG); bytes } + Self::Other(data) => { + let mut bytes = vec![0; data.len() + 1]; + bytes[0] = Self::OTHER_TAG; + bytes[1..].copy_from_slice(data); + bytes + } } } @@ -864,6 +873,10 @@ impl RuesEventData { let event = serde_json::from_slice(bytes).ok()?; Some(Self::Contract(event)) } + Self::OTHER_TAG => { + let data = bytes.to_vec(); + Some(Self::Other(data)) + } _ => None, } }