Skip to content

Commit

Permalink
rusk: change RUES to allow Text or Binary subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 7, 2024
1 parent 5c3c230 commit d17087d
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 40 deletions.
53 changes: 22 additions & 31 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,23 +537,19 @@ async fn handle_stream_rues<H: HandleRequest>(
},
};

// The event is subscribed to if it matches any of the subscriptions.
let mut is_subscribed = false;
for sub in &subscription_set {
if sub.matches(&event) {
is_subscribed = true;
break;
}
}
let subscription = subscription_set.iter().find(|s|s.location.matches(&event));

// If the event is subscribed, we send it to the client.
if is_subscribed {
if let Some(subscription) = subscription {
event.apply_location();
let event = event.to_bytes();
let event = match subscription.binary {
true => Message::Binary(event.to_bytes()),
false => Message::Text(event.to_json()),
};

// If the event fails sending we close the socket on the client
// and stop processing further.
if stream.send(Message::Binary(event)).await.is_err() {
if stream.send(event).await.is_err() {
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Error,
reason: Cow::from("Failed sending event"),
Expand Down Expand Up @@ -601,7 +597,7 @@ async fn handle_dispatch<H: HandleRequest>(

sender
.send(Ok(RuesEvent {
subscription: sub,
location: sub.location,
data: rsp,
}))
.await;
Expand Down Expand Up @@ -672,16 +668,15 @@ async fn handle_request_rues<H: HandleRequest>(
Some(sid) => sid,
};

let subscription =
match RuesSubscription::parse_from_path_split(path_split) {
None => {
return response(
StatusCode::NOT_FOUND,
"{{\"error\":\"Invalid URL path\n\"}}",
);
}
Some(s) => s,
};
let subscription = match RuesSubscription::parse_from_req(&req) {
None => {
return response(
StatusCode::NOT_FOUND,
"{{\"error\":\"Invalid URL path\n\"}}",
);
}
Some(s) => s,
};

let action_sender = match sockets_map.read().await.get(&sid) {
Some(sender) => sender.clone(),
Expand Down Expand Up @@ -823,7 +818,7 @@ mod tests {
use std::{fs, thread};

use super::*;
use event::Event as EventRequest;
use event::{Event as EventRequest, RuesLocation};

use crate::http::event::WrappedContractId;
use execution_core::ContractId;
Expand Down Expand Up @@ -1113,6 +1108,7 @@ mod tests {
server.local_addr
))
.header("Rusk-Session-Id", sid.to_string())
.header("Accept", "application/octet-stream".to_string())
.send()
.await
.expect("Requesting should succeed");
Expand All @@ -1125,6 +1121,7 @@ mod tests {
server.local_addr
))
.header("Rusk-Session-Id", sid.to_string())
.header("Accept", "application/octet-stream".to_string())
.send()
.await
.expect("Requesting should succeed");
Expand Down Expand Up @@ -1222,19 +1219,13 @@ mod tests {
.ok_or(anyhow::anyhow!("Content location is not a string"))?
.to_string();

let mut path_split = path.split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

let subscription = RuesSubscription::parse_from_path_split(path_split)
let location = RuesLocation::parse_from_path(&path)
.ok_or(anyhow::anyhow!("Invalid location"))?;

let mut data = ResponseData::new(data.to_vec());
for (key, value) in header {
data = data.with_header(key, value);
}
Ok(RuesEvent { data, subscription })
Ok(RuesEvent { data, location })
}
}
48 changes: 39 additions & 9 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl From<Vec<u8>> for RequestData {
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct ResponseData {
data: DataType,
header: serde_json::Map<String, serde_json::Value>,
Expand Down Expand Up @@ -734,16 +734,36 @@ impl SessionId {
/// `transactions`, etc...) and an optional entity within the component that
/// the event targets.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)]
pub struct RuesSubscription {
pub struct RuesLocation {
pub component: String,
pub entity: Option<String>,
pub topic: String,
}

pub const RUES_LOCATION_PREFIX: &str = "/on";

#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)]
pub struct RuesSubscription {
pub binary: bool,
pub location: RuesLocation,
}

impl RuesSubscription {
pub fn to_location(&self) -> String {
pub fn parse_from_req<B>(req: &Request<B>) -> Option<Self> {
let location = RuesLocation::parse_from_path(req.uri().path())?;
let binary = req
.headers()
.get(ACCEPT)
.and_then(|h| h.to_str().ok())
.map(|v| v.eq_ignore_ascii_case(CONTENT_TYPE_BINARY))
.unwrap_or_default();

Some(Self { binary, location })
}
}

impl RuesLocation {
pub fn to_path(&self) -> String {
let component = &self.component;
let entity = self
.entity
Expand All @@ -755,7 +775,13 @@ impl RuesSubscription {
format!("{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}")
}

pub fn parse_from_path_split(mut path_split: Split<char>) -> Option<Self> {
pub fn parse_from_path(path: &str) -> Option<Self> {
let mut path_split = path.split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

// If the segment contains a `:`, we split the string in two after the
// first one - meaning entities with `:` are still possible.
// If the segment doesn't contain a `:` then the segment is just a
Expand All @@ -780,7 +806,7 @@ impl RuesSubscription {
}

pub fn matches(&self, event: &RuesEvent) -> bool {
let event = &event.subscription;
let event = &event.location;
if self.component != event.component {
return false;
}
Expand Down Expand Up @@ -829,13 +855,13 @@ impl From<ContractEvent> for execution_core::Event {
/// A RUES event
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RuesEvent {
pub subscription: RuesSubscription,
pub location: RuesLocation,
pub data: ResponseData,
}

impl RuesEvent {
pub fn apply_location(&mut self) {
let location = self.subscription.to_location();
let location = self.location.to_path();
self.data.add_header("Content-Location", location);
}

Expand All @@ -859,12 +885,16 @@ impl RuesEvent {

bytes
}

pub fn to_json(&self) -> String {
serde_json::to_string(&self.data).expect("Data to be json serializable")
}
}

impl From<ContractEvent> for RuesEvent {
fn from(event: ContractEvent) -> Self {
Self {
subscription: RuesSubscription {
location: RuesLocation {
component: "contracts".into(),
entity: Some(hex::encode(event.target.0.as_bytes())),
topic: event.topic,
Expand All @@ -884,7 +914,7 @@ impl From<execution_core::Event> for RuesEvent {
impl From<node_data::events::Event> for RuesEvent {
fn from(value: node_data::events::Event) -> Self {
Self {
subscription: RuesSubscription {
location: RuesLocation {
component: value.component.into(),
entity: Some(value.entity),
topic: value.topic.into(),
Expand Down

0 comments on commit d17087d

Please sign in to comment.