From 53cf4721e216482b481dbeabe4685ff461a83b68 Mon Sep 17 00:00:00 2001 From: Ignacio Duart Date: Mon, 13 Nov 2023 10:03:35 +0100 Subject: [PATCH] Improve connect trace + fix ts compute in logs --- crates/core/src/config.rs | 2 +- crates/core/src/node/network_event_log.rs | 125 +++++++++++++++++----- crates/core/src/operations/connect.rs | 17 ++- 3 files changed, 116 insertions(+), 28 deletions(-) diff --git a/crates/core/src/config.rs b/crates/core/src/config.rs index f8cba2f32..1b691c013 100644 --- a/crates/core/src/config.rs +++ b/crates/core/src/config.rs @@ -157,7 +157,7 @@ impl Config { .store(local_mode, std::sync::atomic::Ordering::SeqCst); } - pub fn node_mode() -> OperationMode { + fn node_mode() -> OperationMode { if Self::conf() .local_mode .load(std::sync::atomic::Ordering::SeqCst) diff --git a/crates/core/src/node/network_event_log.rs b/crates/core/src/node/network_event_log.rs index 637d5b733..af3e09168 100644 --- a/crates/core/src/node/network_event_log.rs +++ b/crates/core/src/node/network_event_log.rs @@ -157,6 +157,27 @@ impl<'a> NetEventLog<'a> { EventKind::Ignored } } + Message::Connect(connect::ConnectMsg::Response { + msg: + connect::ConnectResponse::Proxy { + accepted_by, + joiner, + }, + .. + }) => { + let this_peer = op_storage.ring.own_location(); + if accepted_by.contains(&this_peer) { + EventKind::Connect(ConnectEvent::Connected { + this: this_peer, + connected: PeerKeyLocation { + peer: *joiner, + location: None, + }, + }) + } else { + EventKind::Ignored + } + } _ => EventKind::Ignored, }; Either::Left(NetEventLog { @@ -296,11 +317,11 @@ impl NetLogMessage { /// In case of isolated events where the span is not being tracked it should return true. fn span_completed(&self) -> bool { match &self.kind { - // EventKind::Connect(ConnectEvent::) + EventKind::Connect(ConnectEvent::Finished { .. }) => true, EventKind::Connect(_) => false, EventKind::Put(PutEvent::PutSuccess { .. }) => true, EventKind::Put(_) => false, - _ => true, + _ => false, } } } @@ -644,15 +665,16 @@ impl NetEventRegister for EventRegister { #[cfg(feature = "trace-ot")] mod opentelemetry_tracer { - use std::{collections::HashMap, time::Duration}; + #[cfg(not(test))] + use std::collections::HashMap; + use std::time::Duration; + use dashmap::DashMap; use opentelemetry::{ global, trace::{self, Span}, }; - use crate::local_node::OperationMode; - use super::*; struct OTSpan { @@ -676,9 +698,10 @@ mod opentelemetry_tracer { let tx_bytes = transaction.as_bytes(); let mut span_id = [0; 8]; span_id.copy_from_slice(&tx_bytes[8..]); + let start_time = transaction.started(); let inner = tracer.build(trace::SpanBuilder { name: transaction.transaction_type().description().into(), - start_time: Some(transaction.started()), + start_time: Some(start_time), span_id: Some(trace::SpanId::from_bytes(span_id)), trace_id: Some(trace::TraceId::from_bytes(tx_bytes)), ..Default::default() @@ -692,8 +715,11 @@ mod opentelemetry_tracer { fn add_log(&mut self, log: &NetLogMessage) { // NOTE: if we need to add some standard attributes in the future take a look at // https://docs.rs/opentelemetry-semantic-conventions/latest/opentelemetry_semantic_conventions/ - let ts = - SystemTime::UNIX_EPOCH + Duration::from_millis(log.datetime.timestamp() as u64); + let ts = SystemTime::UNIX_EPOCH + + Duration::from_nanos( + ((log.datetime.timestamp() * 1_000_000_000) + + log.datetime.timestamp_subsec_nanos() as i64) as u64, + ); self.last_log = ts; if let Some(log_vals) = >>::from(log) { self.inner.add_event_with_timestamp( @@ -703,7 +729,10 @@ mod opentelemetry_tracer { ); } } - fn finish(&mut self) { + } + + impl Drop for OTSpan { + fn drop(&mut self) { self.inner.end_with_timestamp(self.last_log); } } @@ -744,8 +773,15 @@ mod opentelemetry_tracer { finished_tx_notifier: mpsc::Sender, } + /// For tests running in a single process is importart that span tracking is global across threads and simulated peers. + static UNIQUE_REGISTER: std::sync::OnceLock> = + std::sync::OnceLock::new(); + impl OTEventRegister { pub fn new() -> Self { + if cfg!(test) { + UNIQUE_REGISTER.get_or_init(DashMap::new); + } let (sender, finished_tx_notifier) = mpsc::channel(100); let (log_sender, log_recv) = mpsc::channel(1000); NEW_RECORDS_TS.get_or_init(SystemTime::now); @@ -760,8 +796,10 @@ mod opentelemetry_tracer { mut log_recv: mpsc::Receiver, mut finished_tx_notifier: mpsc::Receiver, ) { + #[cfg(not(test))] let mut logs = HashMap::new(); + #[cfg(not(test))] fn process_log(logs: &mut HashMap, log: NetLogMessage) { let span_completed = log.span_completed(); match logs.entry(log.tx) { @@ -771,8 +809,7 @@ mod opentelemetry_tracer { span.add_log(&log); } if span_completed { - let (_, mut completed_span) = val.remove_entry(); - completed_span.finish(); + let (_, _span) = val.remove_entry(); } } std::collections::hash_map::Entry::Vacant(empty) => { @@ -786,24 +823,66 @@ mod opentelemetry_tracer { } } - fn cleanup_timed_out(logs: &mut HashMap, tx: Transaction) { - if let Some(mut span) = logs.remove(&tx) { - span.finish(); + #[cfg(test)] + fn process_log(logs: &DashMap, log: NetLogMessage) { + let span_completed = log.span_completed(); + match logs.entry(log.tx) { + dashmap::mapref::entry::Entry::Occupied(mut val) => { + { + let span = val.get_mut(); + span.add_log(&log); + } + if span_completed { + let (_, _span) = val.remove_entry(); + } + } + dashmap::mapref::entry::Entry::Vacant(empty) => { + let mut span = empty.insert(OTSpan::new(log.tx)); + // does not make much sense to treat a single isolated event as a span, + // so just ignore those in case they were to happen + if !span_completed { + span.add_log(&log); + } + } } } + #[cfg(not(test))] + fn cleanup_timed_out(logs: &mut HashMap, tx: Transaction) { + if let Some(_span) = logs.remove(&tx) {} + } + + #[cfg(test)] + fn cleanup_timed_out(logs: &DashMap, tx: Transaction) { + if let Some((_, _span)) = logs.remove(&tx) {} + } + loop { tokio::select! { log_msg = log_recv.recv() => { if let Some(log) = log_msg { - process_log(&mut logs, log); + #[cfg(not(test))] + { + process_log(&mut logs, log); + } + #[cfg(test)] + { + process_log(UNIQUE_REGISTER.get().expect("should be set"), log); + } } else { break; } } finished_tx = finished_tx_notifier.recv() => { if let Some(tx) = finished_tx { - cleanup_timed_out(&mut logs, tx); + #[cfg(not(test))] + { + cleanup_timed_out(&mut logs, tx); + } + #[cfg(test)] + { + cleanup_timed_out(UNIQUE_REGISTER.get().expect("should be set"), tx); + } } else { break; } @@ -832,7 +911,7 @@ mod opentelemetry_tracer { fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()> { async move { - if matches!(crate::config::Config::node_mode(), OperationMode::Network) { + if cfg!(test) { let _ = self.finished_tx_notifier.send(tx).await; } } @@ -1107,14 +1186,10 @@ pub(super) mod test { .iter() .flat_map(|dcs| dcs.iter()) .any(|dc| dc > &l.datetime); - if this.peer == peer && !disconnected { - return Some(( - connected.peer, - connected - .location - .expect("set location") - .distance(this.location.unwrap()), - )); + if let Some((this_loc, conn_loc)) = this.location.zip(connected.location) { + if this.peer == peer && !disconnected { + return Some((connected.peer, conn_loc.distance(this_loc))); + } } } None diff --git a/crates/core/src/operations/connect.rs b/crates/core/src/operations/connect.rs index 3c24b41c2..85c3b4dd8 100644 --- a/crates/core/src/operations/connect.rs +++ b/crates/core/src/operations/connect.rs @@ -181,6 +181,7 @@ impl Operation for ConnectOp { target: *joiner, msg: ConnectResponse::Proxy { accepted_by: HashSet::new(), + joiner: joiner.peer, }, }); new_state = None; @@ -352,6 +353,7 @@ impl Operation for ConnectOp { sender, &own_loc, accepted_by, + joiner.peer, ); new_state = state; return_msg = msg; @@ -470,7 +472,11 @@ impl Operation for ConnectOp { id, target, sender, - msg: ConnectResponse::Proxy { accepted_by }, + msg: + ConnectResponse::Proxy { + accepted_by, + joiner, + }, } => { tracing::debug!(tx = %id, at = %target.peer, "Received proxy connect response"); match self.state { @@ -524,6 +530,7 @@ impl Operation for ConnectOp { sender: *target, msg: ConnectResponse::Proxy { accepted_by: previously_accepted, + joiner: *joiner, }, }); } @@ -535,6 +542,7 @@ impl Operation for ConnectOp { sender: *target, msg: ConnectResponse::Proxy { accepted_by: accepted_by.clone(), + joiner: joiner.peer, }, }); new_state = None; @@ -684,6 +692,7 @@ fn try_returning_proxy_connection( sender: &PeerKeyLocation, own_loc: &PeerKeyLocation, accepted_by: HashSet, + joiner: PeerKey, ) -> (Option, Option) { let new_state = if accepted_by.contains(own_loc) { tracing::debug!( @@ -698,7 +707,10 @@ fn try_returning_proxy_connection( None }; let return_msg = Some(ConnectMsg::Response { - msg: ConnectResponse::Proxy { accepted_by }, + msg: ConnectResponse::Proxy { + accepted_by, + joiner, + }, sender: *own_loc, id: *id, target: *sender, @@ -1121,6 +1133,7 @@ mod messages { }, Proxy { accepted_by: HashSet, + joiner: PeerKey, }, } }