Skip to content

Commit

Permalink
Improve connect trace + fix ts compute in logs
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Nov 13, 2023
1 parent a50a779 commit 53cf472
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 28 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Config {
.store(local_mode, std::sync::atomic::Ordering::SeqCst);
}

pub fn node_mode() -> OperationMode {
fn node_mode() -> OperationMode {
if Self::conf()
.local_mode
.load(std::sync::atomic::Ordering::SeqCst)
Expand Down
125 changes: 100 additions & 25 deletions crates/core/src/node/network_event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,27 @@ impl<'a> NetEventLog<'a> {
EventKind::Ignored
}
}
Message::Connect(connect::ConnectMsg::Response {
msg:
connect::ConnectResponse::Proxy {
accepted_by,
joiner,
},
..
}) => {
let this_peer = op_storage.ring.own_location();
if accepted_by.contains(&this_peer) {
EventKind::Connect(ConnectEvent::Connected {
this: this_peer,
connected: PeerKeyLocation {
peer: *joiner,
location: None,
},
})
} else {
EventKind::Ignored
}
}
_ => EventKind::Ignored,
};
Either::Left(NetEventLog {
Expand Down Expand Up @@ -296,11 +317,11 @@ impl NetLogMessage {
/// In case of isolated events where the span is not being tracked it should return true.
fn span_completed(&self) -> bool {
match &self.kind {
// EventKind::Connect(ConnectEvent::)
EventKind::Connect(ConnectEvent::Finished { .. }) => true,
EventKind::Connect(_) => false,
EventKind::Put(PutEvent::PutSuccess { .. }) => true,
EventKind::Put(_) => false,
_ => true,
_ => false,
}
}
}
Expand Down Expand Up @@ -644,15 +665,16 @@ impl NetEventRegister for EventRegister {

#[cfg(feature = "trace-ot")]
mod opentelemetry_tracer {
use std::{collections::HashMap, time::Duration};
#[cfg(not(test))]
use std::collections::HashMap;
use std::time::Duration;

use dashmap::DashMap;
use opentelemetry::{
global,
trace::{self, Span},
};

use crate::local_node::OperationMode;

use super::*;

struct OTSpan {
Expand All @@ -676,9 +698,10 @@ mod opentelemetry_tracer {
let tx_bytes = transaction.as_bytes();
let mut span_id = [0; 8];
span_id.copy_from_slice(&tx_bytes[8..]);
let start_time = transaction.started();
let inner = tracer.build(trace::SpanBuilder {
name: transaction.transaction_type().description().into(),
start_time: Some(transaction.started()),
start_time: Some(start_time),
span_id: Some(trace::SpanId::from_bytes(span_id)),
trace_id: Some(trace::TraceId::from_bytes(tx_bytes)),
..Default::default()
Expand All @@ -692,8 +715,11 @@ mod opentelemetry_tracer {
fn add_log(&mut self, log: &NetLogMessage) {
// NOTE: if we need to add some standard attributes in the future take a look at
// https://docs.rs/opentelemetry-semantic-conventions/latest/opentelemetry_semantic_conventions/
let ts =
SystemTime::UNIX_EPOCH + Duration::from_millis(log.datetime.timestamp() as u64);
let ts = SystemTime::UNIX_EPOCH
+ Duration::from_nanos(
((log.datetime.timestamp() * 1_000_000_000)
+ log.datetime.timestamp_subsec_nanos() as i64) as u64,
);
self.last_log = ts;
if let Some(log_vals) = <Option<Vec<_>>>::from(log) {
self.inner.add_event_with_timestamp(
Expand All @@ -703,7 +729,10 @@ mod opentelemetry_tracer {
);
}
}
fn finish(&mut self) {
}

impl Drop for OTSpan {
fn drop(&mut self) {
self.inner.end_with_timestamp(self.last_log);
}
}
Expand Down Expand Up @@ -744,8 +773,15 @@ mod opentelemetry_tracer {
finished_tx_notifier: mpsc::Sender<Transaction>,
}

/// For tests running in a single process is importart that span tracking is global across threads and simulated peers.
static UNIQUE_REGISTER: std::sync::OnceLock<DashMap<Transaction, OTSpan>> =
std::sync::OnceLock::new();

impl OTEventRegister {
pub fn new() -> Self {
if cfg!(test) {
UNIQUE_REGISTER.get_or_init(DashMap::new);
}
let (sender, finished_tx_notifier) = mpsc::channel(100);
let (log_sender, log_recv) = mpsc::channel(1000);
NEW_RECORDS_TS.get_or_init(SystemTime::now);
Expand All @@ -760,8 +796,10 @@ mod opentelemetry_tracer {
mut log_recv: mpsc::Receiver<NetLogMessage>,
mut finished_tx_notifier: mpsc::Receiver<Transaction>,
) {
#[cfg(not(test))]
let mut logs = HashMap::new();

#[cfg(not(test))]
fn process_log(logs: &mut HashMap<Transaction, OTSpan>, log: NetLogMessage) {
let span_completed = log.span_completed();
match logs.entry(log.tx) {
Expand All @@ -771,8 +809,7 @@ mod opentelemetry_tracer {
span.add_log(&log);
}
if span_completed {
let (_, mut completed_span) = val.remove_entry();
completed_span.finish();
let (_, _span) = val.remove_entry();
}
}
std::collections::hash_map::Entry::Vacant(empty) => {
Expand All @@ -786,24 +823,66 @@ mod opentelemetry_tracer {
}
}

fn cleanup_timed_out(logs: &mut HashMap<Transaction, OTSpan>, tx: Transaction) {
if let Some(mut span) = logs.remove(&tx) {
span.finish();
#[cfg(test)]
fn process_log(logs: &DashMap<Transaction, OTSpan>, log: NetLogMessage) {
let span_completed = log.span_completed();
match logs.entry(log.tx) {
dashmap::mapref::entry::Entry::Occupied(mut val) => {
{
let span = val.get_mut();
span.add_log(&log);
}
if span_completed {
let (_, _span) = val.remove_entry();
}
}
dashmap::mapref::entry::Entry::Vacant(empty) => {
let mut span = empty.insert(OTSpan::new(log.tx));
// does not make much sense to treat a single isolated event as a span,
// so just ignore those in case they were to happen
if !span_completed {
span.add_log(&log);
}
}
}
}

#[cfg(not(test))]
fn cleanup_timed_out(logs: &mut HashMap<Transaction, OTSpan>, tx: Transaction) {
if let Some(_span) = logs.remove(&tx) {}
}

#[cfg(test)]
fn cleanup_timed_out(logs: &DashMap<Transaction, OTSpan>, tx: Transaction) {
if let Some((_, _span)) = logs.remove(&tx) {}
}

loop {
tokio::select! {
log_msg = log_recv.recv() => {
if let Some(log) = log_msg {
process_log(&mut logs, log);
#[cfg(not(test))]
{
process_log(&mut logs, log);
}
#[cfg(test)]
{
process_log(UNIQUE_REGISTER.get().expect("should be set"), log);
}
} else {
break;
}
}
finished_tx = finished_tx_notifier.recv() => {
if let Some(tx) = finished_tx {
cleanup_timed_out(&mut logs, tx);
#[cfg(not(test))]
{
cleanup_timed_out(&mut logs, tx);
}
#[cfg(test)]
{
cleanup_timed_out(UNIQUE_REGISTER.get().expect("should be set"), tx);
}
} else {
break;
}
Expand Down Expand Up @@ -832,7 +911,7 @@ mod opentelemetry_tracer {

fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()> {
async move {
if matches!(crate::config::Config::node_mode(), OperationMode::Network) {
if cfg!(test) {
let _ = self.finished_tx_notifier.send(tx).await;
}
}
Expand Down Expand Up @@ -1107,14 +1186,10 @@ pub(super) mod test {
.iter()
.flat_map(|dcs| dcs.iter())
.any(|dc| dc > &l.datetime);
if this.peer == peer && !disconnected {
return Some((
connected.peer,
connected
.location
.expect("set location")
.distance(this.location.unwrap()),
));
if let Some((this_loc, conn_loc)) = this.location.zip(connected.location) {
if this.peer == peer && !disconnected {
return Some((connected.peer, conn_loc.distance(this_loc)));
}
}
}
None
Expand Down
17 changes: 15 additions & 2 deletions crates/core/src/operations/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ impl Operation for ConnectOp {
target: *joiner,
msg: ConnectResponse::Proxy {
accepted_by: HashSet::new(),
joiner: joiner.peer,
},
});
new_state = None;
Expand Down Expand Up @@ -352,6 +353,7 @@ impl Operation for ConnectOp {
sender,
&own_loc,
accepted_by,
joiner.peer,
);
new_state = state;
return_msg = msg;
Expand Down Expand Up @@ -470,7 +472,11 @@ impl Operation for ConnectOp {
id,
target,
sender,
msg: ConnectResponse::Proxy { accepted_by },
msg:
ConnectResponse::Proxy {
accepted_by,
joiner,
},
} => {
tracing::debug!(tx = %id, at = %target.peer, "Received proxy connect response");
match self.state {
Expand Down Expand Up @@ -524,6 +530,7 @@ impl Operation for ConnectOp {
sender: *target,
msg: ConnectResponse::Proxy {
accepted_by: previously_accepted,
joiner: *joiner,
},
});
}
Expand All @@ -535,6 +542,7 @@ impl Operation for ConnectOp {
sender: *target,
msg: ConnectResponse::Proxy {
accepted_by: accepted_by.clone(),
joiner: joiner.peer,
},
});
new_state = None;
Expand Down Expand Up @@ -684,6 +692,7 @@ fn try_returning_proxy_connection(
sender: &PeerKeyLocation,
own_loc: &PeerKeyLocation,
accepted_by: HashSet<PeerKeyLocation>,
joiner: PeerKey,
) -> (Option<ConnectState>, Option<ConnectMsg>) {
let new_state = if accepted_by.contains(own_loc) {
tracing::debug!(
Expand All @@ -698,7 +707,10 @@ fn try_returning_proxy_connection(
None
};
let return_msg = Some(ConnectMsg::Response {
msg: ConnectResponse::Proxy { accepted_by },
msg: ConnectResponse::Proxy {
accepted_by,
joiner,
},
sender: *own_loc,
id: *id,
target: *sender,
Expand Down Expand Up @@ -1121,6 +1133,7 @@ mod messages {
},
Proxy {
accepted_by: HashSet<PeerKeyLocation>,
joiner: PeerKey,
},
}
}
Expand Down

0 comments on commit 53cf472

Please sign in to comment.