Skip to content

Commit

Permalink
Guarantee spans are finished on cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Nov 12, 2023
1 parent 6445584 commit a50a779
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 38 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Config {
.store(local_mode, std::sync::atomic::Ordering::SeqCst);
}

fn node_mode() -> OperationMode {
pub fn node_mode() -> OperationMode {
if Self::conf()
.local_mode
.load(std::sync::atomic::Ordering::SeqCst)
Expand Down
21 changes: 11 additions & 10 deletions crates/core/src/node/in_memory_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub(super) struct NodeInMemory {
gateways: Vec<PeerKeyLocation>,
notification_channel: EventLoopNotifications,
conn_manager: MemoryConnManager,
event_listener: Box<dyn NetEventRegister>,
event_register: Box<dyn NetEventRegister>,
is_gateway: bool,
_executor_listener: ExecutorToEventLoopChannel<NetworkEventListenerHalve>,
/// Span to use for this node for tracing purposes
Expand All @@ -41,35 +41,36 @@ pub(super) struct NodeInMemory {

impl NodeInMemory {
/// Buils an in-memory node. Does nothing upon construction,
pub async fn build<EL: NetEventRegister>(
pub async fn build<ER: NetEventRegister + Clone>(
builder: NodeBuilder<1>,
event_listener: EL,
event_register: ER,
ch_builder: String,
add_noise: bool,
) -> Result<NodeInMemory, anyhow::Error> {
let event_listener = Box::new(event_listener);
let peer_key = PeerKey::from(builder.local_key.public());
let gateways = builder.get_gateways()?;
let is_gateway = builder.local_ip.zip(builder.local_port).is_some();

let (notification_channel, notification_tx) = EventLoopNotifications::channel();
let (ops_ch_channel, ch_channel) = contract::contract_handler_channel();

let op_storage = Arc::new(OpManager::new::<1, EL>(
let op_storage = Arc::new(OpManager::new(
notification_tx,
ops_ch_channel,
&builder,
&gateways,
event_register.clone(),
)?);
let (_executor_listener, executor_sender) = executor_channel(op_storage.clone());
let contract_handler =
MemoryContractHandler::build(ch_channel, executor_sender, ch_builder)
.await
.map_err(|e| anyhow::anyhow!(e))?;

let event_register = Box::new(event_register);
let conn_manager = MemoryConnManager::new(
peer_key,
event_listener.trait_clone(),
event_register.trait_clone(),
op_storage.clone(),
add_noise,
);
Expand All @@ -86,7 +87,7 @@ impl NodeInMemory {
op_storage,
gateways,
notification_channel,
event_listener,
event_register,
is_gateway,
_executor_listener,
parent_span,
Expand Down Expand Up @@ -223,7 +224,7 @@ impl NodeInMemory {
NodeEvent::ShutdownNode => break Ok(()),
NodeEvent::DropConnection(peer) => {
tracing::info!("Dropping connection to {peer}");
self.event_listener
self.event_register
.register_events(Either::Left(NetEventLog::disconnected(&peer)));
self.op_storage.ring.prune_connection(peer);
continue;
Expand All @@ -239,7 +240,7 @@ impl NodeInMemory {
&self.op_storage,
None,
None,
&mut *self.event_listener as &mut _,
&mut *self.event_register as &mut _,
)
.await;
continue;
Expand All @@ -248,7 +249,7 @@ impl NodeInMemory {

let op_storage = self.op_storage.clone();
let conn_manager = self.conn_manager.clone();
let event_listener = self.event_listener.trait_clone();
let event_listener = self.event_register.trait_clone();

let parent_span = tracing::Span::current();
let span = tracing::info_span!(
Expand Down
97 changes: 84 additions & 13 deletions crates/core/src/node/network_event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub(crate) trait NetEventRegister: std::any::Any + Send + Sync + 'static {
{
self as _
}
fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()>;
}

#[cfg(feature = "trace-ot")]
Expand Down Expand Up @@ -77,6 +78,15 @@ impl<const N: usize> NetEventRegister for CombinedRegister<N> {
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}

fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()> {
async move {
for reg in &mut self.0 {
reg.notify_of_time_out(tx);
}
}
.boxed()
}
}

#[cfg(feature = "trace-ot")]
Expand Down Expand Up @@ -626,18 +636,28 @@ impl NetEventRegister for EventRegister {
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}

fn notify_of_time_out(&mut self, _: Transaction) -> BoxFuture<()> {
async {}.boxed()
}
}

#[cfg(feature = "trace-ot")]
mod opentelemetry_tracer {
use std::{collections::HashMap, time::Duration};

use opentelemetry::{global, trace};
use opentelemetry::{
global,
trace::{self, Span},
};

use crate::local_node::OperationMode;

use super::*;

struct OTSpan {
inner: global::BoxedSpan,
last_log: SystemTime,
}

impl OTSpan {
Expand All @@ -663,24 +683,29 @@ mod opentelemetry_tracer {
trace_id: Some(trace::TraceId::from_bytes(tx_bytes)),
..Default::default()
});
OTSpan { inner }
OTSpan {
inner,
last_log: SystemTime::now(),
}
}

fn add_log(&mut self, log: &NetLogMessage) {
// NOTE: if we need to add some standard attributes in the future take a look at
// https://docs.rs/opentelemetry-semantic-conventions/latest/opentelemetry_semantic_conventions/
use trace::Span;
let ts =
SystemTime::UNIX_EPOCH + Duration::from_millis(log.datetime.timestamp() as u64);
self.last_log = ts;
if let Some(log_vals) = <Option<Vec<_>>>::from(log) {
let ts =
SystemTime::UNIX_EPOCH + Duration::from_millis(log.datetime.timestamp() as u64);

self.inner.add_event_with_timestamp(
log.tx.transaction_type().description(),
ts,
log_vals,
);
}
}
fn finish(&mut self) {
self.inner.end_with_timestamp(self.last_log);
}
}

impl trace::Span for OTSpan {
Expand Down Expand Up @@ -716,20 +741,28 @@ mod opentelemetry_tracer {
#[derive(Clone)]
pub(in crate::node) struct OTEventRegister {
log_sender: mpsc::Sender<NetLogMessage>,
finished_tx_notifier: mpsc::Sender<Transaction>,
}

impl OTEventRegister {
pub fn new() -> Self {
let (sender, finished_tx_notifier) = mpsc::channel(100);
let (log_sender, log_recv) = mpsc::channel(1000);
NEW_RECORDS_TS.get_or_init(SystemTime::now);
GlobalExecutor::spawn(Self::record_logs(log_recv));
Self { log_sender }
GlobalExecutor::spawn(Self::record_logs(log_recv, finished_tx_notifier));
Self {
log_sender,
finished_tx_notifier: sender,
}
}

async fn record_logs(mut log_recv: mpsc::Receiver<NetLogMessage>) {
use trace::Span;
let mut logs: HashMap<_, OTSpan> = HashMap::new();
while let Some(log) = log_recv.recv().await {
async fn record_logs(
mut log_recv: mpsc::Receiver<NetLogMessage>,
mut finished_tx_notifier: mpsc::Receiver<Transaction>,
) {
let mut logs = HashMap::new();

fn process_log(logs: &mut HashMap<Transaction, OTSpan>, log: NetLogMessage) {
let span_completed = log.span_completed();
match logs.entry(log.tx) {
std::collections::hash_map::Entry::Occupied(mut val) => {
Expand All @@ -739,7 +772,7 @@ mod opentelemetry_tracer {
}
if span_completed {
let (_, mut completed_span) = val.remove_entry();
completed_span.end_with_timestamp(SystemTime::now())
completed_span.finish();
}
}
std::collections::hash_map::Entry::Vacant(empty) => {
Expand All @@ -752,6 +785,31 @@ mod opentelemetry_tracer {
}
}
}

fn cleanup_timed_out(logs: &mut HashMap<Transaction, OTSpan>, tx: Transaction) {
if let Some(mut span) = logs.remove(&tx) {
span.finish();
}
}

loop {
tokio::select! {
log_msg = log_recv.recv() => {
if let Some(log) = log_msg {
process_log(&mut logs, log);
} else {
break;
}
}
finished_tx = finished_tx_notifier.recv() => {
if let Some(tx) = finished_tx {
cleanup_timed_out(&mut logs, tx);
} else {
break;
}
}
}
}
}
}

Expand All @@ -771,6 +829,15 @@ mod opentelemetry_tracer {
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}

fn notify_of_time_out(&mut self, tx: Transaction) -> BoxFuture<()> {
async move {
if matches!(crate::config::Config::node_mode(), OperationMode::Network) {
let _ = self.finished_tx_notifier.send(tx).await;
}
}
.boxed()
}
}
}

Expand Down Expand Up @@ -1097,6 +1164,10 @@ pub(super) mod test {
fn trait_clone(&self) -> Box<dyn NetEventRegister> {
Box::new(self.clone())
}

fn notify_of_time_out(&mut self, _: Transaction) -> BoxFuture<()> {
async {}.boxed()
}
}

#[test]
Expand Down
35 changes: 26 additions & 9 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,26 @@ pub(crate) struct OpManager {
}

impl OpManager {
pub(super) fn new<const CLIENTS: usize, EL: NetEventRegister>(
pub(super) fn new<const CLIENTS: usize, ER: NetEventRegister>(
notification_channel: EventLoopNotificationsSender,
contract_handler: ContractHandlerChannel<SenderHalve>,
builder: &NodeBuilder<CLIENTS>,
gateways: &[PeerKeyLocation],
event_register: ER,
) -> Result<Self, anyhow::Error> {
let ring = Ring::new::<CLIENTS, EL>(builder, gateways, notification_channel.clone())?;
let ring = Ring::new::<CLIENTS, ER>(builder, gateways, notification_channel.clone())?;
let ops = Arc::new(Ops::default());

let (new_transactions, rx) = tokio::sync::mpsc::channel(100);
let parent_span = tracing::Span::current();
GlobalExecutor::spawn(
garbage_cleanup_task(rx, ops.clone(), ring.live_tx_tracker.clone())
.instrument(tracing::info_span!(parent: parent_span, "garbage_cleanup_task")),
garbage_cleanup_task(
rx,
ops.clone(),
ring.live_tx_tracker.clone(),
event_register,
)
.instrument(tracing::info_span!(parent: parent_span, "garbage_cleanup_task")),
);

Ok(Self {
Expand Down Expand Up @@ -214,22 +220,28 @@ impl OpManager {
}
}

async fn garbage_cleanup_task(
async fn garbage_cleanup_task<ER: NetEventRegister>(
mut new_transactions: tokio::sync::mpsc::Receiver<Transaction>,
ops: Arc<Ops>,
live_tx_tracker: LiveTransactionTracker,
mut event_register: ER,
) {
const CLEANUP_INTERVAL: Duration = Duration::from_secs(5);
let mut tick = tokio::time::interval(CLEANUP_INTERVAL);
tick.tick().await;

let mut ttl_set = BTreeSet::new();

let remove_old = move |ttl_set: &mut BTreeSet<Reverse<Transaction>>,
delayed: &mut Vec<Transaction>| {
let mut remove_old = move |ttl_set: &mut BTreeSet<Reverse<Transaction>>,
delayed: &mut Vec<Transaction>| {
let mut old_missing = std::mem::replace(delayed, Vec::with_capacity(200));
for tx in old_missing.drain(..) {
if ops.completed.remove(&tx).is_some() {
if let Some(tx) = ops.completed.remove(&tx) {
if cfg!(feature = "trace-ot") {
event_register.notify_of_time_out(tx);
} else {
_ = tx;
}
continue;
}
let still_waiting = match tx.transaction_type() {
Expand Down Expand Up @@ -258,7 +270,12 @@ async fn garbage_cleanup_task(
delayed.push(tx);
continue;
}
if ops.completed.remove(&tx).is_some() {
if let Some(tx) = ops.completed.remove(&tx) {
if cfg!(feature = "trace-ot") {
event_register.notify_of_time_out(tx);
} else {
_ = tx;
}
continue;
}
let removed = match tx.transaction_type() {
Expand Down
Loading

0 comments on commit a50a779

Please sign in to comment.