Skip to content

Commit

Permalink
feat(node/syncer)!: Implement backwards header sync (#279)
Browse files Browse the repository at this point in the history
Store
    Relaxed rules around header insertion. Previously Store enforced storing a single contiguous range from genesis to store head, with appends allowed in a single slot right after the head.
    Now Store holds possibly multiple non-overlapping header ranges starting and ending at arbitrary heights. Append is allowed as long as it extends an existing range, or creates a new head range (creating a new range at the front is needed for syncing from the front).
    Fundamental unit for inserting headers is now a Vec of headers and single header insert is expressed in terms of that (previously it was other way around).

Header sync
    Syncer starts up, queries store for headers already present and swarm for the network head
    Using above, it calculates header ranges which are already published, but absent from store
    Taking 512 highest missing headers it starts new header session
    Session starts 8 jobs, each taking up to 64 contiguous heights from the missing list and requesting them from the network
    Each time job finishes its work, next up to 64 headers are requested. If less than 64 headers are received, remaining range is re-requested again.
    After fetching all the requested ranges, they are sorted into contiguous ranges. Each range is verified internally.
    Each range is inserted into the store, veryfing the range edges against headers in store, if they exist. (If they don't exist yet, syncer will fetch them eventually, at which point this they'll be verified).

Co-authored-by: Yiannis Marangos <[email protected]>
  • Loading branch information
fl0rek and oblique authored Jun 14, 2024
1 parent 163cefb commit 8701c6b
Show file tree
Hide file tree
Showing 28 changed files with 2,096 additions and 727 deletions.
22 changes: 17 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ members = ["cli", "node", "node-wasm", "proto", "rpc", "types"]
[workspace.dependencies]
blockstore = "0.5"
lumina-node = { version = "0.2.0", path = "node" }
lumina-node-wasm = { version = "0.1.1", path = "node-wasm" }
celestia-proto = { version = "0.2.0", path = "proto" }
celestia-rpc = { version = "0.2.0", path = "rpc", default-features = false }
celestia-types = { version = "0.2.0", path = "types", default-features = false }
Expand Down
8 changes: 5 additions & 3 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ pub(crate) async fn run(args: Params) -> Result<()> {
let store = RedbStore::new(db.clone()).await?;
let blockstore = RedbBlockstore::new(db);

match store.head_height().await {
Ok(height) => info!("Initialised store with head height: {height}"),
Err(_) => info!("Initialised new store"),
let stored_ranges = store.get_stored_header_ranges().await?;
if stored_ranges.is_empty() {
info!("Initialised new store");
} else {
info!("Initialised store, present headers: {stored_ranges}");
}

let node = Node::new(NodeConfig {
Expand Down
4 changes: 2 additions & 2 deletions cli/static/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ <h2 class="status">Status</h2>
</div>

<div class="status">
<b>Synchronizing headers:</b>
<span class="status-value" id="syncer"></span>
<b>Synchronised header ranges:</b>
<span class="status-value" id="stored-ranges"></span>
</div>

<div class="status">
Expand Down
4 changes: 3 additions & 1 deletion cli/static/run_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ async function show_stats(node) {
return;
}
const info = await node.syncer_info();
document.getElementById("syncer").innerText = `${info.local_head}/${info.subjective_head}`;
document.getElementById("stored-ranges").innerText = info.stored_headers.map((range) => {
return `${range.start}..${range.end}`;
}).join(", ");

let peers_ul = document.createElement('ul');
(await node.connected_peers()).forEach(peer => {
Expand Down
11 changes: 11 additions & 0 deletions node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,17 @@ impl WasmNode {
Ok(to_value(&local_head)?)
}

/// Get ranges of headers currently stored.
pub async fn get_stored_header_ranges(&self) -> Result<Array> {
let ranges = self.node.get_stored_header_ranges().await?;

Ok(ranges
.as_ref()
.iter()
.map(to_value)
.collect::<Result<_, _>>()?)
}

/// Get a synced header for the block with a given hash.
pub async fn get_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let hash: Hash = hash.parse().context("parsing hash failed")?;
Expand Down
3 changes: 2 additions & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ dashmap = "5.5.3"
futures = "0.3.28"
hex = "0.4.3"
instant = "0.1.12"
itertools = "0.13"
prost = "0.12.0"
rand = "0.8.5"
serde = { version = "1.0.164", features = ["derive"] }
smallvec = { version = "1.11.1", features = ["union", "const_generics"] }
smallvec = { version = "1.11.1", features = ["union", "const_generics", "serde"] }
thiserror = "1.0.48"
tokio = { version = "1.32.0", features = ["macros", "sync"] }
tokio-util = "0.7.9"
Expand Down
8 changes: 4 additions & 4 deletions node/src/daser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,10 @@ mod tests {
headers.push(header);
}

store.append(headers[0..10].to_vec()).await.unwrap();
store.insert(headers[0..10].to_vec()).await.unwrap();
// Wait a bit for sampling of block 10 start
sleep(Duration::from_millis(10)).await;
store.append(headers[10..].to_vec()).await.unwrap();
store.insert(headers[10..].to_vec()).await.unwrap();
// Wait a bit for the queue to be populated with higher blocks
sleep(Duration::from_millis(10)).await;

Expand Down Expand Up @@ -659,7 +659,7 @@ mod tests {
let eds = generate_eds(2);
let dah = DataAvailabilityHeader::from_eds(&eds);
let header = gen.next_with_dah(dah);
store.append_single(header).await.unwrap();
store.insert(header).await.unwrap();

// Sample block 21
handle_get_shwap_cid(&mut handle, &store, &mut event_sub, 21, &eds, false).await;
Expand All @@ -680,7 +680,7 @@ mod tests {
let header = gen.next_with_dah(dah);
let height = header.height().value();

store.append_single(header).await.unwrap();
store.insert(header).await.unwrap();

let cids = handle_get_shwap_cid(
handle,
Expand Down
8 changes: 7 additions & 1 deletion node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::events::{EventChannel, EventSubscriber};
use crate::executor::spawn;
use crate::p2p::{P2p, P2pArgs, P2pError};
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::header_ranges::HeaderRanges;
use crate::store::{SamplingMetadata, Store, StoreError};
use crate::syncer::{Syncer, SyncerArgs, SyncerError, SyncingInfo};

Expand Down Expand Up @@ -109,7 +110,6 @@ where
})?);

let syncer = Arc::new(Syncer::start(SyncerArgs {
genesis_hash: config.genesis_hash,
store: store.clone(),
p2p: p2p.clone(),
})?);
Expand Down Expand Up @@ -280,6 +280,12 @@ where
self.p2p.header_sub_watcher().borrow().clone()
}

/// Get ranges of headers currently stored.
#[doc(hidden)]
pub async fn get_stored_header_ranges(&self) -> Result<HeaderRanges> {
Ok(self.store.get_stored_header_ranges().await?)
}

/// Get the latest locally synced header.
pub async fn get_local_head_header(&self) -> Result<ExtendedHeader> {
Ok(self.store.get_head().await?)
Expand Down
33 changes: 31 additions & 2 deletions node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihash
use crate::p2p::swarm::new_swarm;
use crate::peer_tracker::PeerTracker;
use crate::peer_tracker::PeerTrackerInfo;
use crate::store::header_ranges::{header_ranges, HeaderRanges};
use crate::store::Store;
use crate::utils::{
celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt,
Expand Down Expand Up @@ -401,15 +402,43 @@ impl P2p {

let height = from.height().value() + 1;

let mut session = HeaderSession::new(height, amount, self.cmd_tx.clone())?;
let headers = session.run().await?;
let range = height..=height + amount - 1;

let mut session = HeaderSession::new(header_ranges![range], self.cmd_tx.clone());
let headers = session
.run()
.await?
.pop()
.ok_or(HeaderExError::InvalidResponse)?;

from.verify_adjacent_range(&headers)
.map_err(|_| HeaderExError::InvalidResponse)?;

Ok(headers)
}

/// Request a list of ranges with the `header-ex` protocol
///
/// For each of the ranges, headers are verified against each other, but it's the caller
/// responsibility to verify range edges against headers existing in the store.
pub(crate) async fn get_unverified_header_ranges(
&self,
ranges: HeaderRanges,
) -> Result<Vec<Vec<ExtendedHeader>>> {
let mut session = HeaderSession::new(ranges, self.cmd_tx.clone());
let header_ranges = session.run().await?;

for range in &header_ranges {
let Some(head) = range.first() else {
continue;
};
head.verify_adjacent_range(&range[1..])
.map_err(|_| HeaderExError::InvalidResponse)?;
}

Ok(header_ranges)
}

/// Request a [`Cid`] on bitswap protocol.
pub(crate) async fn get_shwap_cid(
&self,
Expand Down
12 changes: 4 additions & 8 deletions node/src/p2p/header_ex/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::p2p::header_ex::utils::{HeaderRequestExt, HeaderResponseExt};
use crate::p2p::header_ex::{HeaderExError, ReqRespBehaviour};
use crate::p2p::P2pError;
use crate::peer_tracker::PeerTracker;
use crate::utils::{OneshotResultSender, OneshotResultSenderExt, VALIDATIONS_PER_YIELD};
use crate::store::utils::VALIDATIONS_PER_YIELD;
use crate::utils::{OneshotResultSender, OneshotResultSenderExt};

const MAX_PEERS: usize = 10;

Expand Down Expand Up @@ -612,8 +613,6 @@ mod tests {
}

#[async_test]
#[ignore] // TODO: Enable this test after sessions are implemented
#[cfg(not(target_arch = "wasm32"))] // wasm_bindgen_test doesn't seem to support #[ignore]
async fn request_range_responds_with_smaller_one() {
let peer_tracker = peer_tracker_with_n_peers(15);
let mut mock_req = MockReq::new();
Expand All @@ -627,11 +626,8 @@ mod tests {
let header5 = gen.next();

mock_req.send_n_responses(&mut handler, 1, vec![header5.to_header_response()]);

assert!(matches!(
rx.await,
Ok(Err(P2pError::HeaderEx(HeaderExError::InvalidResponse)))
));
let headers = rx.await.unwrap().unwrap();
assert_eq!(headers, vec![header5]);
}

#[async_test]
Expand Down
16 changes: 8 additions & 8 deletions node/src/p2p/header_ex/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ mod tests {

#[async_test]
async fn request_head_test() {
let (store, _) = gen_filled_store(4);
let (store, _) = gen_filled_store(4).await;
let expected_head = store.get_head().await.unwrap();
let mut handler = HeaderExServerHandler::new(Arc::new(store));

Expand All @@ -224,7 +224,7 @@ mod tests {

#[async_test]
async fn request_header_test() {
let (store, _) = gen_filled_store(3);
let (store, _) = gen_filled_store(3).await;
let expected_genesis = store.get_by_height(1).await.unwrap();
let mut handler = HeaderExServerHandler::new(Arc::new(store));

Expand All @@ -245,7 +245,7 @@ mod tests {

#[async_test]
async fn invalid_amount_request_test() {
let (store, _) = gen_filled_store(1);
let (store, _) = gen_filled_store(1).await;
let mut handler = HeaderExServerHandler::new(Arc::new(store));

handler.on_request_received(
Expand All @@ -263,7 +263,7 @@ mod tests {

#[async_test]
async fn none_data_request_test() {
let (store, _) = gen_filled_store(1);
let (store, _) = gen_filled_store(1).await;
let mut handler = HeaderExServerHandler::new(Arc::new(store));

let request = HeaderRequest {
Expand All @@ -280,7 +280,7 @@ mod tests {

#[async_test]
async fn request_hash_test() {
let (store, _) = gen_filled_store(1);
let (store, _) = gen_filled_store(1).await;
let stored_header = store.get_head().await.unwrap();
let mut handler = HeaderExServerHandler::new(Arc::new(store));

Expand All @@ -301,7 +301,7 @@ mod tests {

#[async_test]
async fn request_malformed_hash_test() {
let (store, _) = gen_filled_store(1);
let (store, _) = gen_filled_store(1).await;
let mut handler = HeaderExServerHandler::new(Arc::new(store));

let request = HeaderRequest {
Expand All @@ -319,7 +319,7 @@ mod tests {

#[async_test]
async fn request_range_test() {
let (store, _) = gen_filled_store(10);
let (store, _) = gen_filled_store(10).await;
let expected_headers = [
store.get_by_height(5).await.unwrap(),
store.get_by_height(6).await.unwrap(),
Expand All @@ -344,7 +344,7 @@ mod tests {

#[async_test]
async fn request_range_beyond_head_test() {
let (store, _) = gen_filled_store(5);
let (store, _) = gen_filled_store(5).await;
let expected_hashes = [store.get_by_height(5).await.ok()];
let expected_status_codes = [StatusCode::Ok];
assert_eq!(expected_hashes.len(), expected_status_codes.len());
Expand Down
Loading

0 comments on commit 8701c6b

Please sign in to comment.