Skip to content

Commit

Permalink
Merge pull request #2224 from dusk-network/rues-dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia authored Sep 2, 2024
2 parents 20f16fd + 26d5f0b commit bfc2e15
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 64 deletions.
139 changes: 101 additions & 38 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ use rand::rngs::OsRng;
use crate::http::event::FullOrStreamBody;
use crate::VERSION;

pub use self::event::{ContractEvent, RuesEvent, RUES_LOCATION_PREFIX};
pub use self::event::{
ContractEvent, RuesDispatchEvent, RuesEvent, RUES_LOCATION_PREFIX,
};
use self::event::{MessageRequest, ResponseData, RuesEventUri, SessionId};
use self::stream::{Listener, Stream};

Expand Down Expand Up @@ -147,6 +149,24 @@ impl HandleRequest for DataSources {
}
Err(anyhow::anyhow!("unsupported target type"))
}

fn can_handle_rues(&self, event: &RuesDispatchEvent) -> bool {
self.sources.iter().any(|s| s.can_handle_rues(event))
}

async fn handle_rues(
&self,
event: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
info!("Received event at {}", event.uri);
event.check_rusk_version()?;
for h in &self.sources {
if h.can_handle_rues(event) {
return h.handle_rues(event).await;
}
}
Err(anyhow::anyhow!("unsupported location"))
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -426,7 +446,6 @@ where
enum SubscriptionAction {
Subscribe(RuesEventUri),
Unsubscribe(RuesEventUri),
Dispatch { uri: RuesEventUri, body: Incoming },
}

async fn handle_stream_rues<H: HandleRequest>(
Expand Down Expand Up @@ -459,17 +478,8 @@ async fn handle_stream_rues<H: HandleRequest>(
const DISPATCH_BUFFER_SIZE: usize = 16;

let mut subscription_set = HashSet::new();
let (dispatch_sender, dispatch_events) =
mpsc::channel(DISPATCH_BUFFER_SIZE);

// Join the two event receivers together, allowing for reusing the exact
// same code when handling them either of them.
let mut events = BroadcastStream::new(events);
let mut dispatch_events = ReceiverStream::new(dispatch_events);

let mut events = events
.map_err(Either::Left)
.merge(dispatch_events.map_err(Either::Right));

loop {
tokio::select! {
Expand Down Expand Up @@ -502,36 +512,23 @@ async fn handle_stream_rues<H: HandleRequest>(
SubscriptionAction::Unsubscribe(subscription) => {
subscription_set.remove(&subscription);
},
SubscriptionAction::Dispatch {
uri,
body
} => {
// TODO figure out if we should subscribe to the event we dispatch
task::spawn(handle_dispatch(uri, body, handler.clone(), dispatch_sender.clone()));
}
}
}

Some(event) = events.next() => {
let mut event = match event {
Ok(event) => event,
Err(err) => match err {
Either::Left(_berr) => {
// If the event channel is closed, it means the
// server has stopped producing events, so we
// should inform the client and stop.
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Away,
reason: Cow::from("Shutting down"),
})).await;
break;
Err(err) => {
// If the event channel is closed, it means the
// server has stopped producing events, so we
// should inform the client and stop.
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Away,
reason: Cow::from("Shutting down"),
})).await;
break;

}
Either::Right(_eerr) => {
// TODO handle execution error
continue;
},
},
}
};

// The event is subscribed to if it matches any of the subscriptions.
Expand Down Expand Up @@ -648,6 +645,31 @@ async fn handle_request_rues<H: HandleRequest>(
));

