From ef4def494864970a6ff4b3c49330444fcc8f51d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduardo=20Leegwater=20Sim=C3=B5es?= Date: Wed, 1 May 2024 22:42:01 +0200 Subject: [PATCH 1/5] rusk: add scaffold for RUES dispatch --- rusk/Cargo.toml | 2 +- rusk/src/lib/http.rs | 81 +++++++++++++++++++++++++++++++++++++------- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/rusk/Cargo.toml b/rusk/Cargo.toml index 986edeced7..fdeaacd10c 100644 --- a/rusk/Cargo.toml +++ b/rusk/Cargo.toml @@ -55,7 +55,7 @@ tungstenite = "0.20" hyper-tungstenite = "0.11" hyper = { version = "0.14", features = ["server", "stream", "http1", "http2"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = { version = "0.7", features = ["rt"] } tokio-rustls = "0.25" rustls-pemfile = "2" diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 48fbf90c00..63a870469a 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -19,7 +19,6 @@ pub(crate) use event::{ BinaryWrapper, DataType, ExecutionError, MessageResponse as EventResponse, RequestData, Target, }; -use hyper::http::{HeaderName, HeaderValue}; use rusk_abi::Event; use tracing::{info, warn}; @@ -31,6 +30,7 @@ use std::net::SocketAddr; use std::path::Path; use std::pin::Pin; use std::str::FromStr; +use std::sync::mpsc as std_mpsc; use std::sync::Arc; use std::task::{Context, Poll}; @@ -40,7 +40,11 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::ToSocketAddrs; use tokio::sync::{broadcast, mpsc, oneshot, RwLock}; use tokio::{io, task}; +use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; +use tokio_stream::StreamExt; +use tokio_util::either::Either; +use hyper::http::{HeaderName, HeaderValue}; use hyper::server::conn::Http; use hyper::service::Service; use hyper::{body, Body, Method, Request, Response, StatusCode}; @@ -50,7 +54,9 @@ use tungstenite::protocol::frame::coding::CloseCode; use tungstenite::protocol::{CloseFrame, Message}; use futures_util::stream::iter as stream_iter; -use futures_util::{SinkExt, StreamExt}; +use futures_util::{SinkExt, TryStreamExt}; + +use anyhow::Error as AnyhowError; use rand::rngs::OsRng; #[cfg(feature = "node")] @@ -402,13 +408,14 @@ where enum SubscriptionAction { Subscribe(RuesSubscription), Unsubscribe(RuesSubscription), + Dispatch { sub: RuesSubscription, body: Body }, } async fn handle_stream_rues( sid: SessionId, websocket: HyperWebsocket, mut subscriptions: mpsc::Receiver, - mut events: broadcast::Receiver, + events: broadcast::Receiver, mut shutdown: broadcast::Receiver, sockets_map: Arc< RwLock>>, @@ -429,7 +436,21 @@ async fn handle_stream_rues( return; } + // FIXME make this a configuration parameter + const DISPATCH_BUFFER_SIZE: usize = 16; + let mut subscription_set = HashSet::new(); + let (dispatch_sender, dispatch_events) = + mpsc::channel(DISPATCH_BUFFER_SIZE); + + // Join the two event receivers together, allowing for reusing the exact + // same code when handling them either of them. + let mut events = BroadcastStream::new(events); + let mut dispatch_events = ReceiverStream::new(dispatch_events); + + let mut events = events + .map_err(Either::Left) + .merge(dispatch_events.map_err(Either::Right)); loop { tokio::select! { @@ -456,26 +477,41 @@ async fn handle_stream_rues( }; match subscription { - SubscriptionAction::Subscribe(subscription) => { + SubscriptionAction::Subscribe(subscription) => { subscription_set.insert(subscription); }, SubscriptionAction::Unsubscribe(subscription) => { subscription_set.remove(&subscription); }, + SubscriptionAction::Dispatch { + sub, + body + } => { + // TODO figure out if we should subscribe to the event we dispatch + task::spawn(handle_dispatch(sub, body, dispatch_sender.clone())); + } } } - event = events.recv() => { + Some(event) = events.next() => { let event = match event { Ok(event) => event, - Err(err) => { - // If the event channel is closed, it means the server has stopped - // producing events, so we should inform the client and stop. - let _ = stream.close(Some(CloseFrame { - code: CloseCode::Away, - reason: Cow::from("Shutting down"), - })).await; - break; + Err(err) => match err { + Either::Left(_berr) => { + // If the event channel is closed, it means the + // server has stopped producing events, so we + // should inform the client and stop. + let _ = stream.close(Some(CloseFrame { + code: CloseCode::Away, + reason: Cow::from("Shutting down"), + })).await; + break; + + } + Either::Right(_eerr) => { + // TODO handle execution error + continue; + }, }, }; @@ -518,6 +554,21 @@ async fn handle_stream_rues( sockets.remove(&sid); } +async fn handle_dispatch( + sub: RuesSubscription, + body: Body, + sender: mpsc::Sender>, +) { + todo!( + "\ + Figure out if the subscription is a contract subscription (meaning a \ + contract call) and, if so, parse the body for the arguments and execute, \ + giving somehow passing the resulting events through to the websocket stream + that dispatched the event. + " + ) +} + fn response( status: StatusCode, body: impl Into, @@ -605,6 +656,10 @@ async fn handle_request_rues( let action = match *req.method() { Method::GET => SubscriptionAction::Subscribe(subscription), Method::DELETE => SubscriptionAction::Unsubscribe(subscription), + Method::POST => SubscriptionAction::Dispatch { + sub: subscription, + body: req.into_body(), + }, _ => { return response( StatusCode::METHOD_NOT_ALLOWED, From 6cf5b079e5515a0b64bccad5f8e71b9cb49a37bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduardo=20Leegwater=20Sim=C3=B5es?= Date: Thu, 2 May 2024 21:20:55 +0200 Subject: [PATCH 2/5] rusk: upgrade `hyper` and `tungstenite` --- rusk/Cargo.toml | 11 ++-- rusk/src/lib/http.rs | 107 +++++++++++++++++++++++------------ rusk/src/lib/http/event.rs | 110 ++++++++++++++++++++++++++++-------- rusk/src/lib/http/stream.rs | 2 +- 4 files changed, 164 insertions(+), 66 deletions(-) diff --git a/rusk/Cargo.toml b/rusk/Cargo.toml index fdeaacd10c..f6ab262b2e 100644 --- a/rusk/Cargo.toml +++ b/rusk/Cargo.toml @@ -51,9 +51,12 @@ dusk-bytes = "0.1" kadcast = "0.6.0-rc" dusk-wallet-core = "0.25.0-phoenix.0.26" phoenix-core = { version = "0.26", default-features = false, features = ["rkyv-impl", "alloc"] } -tungstenite = "0.20" -hyper-tungstenite = "0.11" -hyper = { version = "0.14", features = ["server", "stream", "http1", "http2"] } +pin-project = "1" +tungstenite = "0.21" +hyper-tungstenite = "0.13" +hyper = { version = "1", features = ["server", "http1", "http2"] } +hyper-util = { version = "0.1", features = ["server", "http1", "http2"] } +http-body-util = "0.1" tokio-stream = { version = "0.1", features = ["sync"] } tokio-util = { version = "0.7", features = ["rt"] } @@ -86,7 +89,7 @@ futures = { version = "0.3", optional = true } [dev-dependencies] test-context = "0.1" -reqwest = "0.11" +reqwest = "0.12" rusk-recovery = { version = "0.6", path = "../rusk-recovery", features = ["state"] } ff = { version = "0.13", default-features = false } rusk-prover = { version = "0.3", path = "../rusk-prover", features = ["no_random"] } diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 63a870469a..3d31d97397 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -44,11 +44,15 @@ use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use tokio_stream::StreamExt; use tokio_util::either::Either; +use http_body_util::Full; use hyper::http::{HeaderName, HeaderValue}; -use hyper::server::conn::Http; use hyper::service::Service; -use hyper::{body, Body, Method, Request, Response, StatusCode}; +use hyper::{ + body::{self, Body, Bytes, Incoming}, + Method, Request, Response, StatusCode, +}; use hyper_tungstenite::{tungstenite, HyperWebsocket}; +use hyper_util::server::conn::auto::Builder as HttpBuilder; use tungstenite::protocol::frame::coding::CloseCode; use tungstenite::protocol::{CloseFrame, Message}; @@ -57,10 +61,12 @@ use futures_util::stream::iter as stream_iter; use futures_util::{SinkExt, TryStreamExt}; use anyhow::Error as AnyhowError; +use hyper_util::rt::TokioIo; use rand::rngs::OsRng; #[cfg(feature = "node")] use crate::chain::{Rusk, RuskNode}; +use crate::http::event::FullOrStreamBody; use crate::VERSION; pub use self::event::ContractEvent; @@ -155,6 +161,19 @@ impl HandleRequest for DataSources { } } +#[derive(Clone)] +struct TokioExecutor; + +impl hyper::rt::Executor for TokioExecutor +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + fn execute(&self, fut: F) { + task::spawn(fut); + } +} + async fn listening_loop( handler: H, listener: Listener, @@ -165,9 +184,16 @@ async fn listening_loop( H: HandleRequest, { let sources = Arc::new(handler); - let http = Http::new(); let sockets_map = Arc::new(RwLock::new(HashMap::new())); + let service = ExecutionService { + sources: sources.clone(), + sockets_map: sockets_map.clone(), + events: events.resubscribe(), + shutdown: shutdown.resubscribe(), + ws_event_channel_cap, + }; + loop { tokio::select! { _ = shutdown.recv() => { @@ -179,16 +205,15 @@ async fn listening_loop( Err(_) => break, }; - let service = ExecutionService { - sources: sources.clone(), - sockets_map: sockets_map.clone(), - events: events.resubscribe(), - shutdown: shutdown.resubscribe(), - ws_event_channel_cap, - }; - let conn = http.serve_connection(stream, service).with_upgrades(); + let http = HttpBuilder::new(TokioExecutor); + + let stream = TokioIo::new(stream); + let service = service.clone(); - task::spawn(conn); + task::spawn(async move { + let conn = http.serve_connection_with_upgrades(stream, service); + conn.await + }); } } } @@ -352,11 +377,23 @@ struct ExecutionService { ws_event_channel_cap: usize, } -impl Service> for ExecutionService +impl Clone for ExecutionService { + fn clone(&self) -> Self { + Self { + sources: self.sources.clone(), + sockets_map: self.sockets_map.clone(), + events: self.events.resubscribe(), + shutdown: self.shutdown.resubscribe(), + ws_event_channel_cap: self.ws_event_channel_cap, + } + } +} + +impl Service> for ExecutionService where H: HandleRequest, { - type Response = Response; + type Response = Response; type Error = Infallible; type Future = Pin< Box< @@ -366,19 +403,12 @@ where >, >; - fn poll_ready( - &mut self, - _: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - /// Handle the HTTP request. /// /// A request may be a "normal" request, or a WebSocket upgrade request. In /// the former case, the request is handled on the spot, while in the /// latter task running the stream handler loop is spawned. - fn call(&mut self, mut req: Request) -> Self::Future { + fn call(&self, mut req: Request) -> Self::Future { let sources = self.sources.clone(); let sockets_map = self.sockets_map.clone(); let events = self.events.resubscribe(); @@ -395,10 +425,10 @@ where ws_event_channel_cap, ) .await; - response.or_else(|error| { + response.map(Into::into).or_else(|error| { Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(error.to_string())) + .body(Full::new(error.to_string().into()).into()) .expect("Failed to build response")) }) }) @@ -408,7 +438,10 @@ where enum SubscriptionAction { Subscribe(RuesSubscription), Unsubscribe(RuesSubscription), - Dispatch { sub: RuesSubscription, body: Body }, + Dispatch { + sub: RuesSubscription, + body: Incoming, + }, } async fn handle_stream_rues( @@ -556,7 +589,7 @@ async fn handle_stream_rues( async fn handle_dispatch( sub: RuesSubscription, - body: Body, + body: Incoming, sender: mpsc::Sender>, ) { todo!( @@ -571,23 +604,23 @@ async fn handle_dispatch( fn response( status: StatusCode, - body: impl Into, -) -> Result, ExecutionError> { + body: impl Into, +) -> Result, ExecutionError> { Ok(Response::builder() .status(status) - .body(body.into()) + .body(Full::new(body.into()).into()) .expect("Failed to build response")) } async fn handle_request_rues( - mut req: Request, + mut req: Request, sockets_map: Arc< RwLock>>, >, events: broadcast::Receiver, shutdown: broadcast::Receiver, ws_event_channel_cap: usize, -) -> Result, ExecutionError> { +) -> Result, ExecutionError> { if hyper_tungstenite::is_upgrade_request(&req) { let (subscription_sender, subscriptions) = mpsc::channel(ws_event_channel_cap); @@ -613,7 +646,7 @@ async fn handle_request_rues( sockets_map.clone(), )); - Ok(response) + Ok(response.map(Into::into)) } else { let headers = req.headers(); let mut path_split = req.uri().path().split('/'); @@ -680,7 +713,7 @@ async fn handle_request_rues( } async fn handle_request( - mut req: Request, + mut req: Request, sources: Arc, sockets_map: Arc< RwLock>>, @@ -688,7 +721,7 @@ async fn handle_request( events: broadcast::Receiver, shutdown: broadcast::Receiver, ws_event_channel_cap: usize, -) -> Result, ExecutionError> +) -> Result, ExecutionError> where H: HandleRequest, { @@ -712,7 +745,7 @@ where let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)?; task::spawn(handle_stream(sources, websocket, target, shutdown)); - Ok(response) + Ok(response.map(Into::into)) } else { let (execution_request, binary_resp) = MessageRequest::from_request(req).await?; @@ -852,7 +885,7 @@ mod tests { let client = reqwest::Client::new(); let response = client .post(format!("http://{}/01/target", server.local_addr)) - .body(Body::from(request)) + .body(request) .send() .await .expect("Requesting should succeed"); @@ -914,7 +947,7 @@ mod tests { "https://localhost:{}/01/target", server.local_addr.port() )) - .body(Body::from(request)) + .body(request) .send() .await .expect("Requesting should succeed"); diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index e8a298e71d..77cbf6b394 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -8,9 +8,16 @@ use super::RUSK_VERSION_HEADER; use base64::engine::{general_purpose::STANDARD as BASE64, Engine}; use bytecheck::CheckBytes; -use futures_util::{stream, StreamExt}; +use futures_util::stream::Iter as StreamIter; +use futures_util::{stream, Stream, StreamExt}; +use http_body_util::{BodyExt, Either, Full, StreamBody}; +use hyper::body::{Buf, Frame}; use hyper::header::{InvalidHeaderName, InvalidHeaderValue}; -use hyper::{Body, Request, Response}; +use hyper::{ + body::{Body, Bytes, Incoming}, + Request, Response, +}; +use pin_project::pin_project; use rand::distributions::{Distribution, Standard}; use rand::Rng; use rkyv::Archive; @@ -19,12 +26,13 @@ use semver::{Version, VersionReq}; use serde::de::{Error, MapAccess, Unexpected, Visitor}; use serde::ser::SerializeMap; use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use serde_with::serde_as; use std::collections::HashMap; use std::fmt::{Display, Formatter}; +use std::pin::Pin; use std::str::FromStr; use std::str::Split; use std::sync::mpsc; +use std::task::{Context, Poll}; use tungstenite::http::HeaderValue; /// A request sent by the websocket client. @@ -133,7 +141,7 @@ impl MessageRequest { } pub async fn from_request( - req: hyper::Request, + req: Request, ) -> anyhow::Result<(Self, bool)> { let headers = req .headers() @@ -198,11 +206,11 @@ impl MessageResponse { pub fn into_http( self, is_binary: bool, - ) -> anyhow::Result> { + ) -> anyhow::Result> { if let Some(error) = &self.error { return Ok(hyper::Response::builder() .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::from(error.to_string()))?); + .body(Full::new(error.to_string().into()).into())?); } let mut headers = HashMap::new(); @@ -214,30 +222,29 @@ impl MessageResponse { true => wrapper.inner, false => hex::encode(wrapper.inner).as_bytes().to_vec(), }; - Body::from(data) + Full::from(Bytes::from(data)).into() } - DataType::Text(text) => Body::from(text), + DataType::Text(text) => Full::from(Bytes::from(text)).into(), DataType::Json(value) => { headers.insert(CONTENT_TYPE, CONTENT_TYPE_JSON.clone()); - Body::from(value.to_string()) + Full::from(Bytes::from(value.to_string())).into() } - DataType::Channel(channel) => { - Body::wrap_stream(stream::iter(channel).map(move |e| { - match is_binary { - true => Ok::<_, anyhow::Error>(e), - false => Ok::<_, anyhow::Error>( - hex::encode(e).as_bytes().to_vec(), - ), - } - })) - } - DataType::None => Body::empty(), + DataType::Channel(receiver) => FullOrStreamBody { + either: Either::Right(StreamBody::new( + BinaryOrTextStream { + is_binary, + stream: stream::iter(receiver), + }, + )), + }, + DataType::None => Full::new(Bytes::new()).into(), } }; - let mut response = hyper::Response::new(body); + let mut response = Response::new(body); for (k, v) in headers { response.headers_mut().insert(k, v); } + Ok(response) } @@ -256,6 +263,61 @@ impl MessageResponse { } } +#[pin_project] +pub struct FullOrStreamBody { + #[pin] + either: Either, StreamBody>, +} + +impl From> for FullOrStreamBody { + fn from(body: Full) -> Self { + Self { + either: Either::Left(body), + } + } +} + +impl Body for FullOrStreamBody { + type Data = + , StreamBody> as Body>::Data; + type Error = + , StreamBody> as Body>::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + let this = self.project(); + this.either.poll_frame(cx) + } +} + +#[pin_project] +pub struct BinaryOrTextStream { + is_binary: bool, + #[pin] + stream: StreamIter<> as IntoIterator>::IntoIter>, +} + +impl Stream for BinaryOrTextStream { + type Item = anyhow::Result>; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + this.stream.poll_next(cx).map(|next| { + next.map(|x| match this.is_binary { + true => Ok(Frame::data(Bytes::from(x))), + false => Ok(Frame::data(Bytes::from( + hex::encode(x).as_bytes().to_vec(), + ))), + }) + }) + } +} + #[derive(Debug, Serialize, Deserialize)] #[serde(untagged)] pub enum RequestData { @@ -378,7 +440,7 @@ impl Event { }) } pub async fn from_request( - req: hyper::Request, + req: Request, ) -> anyhow::Result<(Self, bool)> { let (parts, req_body) = req.into_parts(); // HTTP REQUEST @@ -391,7 +453,7 @@ impl Event { let target = parts.uri.path().try_into()?; - let body = hyper::body::to_bytes(req_body).await?; + let body = req_body.collect().await?.to_bytes(); let mut event = match binary_request { true => Event::parse(&body) @@ -671,7 +733,7 @@ impl RuesSubscription { } /// A contract event that is sent to a websocket client. -#[serde_as] +#[serde_with::serde_as] #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub struct ContractEvent { pub target: WrappedContractId, diff --git a/rusk/src/lib/http/stream.rs b/rusk/src/lib/http/stream.rs index e1993b2732..411886b7e2 100644 --- a/rusk/src/lib/http/stream.rs +++ b/rusk/src/lib/http/stream.rs @@ -13,7 +13,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use hyper::body::HttpBody; +use hyper::body::Body; use rustls_pemfile::{certs, pkcs8_private_keys}; use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; From 58658307bf9cced61b54777b22981c7003705d43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduardo=20Leegwater=20Sim=C3=B5es?= Date: Mon, 6 May 2024 16:57:22 +0200 Subject: [PATCH 3/5] rusk: (de)serialize RUES events properly This is achieved by introducing a formal `RuesEvent` struct and using it as a wrapper for contract events. This also allows the system to be more extensible - beyond contract events - and include more types of data, such as chain data, in the future. --- rusk/src/lib/chain.rs | 4 +- rusk/src/lib/chain/rusk.rs | 4 +- rusk/src/lib/http.rs | 110 +++++++++++++++++++++++++------------ rusk/src/lib/http/event.rs | 109 +++++++++++++++++++++++++++++++++++- 4 files changed, 186 insertions(+), 41 deletions(-) diff --git a/rusk/src/lib/chain.rs b/rusk/src/lib/chain.rs index 87f34edf41..19b0680212 100644 --- a/rusk/src/lib/chain.rs +++ b/rusk/src/lib/chain.rs @@ -19,7 +19,7 @@ use node::network::Kadcast; use rusk_abi::dusk::{dusk, Dusk}; use rusk_abi::VM; -use crate::http::ContractEvent; +use crate::http::RuesEvent; pub const MINIMUM_STAKE: Dusk = dusk(1000.0); @@ -36,7 +36,7 @@ pub struct Rusk { dir: PathBuf, pub(crate) generation_timeout: Option, pub(crate) feeder_gas_limit: u64, - pub(crate) event_sender: broadcast::Sender, + pub(crate) event_sender: broadcast::Sender, } #[derive(Clone)] diff --git a/rusk/src/lib/chain/rusk.rs b/rusk/src/lib/chain/rusk.rs index afe0e2c616..8686e5d1f1 100644 --- a/rusk/src/lib/chain/rusk.rs +++ b/rusk/src/lib/chain/rusk.rs @@ -30,7 +30,7 @@ use rusk_profile::to_rusk_state_id_path; use tokio::sync::broadcast; use super::{coinbase_value, emission_amount, Rusk, RuskTip}; -use crate::http::ContractEvent; +use crate::http::RuesEvent; use crate::{Error, Result}; pub static DUSK_KEY: LazyLock = LazyLock::new(|| { @@ -44,7 +44,7 @@ impl Rusk { dir: P, generation_timeout: Option, feeder_gas_limit: u64, - event_sender: broadcast::Sender, + event_sender: broadcast::Sender, ) -> Result { let dir = dir.as_ref(); let commit_id_path = to_rusk_state_id_path(dir); diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 3d31d97397..84008f7587 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -3,7 +3,6 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // // Copyright (c) DUSK NETWORK. All rights reserved. - #![allow(unused)] #[cfg(feature = "node")] @@ -69,7 +68,7 @@ use crate::chain::{Rusk, RuskNode}; use crate::http::event::FullOrStreamBody; use crate::VERSION; -pub use self::event::ContractEvent; +pub use self::event::{ContractEvent, RuesEvent}; use self::event::{MessageRequest, ResponseData, RuesSubscription, SessionId}; use self::stream::{Listener, Stream}; @@ -84,7 +83,7 @@ pub struct HttpServer { impl HttpServer { pub async fn bind( handler: H, - event_receiver: broadcast::Receiver, + event_receiver: broadcast::Receiver, ws_event_channel_cap: usize, addr: A, cert_and_key: Option<(P1, P2)>, @@ -177,7 +176,7 @@ where async fn listening_loop( handler: H, listener: Listener, - events: broadcast::Receiver, + events: broadcast::Receiver, mut shutdown: broadcast::Receiver, ws_event_channel_cap: usize, ) where @@ -372,7 +371,7 @@ struct ExecutionService { sources: Arc, sockets_map: Arc>>>, - events: broadcast::Receiver, + events: broadcast::Receiver, shutdown: broadcast::Receiver, ws_event_channel_cap: usize, } @@ -444,12 +443,13 @@ enum SubscriptionAction { }, } -async fn handle_stream_rues( +async fn handle_stream_rues( sid: SessionId, websocket: HyperWebsocket, + events: broadcast::Receiver, mut subscriptions: mpsc::Receiver, - events: broadcast::Receiver, mut shutdown: broadcast::Receiver, + handler: Arc, sockets_map: Arc< RwLock>>, >, @@ -521,7 +521,7 @@ async fn handle_stream_rues( body } => { // TODO figure out if we should subscribe to the event we dispatch - task::spawn(handle_dispatch(sub, body, dispatch_sender.clone())); + task::spawn(handle_dispatch(sub, body, handler.clone(), dispatch_sender.clone())); } } } @@ -559,19 +559,20 @@ async fn handle_stream_rues( // If the event is subscribed, we send it to the client. if is_subscribed { - let event = match serde_json::to_string(&event) { - Ok(event) => event, - // If we fail to serialize the event, we log the error - // and continue processing further. - Err(err) => { - warn!("Failed serializing event: {err}"); - continue; - } - }; + let event = event.to_bytes(); + //let event = match serde_json::to_string(&event) { + // Ok(event) => event, + // // If we fail to serialize the event, we log the error + // // and continue processing further. + // Err(err) => { + // warn!("Failed serializing event: {err}"); + // continue; + // } + //}; // If the event fails sending we close the socket on the client // and stop processing further. - if stream.send(Message::Text(event)).await.is_err() { + if stream.send(Message::Binary(event)).await.is_err() { let _ = stream.close(Some(CloseFrame { code: CloseCode::Error, reason: Cow::from("Failed sending event"), @@ -587,11 +588,45 @@ async fn handle_stream_rues( sockets.remove(&sid); } -async fn handle_dispatch( +async fn handle_dispatch( sub: RuesSubscription, body: Incoming, - sender: mpsc::Sender>, + handler: Arc, + sender: mpsc::Sender>, ) { + let bytes = match body.collect().await { + Ok(bytes) => bytes.to_bytes(), + Err(err) => { + let _ = sender.send(Err(err.into())); + return; + } + }; + + let req = match MessageRequest::parse(&bytes) { + Ok(req) => req, + Err(err) => { + let _ = sender.send(Err(err.into())); + return; + } + }; + + let rsp = match handler.handle(&req).await { + Ok(rsp) => rsp, + Err(err) => { + let _ = sender.send(Err(err.into())); + return; + } + }; + + let (data, header) = rsp.into_inner(); + match data { + DataType::Binary(_) => {} + DataType::Text(_) => {} + DataType::Json(_) => {} + DataType::Channel(_) => {} + DataType::None => {} + } + todo!( "\ Figure out if the subscription is a contract subscription (meaning a \ @@ -612,12 +647,13 @@ fn response( .expect("Failed to build response")) } -async fn handle_request_rues( +async fn handle_request_rues( mut req: Request, + handler: Arc, sockets_map: Arc< RwLock>>, >, - events: broadcast::Receiver, + events: broadcast::Receiver, shutdown: broadcast::Receiver, ws_event_channel_cap: usize, ) -> Result, ExecutionError> { @@ -640,9 +676,10 @@ async fn handle_request_rues( task::spawn(handle_stream_rues( sid, websocket, - subscriptions, events, + subscriptions, shutdown, + handler.clone(), sockets_map.clone(), )); @@ -718,7 +755,7 @@ async fn handle_request( sockets_map: Arc< RwLock>>, >, - events: broadcast::Receiver, + events: broadcast::Receiver, shutdown: broadcast::Receiver, ws_event_channel_cap: usize, ) -> Result, ExecutionError> @@ -731,6 +768,7 @@ where if path.starts_with("/on") { return handle_request_rues( req, + sources.clone(), sockets_map, events, shutdown, @@ -1119,26 +1157,26 @@ mod tests { assert_eq!(response.status(), StatusCode::OK); // This event is subscribed to, so it should be received - let received_event = ContractEvent { + let received_event = RuesEvent::from(ContractEvent { target: SUB_CONTRACT_ID, topic: TOPIC.into(), data: b"hello, events".to_vec(), - }; + }); // This event is at first subscribed to, so it should be received the // first time - let at_first_received_event = ContractEvent { + let at_first_received_event = RuesEvent::from(ContractEvent { target: MAYBE_SUB_CONTRACT_ID, topic: TOPIC.into(), data: b"hello, events".to_vec(), - }; + }); // This event is not subscribed to, so it should not be received - let non_received_event = ContractEvent { + let non_received_event = RuesEvent::from(ContractEvent { target: NON_SUB_CONTRACT_ID, topic: TOPIC.into(), data: b"hello, events".to_vec(), - }; + }); event_sender .send(non_received_event.clone()) @@ -1153,17 +1191,17 @@ mod tests { .expect("Sending event should succeed"); let message = stream.read().expect("Event should be received"); - let event_text = message.into_text().expect("Event should be text"); + let event_bytes = message.into_data(); - let event: ContractEvent = serde_json::from_str(&event_text) + let event = RuesEvent::from_bytes(&event_bytes) .expect("Event should deserialize"); assert_eq!(at_first_received_event, event, "Event should be the same"); let message = stream.read().expect("Event should be received"); - let event_text = message.into_text().expect("Event should be text"); + let event_bytes = message.into_data(); - let event: ContractEvent = serde_json::from_str(&event_text) + let event = RuesEvent::from_bytes(&event_bytes) .expect("Event should deserialize"); assert_eq!(received_event, event, "Event should be the same"); @@ -1193,9 +1231,9 @@ mod tests { .expect("Sending event should succeed"); let message = stream.read().expect("Event should be received"); - let event_text = message.into_text().expect("Event should be text"); + let event_bytes = message.into_data(); - let event: ContractEvent = serde_json::from_str(&event_text) + let event = RuesEvent::from_bytes(&event_bytes) .expect("Event should deserialize"); assert_eq!(received_event, event, "Event should be the same"); diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 77cbf6b394..3511e0a049 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -713,7 +713,12 @@ impl RuesSubscription { }) } - pub fn matches(&self, event: &ContractEvent) -> bool { + pub fn matches(&self, event: &RuesEvent) -> bool { + let event = match &event.data { + RuesEventData::Contract(event) => event, + _ => return false, + }; + if self.component != "contracts" { return false; } @@ -762,6 +767,108 @@ impl From for rusk_abi::Event { } } +/// A RUES event +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RuesEvent { + pub headers: serde_json::Map, + pub data: RuesEventData, +} + +impl RuesEvent { + /// Serialize the event into a vector of bytes. + pub fn to_bytes(&self) -> Vec { + let headers_bytes = serde_json::to_vec(&self.headers) + .expect("Serializing JSON should succeed"); + + let headers_len = headers_bytes.len() as u32; + let headers_len_bytes = headers_len.to_le_bytes(); + + let data_bytes = self.data.to_bytes(); + + let len = + headers_len_bytes.len() + headers_bytes.len() + data_bytes.len(); + let mut bytes = Vec::with_capacity(len); + + bytes.extend(headers_len_bytes); + bytes.extend(headers_bytes); + bytes.extend(data_bytes); + + bytes + } + + pub fn from_bytes(bytes: &[u8]) -> Option { + if bytes.len() < 4 { + return None; + } + let (headers_len_bytes, bytes) = bytes.split_at(4); + + let mut headers_len_array = [0u8; 4]; + headers_len_array.copy_from_slice(&headers_len_bytes); + + let headers_len = u32::from_le_bytes(headers_len_array) as usize; + if bytes.len() < headers_len { + return None; + } + + let (headers_bytes, data_bytes) = bytes.split_at(headers_len); + let headers = serde_json::from_slice(headers_bytes).ok()?; + + let data = RuesEventData::from_bytes(data_bytes)?; + + Some(Self { headers, data }) + } +} + +impl From for RuesEvent { + fn from(event: ContractEvent) -> Self { + Self { + headers: serde_json::Map::new(), + data: RuesEventData::Contract(event), + } + } +} + +impl From for RuesEvent { + fn from(event: rusk_abi::Event) -> Self { + Self::from(ContractEvent::from(event)) + } +} + +/// Types of event data that RUES supports. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RuesEventData { + /// A contract event. + Contract(ContractEvent), +} + +impl RuesEventData { + const CONTRACT_TAG: u8 = 1; + + fn to_bytes(&self) -> Vec { + match self { + Self::Contract(event) => { + let mut bytes = serde_json::to_vec(event).expect( + "Serializing contract event to JSON should succeed", + ); + bytes.insert(0, Self::CONTRACT_TAG); + bytes + } + } + } + + fn from_bytes(bytes: &[u8]) -> Option { + let (tag, bytes) = bytes.split_first()?; + + match *tag { + Self::CONTRACT_TAG => { + let event = serde_json::from_slice(bytes).ok()?; + Some(Self::Contract(event)) + } + _ => None, + } + } +} + #[cfg(test)] mod tests { From bdf80b30b29f1739f9d01b7b16712f32a8f63a62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduardo=20Leegwater=20Sim=C3=B5es?= Date: Thu, 2 May 2024 22:34:45 +0200 Subject: [PATCH 4/5] rusk: process dispatched events --- rusk/src/lib/http.rs | 63 +++++++++++++++++++++++++++----------- rusk/src/lib/http/event.rs | 15 ++++++++- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 84008f7587..81aa5e0f70 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -3,6 +3,7 @@ // file, You can obtain one at http://mozilla.org/MPL/2.0/. // // Copyright (c) DUSK NETWORK. All rights reserved. + #![allow(unused)] #[cfg(feature = "node")] @@ -18,6 +19,7 @@ pub(crate) use event::{ BinaryWrapper, DataType, ExecutionError, MessageResponse as EventResponse, RequestData, Target, }; + use rusk_abi::Event; use tracing::{info, warn}; @@ -43,7 +45,7 @@ use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; use tokio_stream::StreamExt; use tokio_util::either::Either; -use http_body_util::Full; +use http_body_util::{BodyExt, Full}; use hyper::http::{HeaderName, HeaderValue}; use hyper::service::Service; use hyper::{ @@ -65,7 +67,7 @@ use rand::rngs::OsRng; #[cfg(feature = "node")] use crate::chain::{Rusk, RuskNode}; -use crate::http::event::FullOrStreamBody; +use crate::http::event::{FullOrStreamBody, RuesEventData}; use crate::VERSION; pub use self::event::{ContractEvent, RuesEvent}; @@ -597,7 +599,7 @@ async fn handle_dispatch( let bytes = match body.collect().await { Ok(bytes) => bytes.to_bytes(), Err(err) => { - let _ = sender.send(Err(err.into())); + let _ = sender.send(Err(err.into())).await; return; } }; @@ -605,7 +607,7 @@ async fn handle_dispatch( let req = match MessageRequest::parse(&bytes) { Ok(req) => req, Err(err) => { - let _ = sender.send(Err(err.into())); + let _ = sender.send(Err(err)).await; return; } }; @@ -613,28 +615,53 @@ async fn handle_dispatch( let rsp = match handler.handle(&req).await { Ok(rsp) => rsp, Err(err) => { - let _ = sender.send(Err(err.into())); + let _ = sender.send(Err(err)).await; return; } }; let (data, header) = rsp.into_inner(); match data { - DataType::Binary(_) => {} - DataType::Text(_) => {} - DataType::Json(_) => {} - DataType::Channel(_) => {} + DataType::Binary(bytes) => { + let _ = sender + .send(Ok(RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(bytes.inner), + })) + .await; + } + DataType::Text(text) => { + let _ = sender + .send(Ok(RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(text.into_bytes()), + })) + .await; + } + DataType::Json(json) => { + let _ = sender + .send( + serde_json::to_vec(&json) + .map(|bytes| RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(bytes), + }) + .map_err(Into::into), + ) + .await; + } + DataType::Channel(channel) => { + for bytes in channel { + let _ = sender + .send(Ok(RuesEvent { + headers: req.headers.clone(), + data: RuesEventData::Other(bytes), + })) + .await; + } + } DataType::None => {} } - - todo!( - "\ - Figure out if the subscription is a contract subscription (meaning a \ - contract call) and, if so, parse the body for the arguments and execute, \ - giving somehow passing the resulting events through to the websocket stream - that dispatched the event. - " - ) } fn response( diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 3511e0a049..7d1a01eeb2 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -803,7 +803,7 @@ impl RuesEvent { let (headers_len_bytes, bytes) = bytes.split_at(4); let mut headers_len_array = [0u8; 4]; - headers_len_array.copy_from_slice(&headers_len_bytes); + headers_len_array.copy_from_slice(headers_len_bytes); let headers_len = u32::from_le_bytes(headers_len_array) as usize; if bytes.len() < headers_len { @@ -839,10 +839,13 @@ impl From for RuesEvent { pub enum RuesEventData { /// A contract event. Contract(ContractEvent), + /// An event whose provenance is unknown. + Other(Vec), } impl RuesEventData { const CONTRACT_TAG: u8 = 1; + const OTHER_TAG: u8 = 255; fn to_bytes(&self) -> Vec { match self { @@ -853,6 +856,12 @@ impl RuesEventData { bytes.insert(0, Self::CONTRACT_TAG); bytes } + Self::Other(data) => { + let mut bytes = vec![0; data.len() + 1]; + bytes[0] = Self::OTHER_TAG; + bytes[1..].copy_from_slice(data); + bytes + } } } @@ -864,6 +873,10 @@ impl RuesEventData { let event = serde_json::from_slice(bytes).ok()?; Some(Self::Contract(event)) } + Self::OTHER_TAG => { + let data = bytes.to_vec(); + Some(Self::Other(data)) + } _ => None, } } From 63d0b6ec2cad6d33d2ba77bd3ab908d926cdedbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduardo=20Leegwater=20Sim=C3=B5es?= Date: Wed, 15 May 2024 15:32:52 +0200 Subject: [PATCH 5/5] rusk: remove errant comment --- rusk/src/lib/http.rs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index 81aa5e0f70..c8e9669e7c 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -562,15 +562,6 @@ async fn handle_stream_rues( // If the event is subscribed, we send it to the client. if is_subscribed { let event = event.to_bytes(); - //let event = match serde_json::to_string(&event) { - // Ok(event) => event, - // // If we fail to serialize the event, we log the error - // // and continue processing further. - // Err(err) => { - // warn!("Failed serializing event: {err}"); - // continue; - // } - //}; // If the event fails sending we close the socket on the client // and stop processing further.