Skip to content

Commit

Permalink
rusk: change RUES to allow Text or Binary subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 8, 2024
1 parent 34a790c commit 18d1215
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 25 deletions.
46 changes: 23 additions & 23 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod stream;

pub(crate) use event::{
BinaryWrapper, DataType, ExecutionError, MessageResponse as EventResponse,
RequestData, Target,
RequestData, RuesSubscription, Target,
};

use execution_core::Event;
Expand Down Expand Up @@ -424,8 +424,8 @@ where
}

enum SubscriptionAction {
Subscribe(RuesEventUri),
Unsubscribe(RuesEventUri),
Subscribe(RuesSubscription),
Unsubscribe(RuesSubscription),
Dispatch { uri: RuesEventUri, body: Incoming },
}

Expand Down Expand Up @@ -534,23 +534,19 @@ async fn handle_stream_rues<H: HandleRequest>(
},
};

// The event is subscribed to if it matches any of the subscriptions.
let mut is_subscribed = false;
for sub in &subscription_set {
if sub.matches(&event) {
is_subscribed = true;
break;
}
}
let subscription = subscription_set.iter().find(|s|s.uri.matches(&event));

// If the event is subscribed, we send it to the client.
if is_subscribed {
if let Some(subscription) = subscription {
event.add_header("Content-Location", event.uri.to_string());
let event = event.to_bytes();
let event = match subscription.binary {
true => Message::Binary(event.to_bytes()),
false => Message::Text(event.to_json()),
};

// If the event fails sending we close the socket on the client
// and stop processing further.
if stream.send(Message::Binary(event)).await.is_err() {
if stream.send(event).await.is_err() {
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Error,
reason: Cow::from("Failed sending event"),
Expand Down Expand Up @@ -661,7 +657,7 @@ async fn handle_request_rues<H: HandleRequest>(
Some(sid) => sid,
};

let uri = match RuesEventUri::parse_from_path(req.uri().path()) {
let subscription = match RuesSubscription::parse_from_req(&req) {
None => {
return response(
StatusCode::NOT_FOUND,
Expand All @@ -682,10 +678,10 @@ async fn handle_request_rues<H: HandleRequest>(
};

let action = match *req.method() {
Method::GET => SubscriptionAction::Subscribe(uri),
Method::DELETE => SubscriptionAction::Unsubscribe(uri),
Method::GET => SubscriptionAction::Subscribe(subscription),
Method::DELETE => SubscriptionAction::Unsubscribe(subscription),
Method::POST => SubscriptionAction::Dispatch {
uri,
uri: subscription.uri,
body: req.into_body(),
},
_ => {
Expand Down Expand Up @@ -811,7 +807,7 @@ mod tests {
use std::{fs, thread};

use super::*;
use event::Event as EventRequest;
use event::{Event as EventRequest, RuesEventUri};

use crate::http::event::WrappedContractId;
use execution_core::ContractId;
Expand Down Expand Up @@ -1101,6 +1097,7 @@ mod tests {
server.local_addr
))
.header("Rusk-Session-Id", sid.to_string())
.header("Accept", "application/octet-stream".to_string())
.send()
.await
.expect("Requesting should succeed");
Expand All @@ -1113,6 +1110,7 @@ mod tests {
server.local_addr
))
.header("Rusk-Session-Id", sid.to_string())
.header("Accept", "application/octet-stream".to_string())
.send()
.await
.expect("Requesting should succeed");
Expand Down Expand Up @@ -1167,11 +1165,13 @@ mod tests {

assert_eq!(received_event, event, "Event should be the same");

let unsubscribe_location = format!(
"http://{}{}",
server.local_addr, at_first_received_event.uri
);

let response = client
.delete(format!(
"http://{}/on/contracts:{maybe_sub_contract_id_hex}/{TOPIC}",
server.local_addr
))
.delete(unsubscribe_location)
.header("Rusk-Session-Id", sid.to_string())
.send()
.await
Expand Down
44 changes: 42 additions & 2 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl From<Vec<u8>> for RequestData {
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct ResponseData {
data: DataType,
header: serde_json::Map<String, serde_json::Value>,
Expand Down Expand Up @@ -414,11 +414,11 @@ impl Eq for DataType {}
impl PartialEq for DataType {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Channel(_), Self::Channel(_)) => true,
(Self::Text(a), Self::Text(b)) => a == b,
(Self::Json(a), Self::Json(b)) => a == b,
(Self::Binary(a), Self::Binary(b)) => a == b,
(Self::None, Self::None) => true,
// Two mpsc::Receiver are always different
_ => false,
}
}
Expand Down Expand Up @@ -742,6 +742,42 @@ pub struct RuesEventUri {

pub const RUES_LOCATION_PREFIX: &str = "/on";

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct RuesSubscription {
pub binary: bool,
pub uri: RuesEventUri,
}

// We won't to take into consideration the "binary" flag while inserting into
// subscriptions_set
impl core::hash::Hash for RuesSubscription {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.uri.hash(state);
}
}

impl PartialEq for RuesSubscription {
fn eq(&self, other: &Self) -> bool {
self.uri == other.uri
}
}

impl Eq for RuesSubscription {}

impl RuesSubscription {
pub fn parse_from_req<B>(req: &Request<B>) -> Option<Self> {
let uri = RuesEventUri::parse_from_path(req.uri().path())?;
let binary = req
.headers()
.get(ACCEPT)
.and_then(|h| h.to_str().ok())
.map(|v| v.eq_ignore_ascii_case(CONTENT_TYPE_BINARY))
.unwrap_or_default();

Some(Self { binary, uri })
}
}

impl Display for RuesEventUri {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let component = &self.component;
Expand Down Expand Up @@ -876,6 +912,10 @@ impl RuesEvent {

bytes
}

pub fn to_json(&self) -> String {
serde_json::to_string(&self.data).expect("Data to be json serializable")
}
}

impl From<ContractEvent> for RuesEvent {
Expand Down

0 comments on commit 18d1215

Please sign in to comment.