Ok(response.map(Into::into))
} else if req.method() == Method::POST {
let event = RuesDispatchEvent::from_request(req).await?;
let is_binary = event.is_binary();
let mut resp_headers = event.x_headers();
let (responder, mut receiver) = mpsc::unbounded_channel();
handle_execution_rues(handler, event, responder).await;

let execution_response = receiver
.recv()
.await
.expect("An execution should always return a response");
resp_headers.extend(execution_response.headers.clone());
let mut resp = execution_response.into_http(is_binary)?;

for (k, v) in resp_headers {
let k = HeaderName::from_str(&k)?;
let v = match v {
serde_json::Value::String(s) => HeaderValue::from_str(&s),
serde_json::Value::Null => HeaderValue::from_str(""),
_ => HeaderValue::from_str(&v.to_string()),
}?;
resp.headers_mut().append(k, v);
}

Ok(resp)
} else {
let headers = req.headers();

Expand Down Expand Up @@ -684,10 +706,6 @@ async fn handle_request_rues<H: HandleRequest>(
let action = match *req.method() {
Method::GET => SubscriptionAction::Subscribe(uri),
Method::DELETE => SubscriptionAction::Unsubscribe(uri),
Method::POST => SubscriptionAction::Dispatch {
uri,
body: req.into_body(),
},
_ => {
return response(
StatusCode::METHOD_NOT_ALLOWED,
Expand Down Expand Up @@ -797,13 +815,48 @@ async fn handle_execution<H>(
let _ = responder.send(rsp);
}

async fn handle_execution_rues<H>(
sources: Arc<H>,
event: RuesDispatchEvent,
responder: mpsc::UnboundedSender<EventResponse>,
) where
H: HandleRequest,
{
let mut rsp = sources
.handle_rues(&event)
.await
.map(|data| {
let (data, mut headers) = data.into_inner();
headers.append(&mut event.x_headers());
EventResponse {
data,
error: None,
headers,
}
})
.unwrap_or_else(|e| EventResponse {
headers: event.x_headers(),
data: DataType::None,
error: Some(e.to_string()),
});

rsp.set_header(RUSK_VERSION_HEADER, serde_json::json!(*VERSION));
let _ = responder.send(rsp);
}

#[async_trait]
pub trait HandleRequest: Send + Sync + 'static {
fn can_handle(&self, request: &MessageRequest) -> bool;
async fn handle(
&self,
request: &MessageRequest,
) -> anyhow::Result<ResponseData>;

fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool;
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData>;
}

#[cfg(test)]
Expand Down Expand Up @@ -834,6 +887,16 @@ mod tests {
true
}

fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
false
}
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
unimplemented!()
}

async fn handle(
&self,
request: &MessageRequest,
Expand Down
59 changes: 51 additions & 8 deletions rusk/src/lib/http/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ use graphql::{DBContext, Query};
use async_graphql::{
EmptyMutation, EmptySubscription, Name, Schema, Variables,
};
use serde_json::json;
use serde_json::{json, Map, Value};

use super::*;
use crate::node::RuskNode;
use crate::{VERSION, VERSION_BUILD};

const GQL_VAR_PREFIX: &str = "rusk-gqlvar-";

fn variables_from_request(request: &MessageRequest) -> Variables {
fn variables_from_headers(headers: &Map<String, Value>) -> Variables {
let mut var = Variables::default();
request
.headers
headers
.iter()
.filter_map(|(h, v)| {
let h = h.to_lowercase();
Expand All @@ -47,17 +46,60 @@ fn variables_from_request(request: &MessageRequest) -> Variables {

var
}

#[async_trait]
impl HandleRequest for RuskNode {
fn can_handle(&self, request: &MessageRequest) -> bool {
matches!(request.event.to_route(), (Target::Host(_), "Chain", _))
}

fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
#[allow(clippy::match_like_matches_macro)]
match request.uri.inner() {
("graphql", _, "query") => true,
("transactions", _, "propagate") => true,
("network", _, "peers") => true,
("node", _, "info") => true,
("blocks", _, "gas-price") => true,
_ => false,
}
}
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
match request.uri.inner() {
("graphql", _, "query") => {
self.handle_gql(&request.data, &request.headers).await
}
("transactions", _, "propagate") => {
self.propagate_tx(request.data.as_bytes()).await
}
("network", _, "peers") => {
let amount = request.data.as_string().trim().parse()?;
self.alive_nodes(amount).await
}
("node", _, "info") => self.get_info().await,
("blocks", _, "gas-price") => {
let max_transactions = request
.data
.as_string()
.trim()
.parse::<usize>()
.unwrap_or(usize::MAX);
self.get_gas_price(max_transactions).await
}
_ => anyhow::bail!("Unsupported"),
}
}
async fn handle(
&self,
request: &MessageRequest,
) -> anyhow::Result<ResponseData> {
match &request.event.to_route() {
(Target::Host(_), "Chain", "gql") => self.handle_gql(request).await,
(Target::Host(_), "Chain", "gql") => {
self.handle_gql(&request.event.data, &request.headers).await
}
(Target::Host(_), "Chain", "propagate_tx") => {
self.propagate_tx(request.event_data()).await
}
Expand All @@ -83,9 +125,10 @@ impl HandleRequest for RuskNode {
impl RuskNode {
async fn handle_gql(
&self,
request: &MessageRequest,
data: &RequestData,
headers: &serde_json::Map<String, Value>,
) -> anyhow::Result<ResponseData> {
let gql_query = request.event.data.as_string();
let gql_query = data.as_string();

let schema = Schema::build(Query, EmptyMutation, EmptySubscription)
.data(self.db())
Expand All @@ -95,7 +138,7 @@ impl RuskNode {
return Ok(ResponseData::new(schema.sdl()));
}

let variables = variables_from_request(request);
let variables = variables_from_headers(headers);
let gql_query =
async_graphql::Request::new(gql_query).variables(variables);

Expand Down
Loading

0 comments on commit bfc2e15

Please sign in to comment.