diff --git a/node-wasm/src/node.rs b/node-wasm/src/node.rs index cd8736aff..52f6a361d 100644 --- a/node-wasm/src/node.rs +++ b/node-wasm/src/node.rs @@ -266,7 +266,7 @@ impl NodeDriver { let response = self.client.exec(command).await?; let header = response.into_last_seen_network_head().check_variant()?; - Ok(header) + header.into() } /// Get the latest locally synced header. diff --git a/node-wasm/src/worker.rs b/node-wasm/src/worker.rs index f96c9fb58..3202a3538 100644 --- a/node-wasm/src/worker.rs +++ b/node-wasm/src/worker.rs @@ -164,9 +164,11 @@ impl NodeWorker { .context("could not serialise fetched headers") } - async fn get_last_seen_network_head(&mut self) -> JsValue { - // JS interface returns `undefined`, if node haven't received any headers from HeaderSub yet - to_value(&self.node.get_network_head_header()).unwrap_or(JsValue::UNDEFINED) + async fn get_last_seen_network_head(&mut self) -> Result { + match self.node.get_network_head_header().await? { + Some(header) => to_value(&header).context("could not serialise head header"), + None => Ok(JsValue::UNDEFINED), + } } async fn get_sampling_metadata(&mut self, height: u64) -> Result> { @@ -222,7 +224,7 @@ impl NodeWorker { .into(), ), NodeCommand::LastSeenNetworkHead => { - WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await) + WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await.into()) } NodeCommand::GetSamplingMetadata { height } => { WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await) diff --git a/node-wasm/src/worker/commands.rs b/node-wasm/src/worker/commands.rs index f35ef8d2c..bf662a3d0 100644 --- a/node-wasm/src/worker/commands.rs +++ b/node-wasm/src/worker/commands.rs @@ -77,8 +77,7 @@ pub(crate) enum WorkerResponse { Listeners(Result>), Header(JsResult), Headers(JsResult), - #[serde(with = "serde_wasm_bindgen::preserve")] - LastSeenNetworkHead(JsValue), + LastSeenNetworkHead(JsResult), SamplingMetadata(Result>), WorkerClosed(()), } diff --git a/node/src/node.rs b/node/src/node.rs index 0ce5a1be8..b28eaef6a 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -300,8 +300,8 @@ where } /// Get the latest header announced in the network. - pub fn get_network_head_header(&self) -> Option { - self.p2p.header_sub_watcher().borrow().clone() + pub async fn get_network_head_header(&self) -> Result> { + Ok(self.p2p.get_network_head().await?) } /// Get the latest locally synced header. diff --git a/node/src/p2p.rs b/node/src/p2p.rs index c9b1a2237..b59d7a98f 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -27,7 +27,7 @@ use celestia_types::nmt::Namespace; use celestia_types::row::Row; use celestia_types::sample::Sample; use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash}; -use celestia_types::{ExtendedHeader, FraudProof, Height}; +use celestia_types::{ExtendedHeader, FraudProof}; use cid::Cid; use futures::StreamExt; use libp2p::{ @@ -165,7 +165,6 @@ impl From for P2pError { #[derive(Debug)] pub struct P2p { cmd_tx: mpsc::Sender, - header_sub_watcher: watch::Receiver>, peer_tracker_info_watcher: watch::Receiver, local_peer_id: PeerId, } @@ -209,6 +208,8 @@ pub(crate) enum P2pCmd { }, InitHeaderSub { head: Box, + /// Any valid headers received by header-sub will be send to this channel. + channel: mpsc::Sender, }, SetPeerTrust { peer_id: PeerId, @@ -221,6 +222,9 @@ pub(crate) enum P2pCmd { GetNetworkCompromisedToken { respond_to: oneshot::Sender, }, + GetNetworkHead { + respond_to: oneshot::Sender>, + }, } impl P2p { @@ -234,13 +238,11 @@ impl P2p { let local_peer_id = PeerId::from(args.local_keypair.public()); - let (cmd_tx, cmd_rx) = mpsc::channel(16); - let (header_sub_tx, header_sub_rx) = watch::channel(None); - let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone())); let peer_tracker_info_watcher = peer_tracker.info_watcher(); - let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?; + let (cmd_tx, cmd_rx) = mpsc::channel(16); + let mut worker = Worker::new(args, cmd_rx, peer_tracker)?; spawn(async move { worker.run().await; @@ -248,7 +250,6 @@ impl P2p { Ok(P2p { cmd_tx, - header_sub_watcher: header_sub_rx, peer_tracker_info_watcher, local_peer_id, }) @@ -258,12 +259,10 @@ impl P2p { #[cfg(any(test, feature = "test-utils"))] pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) { let (cmd_tx, cmd_rx) = mpsc::channel(16); - let (header_sub_tx, header_sub_rx) = watch::channel(None); let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default()); let p2p = P2p { cmd_tx: cmd_tx.clone(), - header_sub_watcher: header_sub_rx, peer_tracker_info_watcher: peer_tracker_rx, local_peer_id: PeerId::random(), }; @@ -271,7 +270,7 @@ impl P2p { let handle = crate::test_utils::MockP2pHandle { cmd_tx, cmd_rx, - header_sub_tx, + header_sub_tx: None, peer_tracker_tx, }; @@ -296,11 +295,6 @@ impl P2p { .map_err(|_| P2pError::WorkerDied) } - /// Watcher for the latest verified network head headers announced on `header-sub`. - pub fn header_sub_watcher(&self) -> watch::Receiver> { - self.header_sub_watcher.clone() - } - /// Watcher for the current [`PeerTrackerInfo`]. pub fn peer_tracker_info_watcher(&self) -> watch::Receiver { self.peer_tracker_info_watcher.clone() @@ -312,9 +306,14 @@ impl P2p { } /// Initializes `header-sub` protocol with a given `subjective_head`. - pub async fn init_header_sub(&self, head: ExtendedHeader) -> Result<()> { + pub async fn init_header_sub( + &self, + head: ExtendedHeader, + channel: mpsc::Sender, + ) -> Result<()> { self.send_command(P2pCmd::InitHeaderSub { head: Box::new(head), + channel, }) .await } @@ -545,6 +544,15 @@ impl P2p { Ok(rx.await?) } + + pub async fn get_network_head(&self) -> Result> { + let (tx, rx) = oneshot::channel(); + + self.send_command(P2pCmd::GetNetworkHead { respond_to: tx }) + .await?; + + Ok(rx.await?) + } } /// Our network behaviour. @@ -573,12 +581,17 @@ where bad_encoding_fraud_sub_topic: TopicHash, cmd_rx: mpsc::Receiver, peer_tracker: Arc, - header_sub_watcher: watch::Sender>, + header_sub_state: Option, bitswap_queries: HashMap, P2pError>>, network_compromised_token: CancellationToken, store: Arc, } +struct HeaderSubState { + known_head: ExtendedHeader, + channel: mpsc::Sender, +} + impl Worker where B: Blockstore, @@ -587,7 +600,6 @@ where fn new( args: P2pArgs, cmd_rx: mpsc::Receiver, - header_sub_watcher: watch::Sender>, peer_tracker: Arc, ) -> Result { let local_peer_id = PeerId::from(args.local_keypair.public()); @@ -649,7 +661,7 @@ where bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(), header_sub_topic_hash: header_sub_topic.hash(), peer_tracker, - header_sub_watcher, + header_sub_state: None, bitswap_queries: HashMap::new(), network_compromised_token: CancellationToken::new(), store: args.store, @@ -771,8 +783,8 @@ where P2pCmd::ConnectedPeers { respond_to } => { respond_to.maybe_send(self.peer_tracker.connected_peers()); } - P2pCmd::InitHeaderSub { head } => { - self.on_init_header_sub(*head); + P2pCmd::InitHeaderSub { head, channel } => { + self.on_init_header_sub(*head, channel); } P2pCmd::SetPeerTrust { peer_id, @@ -786,7 +798,14 @@ where self.on_get_shwap_cid(cid, respond_to); } P2pCmd::GetNetworkCompromisedToken { respond_to } => { - respond_to.maybe_send(self.network_compromised_token.child_token()) + respond_to.maybe_send(self.network_compromised_token.child_token()); + } + P2pCmd::GetNetworkHead { respond_to } => { + let head = self + .header_sub_state + .as_ref() + .map(|state| state.known_head.clone()); + respond_to.maybe_send(head); } } @@ -961,8 +980,11 @@ where } #[instrument(skip_all, fields(header = %head))] - fn on_init_header_sub(&mut self, head: ExtendedHeader) { - self.header_sub_watcher.send_replace(Some(head)); + fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender) { + self.header_sub_state = Some(HeaderSubState { + known_head: head, + channel, + }); trace!("HeaderSub initialized"); } @@ -975,27 +997,22 @@ where trace!("Received header from header-sub ({header})"); - let updated = self.header_sub_watcher.send_if_modified(move |state| { - let Some(known_header) = state else { - debug!("HeaderSub not initialized yet"); - return false; - }; + let Some(ref mut state) = self.header_sub_state else { + debug!("header-sub not initialized yet"); + return gossipsub::MessageAcceptance::Ignore; + }; - if known_header.verify(&header).is_err() { - trace!("Failed to verify HeaderSub header. Ignoring {header}"); - return false; - } + if state.known_head.verify(&header).is_err() { + trace!("Failed to verify HeaderSub header. Ignoring {header}"); + return gossipsub::MessageAcceptance::Ignore; + } - debug!("New header from header-sub ({header})"); - *state = Some(header); - true - }); + trace!("New header from header-sub ({header})"); - if updated { - gossipsub::MessageAcceptance::Accept - } else { - gossipsub::MessageAcceptance::Ignore - } + state.known_head = header.clone(); + let _ = state.channel.try_send(header); + + gossipsub::MessageAcceptance::Accept } #[instrument(skip_all)] @@ -1011,15 +1028,15 @@ where }; let height = befp.height().value(); - let current_height = - if let Some(network_height) = network_head_height(&self.header_sub_watcher) { - network_height.value() - } else if let Ok(local_head) = self.store.get_head().await { - local_head.height().value() - } else { - // we aren't tracking the network and have uninitialized store - return gossipsub::MessageAcceptance::Ignore; - }; + + let current_height = if let Some(ref header_sub_state) = self.header_sub_state { + header_sub_state.known_head.height().value() + } else if let Ok(local_head) = self.store.get_head().await { + local_head.height().value() + } else { + // we aren't tracking the network and have uninitialized store + return gossipsub::MessageAcceptance::Ignore; + }; if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD { // does this threshold make any sense if we're gonna ignore it anyway @@ -1156,7 +1173,3 @@ where .client_set_send_dont_have(false) .build()) } - -fn network_head_height(watcher: &watch::Sender>) -> Option { - watcher.borrow().as_ref().map(|header| header.height()) -} diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 55a3e1778..fc12e5a53 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -9,8 +9,10 @@ //! headers announced on the `header-sub` p2p protocol to keep the `subjective_head` as close //! to the `network_head` as possible. +use std::future::poll_fn; use std::marker::PhantomData; use std::sync::Arc; +use std::task::Poll; use std::time::Duration; use backoff::backoff::Backoff; @@ -20,7 +22,7 @@ use celestia_types::ExtendedHeader; use futures::FutureExt; use serde::{Deserialize, Serialize}; use tokio::select; -use tokio::sync::{mpsc, oneshot, watch}; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; use web_time::Instant; @@ -191,7 +193,7 @@ where event_pub: EventPublisher, p2p: Arc, store: Arc, - header_sub_watcher: watch::Receiver>, + header_sub_rx: Option>, subjective_head_height: Option, batch_size: u64, ongoing_batch: Option, @@ -212,15 +214,13 @@ where cancellation_token: CancellationToken, cmd_rx: mpsc::Receiver, ) -> Result { - let header_sub_watcher = args.p2p.header_sub_watcher(); - Ok(Worker { cancellation_token, cmd_rx, event_pub: args.event_pub, p2p: args.p2p, store: args.store, - header_sub_watcher, + header_sub_rx: None, subjective_head_height: None, batch_size: args.batch_size, ongoing_batch: None, @@ -274,7 +274,10 @@ where info!("Setting initial subjective head to {network_head_height}"); self.set_subjective_head_height(network_head_height); - self.p2p.init_header_sub(network_head).await?; + + let (header_sub_tx, header_sub_rx) = mpsc::channel(16); + self.p2p.init_header_sub(network_head, header_sub_tx).await?; + self.header_sub_rx = Some(header_sub_rx); self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished { height: network_head_height, @@ -326,8 +329,9 @@ where _ = report_interval.tick() => { self.report().await?; } - _ = self.header_sub_watcher.changed() => { - self.on_header_sub_message().await?; + res = header_sub_recv(&mut self.header_sub_rx) => { + let header = res?; + self.on_header_sub_message(header).await?; self.fetch_next_batch(&headers_tx).await?; } Some(cmd) = self.cmd_rx.recv() => { @@ -345,6 +349,8 @@ where ongoing.cancellation_token.cancel(); } + self.header_sub_rx.take(); + Ok(()) } @@ -428,18 +434,7 @@ where } #[instrument(skip_all)] - async fn on_header_sub_message(&mut self) -> Result<()> { - // If subjective head isn't set, do nothing. - // We do this to avoid some edge cases. - if self.subjective_head_height.is_none() { - return Ok(()); - } - - let Some(new_head) = self.header_sub_watcher.borrow().to_owned() else { - // Nothing to do - return Ok(()); - }; - + async fn on_header_sub_message(&mut self, new_head: ExtendedHeader) -> Result<()> { let new_head_height = new_head.height().value(); self.set_subjective_head_height(new_head_height); @@ -618,12 +613,28 @@ where let network_head = p2p.get_head_header().await?; - // Insert HEAD to the store and initialize header-sub + // Insert HEAD to the store and initialize header-sub. + // This will apply insertion restrictions. store.insert(network_head.clone()).await?; Ok(network_head) } +async fn header_sub_recv( + rx: &mut Option>, +) -> Result { + poll_fn(|cx| { + let rx = rx.as_mut().expect("header-sub not initialized"); + + match rx.poll_recv(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(header)) => Poll::Ready(Ok(header)), + Poll::Ready(None) => Poll::Ready(Err(SyncerError::P2p(P2pError::WorkerDied))), + } + }) + .await +} + #[cfg(test)] mod tests { use std::ops::RangeInclusive; diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index 8c1122a1d..c02d71151 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -80,7 +80,7 @@ pub struct MockP2pHandle { #[allow(dead_code)] pub(crate) cmd_tx: mpsc::Sender, pub(crate) cmd_rx: mpsc::Receiver, - pub(crate) header_sub_tx: watch::Sender>, + pub(crate) header_sub_tx: Option>, pub(crate) peer_tracker_tx: watch::Sender, } @@ -110,7 +110,9 @@ impl MockP2pHandle { /// Simulate a new header announced in the network. pub fn announce_new_head(&self, header: ExtendedHeader) { - self.header_sub_tx.send_replace(Some(header)); + if let Some(ref tx) = self.header_sub_tx { + let _ = tx.try_send(header); + } } /// Assert that a command was sent to the [`P2p`] worker. @@ -194,7 +196,10 @@ impl MockP2pHandle { /// [`P2p`]: crate::p2p::P2p pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader { match self.expect_cmd().await { - P2pCmd::InitHeaderSub { head } => *head, + P2pCmd::InitHeaderSub { head, channel } => { + self.header_sub_tx = Some(channel); + *head + } cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"), } } diff --git a/node/tests/utils/mod.rs b/node/tests/utils/mod.rs index cf39a3b08..8de91af81 100644 --- a/node/tests/utils/mod.rs +++ b/node/tests/utils/mod.rs @@ -44,7 +44,7 @@ pub async fn new_connected_node() -> Node { // Wait until node reaches height 3 loop { - if let Some(head) = node.get_network_head_header() { + if let Some(head) = node.get_network_head_header().await.unwrap() { if head.height().value() >= 3 { break; }