Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(node)!: Do not skip header-sub reports when store writes are slow #333

Merged
merged 7 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl NodeDriver {
let response = self.client.exec(command).await?;
let header = response.into_last_seen_network_head().check_variant()?;

Ok(header)
header.into()
}

/// Get the latest locally synced header.
Expand Down
10 changes: 6 additions & 4 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,11 @@ impl NodeWorker {
.context("could not serialise fetched headers")
}

async fn get_last_seen_network_head(&mut self) -> JsValue {
// JS interface returns `undefined`, if node haven't received any headers from HeaderSub yet
to_value(&self.node.get_network_head_header()).unwrap_or(JsValue::UNDEFINED)
async fn get_last_seen_network_head(&mut self) -> Result<JsValue> {
match self.node.get_network_head_header().await? {
Some(header) => to_value(&header).context("could not serialise head header"),
None => Ok(JsValue::UNDEFINED),
}
}

async fn get_sampling_metadata(&mut self, height: u64) -> Result<Option<SamplingMetadata>> {
Expand Down Expand Up @@ -222,7 +224,7 @@ impl NodeWorker {
.into(),
),
NodeCommand::LastSeenNetworkHead => {
WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await)
WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await.into())
}
NodeCommand::GetSamplingMetadata { height } => {
WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await)
Expand Down
3 changes: 1 addition & 2 deletions node-wasm/src/worker/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ pub(crate) enum WorkerResponse {
Listeners(Result<Vec<Multiaddr>>),
Header(JsResult<JsValue, Error>),
Headers(JsResult<Array, Error>),
#[serde(with = "serde_wasm_bindgen::preserve")]
LastSeenNetworkHead(JsValue),
LastSeenNetworkHead(JsResult<JsValue, Error>),
SamplingMetadata(Result<Option<SamplingMetadata>>),
WorkerClosed(()),
}
Expand Down
4 changes: 2 additions & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ where
}

