Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
herr-seppia committed Aug 6, 2024
1 parent 80ed730 commit fab8b26
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 32 deletions.
18 changes: 9 additions & 9 deletions node-data/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,27 @@ pub use blocks::BlockEvent;
pub use transactions::TransactionEvent;

#[derive(Clone, Debug)]
pub struct NodeEvent {
pub target: String,
pub topic: String,
pub struct Event {
pub target: &'static str,
pub topic: &'static str,
pub id: String,
pub data: String,
pub data: Option<String>,
}

pub trait NodeEventSource {
pub trait EventSource {
fn target(&self) -> &'static str;
fn topic(&self) -> &'static str;
fn id(&self) -> String;
fn data(&self) -> String;
fn data(&self) -> Option<String>;
}

impl<B: NodeEventSource> From<B> for NodeEvent {
impl<B: EventSource> From<B> for Event {
fn from(value: B) -> Self {
Self {
data: value.data(),
topic: value.topic().into(),
topic: value.topic(),
id: value.id(),
target: value.target().into(),
target: value.target(),
}
}
}
10 changes: 5 additions & 5 deletions node-data/src/events/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use super::*;
use crate::ledger::{Block, Hash};

impl NodeEventSource for BlockEvent<'_> {
impl EventSource for BlockEvent<'_> {
fn target(&self) -> &'static str {
"blocks"
}
Expand All @@ -17,15 +17,15 @@ impl NodeEventSource for BlockEvent<'_> {
Self::StateChange { .. } => "state_change",
}
}
fn data(&self) -> String {
fn data(&self) -> Option<String> {
match self {
Self::Accepted(_) => String::new(),
Self::StateChange { state, height, .. } => format!(
Self::Accepted(_) => None,
Self::StateChange { state, height, .. } => Some(format!(
r#"{{
"state": "{state}",
"atHeight": {height}
}}"#,
),
)),
}
}
fn id(&self) -> String {
Expand Down
6 changes: 3 additions & 3 deletions node-data/src/events/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub enum TransactionEvent<'t> {
Executed(&'t SpentTransaction),
}

impl NodeEventSource for TransactionEvent<'_> {
impl EventSource for TransactionEvent<'_> {
fn target(&self) -> &'static str {
"transactions"
}
Expand All @@ -25,8 +25,8 @@ impl NodeEventSource for TransactionEvent<'_> {
Self::Included(_) => "included",
}
}
fn data(&self) -> String {
String::new()
fn data(&self) -> Option<String> {
None
}
fn id(&self) -> String {
let hash = match self {
Expand Down
10 changes: 7 additions & 3 deletions node/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use anyhow::Result;
use async_trait::async_trait;
use dusk_consensus::commons::ConsensusError;
pub use header_validation::verify_att;
use node_data::events::NodeEvent;
use node_data::events::Event;
use node_data::ledger::{to_str, BlockWithLabel, Label};
use node_data::message::AsyncQueue;
use node_data::message::{Payload, Topics};
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
keys_path: String,
acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
max_consensus_queue_size: usize,
event_sender: Sender<NodeEvent>,
event_sender: Sender<Event>,
}

#[async_trait]
Expand Down Expand Up @@ -242,7 +242,11 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution>
}

impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
pub fn new(keys_path: String, max_inbound_size: usize, event_sender: Sender<NodeEvent>) -> Self {
pub fn new(
keys_path: String,
max_inbound_size: usize,
event_sender: Sender<Event>,
) -> Self {
info!(
"ChainSrv::new with keys_path: {}, max_inbound_size: {}",
keys_path, max_inbound_size
Expand Down
8 changes: 4 additions & 4 deletions node/src/chain/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use dusk_consensus::commons::{ConsensusError, TimeoutSet};
use dusk_consensus::config::{MAX_STEP_TIMEOUT, MIN_STEP_TIMEOUT};
use dusk_consensus::user::provisioners::{ContextProvisioners, Provisioners};
use node_data::bls::PublicKey;
use node_data::events::{BlockEvent, NodeEvent, TransactionEvent};
use node_data::events::{BlockEvent, Event, TransactionEvent};
use node_data::ledger::{
self, to_str, Block, BlockWithLabel, Label, Seed, Slash, SpentTransaction,
};
Expand Down Expand Up @@ -70,7 +70,7 @@ pub(crate) struct Acceptor<N: Network, DB: database::DB, VM: vm::VMExecution> {
pub(crate) vm: Arc<RwLock<VM>>,
network: Arc<RwLock<N>>,

event_sender: Sender<NodeEvent>,
event_sender: Sender<Event>,
}

impl<DB: database::DB, VM: vm::VMExecution, N: Network> Drop
Expand Down Expand Up @@ -131,7 +131,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
network: Arc<RwLock<N>>,
vm: Arc<RwLock<VM>>,
max_queue_size: usize,
event_sender: Sender<NodeEvent>,
event_sender: Sender<Event>,
) -> anyhow::Result<Self> {
let tip_height = tip.inner().header().height;
let tip_state_hash = tip.inner().header().state_hash;
Expand Down Expand Up @@ -646,7 +646,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> Acceptor<N, DB, VM> {
pni: u8,
blk: &Block,
db: &D::P<'_>,
events: &mut Vec<NodeEvent>,
events: &mut Vec<Event>,
) -> Result<(Label, Option<RollingFinalityResult>)> {
let confirmed_after = match pni {
0 => 1u64,
Expand Down
6 changes: 3 additions & 3 deletions node/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::database::{Ledger, Mempool};
use crate::mempool::conf::Params;
use crate::{database, vm, LongLivedService, Message, Network};
use async_trait::async_trait;
use node_data::events::{NodeEvent, TransactionEvent};
use node_data::events::{Event, TransactionEvent};
use node_data::ledger::Transaction;
use node_data::message::{AsyncQueue, Payload, Topics};
use std::sync::Arc;
Expand Down Expand Up @@ -43,11 +43,11 @@ impl From<anyhow::Error> for TxAcceptanceError {

pub struct MempoolSrv {
inbound: AsyncQueue<Message>,
event_sender: Sender<NodeEvent>,
event_sender: Sender<Event>,
}

impl MempoolSrv {
pub fn new(conf: Params, event_sender: Sender<NodeEvent>) -> Self {
pub fn new(conf: Params, event_sender: Sender<Event>) -> Self {
info!("MempoolSrv::new with conf {}", conf);
Self {
inbound: AsyncQueue::bounded(
Expand Down
6 changes: 3 additions & 3 deletions rusk/src/lib/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,8 +835,8 @@ impl From<execution_core::Event> for RuesEvent {
}

#[cfg(feature = "node")]
impl From<node_data::events::NodeEvent> for RuesEvent {
fn from(value: node_data::events::NodeEvent) -> Self {
impl From<node_data::events::Event> for RuesEvent {
fn from(value: node_data::events::Event) -> Self {
let mut headers = serde_json::Map::new();
headers.insert(
"Content-Location".into(),
Expand All @@ -845,7 +845,7 @@ impl From<node_data::events::NodeEvent> for RuesEvent {

Self {
headers: serde_json::Map::new(),
data: RuesEventData::Other(value.data.into()),
data: RuesEventData::Other(value.data.unwrap_or_default().into()),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions rusk/src/lib/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ use std::sync::Arc;
use async_trait::async_trait;
use node::database::{self};
use node::{LongLivedService, Network};
use node_data::events::NodeEvent;
use node_data::events::Event as ChainEvent;
use tokio::sync::broadcast;
use tokio::sync::mpsc::Receiver;
use tracing::error;

use crate::http::RuesEvent;

pub(crate) struct NodeEventStreamer {
pub node_receiver: Receiver<NodeEvent>,
pub node_receiver: Receiver<ChainEvent>,
#[allow(dead_code)]
pub rues_sender: broadcast::Sender<RuesEvent>,
}
Expand Down

0 comments on commit fab8b26

Please sign in to comment.