Skip to content

Commit

Permalink
feat(node): Generate syncing related events (#312)
Browse files Browse the repository at this point in the history
Signed-off-by: Yiannis Marangos <[email protected]>
Co-authored-by: Maciej Zwoliński <[email protected]>
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
3 people authored Jul 1, 2024
1 parent 14b0c35 commit 74123f3
Show file tree
Hide file tree
Showing 7 changed files with 308 additions and 52 deletions.
7 changes: 2 additions & 5 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) async fn run(args: Params) -> Result<()> {
info!("Initialised store, present headers: {stored_ranges}");
}

let node = Node::new(NodeConfig {
let (_node, mut events) = Node::new_subscribed(NodeConfig {
network_id,
genesis_hash,
p2p_local_keypair,
Expand All @@ -80,14 +80,11 @@ pub(crate) async fn run(args: Params) -> Result<()> {
.await
.context("Failed to start node")?;

node.wait_connected_trusted().await?;
let mut events = node.event_subscriber();

// We have nothing else to do, but we want to keep main alive
while let Ok(ev) = events.recv().await {
match ev.event {
// Skip noisy events
NodeEvent::ShareSamplingResult { .. } => continue,
event if event.is_error() => warn!("{event}"),
event => info!("{event}"),
}
}
Expand Down
10 changes: 10 additions & 0 deletions cli/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@
color: var(--fg2);
}

.event-logs {
background-color: var(--bg1);
color: var(--fg1);
border: 1px solid var(--border);
font-family: var(--fonts-mono);
}

h2.status {
margin: 2rem 0 1rem 0;
color: var(--fg1);
Expand Down Expand Up @@ -96,6 +103,9 @@ <h3>Bootnodes</h3>
<button id="start" class="config"><b>Start!</b></button>
</div>

<h2 class="event_logs">Event Logs</h2>
<textarea readonly id="event-logs" class="event-logs" cols=120 rows=8></textarea>

<h2 class="status">Status</h2>

<div class="status">
Expand Down
24 changes: 24 additions & 0 deletions cli/static/run_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,35 @@ function bind_config(data) {
});
}

function log_event(event) {
// Skip noisy events
if (event.data.get("event").type == "share_sampling_result") {
return;
}

const time = new Date(event.data.get("time"));

const log = time.getHours().toString().padStart(2, '0')
+ ":" + time.getMinutes().toString().padStart(2, '0')
+ ":" + time.getSeconds().toString().padStart(2, '0')
+ "." + time.getMilliseconds().toString().padStart(3, '0')
+ ": " + event.data.get("message");

var textarea = document.getElementById("event-logs");
textarea.value += log + "\n";
textarea.scrollTop = textarea.scrollHeight;
}

async function main(document, window) {
await init();

window.node = await new NodeClient("/js/worker.js");

window.events = await window.node.events_channel();
window.events.onmessage = (event) => {
log_event(event);
};

bind_config(await fetch_config());

if (await window.node.is_running() === true) {
Expand Down
25 changes: 17 additions & 8 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct NodeWorker {
}

impl NodeWorker {
async fn new(config: WasmNodeConfig) -> Result<Self> {
async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
let config = config.into_node_config().await?;

if let Ok(store_height) = config.store.head_height().await {
Expand All @@ -64,18 +64,16 @@ impl NodeWorker {
info!("Initialised new empty store");
}

let node = Node::new(config).await?;
let (node, events_sub) = Node::new_subscribed(config).await?;

let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());
let events_channel = BroadcastChannel::new(&events_channel_name)
let events_channel = BroadcastChannel::new(events_channel_name)
.context("Failed to allocate BroadcastChannel")?;

let events_sub = node.event_subscriber();
spawn_local(event_forwarder_task(events_sub, events_channel));

Ok(Self {
node,
events_channel_name,
events_channel_name: events_channel_name.to_owned(),
})
}

Expand Down Expand Up @@ -238,9 +236,10 @@ impl NodeWorker {
}

#[wasm_bindgen]
pub async fn run_worker(queued_events: Vec<MessageEvent>) {
pub async fn run_worker(queued_events: Vec<MessageEvent>) -> Result<()> {
info!("Entered run_worker");
let (tx, mut rx) = mpsc::channel(WORKER_MESSAGE_SERVER_INCOMING_QUEUE_LENGTH);
let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());

let mut message_server: Box<dyn MessageServer> = if SharedWorker::is_worker_type() {
Box::new(SharedWorkerMessageServer::new(tx.clone(), queued_events))
Expand All @@ -265,8 +264,14 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
NodeCommand::IsRunning => {
message_server.respond_to(client_id, WorkerResponse::IsRunning(false));
}
NodeCommand::GetEventsChannelName => {
message_server.respond_to(
client_id,
WorkerResponse::EventsChannelName(events_channel_name.clone()),
);
}
NodeCommand::StartNode(config) => {
match NodeWorker::new(config).await {
match NodeWorker::new(&events_channel_name, config).await {
Ok(node) => {
worker = Some(node);
message_server
Expand All @@ -293,19 +298,23 @@ pub async fn run_worker(queued_events: Vec<MessageEvent>) {
}

info!("Channel to WorkerMessageServer closed, exiting the SharedWorker");

Ok(())
}

async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) {
#[derive(Serialize)]
struct Event {
message: String,
is_error: bool,
#[serde(flatten)]
info: NodeEventInfo,
}

while let Ok(ev) = events_sub.recv().await {
let ev = Event {
message: ev.event.to_string(),
is_error: ev.event.is_error(),
info: ev,
};

Expand Down
146 changes: 143 additions & 3 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl EventChannel {
rx: self.tx.subscribe(),
}
}

/// Returns if there are any active subscribers or not.
pub fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl Default for EventChannel {
Expand All @@ -94,6 +99,10 @@ impl EventPublisher {
file_line: location.line(),
});
}

pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl EventSubscriber {
Expand Down Expand Up @@ -219,6 +228,82 @@ pub enum NodeEvent {
/// A human readable error.
error: String,
},

/// A new header was added from HeaderSub.
AddedHeaderFromHeaderSub {
/// The height of the header.
height: u64,
},

/// Fetching header of network head just started.
FetchingHeadHeaderStarted,

/// Fetching header of network head just finished.
FetchingHeadHeaderFinished {
/// The height of the network head.
height: u64,
/// How much time fetching took.
took: Duration,
},

/// Fetching headers of a specific block range just started.
FetchingHeadersStarted {
/// Start of the range.
from_height: u64,
/// End of the range (included).
to_height: u64,
},

/// Fetching headers of a specific block range just finished.
FetchingHeadersFinished {
/// Start of the range.
from_height: u64,
/// End of the range (included).
to_height: u64,
/// How much time fetching took.
took: Duration,
},

/// Fetching headers of a specific block range just failed.
FetchingHeadersFailed {
/// Start of the range.
from_height: u64,
/// End of the range (included).
to_height: u64,
/// A human readable error.
error: String,
/// How much time fetching took.
took: Duration,
},

/// Network was compromised.
///
/// This happens when a valid bad encoding fraud proof is received.
/// Ideally it would never happen, but protection needs to exist.
/// In case of compromised network, syncing and data sampling will
/// stop immediately.
NetworkCompromised,
}

impl NodeEvent {
/// Returns `true` if the event indicates an error.
pub fn is_error(&self) -> bool {
match self {
NodeEvent::FatalDaserError { .. }
| NodeEvent::FetchingHeadersFailed { .. }
| NodeEvent::NetworkCompromised => true,
NodeEvent::PeerConnected { .. }
| NodeEvent::PeerDisconnected { .. }
| NodeEvent::SamplingStarted { .. }
| NodeEvent::ShareSamplingResult { .. }
| NodeEvent::SamplingFinished { .. }
| NodeEvent::AddedHeaderFromHeaderSub { .. }
| NodeEvent::FetchingHeadHeaderStarted
| NodeEvent::FetchingHeadHeaderFinished { .. }
| NodeEvent::FetchingHeadersStarted { .. }
| NodeEvent::FetchingHeadersFinished { .. } => false,
}
}
}

impl fmt::Display for NodeEvent {
Expand All @@ -243,7 +328,7 @@ impl fmt::Display for NodeEvent {
square_width,
shares,
} => {
write!(f, "Sampling for {height} block started. Square: {square_width}x{square_width}, Shares: {shares:?}.")
write!(f, "Sampling for {height} block started. Square: {square_width}x{square_width}, Shares: {shares:?}")
}
NodeEvent::ShareSamplingResult {
height,
Expand All @@ -255,7 +340,7 @@ impl fmt::Display for NodeEvent {
let acc = if *accepted { "accepted" } else { "rejected" };
write!(
f,
"Sampling for share [{row}, {column}] of {height} block was {acc}."
"Sampling for share [{row}, {column}] of {height} block was {acc}"
)
}
NodeEvent::SamplingFinished {
Expand All @@ -266,12 +351,67 @@ impl fmt::Display for NodeEvent {
let acc = if *accepted { "accepted" } else { "rejected" };
write!(
f,
"Sampling for {height} block finished and {acc}. Took {took:?}."
"Sampling for {height} block finished and {acc}. Took: {took:?}"
)
}
NodeEvent::FatalDaserError { error } => {
write!(f, "Daser stopped because of a fatal error: {error}")
}
NodeEvent::AddedHeaderFromHeaderSub { height } => {
write!(f, "Added header {height} from header-sub")
}
NodeEvent::FetchingHeadHeaderStarted => {
write!(f, "Fetching header of network head block started")
}
NodeEvent::FetchingHeadHeaderFinished { height, took } => {
write!(f, "Fetching header of network head block finished. Height: {height}, Took: {took:?}")
}
NodeEvent::FetchingHeadersStarted {
from_height,
to_height,
} => {
if from_height == to_height {
write!(f, "Fetching header of {from_height} block started")
} else {
write!(
f,
"Fetching headers of {from_height}-{to_height} blocks started"
)
}
}
NodeEvent::FetchingHeadersFinished {
from_height,
to_height,
took,
} => {
if from_height == to_height {
write!(
f,
"Fetching header of {from_height} block finished. Took: {took:?}"
)
} else {
write!(f, "Fetching headers of {from_height}-{to_height} blocks finished. Took: {took:?}")
}
}
NodeEvent::FetchingHeadersFailed {
from_height,
to_height,
error,
took,
} => {
if from_height == to_height {
write!(
f,
"Fetching header of {from_height} block failed. Took: {took:?}, Error: {error}"
)
} else {
write!(f, "Fetching headers of {from_height}-{to_height} blocks failed. Took: {took:?}, Error: {error}")
}
}
NodeEvent::NetworkCompromised => {
write!(f, "The network is compromised and should not be trusted. ")?;
write!(f, "Node stopped synchronizing and sampling, but you can still make some queries to the network.")
}
}
}
}
Expand Down
Loading

0 comments on commit 74123f3

Please sign in to comment.