/// Get the latest header announced in the network.
pub fn get_network_head_header(&self) -> Option<ExtendedHeader> {
self.p2p.header_sub_watcher().borrow().clone()
pub async fn get_network_head_header(&self) -> Result<Option<ExtendedHeader>> {
Ok(self.p2p.get_network_head().await?)
}

/// Get the latest locally synced header.
Expand Down
130 changes: 73 additions & 57 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use celestia_types::nmt::Namespace;
use celestia_types::row::Row;
use celestia_types::sample::Sample;
use celestia_types::{fraud_proof::BadEncodingFraudProof, hash::Hash};
use celestia_types::{ExtendedHeader, FraudProof, Height};
use celestia_types::{ExtendedHeader, FraudProof};
use cid::Cid;
use futures::StreamExt;
use libp2p::{
Expand Down Expand Up @@ -165,7 +165,6 @@ impl From<oneshot::error::RecvError> for P2pError {
#[derive(Debug)]
pub struct P2p {
cmd_tx: mpsc::Sender<P2pCmd>,
header_sub_watcher: watch::Receiver<Option<ExtendedHeader>>,
peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
local_peer_id: PeerId,
}
Expand Down Expand Up @@ -209,6 +208,8 @@ pub(crate) enum P2pCmd {
},
InitHeaderSub {
head: Box<ExtendedHeader>,
/// Any valid headers received by header-sub will be send to this channel.
channel: mpsc::Sender<ExtendedHeader>,
},
SetPeerTrust {
peer_id: PeerId,
Expand All @@ -221,6 +222,9 @@ pub(crate) enum P2pCmd {
GetNetworkCompromisedToken {
respond_to: oneshot::Sender<CancellationToken>,
},
GetNetworkHead {
respond_to: oneshot::Sender<Option<ExtendedHeader>>,
},
}

impl P2p {
Expand All @@ -234,21 +238,18 @@ impl P2p {

let local_peer_id = PeerId::from(args.local_keypair.public());

let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (header_sub_tx, header_sub_rx) = watch::channel(None);

let peer_tracker = Arc::new(PeerTracker::new(args.event_pub.clone()));
let peer_tracker_info_watcher = peer_tracker.info_watcher();

let mut worker = Worker::new(args, cmd_rx, header_sub_tx, peer_tracker)?;
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let mut worker = Worker::new(args, cmd_rx, peer_tracker)?;

spawn(async move {
worker.run().await;
});

Ok(P2p {
cmd_tx,
header_sub_watcher: header_sub_rx,
peer_tracker_info_watcher,
local_peer_id,
})
Expand All @@ -258,20 +259,18 @@ impl P2p {
#[cfg(any(test, feature = "test-utils"))]
pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (header_sub_tx, header_sub_rx) = watch::channel(None);
let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());

let p2p = P2p {
cmd_tx: cmd_tx.clone(),
header_sub_watcher: header_sub_rx,
peer_tracker_info_watcher: peer_tracker_rx,
local_peer_id: PeerId::random(),
};

let handle = crate::test_utils::MockP2pHandle {
cmd_tx,
cmd_rx,
header_sub_tx,
header_sub_tx: None,
peer_tracker_tx,
};

Expand All @@ -296,11 +295,6 @@ impl P2p {
.map_err(|_| P2pError::WorkerDied)
}

/// Watcher for the latest verified network head headers announced on `header-sub`.
pub fn header_sub_watcher(&self) -> watch::Receiver<Option<ExtendedHeader>> {
self.header_sub_watcher.clone()
}

/// Watcher for the current [`PeerTrackerInfo`].
pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
self.peer_tracker_info_watcher.clone()
Expand All @@ -312,9 +306,14 @@ impl P2p {
}

/// Initializes `header-sub` protocol with a given `subjective_head`.
pub async fn init_header_sub(&self, head: ExtendedHeader) -> Result<()> {
pub async fn init_header_sub(
&self,
head: ExtendedHeader,
channel: mpsc::Sender<ExtendedHeader>,
) -> Result<()> {
self.send_command(P2pCmd::InitHeaderSub {
head: Box::new(head),
channel,
})
.await
}
Expand Down Expand Up @@ -545,6 +544,16 @@ impl P2p {

Ok(rx.await?)
}

/// Get the latest header announced in the network.
oblique marked this conversation as resolved.
Show resolved Hide resolved
pub async fn get_network_head(&self) -> Result<Option<ExtendedHeader>> {
let (tx, rx) = oneshot::channel();

self.send_command(P2pCmd::GetNetworkHead { respond_to: tx })
.await?;

Ok(rx.await?)
}
}

/// Our network behaviour.
Expand Down Expand Up @@ -573,12 +582,17 @@ where
bad_encoding_fraud_sub_topic: TopicHash,
cmd_rx: mpsc::Receiver<P2pCmd>,
peer_tracker: Arc<PeerTracker>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
header_sub_state: Option<HeaderSubState>,
bitswap_queries: HashMap<beetswap::QueryId, OneshotResultSender<Vec<u8>, P2pError>>,
network_compromised_token: CancellationToken,
store: Arc<S>,
}

struct HeaderSubState {
known_head: ExtendedHeader,
channel: mpsc::Sender<ExtendedHeader>,
}

impl<B, S> Worker<B, S>
where
B: Blockstore,
Expand All @@ -587,7 +601,6 @@ where
fn new(
args: P2pArgs<B, S>,
cmd_rx: mpsc::Receiver<P2pCmd>,
header_sub_watcher: watch::Sender<Option<ExtendedHeader>>,
peer_tracker: Arc<PeerTracker>,
) -> Result<Self, P2pError> {
let local_peer_id = PeerId::from(args.local_keypair.public());
Expand Down Expand Up @@ -649,7 +662,7 @@ where
bad_encoding_fraud_sub_topic: bad_encoding_fraud_sub_topic.hash(),
header_sub_topic_hash: header_sub_topic.hash(),
peer_tracker,
header_sub_watcher,
header_sub_state: None,
bitswap_queries: HashMap::new(),
network_compromised_token: CancellationToken::new(),
store: args.store,
Expand Down Expand Up @@ -771,8 +784,8 @@ where
P2pCmd::ConnectedPeers { respond_to } => {
respond_to.maybe_send(self.peer_tracker.connected_peers());
}
P2pCmd::InitHeaderSub { head } => {
self.on_init_header_sub(*head);
P2pCmd::InitHeaderSub { head, channel } => {
self.on_init_header_sub(*head, channel);
}
P2pCmd::SetPeerTrust {
peer_id,
Expand All @@ -786,7 +799,14 @@ where
self.on_get_shwap_cid(cid, respond_to);
}
P2pCmd::GetNetworkCompromisedToken { respond_to } => {
respond_to.maybe_send(self.network_compromised_token.child_token())
respond_to.maybe_send(self.network_compromised_token.child_token());
}
P2pCmd::GetNetworkHead { respond_to } => {
let head = self
.header_sub_state
.as_ref()
.map(|state| state.known_head.clone());
respond_to.maybe_send(head);
}
}

Expand Down Expand Up @@ -836,7 +856,7 @@ where
};

let acceptance = if message.topic == self.header_sub_topic_hash {
self.on_header_sub_message(&message.data[..]).await
self.on_header_sub_message(&message.data[..])
} else if message.topic == self.bad_encoding_fraud_sub_topic {
self.on_bad_encoding_fraud_sub_message(&message.data[..], &peer)
.await
Expand Down Expand Up @@ -961,41 +981,41 @@ where
}

#[instrument(skip_all, fields(header = %head))]
fn on_init_header_sub(&mut self, head: ExtendedHeader) {
self.header_sub_watcher.send_replace(Some(head));
fn on_init_header_sub(&mut self, head: ExtendedHeader, channel: mpsc::Sender<ExtendedHeader>) {
self.header_sub_state = Some(HeaderSubState {
known_head: head,
channel,
});
trace!("HeaderSub initialized");
}

#[instrument(skip_all)]
async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance {
let Ok(header) = ExtendedHeader::decode_and_validate(data) else {
trace!("Malformed or invalid header from header-sub");
return gossipsub::MessageAcceptance::Reject;
};

trace!("Received header from header-sub ({header})");

let updated = self.header_sub_watcher.send_if_modified(move |state| {
let Some(known_header) = state else {
debug!("HeaderSub not initialized yet");
return false;
};
let Some(ref mut state) = self.header_sub_state else {
debug!("header-sub not initialized yet");
return gossipsub::MessageAcceptance::Ignore;
};

if known_header.verify(&header).is_err() {
trace!("Failed to verify HeaderSub header. Ignoring {header}");
return false;
}
if state.known_head.verify(&header).is_err() {
trace!("Failed to verify HeaderSub header. Ignoring {header}");
return gossipsub::MessageAcceptance::Ignore;
}

debug!("New header from header-sub ({header})");
*state = Some(header);
true
});
trace!("New header from header-sub ({header})");

if updated {
gossipsub::MessageAcceptance::Accept
} else {
gossipsub::MessageAcceptance::Ignore
}
state.known_head = header.clone();
// We intentionally do not `send().await` to avoid blocking `P2p`
// in case `Syncer` enters some weird state.
let _ = state.channel.try_send(header);

gossipsub::MessageAcceptance::Accept
}

#[instrument(skip_all)]
Expand All @@ -1011,15 +1031,15 @@ where
};

let height = befp.height().value();
let current_height =
if let Some(network_height) = network_head_height(&self.header_sub_watcher) {
network_height.value()
} else if let Ok(local_head) = self.store.get_head().await {
local_head.height().value()
} else {
// we aren't tracking the network and have uninitialized store
return gossipsub::MessageAcceptance::Ignore;
};

let current_height = if let Some(ref header_sub_state) = self.header_sub_state {
header_sub_state.known_head.height().value()
} else if let Ok(local_head) = self.store.get_head().await {
local_head.height().value()
} else {
// we aren't tracking the network and have uninitialized store
return gossipsub::MessageAcceptance::Ignore;
};

if height > current_height + FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD {
// does this threshold make any sense if we're gonna ignore it anyway
Expand Down Expand Up @@ -1156,7 +1176,3 @@ where
.client_set_send_dont_have(false)
.build())
}

fn network_head_height(watcher: &watch::Sender<Option<ExtendedHeader>>) -> Option<Height> {
watcher.borrow().as_ref().map(|header| header.height())
}
Loading
Loading