Skip to content

Commit

Permalink
ok
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 30, 2024
1 parent 984f138 commit 69312a6
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 31 deletions.
9 changes: 2 additions & 7 deletions rusk/src/lib/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,6 @@ async fn handle_request_rues<H: HandleRequest>(
let event = RuesDispatchEvent::from_request(req).await?;
let is_binary = event.is_binary();
let mut resp_headers = event.x_headers();
unimplemented!("Handle rues dispatch here");
let (responder, mut receiver) = mpsc::unbounded_channel();
handle_execution_rues(handler, event, responder).await;

Expand Down Expand Up @@ -853,15 +852,11 @@ pub trait HandleRequest: Send + Sync + 'static {
request: &MessageRequest,
) -> anyhow::Result<ResponseData>;

fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
true
}
fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool;
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
unimplemented!()
}
) -> anyhow::Result<ResponseData>;
}
// #[async_trait]
// pub trait HandleRequestRues: Send + Sync + 'static {
Expand Down
51 changes: 43 additions & 8 deletions rusk/src/lib/http/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ use graphql::{DBContext, Query};
use async_graphql::{
EmptyMutation, EmptySubscription, Name, Schema, Variables,
};
use serde_json::json;
use serde_json::{json, Map, Value};

use super::*;
use crate::node::RuskNode;
use crate::{VERSION, VERSION_BUILD};

const GQL_VAR_PREFIX: &str = "rusk-gqlvar-";

