From 74123f30329166658f3e427c50897a23e845f5c6 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Mon, 1 Jul 2024 14:26:26 +0300 Subject: [PATCH] feat(node): Generate syncing related events (#312) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Yiannis Marangos Co-authored-by: Maciej Zwoliński Co-authored-by: Mikołaj Florkiewicz --- cli/src/native.rs | 7 +- cli/static/index.html | 10 +++ cli/static/run_node.js | 24 +++++++ node-wasm/src/worker.rs | 25 ++++--- node/src/events.rs | 146 +++++++++++++++++++++++++++++++++++++++- node/src/node.rs | 36 ++++++++-- node/src/syncer.rs | 112 +++++++++++++++++++++--------- 7 files changed, 308 insertions(+), 52 deletions(-) diff --git a/cli/src/native.rs b/cli/src/native.rs index 7b2a6a45..81f29b54 100644 --- a/cli/src/native.rs +++ b/cli/src/native.rs @@ -68,7 +68,7 @@ pub(crate) async fn run(args: Params) -> Result<()> { info!("Initialised store, present headers: {stored_ranges}"); } - let node = Node::new(NodeConfig { + let (_node, mut events) = Node::new_subscribed(NodeConfig { network_id, genesis_hash, p2p_local_keypair, @@ -80,14 +80,11 @@ pub(crate) async fn run(args: Params) -> Result<()> { .await .context("Failed to start node")?; - node.wait_connected_trusted().await?; - let mut events = node.event_subscriber(); - - // We have nothing else to do, but we want to keep main alive while let Ok(ev) = events.recv().await { match ev.event { // Skip noisy events NodeEvent::ShareSamplingResult { .. } => continue, + event if event.is_error() => warn!("{event}"), event => info!("{event}"), } } diff --git a/cli/static/index.html b/cli/static/index.html index 55e7cf64..bb14cb97 100644 --- a/cli/static/index.html +++ b/cli/static/index.html @@ -59,6 +59,13 @@ color: var(--fg2); } + .event-logs { + background-color: var(--bg1); + color: var(--fg1); + border: 1px solid var(--border); + font-family: var(--fonts-mono); + } + h2.status { margin: 2rem 0 1rem 0; color: var(--fg1); @@ -96,6 +103,9 @@

Bootnodes

+

Event Logs

+ +

Status

diff --git a/cli/static/run_node.js b/cli/static/run_node.js index 15d6d2f6..67fa6f64 100644 --- a/cli/static/run_node.js +++ b/cli/static/run_node.js @@ -95,11 +95,35 @@ function bind_config(data) { }); } +function log_event(event) { + // Skip noisy events + if (event.data.get("event").type == "share_sampling_result") { + return; + } + + const time = new Date(event.data.get("time")); + + const log = time.getHours().toString().padStart(2, '0') + + ":" + time.getMinutes().toString().padStart(2, '0') + + ":" + time.getSeconds().toString().padStart(2, '0') + + "." + time.getMilliseconds().toString().padStart(3, '0') + + ": " + event.data.get("message"); + + var textarea = document.getElementById("event-logs"); + textarea.value += log + "\n"; + textarea.scrollTop = textarea.scrollHeight; +} + async function main(document, window) { await init(); window.node = await new NodeClient("/js/worker.js"); + window.events = await window.node.events_channel(); + window.events.onmessage = (event) => { + log_event(event); + }; + bind_config(await fetch_config()); if (await window.node.is_running() === true) { diff --git a/node-wasm/src/worker.rs b/node-wasm/src/worker.rs index f8be9e89..f96c9fb5 100644 --- a/node-wasm/src/worker.rs +++ b/node-wasm/src/worker.rs @@ -55,7 +55,7 @@ struct NodeWorker { } impl NodeWorker { - async fn new(config: WasmNodeConfig) -> Result { + async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result { let config = config.into_node_config().await?; if let Ok(store_height) = config.store.head_height().await { @@ -64,18 +64,16 @@ impl NodeWorker { info!("Initialised new empty store"); } - let node = Node::new(config).await?; + let (node, events_sub) = Node::new_subscribed(config).await?; - let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid()); - let events_channel = BroadcastChannel::new(&events_channel_name) + let events_channel = BroadcastChannel::new(events_channel_name) .context("Failed to allocate BroadcastChannel")?; - let events_sub = node.event_subscriber(); spawn_local(event_forwarder_task(events_sub, events_channel)); Ok(Self { node, - events_channel_name, + events_channel_name: events_channel_name.to_owned(), }) } @@ -238,9 +236,10 @@ impl NodeWorker { } #[wasm_bindgen] -pub async fn run_worker(queued_events: Vec) { +pub async fn run_worker(queued_events: Vec) -> Result<()> { info!("Entered run_worker"); let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH); + let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid()); let mut message_server: Box = if SharedWorker::is_worker_type() { Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events)) @@ -265,8 +264,14 @@ pub async fn run_worker(queued_events: Vec) { NodeCommand::IsRunning => { message_server.respond_to(client_id, WorkerResponse::IsRunning(false)); } + NodeCommand::GetEventsChannelName => { + message_server.respond_to( + client_id, + WorkerResponse::EventsChannelName(events_channel_name.clone()), + ); + } NodeCommand::StartNode(config) => { - match NodeWorker::new(config).await { + match NodeWorker::new(&events_channel_name, config).await { Ok(node) => { worker = Some(node); message_server @@ -293,12 +298,15 @@ pub async fn run_worker(queued_events: Vec) { } info!("Channel to WorkerMessageServer closed, exiting the SharedWorker"); + + Ok(()) } async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) { #[derive(Serialize)] struct Event { message: String, + is_error: bool, #[serde(flatten)] info: NodeEventInfo, } @@ -306,6 +314,7 @@ async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: B while let Ok(ev) = events_sub.recv().await { let ev = Event { message: ev.event.to_string(), + is_error: ev.event.is_error(), info: ev, }; diff --git a/node/src/events.rs b/node/src/events.rs index d173a92e..664b0eca 100644 --- a/node/src/events.rs +++ b/node/src/events.rs @@ -72,6 +72,11 @@ impl EventChannel { rx: self.tx.subscribe(), } } + + /// Returns if there are any active subscribers or not. + pub fn has_subscribers(&self) -> bool { + self.tx.receiver_count() > 0 + } } impl Default for EventChannel { @@ -94,6 +99,10 @@ impl EventPublisher { file_line: location.line(), }); } + + pub(crate) fn has_subscribers(&self) -> bool { + self.tx.receiver_count() > 0 + } } impl EventSubscriber { @@ -219,6 +228,82 @@ pub enum NodeEvent { /// A human readable error. error: String, }, + + /// A new header was added from HeaderSub. + AddedHeaderFromHeaderSub { + /// The height of the header. + height: u64, + }, + + /// Fetching header of network head just started. + FetchingHeadHeaderStarted, + + /// Fetching header of network head just finished. + FetchingHeadHeaderFinished { + /// The height of the network head. + height: u64, + /// How much time fetching took. + took: Duration, + }, + + /// Fetching headers of a specific block range just started. + FetchingHeadersStarted { + /// Start of the range. + from_height: u64, + /// End of the range (included). + to_height: u64, + }, + + /// Fetching headers of a specific block range just finished. + FetchingHeadersFinished { + /// Start of the range. + from_height: u64, + /// End of the range (included). + to_height: u64, + /// How much time fetching took. + took: Duration, + }, + + /// Fetching headers of a specific block range just failed. + FetchingHeadersFailed { + /// Start of the range. + from_height: u64, + /// End of the range (included). + to_height: u64, + /// A human readable error. + error: String, + /// How much time fetching took. + took: Duration, + }, + + /// Network was compromised. + /// + /// This happens when a valid bad encoding fraud proof is received. + /// Ideally it would never happen, but protection needs to exist. + /// In case of compromised network, syncing and data sampling will + /// stop immediately. + NetworkCompromised, +} + +impl NodeEvent { + /// Returns `true` if the event indicates an error. + pub fn is_error(&self) -> bool { + match self { + NodeEvent::FatalDaserError { .. } + | NodeEvent::FetchingHeadersFailed { .. } + | NodeEvent::NetworkCompromised => true, + NodeEvent::PeerConnected { .. } + | NodeEvent::PeerDisconnected { .. } + | NodeEvent::SamplingStarted { .. } + | NodeEvent::ShareSamplingResult { .. } + | NodeEvent::SamplingFinished { .. } + | NodeEvent::AddedHeaderFromHeaderSub { .. } + | NodeEvent::FetchingHeadHeaderStarted + | NodeEvent::FetchingHeadHeaderFinished { .. } + | NodeEvent::FetchingHeadersStarted { .. } + | NodeEvent::FetchingHeadersFinished { .. } => false, + } + } } impl fmt::Display for NodeEvent { @@ -243,7 +328,7 @@ impl fmt::Display for NodeEvent { square_width, shares, } => { - write!(f, "Sampling for {height} block started. Square: {square_width}x{square_width}, Shares: {shares:?}.") + write!(f, "Sampling for {height} block started. Square: {square_width}x{square_width}, Shares: {shares:?}") } NodeEvent::ShareSamplingResult { height, @@ -255,7 +340,7 @@ impl fmt::Display for NodeEvent { let acc = if *accepted { "accepted" } else { "rejected" }; write!( f, - "Sampling for share [{row}, {column}] of {height} block was {acc}." + "Sampling for share [{row}, {column}] of {height} block was {acc}" ) } NodeEvent::SamplingFinished { @@ -266,12 +351,67 @@ impl fmt::Display for NodeEvent { let acc = if *accepted { "accepted" } else { "rejected" }; write!( f, - "Sampling for {height} block finished and {acc}. Took {took:?}." + "Sampling for {height} block finished and {acc}. Took: {took:?}" ) } NodeEvent::FatalDaserError { error } => { write!(f, "Daser stopped because of a fatal error: {error}") } + NodeEvent::AddedHeaderFromHeaderSub { height } => { + write!(f, "Added header {height} from header-sub") + } + NodeEvent::FetchingHeadHeaderStarted => { + write!(f, "Fetching header of network head block started") + } + NodeEvent::FetchingHeadHeaderFinished { height, took } => { + write!(f, "Fetching header of network head block finished. Height: {height}, Took: {took:?}") + } + NodeEvent::FetchingHeadersStarted { + from_height, + to_height, + } => { + if from_height == to_height { + write!(f, "Fetching header of {from_height} block started") + } else { + write!( + f, + "Fetching headers of {from_height}-{to_height} blocks started" + ) + } + } + NodeEvent::FetchingHeadersFinished { + from_height, + to_height, + took, + } => { + if from_height == to_height { + write!( + f, + "Fetching header of {from_height} block finished. Took: {took:?}" + ) + } else { + write!(f, "Fetching headers of {from_height}-{to_height} blocks finished. Took: {took:?}") + } + } + NodeEvent::FetchingHeadersFailed { + from_height, + to_height, + error, + took, + } => { + if from_height == to_height { + write!( + f, + "Fetching header of {from_height} block failed. Took: {took:?}, Error: {error}" + ) + } else { + write!(f, "Fetching headers of {from_height}-{to_height} blocks failed. Took: {took:?}, Error: {error}") + } + } + NodeEvent::NetworkCompromised => { + write!(f, "The network is compromised and should not be trusted. ")?; + write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.") + } } } } diff --git a/node/src/node.rs b/node/src/node.rs index 428ef354..90a4fc54 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -22,7 +22,7 @@ use tokio_util::sync::CancellationToken; use tracing::warn; use crate::daser::{Daser, DaserArgs, DaserError}; -use crate::events::{EventChannel, EventSubscriber}; +use crate::events::{EventChannel, EventSubscriber, NodeEvent}; use crate::executor::spawn; use crate::p2p::{P2p, P2pArgs, P2pError}; use crate::peer_tracker::PeerTrackerInfo; @@ -92,10 +92,23 @@ where { /// Creates and starts a new celestia node with a given config. pub async fn new(config: NodeConfig) -> Result + where + B: Blockstore + 'static, + { + let (node, _) = Node::new_subscribed(config).await?; + Ok(node) + } + + /// Creates and starts a new celestia node with a given config. + /// + /// Returns `Node` alogn with `EventSubscriber`. Use this to avoid missing any + /// events that will be generated on the construction of the node. + pub async fn new_subscribed(config: NodeConfig) -> Result<(Self, EventSubscriber)> where B: Blockstore + 'static, { let event_channel = EventChannel::new(); + let event_sub = event_channel.subscribe(); let store = Arc::new(config.store); let p2p = Arc::new(P2p::start(P2pArgs { @@ -111,6 +124,7 @@ where let syncer = Arc::new(Syncer::start(SyncerArgs { store: store.clone(), p2p: p2p.clone(), + event_pub: event_channel.publisher(), })?); let daser = Arc::new(Daser::start(DaserArgs { @@ -122,32 +136,42 @@ where // spawn the task that will stop the services when the fraud is detected let network_compromised_token = p2p.get_network_compromised_token().await?; let tasks_cancellation_token = CancellationToken::new(); + spawn({ let syncer = syncer.clone(); let daser = daser.clone(); let tasks_cancellation_token = tasks_cancellation_token.child_token(); + let event_pub = event_channel.publisher(); + async move { select! { _ = tasks_cancellation_token.cancelled() => (), _ = network_compromised_token.cancelled() => { - warn!("The network is compromised and should not be trusted."); - warn!("The node will stop synchronizing and sampling."); - warn!("You can still make some queries to the network."); syncer.stop(); daser.stop(); + + if event_pub.has_subscribers() { + event_pub.send(NodeEvent::NetworkCompromised); + } else { + // This is a very important message and we want to log it if user + // does not consume our events. + warn!("{}", NodeEvent::NetworkCompromised); + } } } } }); - Ok(Node { + let node = Node { event_channel, p2p, store, syncer, _daser: daser, tasks_cancellation_token, - }) + }; + + Ok((node, event_sub)) } /// Returns a new `EventSubscriber`. diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 397c85a9..f9e8ce01 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -22,7 +22,9 @@ use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::sync::CancellationToken; use tracing::{debug, info, info_span, instrument, warn, Instrument}; +use web_time::Instant; +use crate::events::{EventPublisher, NodeEvent}; use crate::executor::{sleep, spawn, spawn_cancellable, Interval}; use crate::p2p::{P2p, P2pError}; use crate::store::header_ranges::{HeaderRanges, PrintableHeaderRange}; @@ -85,6 +87,8 @@ where pub p2p: Arc, /// Headers storage. pub store: Arc, + /// Event publisher. + pub event_pub: EventPublisher, } #[derive(Debug)] @@ -168,12 +172,13 @@ where { cancellation_token: CancellationToken, cmd_rx: mpsc::Receiver, + event_pub: EventPublisher, p2p: Arc, store: Arc, header_sub_watcher: watch::Receiver>, subjective_head_height: Option, - headers_tx: mpsc::Sender, P2pError>>, - headers_rx: mpsc::Receiver, P2pError>>, + headers_tx: mpsc::Sender<(Result, P2pError>, Duration)>, + headers_rx: mpsc::Receiver<(Result, P2pError>, Duration)>, ongoing_batch: Option, } @@ -197,6 +202,7 @@ where Ok(Worker { cancellation_token, cmd_rx, + event_pub: args.event_pub, p2p: args.p2p, store: args.store, header_sub_watcher, @@ -243,9 +249,13 @@ where _ = report_interval.tick() => { self.report().await; } - Ok(network_head_height) = &mut try_init_result => { + Ok((network_head_height, took)) = &mut try_init_result => { info!("Setting initial subjective head to {network_head_height}"); - self.subjective_head_height = Some(network_head_height); + self.set_subjective_head_height(network_head_height); + self.event_pub.send(NodeEvent::FetchingHeadHeaderFinished { + height: network_head_height, + took, + }); break; } Some(cmd) = self.cmd_rx.recv() => { @@ -293,8 +303,8 @@ where Some(cmd) = self.cmd_rx.recv() => { self.on_cmd(cmd).await; } - Some(res) = self.headers_rx.recv() => { - self.on_fetch_next_batch_result(res).await; + Some((res, took)) = self.headers_rx.recv() => { + self.on_fetch_next_batch_result(res, took).await; self.fetch_next_batch().await; } } @@ -333,12 +343,13 @@ where info!("syncing: head: {subjective_head}, stored headers: {stored_headers}, ongoing batches: {ongoing_batch}"); } - fn spawn_try_init(&self) -> oneshot::Receiver { + fn spawn_try_init(&self) -> oneshot::Receiver<(u64, Duration)> { let p2p = self.p2p.clone(); let store = self.store.clone(); let (tx, rx) = oneshot::channel(); let fut = async move { + let now = Instant::now(); let mut backoff = ExponentialBackoffBuilder::default() .with_max_interval(TRY_INIT_BACKOFF_MAX_INTERVAL) .with_max_elapsed_time(None) @@ -347,7 +358,7 @@ where loop { match try_init(&p2p, &*store).await { Ok(network_height) => { - tx.maybe_send(network_height); + tx.maybe_send((network_height, now.elapsed())); break; } Err(e) => { @@ -362,6 +373,8 @@ where } }; + self.event_pub.send(NodeEvent::FetchingHeadHeaderStarted); + spawn_cancellable( self.cancellation_token.child_token(), fut.instrument(info_span!("try_init")), @@ -394,18 +407,30 @@ where let new_head_height = new_head.height().value(); + self.set_subjective_head_height(new_head_height); + if let Ok(store_head_height) = self.store.head_height().await { // If our new header is adjacent to the HEAD of the store if store_head_height + 1 == new_head_height { // Header is already verified by HeaderSub and will be validated against previous // head on insert if self.store.insert(new_head).await.is_ok() { - info!("Added header {new_head_height} from HeaderSub"); + self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub { + height: new_head_height, + }); } } } + } + + fn set_subjective_head_height(&mut self, height: u64) { + if let Some(old_height) = self.subjective_head_height { + if height <= old_height { + return; + } + } - self.subjective_head_height = Some(new_head_height); + self.subjective_head_height = Some(height); } #[instrument(skip_all)] @@ -419,6 +444,12 @@ where return; } + if self.p2p.peer_tracker_info().num_connected_peers == 0 { + // No connected peers. We can't do the request. + // We will recover from this in `run`. + return; + } + let Some(subjective_head_height) = self.subjective_head_height else { // Nothing to schedule return; @@ -443,18 +474,15 @@ where return; } - if self.p2p.peer_tracker_info().num_connected_peers == 0 { - // No connected peers. We can't do the request. - // This will be recovered by `run`. - return; - } + self.event_pub.send(NodeEvent::FetchingHeadersStarted { + from_height: *next_batch.start(), + to_height: *next_batch.end(), + }); let cancellation_token = self.cancellation_token.child_token(); - let batch = PrintableHeaderRange(next_batch.clone()); - info!("Fetching range {batch}"); self.ongoing_batch = Some(Ongoing { - batch, + batch: PrintableHeaderRange(next_batch.clone()), cancellation_token: cancellation_token.clone(), }); @@ -462,36 +490,53 @@ where let p2p = self.p2p.clone(); spawn_cancellable(cancellation_token, async move { - let result = p2p.get_unverified_header_range(next_batch).await; - match result { - Ok(headers) => { - let _ = tx.send(Ok(headers)).await; - } - Err(e) => { - let _ = tx.send(Err(e)).await; - } - } + let now = Instant::now(); + let res = p2p.get_unverified_header_range(next_batch).await; + let _ = tx.send((res, now.elapsed())).await; }); } #[instrument(skip_all)] - async fn on_fetch_next_batch_result(&mut self, res: Result, P2pError>) { + async fn on_fetch_next_batch_result( + &mut self, + res: Result, P2pError>, + took: Duration, + ) { let Some(ongoing) = self.ongoing_batch.take() else { warn!("No batch was scheduled, however result was received. Discarding it."); return; }; + let from_height = *ongoing.batch.0.start(); + let to_height = *ongoing.batch.0.end(); + let headers = match res { Ok(headers) => headers, Err(e) => { - warn!("Failed to receive batch for ranges {}: {e}", ongoing.batch); + self.event_pub.send(NodeEvent::FetchingHeadersFailed { + from_height, + to_height, + error: e.to_string(), + took, + }); return; } }; if let Err(e) = self.store.insert(headers).await { - warn!("Failed to store range {}: {e}", ongoing.batch); + self.event_pub.send(NodeEvent::FetchingHeadersFailed { + from_height, + to_height, + error: format!("Failed to store headers: {e}"), + took, + }); } + + self.event_pub.send(NodeEvent::FetchingHeadersFinished { + from_height, + to_height, + took, + }); } } @@ -519,12 +564,14 @@ mod tests { use std::ops::RangeInclusive; use super::*; + use crate::events::EventChannel; use crate::store::InMemoryStore; use crate::test_utils::{async_test, gen_filled_store, MockP2pHandle}; use celestia_types::test_utils::ExtendedHeaderGenerator; #[async_test] async fn init_without_genesis_hash() { + let events = EventChannel::new(); let (mock, mut handle) = P2p::mocked(); let mut gen = ExtendedHeaderGenerator::new(); let header = gen.next(); @@ -532,6 +579,7 @@ mod tests { let _syncer = Syncer::start(SyncerArgs { p2p: Arc::new(mock), store: Arc::new(InMemoryStore::new()), + event_pub: events.publisher(), }) .unwrap(); @@ -667,6 +715,7 @@ mod tests { #[async_test] async fn start_with_filled_store() { + let events = EventChannel::new(); let (p2p, mut p2p_mock) = P2p::mocked(); let (store, mut gen) = gen_filled_store(25).await; let store = Arc::new(store); @@ -677,6 +726,7 @@ mod tests { let syncer = Syncer::start(SyncerArgs { p2p: Arc::new(p2p), store: store.clone(), + event_pub: events.publisher(), }) .unwrap(); @@ -888,12 +938,14 @@ mod tests { async fn initialized_syncer( head: ExtendedHeader, ) -> (Syncer, Arc, MockP2pHandle) { + let events = EventChannel::new(); let (mock, mut handle) = P2p::mocked(); let store = Arc::new(InMemoryStore::new()); let syncer = Syncer::start(SyncerArgs { p2p: Arc::new(mock), store: store.clone(), + event_pub: events.publisher(), }) .unwrap();