Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 6, 2024
1 parent cf769b9 commit 80ed730
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
16 changes: 16 additions & 0 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,22 @@ impl From<execution_core::Event> for RuesEvent {
}
}

#[cfg(feature = "node")]
impl From<node_data::events::NodeEvent> for RuesEvent {
fn from(value: node_data::events::NodeEvent) -> Self {
let mut headers = serde_json::Map::new();
headers.insert(
"Content-Location".into(),
format!("/on/{}:{}/{}", value.target, value.id, value.topic).into(),
);

Self {
headers: serde_json::Map::new(),
data: RuesEventData::Other(value.data.into()),
}
}
}

/// Types of event data that RUES supports.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RuesEventData {
Expand Down
18 changes: 5 additions & 13 deletions rusk/src/lib/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use node::{LongLivedService, Network};
use node_data::events::NodeEvent;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;
use tracing::info;
use tracing::error;

use crate::http::RuesEvent;

Expand All @@ -22,15 +22,6 @@ pub(crate) struct NodeEventStreamer {
pub rues_sender: broadcast::Sender<RuesEvent>,
}

// impl From<NodeEvent> for RuesEvent {
// fn from(value: NodeEvent) -> Self {
// Self {
// headers: serde_json::Map::new(),
// data: RuesEventData::Contract(event),
// }
// }
// }

#[async_trait]
impl<N: Network, DB: database::DB, VM: node::vm::VMExecution>
LongLivedService<N, DB, VM> for NodeEventStreamer
Expand All @@ -42,9 +33,10 @@ impl<N: Network, DB: database::DB, VM: node::vm::VMExecution>
_: Arc<tokio::sync::RwLock<VM>>,
) -> anyhow::Result<usize> {
loop {
if let Some(_msg) = self.node_receiver.recv().await {
// self.sender.send()
info!("event received");
if let Some(msg) = self.node_receiver.recv().await {
if let Err(e) = self.rues_sender.send(msg.into()) {
error!("Cannot send to rues {e:?}");
}
}
}
}
Expand Down

0 comments on commit 80ed730

Please sign in to comment.