Skip to content

Commit

Permalink
rusk: change RUES to allow Text or Binary subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 7, 2024
1 parent 5c3c230 commit f519cee
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 52 deletions.
71 changes: 29 additions & 42 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,23 +537,19 @@ async fn handle_stream_rues<H: HandleRequest>(
},
};

// The event is subscribed to if it matches any of the subscriptions.
let mut is_subscribed = false;
for sub in &subscription_set {
if sub.matches(&event) {
is_subscribed = true;
break;
}
}
let subscription = subscription_set.iter().find(|s|s.location.matches(&event));

// If the event is subscribed, we send it to the client.
if is_subscribed {
if let Some(subscription) = subscription {
event.apply_location();
let event = event.to_bytes();
let event = match subscription.binary {
true => Message::Binary(event.to_bytes()),
false => Message::Text(event.to_json()),
};

// If the event fails sending we close the socket on the client
// and stop processing further.
if stream.send(Message::Binary(event)).await.is_err() {
if stream.send(event).await.is_err() {
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Error,
reason: Cow::from("Failed sending event"),
Expand Down Expand Up @@ -601,7 +597,7 @@ async fn handle_dispatch<H: HandleRequest>(

sender
.send(Ok(RuesEvent {
subscription: sub,
location: sub.location,
data: rsp,
}))
.await;
Expand Down Expand Up @@ -655,13 +651,6 @@ async fn handle_request_rues<H: HandleRequest>(

Ok(response.map(Into::into))
} else {
let headers = req.headers();
let mut path_split = req.uri().path().split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

let sid = match SessionId::parse_from_req(&req) {
None => {
return response(
Expand All @@ -672,16 +661,15 @@ async fn handle_request_rues<H: HandleRequest>(
Some(sid) => sid,
};

let subscription =
match RuesSubscription::parse_from_path_split(path_split) {
None => {
return response(
StatusCode::NOT_FOUND,
"{{\"error\":\"Invalid URL path\n\"}}",
);
}
Some(s) => s,
};
let subscription = match RuesSubscription::parse_from_req(&req) {
None => {
return response(
StatusCode::NOT_FOUND,
"{{\"error\":\"Invalid URL path\n\"}}",
);
}
Some(s) => s,
};

let action_sender = match sockets_map.read().await.get(&sid) {
Some(sender) => sender.clone(),
Expand Down Expand Up @@ -823,7 +811,7 @@ mod tests {
use std::{fs, thread};

use super::*;
use event::Event as EventRequest;
use event::{Event as EventRequest, RuesLocation};

use crate::http::event::WrappedContractId;
use execution_core::ContractId;
Expand Down Expand Up @@ -1113,6 +1101,7 @@ mod tests {
server.local_addr
))
.header("Rusk-Session-Id", sid.to_string())
.header("Accept", "application/octet-stream".to_string())
.send()
.await
.expect("Requesting should succeed");
Expand All @@ -1125,6 +1114,7 @@ mod tests {
server.local_addr
))
.header("Rusk-Session-Id", sid.to_string())
.header("Accept", "application/octet-stream".to_string())
.send()
.await
.expect("Requesting should succeed");
Expand Down Expand Up @@ -1179,11 +1169,14 @@ mod tests {

assert_eq!(received_event, event, "Event should be the same");

let unsubscribe_location = format!(
"http://{}{}",
server.local_addr,
at_first_received_event.location.to_path()
);

let response = client
.delete(format!(
"http://{}/on/contracts:{maybe_sub_contract_id_hex}/{TOPIC}",
server.local_addr
))
.delete(unsubscribe_location)
.header("Rusk-Session-Id", sid.to_string())
.send()
.await
Expand Down Expand Up @@ -1222,19 +1215,13 @@ mod tests {
.ok_or(anyhow::anyhow!("Content location is not a string"))?
.to_string();

let mut path_split = path.split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

let subscription = RuesSubscription::parse_from_path_split(path_split)
let location = RuesLocation::parse_from_path(&path)
.ok_or(anyhow::anyhow!("Invalid location"))?;

let mut data = ResponseData::new(data.to_vec());
for (key, value) in header {
data = data.with_header(key, value);
}
Ok(RuesEvent { data, subscription })
Ok(RuesEvent { data, location })
}
}
66 changes: 56 additions & 10 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl From<Vec<u8>> for RequestData {
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct ResponseData {
data: DataType,
header: serde_json::Map<String, serde_json::Value>,
Expand Down Expand Up @@ -414,11 +414,11 @@ impl Eq for DataType {}
impl PartialEq for DataType {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Channel(_), Self::Channel(_)) => true,
(Self::Text(a), Self::Text(b)) => a == b,
(Self::Json(a), Self::Json(b)) => a == b,
(Self::Binary(a), Self::Binary(b)) => a == b,
(Self::None, Self::None) => true,
// Two mpsc::Receiver are always different
_ => false,
}
}
Expand Down Expand Up @@ -734,16 +734,52 @@ impl SessionId {
/// `transactions`, etc...) and an optional entity within the component that
/// the event targets.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Deserialize, Serialize)]
pub struct RuesSubscription {
pub struct RuesLocation {
pub component: String,
pub entity: Option<String>,
pub topic: String,
}

pub const RUES_LOCATION_PREFIX: &str = "/on";

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RuesSubscription {
pub binary: bool,
pub location: RuesLocation,
}

// We won't to take into consideration the "binary" flag while inserting into
// subscriptions_set
impl core::hash::Hash for RuesSubscription {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.location.hash(state);
}
}

impl PartialEq for RuesSubscription {
fn eq(&self, other: &Self) -> bool {
self.location == other.location
}
}

impl Eq for RuesSubscription {}

impl RuesSubscription {
pub fn to_location(&self) -> String {
pub fn parse_from_req<B>(req: &Request<B>) -> Option<Self> {
let location = RuesLocation::parse_from_path(req.uri().path())?;
let binary = req
.headers()
.get(ACCEPT)
.and_then(|h| h.to_str().ok())
.map(|v| v.eq_ignore_ascii_case(CONTENT_TYPE_BINARY))
.unwrap_or_default();

Some(Self { binary, location })
}
}

impl RuesLocation {
pub fn to_path(&self) -> String {
let component = &self.component;
let entity = self
.entity
Expand All @@ -755,7 +791,13 @@ impl RuesSubscription {
format!("{RUES_LOCATION_PREFIX}/{component}{entity}/{topic}")
}

pub fn parse_from_path_split(mut path_split: Split<char>) -> Option<Self> {
pub fn parse_from_path(path: &str) -> Option<Self> {
let mut path_split = path.split('/');

// Skip '/on' since we already know its present
path_split.next();
path_split.next();

// If the segment contains a `:`, we split the string in two after the
// first one - meaning entities with `:` are still possible.
// If the segment doesn't contain a `:` then the segment is just a
Expand All @@ -780,7 +822,7 @@ impl RuesSubscription {
}

pub fn matches(&self, event: &RuesEvent) -> bool {
let event = &event.subscription;
let event = &event.location;
if self.component != event.component {
return false;
}
Expand Down Expand Up @@ -829,13 +871,13 @@ impl From<ContractEvent> for execution_core::Event {
/// A RUES event
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RuesEvent {
pub subscription: RuesSubscription,
pub location: RuesLocation,
pub data: ResponseData,
}

impl RuesEvent {
pub fn apply_location(&mut self) {
let location = self.subscription.to_location();
let location = self.location.to_path();
self.data.add_header("Content-Location", location);
}

Expand All @@ -859,12 +901,16 @@ impl RuesEvent {

bytes
}

pub fn to_json(&self) -> String {
serde_json::to_string(&self.data).expect("Data to be json serializable")
}
}

impl From<ContractEvent> for RuesEvent {
fn from(event: ContractEvent) -> Self {
Self {
subscription: RuesSubscription {
location: RuesLocation {
component: "contracts".into(),
entity: Some(hex::encode(event.target.0.as_bytes())),
topic: event.topic,
Expand All @@ -884,7 +930,7 @@ impl From<execution_core::Event> for RuesEvent {
impl From<node_data::events::Event> for RuesEvent {
fn from(value: node_data::events::Event) -> Self {
Self {
subscription: RuesSubscription {
location: RuesLocation {
component: value.component.into(),
entity: Some(value.entity),
topic: value.topic.into(),
Expand Down

0 comments on commit f519cee

Please sign in to comment.