Skip to content

Commit

Permalink
rusk: support routing of generic RuesEvent
Browse files Browse the repository at this point in the history
- Replace RuesEventData with existing DataType
- Fix subscriptions with empty entity
- Add location header to emitted RuesEvents
  • Loading branch information
herr-seppia committed Aug 7, 2024
1 parent 7c617cc commit 5c3c230
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 135 deletions.
93 changes: 41 additions & 52 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ use anyhow::Error as AnyhowError;
use hyper_util::rt::TokioIo;
use rand::rngs::OsRng;

use crate::http::event::{FullOrStreamBody, RuesEventData};
use crate::http::event::FullOrStreamBody;
use crate::VERSION;

pub use self::event::{ContractEvent, RuesEvent};
pub use self::event::{ContractEvent, RuesEvent, RUES_LOCATION_PREFIX};
use self::event::{MessageRequest, ResponseData, RuesSubscription, SessionId};
use self::stream::{Listener, Stream};

Expand Down Expand Up @@ -516,7 +516,7 @@ async fn handle_stream_rues<H: HandleRequest>(
}

Some(event) = events.next() => {
let event = match event {
let mut event = match event {
Ok(event) => event,
Err(err) => match err {
Either::Left(_berr) => {
Expand Down Expand Up @@ -548,6 +548,7 @@ async fn handle_stream_rues<H: HandleRequest>(

// If the event is subscribed, we send it to the client.
if is_subscribed {
event.apply_location();
let event = event.to_bytes();

// If the event fails sending we close the socket on the client
Expand Down Expand Up @@ -598,48 +599,12 @@ async fn handle_dispatch<H: HandleRequest>(
}
};

let (data, header) = rsp.into_inner();
match data {
DataType::Binary(bytes) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes.inner),
}))
.await;
}
DataType::Text(text) => {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(text.into_bytes()),
}))
.await;
}
DataType::Json(json) => {
let _ = sender
.send(
serde_json::to_vec(&json)
.map(|bytes| RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
})
.map_err(Into::into),
)
.await;
}
DataType::Channel(channel) => {
for bytes in channel {
let _ = sender
.send(Ok(RuesEvent {
headers: req.headers.clone(),
data: RuesEventData::Other(bytes),
}))
.await;
}
}
DataType::None => {}
}
sender
.send(Ok(RuesEvent {
subscription: sub,
data: rsp,
}))
.await;
}

fn response(
Expand Down Expand Up @@ -770,7 +735,7 @@ where
let path = req.uri().path();

// If the request is a RUES request, we handle it differently.
if path.starts_with("/on") {
if path.starts_with(RUES_LOCATION_PREFIX) {
return handle_request_rues(
req,
sources.clone(),
Expand Down Expand Up @@ -1203,16 +1168,14 @@ mod tests {
let message = stream.read().expect("Event should be received");
let event_bytes = message.into_data();

let event = RuesEvent::from_bytes(&event_bytes)
.expect("Event should deserialize");
let event = from_bytes(&event_bytes).expect("Event should deserialize");

assert_eq!(at_first_received_event, event, "Event should be the same");

let message = stream.read().expect("Event should be received");
let event_bytes = message.into_data();

let event = RuesEvent::from_bytes(&event_bytes)
.expect("Event should deserialize");
let event = from_bytes(&event_bytes).expect("Event should deserialize");

assert_eq!(received_event, event, "Event should be the same");

Expand Down Expand Up @@ -1241,11 +1204,37 @@ mod tests {
.expect("Sending event should succeed");

let message = stream.read().expect("Event should be received");

let event_bytes = message.into_data();

let event = RuesEvent::from_bytes(&event_bytes)
.expect("Event should deserialize");
let event = from_bytes(&event_bytes).expect("Event should deserialize");

assert_eq!(received_event, event, "Event should be the same");
}

pub fn from_bytes(data: &[u8]) -> anyhow::Result<RuesEvent> {
let (mut header, data) = crate::http::event::parse_header(data)?;

let path = header
.remove("Content-Location")
.ok_or(anyhow::anyhow!("Content location is not set"))?
.as_str()
.ok_or(anyhow::anyhow!("Content location is not a string"))?
.to_string();

let mut path_split = path.split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

let subscription = RuesSubscription::parse_from_path_split(path_split)
.ok_or(anyhow::anyhow!("Invalid location"))?;

let mut data = ResponseData::new(data.to_vec());
for (key, value) in header {
data = data.with_header(key, value);
}
Ok(RuesEvent { data, subscription })
}
}
Loading

0 comments on commit 5c3c230

Please sign in to comment.