Skip to content

Commit

Permalink
Merge branch 'main' into feat/grpc-wasm-support
Browse files Browse the repository at this point in the history
  • Loading branch information
zvolin committed Dec 19, 2024
2 parents 7079948 + 35fac5a commit b04e880
Show file tree
Hide file tree
Showing 42 changed files with 4,774 additions and 517 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release-plz.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
exit
fi
# Update the types definition for lumina-node
npm i
npm clean-install
npm run tsc
# Update the readme for lumina-node
wasm-pack build ..
Expand Down
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = ["cli", "grpc", "node", "node-wasm", "proto", "rpc", "types"]

[workspace.dependencies]
blockstore = "0.7.0"
blockstore = "0.7.1"
lumina-node = { version = "0.8.0", path = "node" }
lumina-node-wasm = { version = "0.7.0", path = "node-wasm" }
celestia-proto = { version = "0.6.0", path = "proto" }
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Rust implementation of Celestia's [data availability node](https://github.com/ce
Run Lumina now at [lumina.rs](https://lumina.rs/) and directly verify Celestia.

Supported features:
- Backward and forward synchronization of block headers within syncing window
- Backward and forward synchronization of block headers within sampling window
- Header exchange (`header-ex`) client and server
- Listening for, verifying and redistributing extended headers on gossip protocol (`header-sub`)
- Listening for, verifying and redistributing fraud proofs on gossip protocol (`fraud-sub`)
Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ name = "lumina"
path = "src/main.rs"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
blockstore.workspace = true
celestia-rpc = { workspace = true, features = ["p2p"] }
celestia-types.workspace = true
libp2p.workspace = true
Expand Down
34 changes: 1 addition & 33 deletions cli/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
use std::env::current_exe;

use anyhow::Result;
use clap::{Parser, ValueEnum};
use lumina_node::network::Network;
use clap::Parser;

use crate::native;
#[cfg(feature = "browser-node")]
use crate::server;

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub(crate) enum ArgNetwork {
#[default]
Mainnet,
Arabica,
Mocha,
Private,
}

#[derive(Debug, Parser)]
pub(crate) enum CliArgs {
/// Run native node locally
Expand Down Expand Up @@ -64,25 +54,3 @@ fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {

guard
}

impl From<ArgNetwork> for Network {
fn from(network: ArgNetwork) -> Network {
match network {
ArgNetwork::Mainnet => Network::Mainnet,
ArgNetwork::Arabica => Network::Arabica,
ArgNetwork::Mocha => Network::Mocha,
ArgNetwork::Private => Network::Private,
}
}
}

impl From<Network> for ArgNetwork {
fn from(network: Network) -> ArgNetwork {
match network {
Network::Mainnet => ArgNetwork::Mainnet,
Network::Arabica => ArgNetwork::Arabica,
Network::Mocha => ArgNetwork::Mocha,
Network::Private => ArgNetwork::Private,
}
}
}
132 changes: 87 additions & 45 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,32 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use blockstore::EitherBlockstore;
use celestia_rpc::prelude::*;
use celestia_rpc::Client;
use clap::Parser;
use clap::{value_parser, Parser};
use directories::ProjectDirs;
use libp2p::{identity, multiaddr::Protocol, Multiaddr};
use lumina_node::blockstore::RedbBlockstore;
use libp2p::multiaddr::{Multiaddr, Protocol};
use lumina_node::blockstore::{InMemoryBlockstore, RedbBlockstore};
use lumina_node::events::NodeEvent;
use lumina_node::network::{canonical_network_bootnodes, network_id, Network};
use lumina_node::node::{Node, NodeConfig};
use lumina_node::store::{RedbStore, Store};
use lumina_node::network::Network;
use lumina_node::node::{Node, MIN_PRUNING_DELAY, MIN_SAMPLING_WINDOW};
use lumina_node::store::{EitherStore, InMemoryStore, RedbStore, Store as _};
use tokio::task::spawn_blocking;
use tracing::info;
use tracing::warn;

use crate::common::ArgNetwork;

const CELESTIA_LOCAL_BRIDGE_RPC_ADDR: &str = "ws://localhost:36658";

type Blockstore = EitherBlockstore<InMemoryBlockstore, RedbBlockstore>;
type Store = EitherStore<InMemoryStore, RedbStore>;

#[derive(Debug, Parser)]
pub(crate) struct Params {
/// Network to connect.
#[arg(short, long, value_enum, default_value_t)]
pub(crate) network: ArgNetwork,
#[arg(short, long)]
#[clap(value_parser = value_parser!(Network))]
pub(crate) network: Network,

/// Listening addresses. Can be used multiple times.
#[arg(short, long = "listen")]
Expand All @@ -37,55 +40,68 @@ pub(crate) struct Params {
pub(crate) bootnodes: Vec<Multiaddr>,

/// Persistent header store path.
#[arg(short, long = "store")]
#[arg(short, long)]
pub(crate) store: Option<PathBuf>,

/// Syncing window size, defines maximum age of headers considered for syncing and sampling.
/// Headers older than syncing window by more than an hour are eligible for pruning.
#[arg(long = "syncing-window", verbatim_doc_comment)]
/// Use in-memory store.
#[arg(long)]
pub(crate) in_memory_store: bool,

/// Sampling window defines maximum age of a block considered for syncing and sampling.
#[arg(long)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) sampling_window: Option<Duration>,

/// Pruning delay defines how much time the pruner should wait after sampling window in
/// order to prune the block.
#[arg(long)]
#[clap(value_parser = parse_duration::parse)]
pub(crate) custom_syncing_window: Option<Duration>,
pub(crate) pruning_delay: Option<Duration>,
}

pub(crate) async fn run(args: Params) -> Result<()> {
let network = args.network.into();
let p2p_local_keypair = identity::Keypair::generate_ed25519();

let p2p_bootnodes = if args.bootnodes.is_empty() {
match network {
Network::Private => fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?,
network => canonical_network_bootnodes(network).collect(),
}
let (blockstore, store) = if args.in_memory_store {
open_in_memory_stores()
} else {
args.bootnodes
open_db_stores(args.store, args.network.id()).await?
};

let network_id = network_id(network).to_owned();
let mut node_builder = Node::builder()
.store(store)
.blockstore(blockstore)
.network(args.network.clone());

info!("Initializing store");
let db = open_db(args.store, &network_id).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);
if let Some(sampling_window) = args.sampling_window {
node_builder = node_builder.sampling_window(sampling_window);
} else if args.in_memory_store {
// In-memory stores are memory hungry, so we lower sampling window.
node_builder = node_builder.sampling_window(MIN_SAMPLING_WINDOW);
}

let stored_ranges = store.get_stored_header_ranges().await?;
if stored_ranges.is_empty() {
info!("Initialised new store");
if let Some(pruning_delay) = args.pruning_delay {
node_builder = node_builder.pruning_delay(pruning_delay);
} else if args.in_memory_store {
// In-memory stores are memory hungry, so we lower pruning window.
node_builder = node_builder.pruning_delay(MIN_PRUNING_DELAY);
}

if args.bootnodes.is_empty() {
if args.network.is_custom() {
let bootnodes = fetch_bridge_multiaddrs(CELESTIA_LOCAL_BRIDGE_RPC_ADDR).await?;
node_builder = node_builder.bootnodes(bootnodes);
}
} else {
info!("Initialised store, present headers: {stored_ranges}");
node_builder = node_builder.bootnodes(args.bootnodes);
}

let (_node, mut events) = Node::new_subscribed(NodeConfig {
network_id,
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: args.listen_addrs,
sync_batch_size: 512,
custom_syncing_window: args.custom_syncing_window,
blockstore,
store,
})
.await
.context("Failed to start node")?;
if !args.listen_addrs.is_empty() {
node_builder = node_builder.listen(args.listen_addrs);
}

let (_node, mut events) = node_builder
.start_subscribed()
.await
.context("Failed to start node")?;

while let Ok(ev) = events.recv().await {
match ev.event {
Expand All @@ -99,6 +115,32 @@ pub(crate) async fn run(args: Params) -> Result<()> {
Ok(())
}

fn open_in_memory_stores() -> (Blockstore, Store) {
info!("Initializing in-memory store");
let store = InMemoryStore::new();
let blockstore = InMemoryBlockstore::new();
(EitherBlockstore::Left(blockstore), EitherStore::Left(store))
}

async fn open_db_stores(path: Option<PathBuf>, network_id: &str) -> Result<(Blockstore, Store)> {
info!("Initializing store");
let db = open_db(path, network_id).await?;
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

let stored_ranges = store.get_stored_header_ranges().await?;
if stored_ranges.is_empty() {
info!("Initialised new store",);
} else {
info!("Initialised store, present headers: {stored_ranges}");
}

Ok((
EitherBlockstore::Right(blockstore),
EitherStore::Right(store),
))
}

async fn open_db(path: Option<PathBuf>, network_id: &str) -> Result<Arc<redb::Database>> {
let network_id = network_id.to_owned();

Expand Down
2 changes: 2 additions & 0 deletions grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tendermint-proto.workspace = true
tendermint.workspace = true

bytes = "1.8"
futures = "0.3.30"
hex = "0.4.3"
http-body = "1"
k256 = "0.13.4"
Expand Down Expand Up @@ -56,4 +57,5 @@ dotenvy = "0.15.7"
tokio = { version = "1.38.0", features = ["rt", "macros"] }

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-futures = "0.4.43"
wasm-bindgen-test = "0.3.42"
3 changes: 2 additions & 1 deletion grpc/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ mod imp {

#[cfg(target_arch = "wasm32")]
mod imp {
use gloo_timers::future::{IntervalStream, TimeoutFuture};
use futures::StreamExt;
use gloo_timers::future::IntervalStream;
use send_wrapper::SendWrapper;
use std::time::Duration;

Expand Down
4 changes: 2 additions & 2 deletions grpc/tests/tonic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use utils::{load_account, TestAccount};

pub mod utils;

use crate::utils::{new_grpc_client, new_tx_client};
use crate::utils::{new_grpc_client, new_tx_client, spawn};

#[cfg(not(target_arch = "wasm32"))]
use tokio::test as async_test;
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn submit_blobs_parallel() {
let futs = (0..100)
.map(|n| {
let tx_client = tx_client.clone();
tokio::spawn(async move {
spawn(async move {
let namespace = Namespace::new_v0(&[1, 2, n]).unwrap();
let blobs =
vec![Blob::new(namespace, format!("bleb{n}").into(), AppVersion::V3).unwrap()];
Expand Down
Loading

0 comments on commit b04e880

Please sign in to comment.