From 984f1381cb99ea85f95141c1c9cf69caa74b63ad Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Fri, 30 Aug 2024 15:22:49 +0200 Subject: [PATCH] wip (to test) --- rusk/src/lib/http.rs | 96 ++++++++++++++++++++++++++++++++++++-- rusk/src/lib/http/event.rs | 45 ++++++++++++++++-- 2 files changed, 134 insertions(+), 7 deletions(-) diff --git a/rusk/src/lib/http.rs b/rusk/src/lib/http.rs index d9ceca07a3..d3431c0f52 100644 --- a/rusk/src/lib/http.rs +++ b/rusk/src/lib/http.rs @@ -68,7 +68,9 @@ use rand::rngs::OsRng; use crate::http::event::FullOrStreamBody; use crate::VERSION; -pub use self::event::{ContractEvent, RuesEvent, RUES_LOCATION_PREFIX}; +pub use self::event::{ + ContractEvent, RuesDispatchEvent, RuesEvent, RUES_LOCATION_PREFIX, +}; use self::event::{MessageRequest, ResponseData, RuesEventUri, SessionId}; use self::stream::{Listener, Stream}; @@ -147,6 +149,24 @@ impl HandleRequest for DataSources { } Err(anyhow::anyhow!("unsupported target type")) } + + fn can_handle_rues(&self, event: &RuesDispatchEvent) -> bool { + self.sources.iter().any(|s| s.can_handle_rues(event)) + } + + async fn handle_rues( + &self, + event: &RuesDispatchEvent, + ) -> anyhow::Result { + info!("Received event at {}", event.uri); + event.check_rusk_version()?; + for h in &self.sources { + if h.can_handle_rues(event) { + return h.handle_rues(event).await; + } + } + Err(anyhow::anyhow!("unsupported location")) + } } #[derive(Clone)] @@ -626,10 +646,31 @@ async fn handle_request_rues( Ok(response.map(Into::into)) } else if req.method() == Method::POST { - let event = RuesEvent::from_request(req).await?; + let event = RuesDispatchEvent::from_request(req).await?; + let is_binary = event.is_binary(); + let mut resp_headers = event.x_headers(); unimplemented!("Handle rues dispatch here"); - // TODO: Handle rues dispatch - // handle_dispatch(uri, body, handler, sender) + let (responder, mut receiver) = mpsc::unbounded_channel(); + handle_execution_rues(handler, event, responder).await; + + let execution_response = receiver + .recv() + .await + .expect("An execution should always return a response"); + resp_headers.extend(execution_response.headers.clone()); + let mut resp = execution_response.into_http(is_binary)?; + + for (k, v) in resp_headers { + let k = HeaderName::from_str(&k)?; + let v = match v { + serde_json::Value::String(s) => HeaderValue::from_str(&s), + serde_json::Value::Null => HeaderValue::from_str(""), + _ => HeaderValue::from_str(&v.to_string()), + }?; + resp.headers_mut().append(k, v); + } + + Ok(resp) } else { let headers = req.headers(); @@ -775,6 +816,35 @@ async fn handle_execution( let _ = responder.send(rsp); } +async fn handle_execution_rues( + sources: Arc, + event: RuesDispatchEvent, + responder: mpsc::UnboundedSender, +) where + H: HandleRequest, +{ + let mut rsp = sources + .handle_rues(&event) + .await + .map(|data| { + let (data, mut headers) = data.into_inner(); + headers.append(&mut event.x_headers()); + EventResponse { + data, + error: None, + headers, + } + }) + .unwrap_or_else(|e| EventResponse { + headers: event.x_headers(), + data: DataType::None, + error: Some(e.to_string()), + }); + + rsp.set_header(RUSK_VERSION_HEADER, serde_json::json!(*VERSION)); + let _ = responder.send(rsp); +} + #[async_trait] pub trait HandleRequest: Send + Sync + 'static { fn can_handle(&self, request: &MessageRequest) -> bool; @@ -782,7 +852,25 @@ pub trait HandleRequest: Send + Sync + 'static { &self, request: &MessageRequest, ) -> anyhow::Result; + + fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool { + true + } + async fn handle_rues( + &self, + request: &RuesDispatchEvent, + ) -> anyhow::Result { + unimplemented!() + } } +// #[async_trait] +// pub trait HandleRequestRues: Send + Sync + 'static { +// fn can_handle_rues(&self, request: &RuesEvent) -> bool; +// async fn handle_rues( +// &self, +// request: &RuesEvent, +// ) -> anyhow::Result; +// } #[cfg(test)] mod tests { diff --git a/rusk/src/lib/http/event.rs b/rusk/src/lib/http/event.rs index 73e5048843..208f1873ff 100644 --- a/rusk/src/lib/http/event.rs +++ b/rusk/src/lib/http/event.rs @@ -845,7 +845,44 @@ pub struct RuesEvent { pub data: DataType, } -impl RuesEvent { +/// A RUES Dispatch request event +#[derive(Debug)] +pub struct RuesDispatchEvent { + pub uri: RuesEventUri, + pub headers: serde_json::Map, + pub data: RequestData, +} + +impl RuesDispatchEvent { + pub fn x_headers(&self) -> serde_json::Map { + let mut h = self.headers.clone(); + h.retain(|k, _| k.to_lowercase().starts_with("x-")); + h + } + + pub fn header(&self, name: &str) -> Option<&serde_json::Value> { + self.headers + .iter() + .find_map(|(k, v)| k.eq_ignore_ascii_case(name).then_some(v)) + } + + pub fn check_rusk_version(&self) -> anyhow::Result<()> { + if let Some(v) = self.header(RUSK_VERSION_HEADER) { + let req = match v.as_str() { + Some(v) => VersionReq::from_str(v), + None => VersionReq::from_str(&v.to_string()), + }?; + + let current = Version::from_str(&crate::VERSION)?; + if !req.matches(¤t) { + return Err(anyhow::anyhow!( + "Mismatched rusk version: requested {req} - current {current}", + )); + } + } + Ok(()) + } + pub fn is_binary(&self) -> bool { self.headers .get(CONTENT_TYPE) @@ -885,17 +922,19 @@ impl RuesEvent { let bytes = body.collect().await?.to_bytes().to_vec(); let data = match content_type { CONTENT_TYPE_BINARY => bytes.into(), - _ => DataType::Text( + _ => RequestData::Text( String::from_utf8(bytes) .map_err(|e| anyhow::anyhow!("Invalid utf8"))?, ), }; - let ret = RuesEvent { headers, data, uri }; + let ret = RuesDispatchEvent { headers, data, uri }; Ok(ret) } +} +impl RuesEvent { pub fn add_header, V: Into>( &mut self, key: K,