Skip to content

Commit

Permalink
Avoid potential deadlocks
Browse files Browse the repository at this point in the history
make child stdout being always processed
  • Loading branch information
iduartgomez committed Nov 21, 2023
1 parent 1e88a70 commit 4ba6954
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 110 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ pub(crate) mod test {
async {
loop {
if self.signal.changed().await.is_ok() {
let (ev_id, pk) = *self.signal.borrow_and_update();
let (ev_id, pk) = *self.signal.borrow();
if self.rng.is_some() && pk == self.id {
let res = OpenRequest {
client_id: ClientId::FIRST,
Expand Down
23 changes: 3 additions & 20 deletions crates/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,18 +171,6 @@ impl Config {
.store(local_mode, std::sync::atomic::Ordering::SeqCst);
}

#[cfg(feature = "trace-ot")]
fn node_mode() -> OperationMode {
if Self::conf()
.local_mode
.load(std::sync::atomic::Ordering::SeqCst)
{
OperationMode::Local
} else {
OperationMode::Network
}
}

pub fn db_dir(&self) -> PathBuf {
if self.local_mode.load(std::sync::atomic::Ordering::SeqCst) {
self.config_paths.db_dir.join("local")
Expand Down Expand Up @@ -421,16 +409,11 @@ mod tracer {

#[cfg(feature = "trace-ot")]
{
use super::*;
let disabled_ot_traces = std::env::var("FREENET_DISABLE_TRACES").is_ok();
// FIXME
let identifier = if matches!(Config::node_mode(), OperationMode::Local) {
"freenet-core".to_string()
let identifier = if let Ok(peer) = std::env::var("FREENET_PEER_ID") {
format!("freenet-core-{peer}")
} else {
format!(
"freenet-core-{peer}",
peer = Config::conf().local_peer_keypair.public().to_peer_id()
)
"freenet-core".to_string()
};
let tracing_ot_layer = {
// Connect the Jaeger OT tracer with the tracing middleware
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/contract/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,6 @@ impl Executor<crate::contract::MockRuntime> {
let db_path = data_dir.join("db");
std::fs::create_dir_all(&db_path).expect("directory created");
let log_file = data_dir.join("_EVENT_LOG_LOCAL");
eprintln!("{log_file:?}");
crate::config::Config::set_event_log(log_file);
let state_store =
StateStore::new(Storage::new(Some(&db_path)).await?, u16::MAX as u32).unwrap();
Expand Down
21 changes: 12 additions & 9 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,16 +406,20 @@ async fn process_open_request(request: OpenRequest<'static>, op_storage: Arc<OpM
key: _key,
data: _delta,
} => {
todo!()
// FIXME: DO THIS
tracing::debug!(
this_peer = %op_storage.ring.peer_key,
"Received update from user event",
);
}
ContractRequest::Get {
key,
fetch_contract: contract,
} => {
// Initialize a get op.
tracing::debug!(
"Received get from user event @ {}",
&op_storage.ring.peer_key
this_peer = %op_storage.ring.peer_key,
"Received get from user event",
);
let op = get::start_op(key, contract);
if let Err(err) = get::request_get(&op_storage, op, Some(client_id)).await {
Expand Down Expand Up @@ -479,19 +483,18 @@ async fn process_open_request(request: OpenRequest<'static>, op_storage: Arc<OpM
}
}
};
GlobalExecutor::spawn(fut.instrument(tracing::span!(
tracing::Level::INFO,
"process_client_request"
)));
GlobalExecutor::spawn(fut.instrument(
tracing::info_span!(parent: tracing::Span::current(), "process_client_request"),
));
}

#[allow(unused)]
macro_rules! log_handling_msg {
($op:expr, $id:expr, $op_storage:ident) => {
tracing::debug!(
tx = %$id,
concat!("Handling ", $op, " request @ {}"),
$op_storage.ring.peer_key,
this_peer = %$op_storage.ring.peer_key,
concat!("Handling ", $op, " request"),
);
};
}
Expand Down
26 changes: 18 additions & 8 deletions crates/core/src/node/network_bridge/inter_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::{Arc, OnceLock};

use futures::{future::BoxFuture, FutureExt};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{AsyncReadExt, AsyncWriteExt, BufWriter, Stdout},
sync::{
watch::{Receiver, Sender},
Mutex,
Expand All @@ -23,7 +23,7 @@ static INCOMING_DATA: OnceLock<Sender<Data>> = OnceLock::new();
#[derive(Clone)]
pub struct InterProcessConnManager {
recv: Receiver<Data>,
output: Arc<Mutex<tokio::io::Stdout>>,
output: Arc<Mutex<BufWriter<Stdout>>>,
}

impl InterProcessConnManager {
Expand All @@ -32,7 +32,7 @@ impl InterProcessConnManager {
INCOMING_DATA.set(sender).expect("shouldn't be set");
Self {
recv,
output: Arc::new(Mutex::new(tokio::io::stdout())),
output: Arc::new(Mutex::new(BufWriter::new(tokio::io::stdout()))),
}
}

Expand All @@ -42,16 +42,23 @@ impl InterProcessConnManager {

pub async fn pull_msg(
stdout: &mut tokio::process::ChildStdout,
) -> std::io::Result<(PeerId, Data)> {
) -> std::io::Result<Option<(PeerId, Data)>> {
let mut msg_len = [0u8; 4];
stdout.read_exact(&mut msg_len).await?;
let Ok(read_res) = tokio::time::timeout(
std::time::Duration::from_millis(100),
stdout.read_exact(&mut msg_len),
)
.await
else {
return Ok(None);
};
read_res?;
let msg_len = u32::from_le_bytes(msg_len) as usize;
let buf = &mut vec![0u8; msg_len];
stdout.read_exact(buf).await?;
let (target, data) = bincode::deserialize(buf)
.map_err(|_| std::io::Error::from(std::io::ErrorKind::Other))?;
tracing::debug!(%target, "network message received");
Ok((target, data))
Ok(Some((target, data)))
}
}

Expand All @@ -62,7 +69,7 @@ impl NetworkBridgeExt for InterProcessConnManager {
.changed()
.await
.map_err(|_| ConnectionError::Timeout)?;
let data = &*self.recv.borrow_and_update();
let data = &*self.recv.borrow();
let deser = bincode::deserialize(data)?;
Ok(deser)
}
Expand All @@ -73,10 +80,13 @@ impl NetworkBridgeExt for InterProcessConnManager {
#[async_trait::async_trait]
impl NetworkBridge for InterProcessConnManager {
async fn send(&self, target: &PeerId, msg: NetMessage) -> super::ConnResult<()> {
tracing::debug!(%target, ?msg, "sending network message out");
let data = bincode::serialize(&(*target, msg))?;
let output = &mut *self.output.lock().await;
output.write_all(&(data.len() as u32).to_le_bytes()).await?;
output.write_all(&data).await?;
output.flush().await?;
tracing::debug!(%target, bytes = data.len(), "sent network message out");
Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/node/network_event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,8 @@ impl EventRegister {
Ok(())
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await; // wait for the node to start
tokio::time::sleep(std::time::Duration::from_millis(200)).await; // wait for the node to start
let event_log_path = crate::config::Config::conf().event_log();
tracing::info!(?event_log_path);
let mut event_log = match OpenOptions::new()
.write(true)
.read(true)
Expand Down
9 changes: 7 additions & 2 deletions crates/core/src/node/op_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,20 @@ impl OpManager {
let ops = Arc::new(Ops::default());

let (new_transactions, rx) = tokio::sync::mpsc::channel(100);
let parent_span = tracing::Span::current();
let current_span = tracing::Span::current();
let garbage_span = if current_span.is_none() {
tracing::info_span!("garbage_cleanup_task")
} else {
tracing::info_span!(parent: current_span, "garbage_cleanup_task")
};
GlobalExecutor::spawn(
garbage_cleanup_task(
rx,
ops.clone(),
ring.live_tx_tracker.clone(),
event_register,
)
.instrument(tracing::info_span!(parent: parent_span, "garbage_cleanup_task")),
.instrument(garbage_span),
);

Ok(Self {
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/node/testing_impl/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ impl<ER> Builder<ER> {
let (notification_channel, notification_tx) = EventLoopNotifications::channel();
let (ops_ch_channel, ch_channel) = contract::contract_handler_channel();

let _guard = parent_span.enter();
let op_storage = Arc::new(OpManager::new(
notification_tx,
ops_ch_channel,
&self.config,
&gateways,
self.event_register.clone(),
)?);
std::mem::drop(_guard);
let (_executor_listener, executor_sender) = executor_channel(op_storage.clone());
let contract_handler =
MemoryContractHandler::build(ch_channel, executor_sender, self.contract_handler_name)
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/node/testing_impl/inter_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl SimPeer {

GlobalExecutor::spawn(
contract::contract_handling(contract_handler)
.instrument(tracing::info_span!("contract_handling")),
.instrument(tracing::info_span!("contract_handling", peer = %self.config.peer_id)),
);

let running_node = super::RunnerConfig {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl Operation for PutOp {

let key = contract.key();
tracing::debug!(
"Rquesting put for contract {} from {} to {}",
"Requesting put for contract {} from {} to {}",
key,
sender.peer,
target.peer
Expand Down
11 changes: 8 additions & 3 deletions crates/core/src/ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,16 @@ impl Ring {
}

let ring = Arc::new(ring);
let parent_span = tracing::Span::current();
let current_span = tracing::Span::current();
let span = if current_span.is_none() {
tracing::info_span!("connection_maintenance")
} else {
tracing::info_span!(parent: current_span, "connection_maintenance")
};
GlobalExecutor::spawn(
ring.clone()
.connection_maintenance(event_loop_notifier, live_tx_tracker, missing_candidate_rx)
.instrument(tracing::info_span!(parent: parent_span, "connection_maintenance")),
.instrument(span),
);
Ok(ring)
}
Expand Down Expand Up @@ -696,7 +701,7 @@ impl Ring {
match loc {
Ok(loc) => loc,
Err(_) => {
tracing::debug!(peer = %self.own_location(), "Insufficient data gathered by the topology manager");
tracing::trace!(peer = %self.own_location(), "Insufficient data gathered by the topology manager");
acquire_max_connections.tick().await;
continue;
}
Expand Down
Loading

0 comments on commit 4ba6954

Please sign in to comment.