diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index 1b691c013..f8cba2f32 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -157,7 +157,7 @@ impl Config { .store(local_mode, std::sync::atomic::Ordering::SeqCst); } - fn node_mode() -> OperationMode { + pub fn node_mode() -> OperationMode { if Self::conf() .local_mode .load(std::sync::atomic::Ordering::SeqCst) diff --git a/crates/core/src/node/in_memory_impl.rs b/crates/core/src/node/in_memory_impl.rs index 745805d1b..607f6e3c1 100644 --- a/crates/core/src/node/in_memory_impl.rs +++ b/crates/core/src/node/in_memory_impl.rs @@ -32,7 +32,7 @@ pub(super) struct NodeInMemory { gateways: Vec, notification_channel: EventLoopNotifications, conn_manager: MemoryConnManager, - event_listener: Box, + event_register: Box, is_gateway: bool, _executor_listener: ExecutorToEventLoopChannel, /// Span to use for this node for tracing purposes @@ -41,13 +41,12 @@ pub(super) struct NodeInMemory { impl NodeInMemory { /// Buils an in-memory node. Does nothing upon construction, - pub async fn build( + pub async fn build( builder: NodeBuilder<1>, - event_listener: EL, + event_register: ER, ch_builder: String, add_noise: bool, ) -> Result { - let event_listener = Box::new(event_listener); let peer_key = PeerKey::from(builder.local_key.public()); let gateways = builder.get_gateways()?; let is_gateway = builder.local_ip.zip(builder.local_port).is_some(); @@ -55,11 +54,12 @@ impl NodeInMemory { let (notification_channel, notification_tx) = EventLoopNotifications::channel(); let (ops_ch_channel, ch_channel) = contract::contract_handler_channel(); - let op_storage = Arc::new(OpManager::new::<1, EL>( + let op_storage = Arc::new(OpManager::new( notification_tx, ops_ch_channel, &builder, &gateways, + event_register.clone(), )?); let (_executor_listener, executor_sender) = executor_channel(op_storage.clone()); let contract_handler = @@ -67,9 +67,10 @@ impl NodeInMemory { .await .map_err(|e| anyhow::anyhow!(e))?; + let event_register = Box::new(event_register); let conn_manager = MemoryConnManager::new( peer_key, - event_listener.trait_clone(), + event_register.trait_clone(), op_storage.clone(), add_noise, ); @@ -86,7 +87,7 @@ impl NodeInMemory { op_storage, gateways, notification_channel, - event_listener, + event_register, is_gateway, _executor_listener, parent_span, @@ -223,7 +224,7 @@ impl NodeInMemory { NodeEvent::ShutdownNode => break Ok(()), NodeEvent::DropConnection(peer) => { tracing::info!("Dropping connection to {peer}"); - self.event_listener + self.event_register .register_events(Either::Left(NetEventLog::disconnected(&peer))); self.op_storage.ring.prune_connection(peer); continue; @@ -239,7 +240,7 @@ impl NodeInMemory { &self.op_storage, None, None, - &mut *self.event_listener as &mut _, + &mut *self.event_register as &mut _, ) .await; continue; @@ -248,7 +249,7 @@ impl NodeInMemory { let op_storage = self.op_storage.clone(); let conn_manager = self.conn_manager.clone(); - let event_listener = self.event_listener.trait_clone(); + let event_listener = self.event_register.trait_clone(); let parent_span = tracing::Span::current(); let span = tracing::info_span!( diff --git a/crates/core/src/node/network_event_log.rs b/crates/core/src/node/network_event_log.rs index c9320c7a6..637d5b733 100644 --- a/crates/core/src/node/network_event_log.rs +++ b/crates/core/src/node/network_event_log.rs @@ -48,6 +48,7 @@ pub(crate) trait NetEventRegister: std::any::Any + Send + Sync + 'static { { self as _ } + fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()>; } #[cfg(feature = "trace-ot")] @@ -77,6 +78,15 @@ impl NetEventRegister for CombinedRegister { fn trait_clone(&self) -> Box { Box::new(self.clone()) } + + fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()> { + async move { + for reg in &mut self.0 { + reg.notify_of_time_out(tx); + } + } + .boxed() + } } #[cfg(feature = "trace-ot")] @@ -626,18 +636,28 @@ impl NetEventRegister for EventRegister { fn trait_clone(&self) -> Box { Box::new(self.clone()) } + + fn notify_of_time_out(&mut self, _: Transaction) -> BoxFuture<()> { + async {}.boxed() + } } #[cfg(feature = "trace-ot")] mod opentelemetry_tracer { use std::{collections::HashMap, time::Duration}; - use opentelemetry::{global, trace}; + use opentelemetry::{ + global, + trace::{self, Span}, + }; + + use crate::local_node::OperationMode; use super::*; struct OTSpan { inner: global::BoxedSpan, + last_log: SystemTime, } impl OTSpan { @@ -663,17 +683,19 @@ mod opentelemetry_tracer { trace_id: Some(trace::TraceId::from_bytes(tx_bytes)), ..Default::default() }); - OTSpan { inner } + OTSpan { + inner, + last_log: SystemTime::now(), + } } fn add_log(&mut self, log: &NetLogMessage) { // NOTE: if we need to add some standard attributes in the future take a look at // https://docs.rs/opentelemetry-semantic-conventions/latest/opentelemetry_semantic_conventions/ - use trace::Span; + let ts = + SystemTime::UNIX_EPOCH + Duration::from_millis(log.datetime.timestamp() as u64); + self.last_log = ts; if let Some(log_vals) = >>::from(log) { - let ts = - SystemTime::UNIX_EPOCH + Duration::from_millis(log.datetime.timestamp() as u64); - self.inner.add_event_with_timestamp( log.tx.transaction_type().description(), ts, @@ -681,6 +703,9 @@ mod opentelemetry_tracer { ); } } + fn finish(&mut self) { + self.inner.end_with_timestamp(self.last_log); + } } impl trace::Span for OTSpan { @@ -716,20 +741,28 @@ mod opentelemetry_tracer { #[derive(Clone)] pub(in crate::node) struct OTEventRegister { log_sender: mpsc::Sender, + finished_tx_notifier: mpsc::Sender, } impl OTEventRegister { pub fn new() -> Self { + let (sender, finished_tx_notifier) = mpsc::channel(100); let (log_sender, log_recv) = mpsc::channel(1000); NEW_RECORDS_TS.get_or_init(SystemTime::now); - GlobalExecutor::spawn(Self::record_logs(log_recv)); - Self { log_sender } + GlobalExecutor::spawn(Self::record_logs(log_recv, finished_tx_notifier)); + Self { + log_sender, + finished_tx_notifier: sender, + } } - async fn record_logs(mut log_recv: mpsc::Receiver) { - use trace::Span; - let mut logs: HashMap<_, OTSpan> = HashMap::new(); - while let Some(log) = log_recv.recv().await { + async fn record_logs( + mut log_recv: mpsc::Receiver, + mut finished_tx_notifier: mpsc::Receiver, + ) { + let mut logs = HashMap::new(); + + fn process_log(logs: &mut HashMap, log: NetLogMessage) { let span_completed = log.span_completed(); match logs.entry(log.tx) { std::collections::hash_map::Entry::Occupied(mut val) => { @@ -739,7 +772,7 @@ mod opentelemetry_tracer { } if span_completed { let (_, mut completed_span) = val.remove_entry(); - completed_span.end_with_timestamp(SystemTime::now()) + completed_span.finish(); } } std::collections::hash_map::Entry::Vacant(empty) => { @@ -752,6 +785,31 @@ mod opentelemetry_tracer { } } } + + fn cleanup_timed_out(logs: &mut HashMap, tx: Transaction) { + if let Some(mut span) = logs.remove(&tx) { + span.finish(); + } + } + + loop { + tokio::select! { + log_msg = log_recv.recv() => { + if let Some(log) = log_msg { + process_log(&mut logs, log); + } else { + break; + } + } + finished_tx = finished_tx_notifier.recv() => { + if let Some(tx) = finished_tx { + cleanup_timed_out(&mut logs, tx); + } else { + break; + } + } + } + } } } @@ -771,6 +829,15 @@ mod opentelemetry_tracer { fn trait_clone(&self) -> Box { Box::new(self.clone()) } + + fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()> { + async move { + if matches!(crate::config::Config::node_mode(), OperationMode::Network) { + let _ = self.finished_tx_notifier.send(tx).await; + } + } + .boxed() + } } } @@ -1097,6 +1164,10 @@ pub(super) mod test { fn trait_clone(&self) -> Box { Box::new(self.clone()) } + + fn notify_of_time_out(&mut self, _: Transaction) -> BoxFuture<()> { + async {}.boxed() + } } #[test] diff --git a/crates/core/src/node/op_state_manager.rs b/crates/core/src/node/op_state_manager.rs index e655d1e99..47f24358c 100644 --- a/crates/core/src/node/op_state_manager.rs +++ b/crates/core/src/node/op_state_manager.rs @@ -59,20 +59,26 @@ pub(crate) struct OpManager { } impl OpManager { - pub(super) fn new( + pub(super) fn new( notification_channel: EventLoopNotificationsSender, contract_handler: ContractHandlerChannel, builder: &NodeBuilder, gateways: &[PeerKeyLocation], + event_register: ER, ) -> Result { - let ring = Ring::new::(builder, gateways, notification_channel.clone())?; + let ring = Ring::new::(builder, gateways, notification_channel.clone())?; let ops = Arc::new(Ops::default()); let (new_transactions, rx) = tokio::sync::mpsc::channel(100); let parent_span = tracing::Span::current(); GlobalExecutor::spawn( - garbage_cleanup_task(rx, ops.clone(), ring.live_tx_tracker.clone()) - .instrument(tracing::info_span!(parent: parent_span, "garbage_cleanup_task")), + garbage_cleanup_task( + rx, + ops.clone(), + ring.live_tx_tracker.clone(), + event_register, + ) + .instrument(tracing::info_span!(parent: parent_span, "garbage_cleanup_task")), ); Ok(Self { @@ -214,10 +220,11 @@ impl OpManager { } } -async fn garbage_cleanup_task( +async fn garbage_cleanup_task( mut new_transactions: tokio::sync::mpsc::Receiver, ops: Arc, live_tx_tracker: LiveTransactionTracker, + mut event_register: ER, ) { const CLEANUP_INTERVAL: Duration = Duration::from_secs(5); let mut tick = tokio::time::interval(CLEANUP_INTERVAL); @@ -225,11 +232,16 @@ async fn garbage_cleanup_task( let mut ttl_set = BTreeSet::new(); - let remove_old = move |ttl_set: &mut BTreeSet>, - delayed: &mut Vec| { + let mut remove_old = move |ttl_set: &mut BTreeSet>, + delayed: &mut Vec| { let mut old_missing = std::mem::replace(delayed, Vec::with_capacity(200)); for tx in old_missing.drain(..) { - if ops.completed.remove(&tx).is_some() { + if let Some(tx) = ops.completed.remove(&tx) { + if cfg!(feature = "trace-ot") { + event_register.notify_of_time_out(tx); + } else { + _ = tx; + } continue; } let still_waiting = match tx.transaction_type() { @@ -258,7 +270,12 @@ async fn garbage_cleanup_task( delayed.push(tx); continue; } - if ops.completed.remove(&tx).is_some() { + if let Some(tx) = ops.completed.remove(&tx) { + if cfg!(feature = "trace-ot") { + event_register.notify_of_time_out(tx); + } else { + _ = tx; + } continue; } let removed = match tx.transaction_type() { diff --git a/crates/core/src/node/p2p_impl.rs b/crates/core/src/node/p2p_impl.rs index ce21d2b47..8f5d0fa06 100644 --- a/crates/core/src/node/p2p_impl.rs +++ b/crates/core/src/node/p2p_impl.rs @@ -72,14 +72,14 @@ impl NodeP2P { .await } - pub(crate) async fn build( + pub(crate) async fn build( builder: NodeBuilder, - event_listener: EL, + event_register: ER, ch_builder: CH::Builder, ) -> Result where CH: ContractHandler + Send + 'static, - EL: NetEventRegister + Clone, + ER: NetEventRegister + Clone, { let peer_key = PeerKey::from(builder.local_key.public()); let gateways = builder.get_gateways()?; @@ -88,11 +88,12 @@ impl NodeP2P { let (ch_outbound, ch_inbound) = contract::contract_handler_channel(); let (client_responses, cli_response_sender) = contract::ClientResponses::channel(); - let op_storage = Arc::new(OpManager::new::( + let op_storage = Arc::new(OpManager::new( notification_tx, ch_outbound, &builder, &gateways, + event_register.clone(), )?); let (executor_listener, executor_sender) = contract::executor_channel(op_storage.clone()); let contract_handler = CH::build(ch_inbound, executor_sender, ch_builder) @@ -101,7 +102,7 @@ impl NodeP2P { let conn_manager = { let transport = Self::config_transport(&builder.local_key)?; - P2pConnManager::build(transport, &builder, op_storage.clone(), event_listener)? + P2pConnManager::build(transport, &builder, op_storage.clone(), event_register)? }; let parent_span = tracing::Span::current(); diff --git a/crates/core/src/ring.rs b/crates/core/src/ring.rs index d877d2730..f91fff0e8 100644 --- a/crates/core/src/ring.rs +++ b/crates/core/src/ring.rs @@ -311,6 +311,7 @@ impl Ring { interval.tick().await; loop { interval.tick().await; + // fixme let history = if std::any::type_name::() == std::any::type_name::() { EventRegister::get_router_events(10_000) .await