Skip to content

Commit

Permalink
chore(node)!: Hide internal components (#342)
Browse files Browse the repository at this point in the history
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
oblique and fl0rek authored Jul 25, 2024
1 parent 4023855 commit 632e1db
Show file tree
Hide file tree
Showing 18 changed files with 107 additions and 119 deletions.
3 changes: 1 addition & 2 deletions node-wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::{BroadcastChannel, MessageEvent, SharedWorker};

use lumina_node::node::Node;
use lumina_node::node::{Node, SyncingInfo};
use lumina_node::store::{IndexedDbStore, SamplingMetadata, Store};
use lumina_node::syncer::SyncingInfo;

use crate::error::{Context, Error, Result};
use crate::node::WasmNodeConfig;
Expand Down
3 changes: 1 addition & 2 deletions node-wasm/src/worker/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ use tracing::error;
use wasm_bindgen::{JsError, JsValue};

use celestia_types::hash::Hash;
use lumina_node::peer_tracker::PeerTrackerInfo;
use lumina_node::node::{PeerTrackerInfo, SyncingInfo};
use lumina_node::store::SamplingMetadata;
use lumina_node::syncer::SyncingInfo;

use crate::error::Error;
use crate::error::Result;
Expand Down
6 changes: 4 additions & 2 deletions node/src/block_ranges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use std::ops::{Add, RangeInclusive, Sub};
use serde::Serialize;
use smallvec::SmallVec;

/// Type alias to `RangeInclusive<u64>`.
/// Type alias of [`RangeInclusive<u64>`].
///
/// [`RangeInclusive<u64>`]: std::ops::RangeInclusive
pub type BlockRange = RangeInclusive<u64>;

/// Errors that can be produced by `BlockRanges`.
/// Errors that can be produced by [`BlockRanges`].
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum BlockRangesError {
/// Block ranges must be sorted.
Expand Down
22 changes: 11 additions & 11 deletions node/src/daser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,39 +56,39 @@ const SAMPLING_WINDOW: Duration = Duration::from_secs(30 * DAY);

type Result<T, E = DaserError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with the [`Daser`].
/// Representation of all the errors that can occur in `Daser` component.
#[derive(Debug, thiserror::Error)]
pub enum DaserError {
/// An error propagated from the [`P2p`] module.
/// An error propagated from the `P2p` component.
#[error("P2p: {0}")]
P2p(#[from] P2pError),

/// An error propagated from the [`Store`] module.
/// An error propagated from the [`Store`] component.
#[error("Store: {0}")]
Store(#[from] StoreError),
}

/// Component responsible for data availability sampling of blocks from the network.
pub struct Daser {
pub(crate) struct Daser {
cancellation_token: CancellationToken,
}

/// Arguments used to configure the [`Daser`].
pub struct DaserArgs<S>
pub(crate) struct DaserArgs<S>
where
S: Store,
{
/// Handler for the peer to peer messaging.
pub p2p: Arc<P2p>,
pub(crate) p2p: Arc<P2p>,
/// Headers storage.
pub store: Arc<S>,
pub(crate) store: Arc<S>,
/// Event publisher.
pub event_pub: EventPublisher,
pub(crate) event_pub: EventPublisher,
}

impl Daser {
/// Create and start the [`Daser`].
pub fn start<S>(args: DaserArgs<S>) -> Result<Self>
pub(crate) fn start<S>(args: DaserArgs<S>) -> Result<Self>
where
S: Store + 'static,
{
Expand All @@ -109,8 +109,8 @@ impl Daser {
Ok(Daser { cancellation_token })
}

/// Stop the [`Daser`].
pub fn stop(&self) {
/// Stop the worker.
pub(crate) fn stop(&self) {
// Singal the Worker to stop.
// TODO: Should we wait for the Worker to stop?
self.cancellation_token.cancel();
Expand Down
20 changes: 8 additions & 12 deletions node/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use tokio::sync::broadcast;

const EVENT_CHANNEL_CAPACITY: usize = 1024;

/// An error returned from the `EventReceiver::recv`.
/// An error returned from the [`EventSubscriber::recv`].
#[derive(Debug, thiserror::Error)]
pub enum RecvError {
/// Node and all its event senders are closed.
#[error("Event channel closed")]
Closed,
}

/// An error returned from the `EventReceiver::try_recv`.
/// An error returned from the [`EventSubscriber::try_recv`].
#[derive(Debug, thiserror::Error)]
pub enum TryRecvError {
/// The event channel is currently empty.
Expand All @@ -32,15 +32,15 @@ pub enum TryRecvError {

/// A channel which users can subscribe for events.
#[derive(Debug)]
pub struct EventChannel {
pub(crate) struct EventChannel {
tx: broadcast::Sender<NodeEventInfo>,
}

/// `EventPublisher` is used to broadcast events generated by [`Node`] to [`EventSubscriber`]s.
///
/// [`Node`]: crate::node::Node
#[derive(Debug, Clone)]
pub struct EventPublisher {
pub(crate) struct EventPublisher {
tx: broadcast::Sender<NodeEventInfo>,
}

Expand All @@ -54,29 +54,24 @@ pub struct EventSubscriber {

impl EventChannel {
/// Create a new `EventChannel`.
pub fn new() -> EventChannel {
pub(crate) fn new() -> EventChannel {
let (tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
EventChannel { tx }
}

/// Creates a new [`EventPublisher`].
pub fn publisher(&self) -> EventPublisher {
pub(crate) fn publisher(&self) -> EventPublisher {
EventPublisher {
tx: self.tx.clone(),
}
}

/// Creates a new [`EventSubscriber`].
pub fn subscribe(&self) -> EventSubscriber {
pub(crate) fn subscribe(&self) -> EventSubscriber {
EventSubscriber {
rx: self.tx.subscribe(),
}
}

/// Returns if there are any active subscribers or not.
pub fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
}

impl Default for EventChannel {
Expand All @@ -100,6 +95,7 @@ impl EventPublisher {
});
}

/// Returns if there are any active subscribers or not.
pub(crate) fn has_subscribers(&self) -> bool {
self.tx.receiver_count() > 0
}
Expand Down
11 changes: 7 additions & 4 deletions node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

pub mod block_ranges;
pub mod blockstore;
pub mod daser;
mod daser;
pub mod events;
mod executor;
pub mod network;
pub mod node;
pub mod p2p;
pub mod peer_tracker;
mod p2p;
mod peer_tracker;
pub mod store;
pub mod syncer;
mod syncer;
#[cfg(any(test, feature = "test-utils"))]
#[cfg_attr(docsrs, doc(cfg(feature = "test-utils")))]
pub mod test_utils;
mod utils;

#[cfg(all(target_arch = "wasm32", test))]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

#[doc(inline)]
pub use crate::node::{Node, NodeConfig, NodeError, Result};
50 changes: 30 additions & 20 deletions node/src/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
//! High-level integration of [`P2p`], [`Store`], [`Syncer`].
//! Node that connects to Celestia's P2P network.
//!
//! [`P2p`]: crate::p2p::P2p
//! [`Store`]: crate::store::Store
//! [`Syncer`]: crate::syncer::Syncer
//! Upon creation, `Node` will try to connect to Celestia's P2P network
//! and then proceed with synchronization and data sampling of the blocks.
use std::ops::RangeBounds;
use std::sync::Arc;
Expand All @@ -18,36 +17,44 @@ use libp2p::identity::Keypair;
use libp2p::swarm::NetworkInfo;
use libp2p::{Multiaddr, PeerId};
use tokio::select;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::daser::{Daser, DaserArgs, DaserError};
use crate::daser::{Daser, DaserArgs};
use crate::events::{EventChannel, EventSubscriber, NodeEvent};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs, P2pError};
use crate::peer_tracker::PeerTrackerInfo;
use crate::p2p::{P2p, P2pArgs};
use crate::store::{SamplingMetadata, Store, StoreError};
use crate::syncer::{Syncer, SyncerArgs, SyncerError, SyncingInfo};
use crate::syncer::{Syncer, SyncerArgs};

type Result<T, E = NodeError> = std::result::Result<T, E>;
pub use crate::daser::DaserError;
pub use crate::p2p::{HeaderExError, P2pError};
pub use crate::peer_tracker::PeerTrackerInfo;
pub use crate::syncer::{SyncerError, SyncingInfo};

/// Alias of [`Result`] with [`NodeError`] error type
///
/// [`Result`]: std::result::Result
pub type Result<T, E = NodeError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with the [`Node`].
#[derive(Debug, thiserror::Error)]
pub enum NodeError {
/// An error propagated from the [`P2p`] module.
#[error(transparent)]
/// An error propagated from the `P2p` component.
#[error("P2p: {0}")]
P2p(#[from] P2pError),

/// An error propagated from the [`Syncer`] module.
#[error(transparent)]
/// An error propagated from the `Syncer` component.
#[error("Syncer: {0}")]
Syncer(#[from] SyncerError),

/// An error propagated from the [`Store`] module.
#[error(transparent)]
/// An error propagated from the [`Store`] component.
#[error("Store: {0}")]
Store(#[from] StoreError),

/// An error propagated from the [`Daser`] module.
#[error(transparent)]
/// An error propagated from the `Daser` component.
#[error("Daser: {0}")]
Daser(#[from] DaserError),
}

Expand Down Expand Up @@ -185,13 +192,16 @@ where
self.p2p.local_peer_id()
}

/// Get current [`PeerTracker`] info.
///
/// [`PeerTracker`]: crate::peer_tracker::PeerTracker
/// Get current [`PeerTrackerInfo`].
pub fn peer_tracker_info(&self) -> PeerTrackerInfo {
self.p2p.peer_tracker_info().clone()
}

/// Get [`PeerTrackerInfo`] watcher.
pub fn peer_tracker_info_watcher(&self) -> watch::Receiver<PeerTrackerInfo> {
self.p2p.peer_tracker_info_watcher()
}

/// Wait until the node is connected to at least 1 peer.
pub async fn wait_connected(&self) -> Result<()> {
Ok(self.p2p.wait_connected().await?)
Expand Down
12 changes: 3 additions & 9 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ const FRAUD_PROOF_HEAD_HEIGHT_THRESHOLD: u64 = 20;

type Result<T, E = P2pError> = std::result::Result<T, E>;

/// Representation of all the errors that can occur when interacting with [`P2p`].
/// Representation of all the errors that can occur in `P2p` component.
#[derive(Debug, thiserror::Error)]
pub enum P2pError {
/// Failed to initialize gossipsub behaviour.
Expand Down Expand Up @@ -163,7 +163,7 @@ impl From<oneshot::error::RecvError> for P2pError {

/// Component responsible for the peer to peer networking handling.
#[derive(Debug)]
pub struct P2p {
pub(crate) struct P2p {
cmd_tx: mpsc::Sender<P2pCmd>,
peer_tracker_info_watcher: watch::Receiver<PeerTrackerInfo>,
local_peer_id: PeerId,
Expand Down Expand Up @@ -256,7 +256,7 @@ impl P2p {
}

/// Creates and starts a new mocked p2p handler.
#[cfg(any(test, feature = "test-utils"))]
#[cfg(test)]
pub fn mocked() -> (Self, crate::test_utils::MockP2pHandle) {
let (cmd_tx, cmd_rx) = mpsc::channel(16);
let (peer_tracker_tx, peer_tracker_rx) = watch::channel(PeerTrackerInfo::default());
Expand All @@ -277,12 +277,6 @@ impl P2p {
(p2p, handle)
}

/// Stop the [`P2p`].
pub async fn stop(&self) -> Result<()> {
// TODO
Ok(())
}

/// Local peer ID on the p2p network.
pub fn local_peer_id(&self) -> &PeerId {
&self.local_peer_id
Expand Down
2 changes: 1 addition & 1 deletion node/src/p2p/header_ex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub(crate) struct HeaderExConfig<'a, S> {
pub header_store: Arc<S>,
}

/// Representation of all the errors that can occur when interacting with the header-ex.
/// Representation of all the errors that can occur in `HeaderEx` component.
#[derive(Debug, thiserror::Error)]
pub enum HeaderExError {
/// Header not found.
Expand Down
Loading

0 comments on commit 632e1db

Please sign in to comment.