Skip to content

Commit

Permalink
rusk: refactor RuesEvent to include headers
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 9, 2024
1 parent 6ec3a42 commit 2e73db4
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 67 deletions.
77 changes: 28 additions & 49 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use crate::http::event::FullOrStreamBody;
use crate::VERSION;

pub use self::event::{ContractEvent, RuesEvent, RUES_LOCATION_PREFIX};
use self::event::{MessageRequest, ResponseData, RuesSubscription, SessionId};
use self::event::{MessageRequest, ResponseData, RuesEventUri, SessionId};
use self::stream::{Listener, Stream};

const RUSK_VERSION_HEADER: &str = "Rusk-Version";
Expand Down Expand Up @@ -424,12 +424,9 @@ where
}

enum SubscriptionAction {
Subscribe(RuesSubscription),
Unsubscribe(RuesSubscription),
Dispatch {
sub: RuesSubscription,
body: Incoming,
},
Subscribe(RuesEventUri),
Unsubscribe(RuesEventUri),
Dispatch { uri: RuesEventUri, body: Incoming },
}

async fn handle_stream_rues<H: HandleRequest>(
Expand Down Expand Up @@ -506,11 +503,11 @@ async fn handle_stream_rues<H: HandleRequest>(
subscription_set.remove(&subscription);
},
SubscriptionAction::Dispatch {
sub,
uri,
body
} => {
// TODO figure out if we should subscribe to the event we dispatch
task::spawn(handle_dispatch(sub, body, handler.clone(), dispatch_sender.clone()));
task::spawn(handle_dispatch(uri, body, handler.clone(), dispatch_sender.clone()));
}
}
}
Expand Down Expand Up @@ -548,7 +545,7 @@ async fn handle_stream_rues<H: HandleRequest>(

// If the event is subscribed, we send it to the client.
if is_subscribed {
event.apply_location();
event.add_header("Content-Location", event.uri.to_string());
let event = event.to_bytes();

// If the event fails sending we close the socket on the client
Expand All @@ -570,7 +567,7 @@ async fn handle_stream_rues<H: HandleRequest>(
}

async fn handle_dispatch<H: HandleRequest>(
sub: RuesSubscription,
uri: RuesEventUri,
body: Incoming,
handler: Arc<H>,
sender: mpsc::Sender<Result<RuesEvent, AnyhowError>>,
Expand Down Expand Up @@ -599,12 +596,9 @@ async fn handle_dispatch<H: HandleRequest>(
}
};

sender
.send(Ok(RuesEvent {
subscription: sub,
data: rsp,
}))
.await;
let (data, headers) = rsp.into_inner();

sender.send(Ok(RuesEvent { uri, data, headers })).await;
}

fn response(
Expand Down Expand Up @@ -656,11 +650,6 @@ async fn handle_request_rues<H: HandleRequest>(
Ok(response.map(Into::into))
} else {
let headers = req.headers();
let mut path_split = req.uri().path().split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

let sid = match SessionId::parse_from_req(&req) {
None => {
Expand All @@ -672,16 +661,15 @@ async fn handle_request_rues<H: HandleRequest>(
Some(sid) => sid,
};

let subscription =
match RuesSubscription::parse_from_path_split(path_split) {
None => {
return response(
StatusCode::NOT_FOUND,
"{{\"error\":\"Invalid URL path\n\"}}",
);
}
Some(s) => s,
};
let uri = match RuesEventUri::parse_from_path(req.uri().path()) {
None => {
return response(
StatusCode::NOT_FOUND,
"{{\"error\":\"Invalid URL path\n\"}}",
);
}
Some(s) => s,
};

let action_sender = match sockets_map.read().await.get(&sid) {
Some(sender) => sender.clone(),
Expand All @@ -694,10 +682,10 @@ async fn handle_request_rues<H: HandleRequest>(
};

let action = match *req.method() {
Method::GET => SubscriptionAction::Subscribe(subscription),
Method::DELETE => SubscriptionAction::Unsubscribe(subscription),
Method::GET => SubscriptionAction::Subscribe(uri),
Method::DELETE => SubscriptionAction::Unsubscribe(uri),
Method::POST => SubscriptionAction::Dispatch {
sub: subscription,
uri,
body: req.into_body(),
},
_ => {
Expand Down Expand Up @@ -1213,28 +1201,19 @@ mod tests {
}

pub fn from_bytes(data: &[u8]) -> anyhow::Result<RuesEvent> {
let (mut header, data) = crate::http::event::parse_header(data)?;
let (mut headers, data) = crate::http::event::parse_header(data)?;

let path = header
let path = headers
.remove("Content-Location")
.ok_or(anyhow::anyhow!("Content location is not set"))?
.as_str()
.ok_or(anyhow::anyhow!("Content location is not a string"))?
.to_string();

let mut path_split = path.split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

let subscription = RuesSubscription::parse_from_path_split(path_split)
let uri = RuesEventUri::parse_from_path(&path)
.ok_or(anyhow::anyhow!("Invalid location"))?;

let mut data = ResponseData::new(data.to_vec());
for (key, value) in header {
data = data.with_header(key, value);
}
Ok(RuesEvent { data, subscription })
let data = data.to_vec().into();
Ok(RuesEvent { data, headers, uri })
}
}
54 changes: 36 additions & 18 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,16 +732,16 @@ impl SessionId {
/// `transactions`, etc...) and an optional entity within the component that
/// the event targets.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)]
pub struct RuesSubscription {
pub struct RuesEventUri {
pub component: String,
pub entity: Option<String>,
pub topic: String,
}

pub const RUES_LOCATION_PREFIX: &str = "/on";

impl RuesSubscription {
pub fn to_location(&self) -> String {
impl Display for RuesEventUri {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let component = &self.component;
let entity = self
.entity
Expand All @@ -750,10 +750,23 @@ impl RuesSubscription {
.unwrap_or_default();
let topic = &self.topic;

format!("{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}")
write!(f, "{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}")
}
}

impl RuesEventUri {
pub fn parse_from_path(path: &str) -> Option<Self> {
if !path.starts_with(RUES_LOCATION_PREFIX) {
return None;
}
// Skip '/on' since we already know its present
let path = &path[RUES_LOCATION_PREFIX.len()..];

let mut path_split = path.split('/');

// Skip first '/'
path_split.next()?;

pub fn parse_from_path_split(mut path_split: Split<char>) -> Option<Self> {
// If the segment contains a `:`, we split the string in two after the
// first one - meaning entities with `:` are still possible.
// If the segment doesn't contain a `:` then the segment is just a
Expand All @@ -778,7 +791,7 @@ impl RuesSubscription {
}

pub fn matches(&self, event: &RuesEvent) -> bool {
let event = &event.subscription;
let event = &event.uri;
if self.component != event.component {
return false;
}
Expand Down Expand Up @@ -827,25 +840,29 @@ impl From<ContractEvent> for execution_core::Event {
/// A RUES event
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RuesEvent {
pub subscription: RuesSubscription,
pub data: ResponseData,
pub uri: RuesEventUri,
pub headers: serde_json::Map<String, serde_json::Value>,
pub data: DataType,
}

impl RuesEvent {
pub fn apply_location(&mut self) {
let location = self.subscription.to_location();
self.data.add_header("Content-Location", location);
pub fn add_header<K: Into<String>, V: Into<serde_json::Value>>(
&mut self,
key: K,
value: V,
) {
self.headers.insert(key.into(), value.into());
}

/// Serialize the event into a vector of bytes.
pub fn to_bytes(&self) -> Vec<u8> {
let headers_bytes = serde_json::to_vec(&self.data.header)
let headers_bytes = serde_json::to_vec(&self.headers)
.expect("Serializing JSON should succeed");

let headers_len = headers_bytes.len() as u32;
let headers_len_bytes = headers_len.to_le_bytes();

let data_bytes = self.data.data.to_bytes();
let data_bytes = self.data.to_bytes();

let len =
headers_len_bytes.len() + headers_bytes.len() + data_bytes.len();
Expand All @@ -862,12 +879,13 @@ impl RuesEvent {
impl From<ContractEvent> for RuesEvent {
fn from(event: ContractEvent) -> Self {
Self {
subscription: RuesSubscription {
uri: RuesEventUri {
component: "contracts".into(),
entity: Some(hex::encode(event.target.0.as_bytes())),
topic: event.topic,
},
data: ResponseData::new(event.data),
data: event.data.into(),
headers: Default::default(),
}
}
}
Expand All @@ -884,13 +902,13 @@ impl From<node_data::events::Event> for RuesEvent {
let data = value.data.map_or(DataType::None, DataType::Json);

Self {
subscription: RuesSubscription {
uri: RuesEventUri {
component: value.component.into(),
entity: Some(value.entity),
topic: value.topic.into(),
},

data: ResponseData::new(data),
data,
headers: Default::default(),
}
}
}
Expand Down

0 comments on commit 2e73db4

Please sign in to comment.