fn variables_from_request(request: &MessageRequest) -> Variables {
fn variables_from_headers(headers: &Map<String, Value>) -> Variables {
let mut var = Variables::default();
request
.headers
headers
.iter()
.filter_map(|(h, v)| {
let h = h.to_lowercase();
Expand All @@ -47,17 +46,52 @@ fn variables_from_request(request: &MessageRequest) -> Variables {

var
}

#[async_trait]
impl HandleRequest for RuskNode {
fn can_handle(&self, request: &MessageRequest) -> bool {
matches!(request.event.to_route(), (Target::Host(_), "Chain", _))
}

fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
matches!(request.uri.inner(), ("Chain", ..))
}
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
match request.uri.inner() {
("Chain", _, "gql") => {
self.handle_gql(&request.data, &request.headers).await
}
("Chain", _, "propagate_tx") => {
self.propagate_tx(request.data.as_bytes()).await
}
("Chain", _, "alive_nodes") => {
let amount = request.data.as_string().trim().parse()?;
self.alive_nodes(amount).await
}
("Chain", _, "info") => self.get_info().await,
("Chain", _, "gas") => {
let max_transactions = request
.data
.as_string()
.trim()
.parse::<usize>()
.unwrap_or(usize::MAX);
self.get_gas_price(max_transactions).await
}
_ => anyhow::bail!("Unsupported"),
}
}
async fn handle(
&self,
request: &MessageRequest,
) -> anyhow::Result<ResponseData> {
match &request.event.to_route() {
(Target::Host(_), "Chain", "gql") => self.handle_gql(request).await,
(Target::Host(_), "Chain", "gql") => {
self.handle_gql(&request.event.data, &request.headers).await
}
(Target::Host(_), "Chain", "propagate_tx") => {
self.propagate_tx(request.event_data()).await
}
Expand All @@ -83,9 +117,10 @@ impl HandleRequest for RuskNode {
impl RuskNode {
async fn handle_gql(
&self,
request: &MessageRequest,
data: &RequestData,
headers: &serde_json::Map<String, Value>,
) -> anyhow::Result<ResponseData> {
let gql_query = request.event.data.as_string();
let gql_query = data.as_string();

let schema = Schema::build(Query, EmptyMutation, EmptySubscription)
.data(self.db())
Expand All @@ -95,7 +130,7 @@ impl RuskNode {
return Ok(ResponseData::new(schema.sdl()));
}

let variables = variables_from_request(request);
let variables = variables_from_headers(headers);
let gql_query =
async_graphql::Request::new(gql_query).variables(variables);

Expand Down
8 changes: 8 additions & 0 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,14 @@ impl Display for RuesEventUri {
}

impl RuesEventUri {
pub fn inner(&self) -> (&str, Option<&String>, &str) {
(
self.component.as_ref(),
self.entity.as_ref(),
self.topic.as_ref(),
)
}

pub fn parse_from_path(path: &str) -> Option<Self> {
if !path.starts_with(RUES_LOCATION_PREFIX) {
return None;
Expand Down
16 changes: 16 additions & 0 deletions rusk/src/lib/http/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ impl HandleRequest for LocalProver {
fn can_handle(&self, request: &MessageRequest) -> bool {
matches!(request.event.to_route(), (_, "rusk", topic) | (_, "prover", topic) if topic.starts_with("prove_"))
}
fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
matches!(request.uri.inner(), ("prover", ..))
}
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
let data = request.data.as_bytes();
let response = match request.uri.inner() {
("prover", _, "prove_execute") => {
LocalProver::prove(data).map_err(|e| anyhow!(e))?
}
_ => anyhow::bail!("Unsupported"),
};
Ok(ResponseData::new(response))
}

async fn handle(
&self,
Expand Down
57 changes: 41 additions & 16 deletions rusk/src/lib/http/rusk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use serde::Serialize;
use std::sync::{mpsc, Arc};
use std::thread;
use tokio::task;
use tungstenite::http::request;

use execution_core::ContractId;

Expand All @@ -29,6 +30,27 @@ impl HandleRequest for Rusk {
(Target::Contract(_), ..) | (Target::Host(_), "rusk", _)
)
}
fn can_handle_rues(&self, request: &RuesDispatchEvent) -> bool {
matches!(request.uri.inner(), ("contracts", ..) | ("rusk", ..))
}
async fn handle_rues(
&self,
request: &RuesDispatchEvent,
) -> anyhow::Result<ResponseData> {
match request.uri.inner() {
("contracts", Some(contract_id), method) => {
let feeder = request.header(RUSK_FEEDER_HEADER).is_some();
let data = request.data.as_bytes();
self.handle_contract_query(contract_id, method, data, feeder)
}
("rusk", _, "preverify") => {
self.handle_preverify(request.data.as_bytes())
}
("rusk", _, "provisioners") => self.get_provisioners(),
("rusk", _, "crs") => self.get_crs(),
_ => Err(anyhow::anyhow!("Unsupported")),
}
}

async fn handle(
&self,
Expand All @@ -37,7 +59,7 @@ impl HandleRequest for Rusk {
match &request.event.to_route() {
(Target::Contract(_), ..) => {
let feeder = request.header(RUSK_FEEDER_HEADER).is_some();
self.handle_contract_query(&request.event, feeder)
self.handle_contract_query_legacy(&request.event, feeder)
}
(Target::Host(_), "rusk", "preverify") => {
self.handle_preverify(request.event_data())
Expand All @@ -52,41 +74,44 @@ impl HandleRequest for Rusk {
}

impl Rusk {
fn handle_contract_query(
fn handle_contract_query_legacy(
&self,
event: &Event,
feeder: bool,
) -> anyhow::Result<ResponseData> {
let contract = event.target.inner();
let topic = &event.topic;
let data = event.data.as_bytes();

self.handle_contract_query(contract, topic, data, feeder)
}
fn handle_contract_query(
&self,
contract: &str,
topic: &str,
data: &[u8],
feeder: bool,
) -> anyhow::Result<ResponseData> {
let contract_bytes = hex::decode(contract)?;

let contract_bytes = contract_bytes
.try_into()
.map_err(|_| anyhow::anyhow!("Invalid contract bytes"))?;

let contract_id = ContractId::from_bytes(contract_bytes);
let fn_name = topic.to_string();
let data = data.to_vec();
if feeder {
let (sender, receiver) = mpsc::channel();

let rusk = self.clone();
let topic = event.topic.clone();
let arg = event.data.as_bytes().to_vec();

thread::spawn(move || {
rusk.feeder_query_raw(
ContractId::from_bytes(contract_bytes),
topic,
arg,
sender,
);
rusk.feeder_query_raw(contract_id, fn_name, data, sender);
});
Ok(ResponseData::new(receiver))
} else {
let data = self
.query_raw(
ContractId::from_bytes(contract_bytes),
event.topic.clone(),
event.data.as_bytes(),
)
.query_raw(contract_id, fn_name, data)
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(ResponseData::new(data))
}
Expand Down

0 comments on commit 69312a6

Please sign in to comment.