Skip to content

Commit

Permalink
wip (to test)
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 30, 2024
1 parent 9005221 commit 984f138
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 7 deletions.
96 changes: 92 additions & 4 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ use rand::rngs::OsRng;
use crate::http::event::FullOrStreamBody;
use crate::VERSION;

pub use self::event::{ContractEvent, RuesEvent, RUES_LOCATION_PREFIX};
pub use self::event::{
ContractEvent, RuesDispatchEvent, RuesEvent, RUES_LOCATION_PREFIX,
};
use self::event::{MessageRequest, ResponseData, RuesEventUri, SessionId};
use self::stream::{Listener, Stream};

Expand Down Expand Up @@ -147,6 +149,24 @@ impl HandleRequest for DataSources {
}
Err(anyhow::anyhow!("unsupported target type"))
}

fn can_handle_rues(&self, event: &RuesDispatchEvent) -> bool {
self.sources.iter().any(|s| s.can_handle_rues(event))
}

async fn handle_rues(
&self,
event: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
info!("Received event at {}", event.uri);
event.check_rusk_version()?;
for h in &self.sources {
if h.can_handle_rues(event) {
return h.handle_rues(event).await;
}
}
Err(anyhow::anyhow!("unsupported location"))
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -626,10 +646,31 @@ async fn handle_request_rues<H: HandleRequest>(

Ok(response.map(Into::into))
} else if req.method() == Method::POST {
let event = RuesEvent::from_request(req).await?;
let event = RuesDispatchEvent::from_request(req).await?;
let is_binary = event.is_binary();
let mut resp_headers = event.x_headers();
unimplemented!("Handle rues dispatch here");
// TODO: Handle rues dispatch
// handle_dispatch(uri, body, handler, sender)
let (responder, mut receiver) = mpsc::unbounded_channel();
handle_execution_rues(handler, event, responder).await;

let execution_response = receiver
.recv()
.await
.expect("An execution should always return a response");
resp_headers.extend(execution_response.headers.clone());
let mut resp = execution_response.into_http(is_binary)?;

for (k, v) in resp_headers {
let k = HeaderName::from_str(&k)?;
let v = match v {
serde_json::Value::String(s) => HeaderValue::from_str(&s),
serde_json::Value::Null => HeaderValue::from_str(""),
_ => HeaderValue::from_str(&v.to_string()),
}?;
resp.headers_mut().append(k, v);
}

Ok(resp)
} else {
let headers = req.headers();

Expand Down Expand Up @@ -775,14 +816,61 @@ async fn handle_execution<H>(
let _ = responder.send(rsp);
}

async fn handle_execution_rues<H>(
sources: Arc<H>,
event: RuesDispatchEvent,
responder: mpsc::UnboundedSender<EventResponse>,
) where
H: HandleRequest,
{
let mut rsp = sources
.handle_rues(&event)
.await
.map(|data| {
let (data, mut headers) = data.into_inner();
headers.append(&mut event.x_headers());
EventResponse {
data,
error: None,
headers,
}
})
.unwrap_or_else(|e| EventResponse {
headers: event.x_headers(),
data: DataType::None,
error: Some(e.to_string()),
});

rsp.set_header(RUSK_VERSION_HEADER, serde_json::json!(*VERSION));
let _ = responder.send(rsp);
}

#[async_trait]
pub trait HandleRequest: Send + Sync + 'static {
fn can_handle(&self, request: &MessageRequest) -> bool;
async fn handle(
&self,
request: &MessageRequest,
) -> anyhow::Result<ResponseData>;

fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
true
}
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
unimplemented!()
}
}
// #[async_trait]
// pub trait HandleRequestRues: Send + Sync + 'static {
// fn can_handle_rues(&self, request: &RuesEvent) -> bool;
// async fn handle_rues(
// &self,
// request: &RuesEvent,
// ) -> anyhow::Result<ResponseData>;
// }

#[cfg(test)]
mod tests {
Expand Down
45 changes: 42 additions & 3 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,44 @@ pub struct RuesEvent {
pub data: DataType,
}

impl RuesEvent {
/// A RUES Dispatch request event
#[derive(Debug)]
pub struct RuesDispatchEvent {
pub uri: RuesEventUri,
pub headers: serde_json::Map<String, serde_json::Value>,
pub data: RequestData,
}

impl RuesDispatchEvent {
pub fn x_headers(&self) -> serde_json::Map<String, serde_json::Value> {
let mut h = self.headers.clone();
h.retain(|k, _| k.to_lowercase().starts_with("x-"));
h
}

pub fn header(&self, name: &str) -> Option<&serde_json::Value> {
self.headers
.iter()
.find_map(|(k, v)| k.eq_ignore_ascii_case(name).then_some(v))
}

pub fn check_rusk_version(&self) -> anyhow::Result<()> {
if let Some(v) = self.header(RUSK_VERSION_HEADER) {
let req = match v.as_str() {
Some(v) => VersionReq::from_str(v),
None => VersionReq::from_str(&v.to_string()),
}?;

let current = Version::from_str(&crate::VERSION)?;
if !req.matches(&current) {
return Err(anyhow::anyhow!(
"Mismatched rusk version: requested {req} - current {current}",
));
}
}
Ok(())
}

pub fn is_binary(&self) -> bool {
self.headers
.get(CONTENT_TYPE)
Expand Down Expand Up @@ -885,17 +922,19 @@ impl RuesEvent {
let bytes = body.collect().await?.to_bytes().to_vec();
let data = match content_type {
CONTENT_TYPE_BINARY => bytes.into(),
_ => DataType::Text(
_ => RequestData::Text(
String::from_utf8(bytes)
.map_err(|e| anyhow::anyhow!("Invalid utf8"))?,
),
};

let ret = RuesEvent { headers, data, uri };
let ret = RuesDispatchEvent { headers, data, uri };

Ok(ret)
}
}

impl RuesEvent {
pub fn add_header<K: Into<String>, V: Into<serde_json::Value>>(
&mut self,
key: K,
Expand Down

0 comments on commit 984f138

Please sign in to comment.