diff --git a/node/src/p2p/header_session.rs b/node/src/p2p/header_session.rs index 1591f653..9e1ccb6d 100644 --- a/node/src/p2p/header_session.rs +++ b/node/src/p2p/header_session.rs @@ -9,7 +9,7 @@ use crate::p2p::{P2pCmd, P2pError}; use crate::store::header_ranges::{HeaderRange, RangeLengthExt}; const MAX_AMOUNT_PER_REQ: u64 = 64; -const MAX_CONCURRENT_REQS: usize = 1; +const MAX_CONCURRENT_REQS: usize = 8; type Result = std::result::Result; @@ -139,16 +139,18 @@ impl HeaderSession { } } +/// take a next batch of up to `limit` headers from the front of the `range_to_fetch` fn take_next_batch(range_to_fetch: &mut Option, limit: u64) -> Option { - // calculate potential end before we modify range_to_fetch + // calculate potential end offset before we modify range_to_fetch let end_offset = limit.checked_sub(1)?; let to_fetch = range_to_fetch.take()?; if to_fetch.len() <= limit { Some(to_fetch) } else { - let _ = range_to_fetch.insert(*to_fetch.start() + limit..=*to_fetch.end()); - Some(*to_fetch.start()..=*to_fetch.start() + end_offset) + // to_fetch.len() > limit, we shouldn't underflow here + let _ = range_to_fetch.insert(*to_fetch.start()..=*to_fetch.end() - limit); + Some(*to_fetch.end() - end_offset..=*to_fetch.end()) } } @@ -201,10 +203,10 @@ mod tests { result_tx.send(res).unwrap(); }); - for i in 0..8 { + for i in (0..8).rev() { let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; - assert_eq!(height, 1 + 64 * i); + assert_eq!(height, 9 + 64 * i); assert_eq!(amount, 64); let start = (height - 1) as usize; let end = start + amount as usize; @@ -212,7 +214,7 @@ mod tests { } let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await; - assert_eq!(height, 513); + assert_eq!(height, 1); assert_eq!(amount, 8); let start = (height - 1) as usize; let end = start + amount as usize; @@ -299,8 +301,38 @@ mod tests { fn take_next_batch_truncated_batch() { let mut range_to_fetch = Some(1..=10); let batch = take_next_batch(&mut range_to_fetch, 5); - assert_eq!(batch, Some(1..=5)); - assert_eq!(range_to_fetch, Some(6..=10)); + assert_eq!(batch, Some(6..=10)); + assert_eq!(range_to_fetch, Some(1..=5)); + } + + #[test] + fn take_next_batch_truncated_calc() { + let mut range_to_fetch = Some(1..=512); + + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(449..=512)); + assert_eq!(range_to_fetch, Some(1..=448)); + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(385..=448)); + assert_eq!(range_to_fetch, Some(1..=384)); + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(321..=384)); + assert_eq!(range_to_fetch, Some(1..=320)); + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(257..=320)); + assert_eq!(range_to_fetch, Some(1..=256)); + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(193..=256)); + assert_eq!(range_to_fetch, Some(1..=192)); + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(129..=192)); + assert_eq!(range_to_fetch, Some(1..=128)); + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(65..=128)); + assert_eq!(range_to_fetch, Some(1..=64)); + let batch = take_next_batch(&mut range_to_fetch, 64); + assert_eq!(batch, Some(1..=64)); + assert_eq!(range_to_fetch, None); } #[test] diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index a3513b9a..3090c493 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -3,7 +3,7 @@ use std::ops::RangeInclusive; use celestia_types::ExtendedHeader; use crate::executor::yield_now; -use crate::store::header_ranges::HeaderRange; +use crate::store::header_ranges::{HeaderRange, RangeLengthExt}; use crate::store::{Result, StoreError}; pub(crate) const VALIDATIONS_PER_YIELD: usize = 4; @@ -13,34 +13,48 @@ pub(crate) const VALIDATIONS_PER_YIELD: usize = 4; pub(crate) fn calculate_range_to_fetch( head_height: u64, store_headers: &[RangeInclusive], + syncing_window_edge: Option, limit: u64, +) -> HeaderRange { + let mut missing_range = get_most_recent_missing_range(head_height, store_headers); + + // truncate to syncing window, if height is known + if let Some(window_edge) = syncing_window_edge { + if missing_range.start() < &window_edge { + missing_range = window_edge + 1..=*missing_range.end(); + } + } + + // truncate number of headers to limit + if missing_range.len() > limit { + let end = missing_range.end(); + let start = end.saturating_sub(limit) + 1; + missing_range = start..=*end; + } + + missing_range +} + +fn get_most_recent_missing_range( + head_height: u64, + store_headers: &[RangeInclusive], ) -> HeaderRange { let mut store_headers_iter = store_headers.iter().rev(); let Some(store_head_range) = store_headers_iter.next() else { - // empty store, just fetch from head - return head_height.saturating_sub(limit) + 1..=head_height; + // empty store, we're missing everything + return 1..=head_height; }; - if store_head_range.end() != &head_height { + if store_head_range.end() < &head_height { // if we haven't caught up with network head, start from there - let fetch_start = u64::max( - store_head_range.end() + 1, - head_height.saturating_sub(limit) + 1, - ); - return fetch_start..=head_height; + return store_head_range.end() + 1..=head_height; } // there exists a range contiguous with network head. inspect previous range end let penultimate_range_end = store_headers_iter.next().map(|r| *r.end()).unwrap_or(0); - let fetch_end = store_head_range.start().saturating_sub(1); - let fetch_start = u64::max( - penultimate_range_end + 1, - fetch_end.saturating_sub(limit) + 1, - ); - - fetch_start..=fetch_end + penultimate_range_end + 1..=store_head_range.start().saturating_sub(1) } pub(crate) fn try_consolidate_ranges( @@ -129,35 +143,43 @@ mod tests { let head_height = 1024; let ranges = [256..=512]; - let fetch_range = calculate_range_to_fetch(head_height, &ranges, 16); + let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 16); assert_eq!(fetch_range, 1009..=1024); - let fetch_range = calculate_range_to_fetch(head_height, &ranges, 511); + let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 511); assert_eq!(fetch_range, 514..=1024); - let fetch_range = calculate_range_to_fetch(head_height, &ranges, 512); + let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 512); assert_eq!(fetch_range, 513..=1024); - let fetch_range = calculate_range_to_fetch(head_height, &ranges, 513); + let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 513); assert_eq!(fetch_range, 513..=1024); - let fetch_range = calculate_range_to_fetch(head_height, &ranges, 1024); + let fetch_range = calculate_range_to_fetch(head_height, &ranges, None, 1024); assert_eq!(fetch_range, 513..=1024); + let fetch_range = calculate_range_to_fetch(head_height, &ranges, Some(900), 1024); + assert_eq!(fetch_range, 901..=1024); } #[test] fn calculate_range_to_fetch_empty_store() { - let fetch_range = calculate_range_to_fetch(1, &[], 100); + let fetch_range = calculate_range_to_fetch(1, &[], None, 100); assert_eq!(fetch_range, 1..=1); - let fetch_range = calculate_range_to_fetch(100, &[], 10); + let fetch_range = calculate_range_to_fetch(100, &[], None, 10); assert_eq!(fetch_range, 91..=100); + + let fetch_range = calculate_range_to_fetch(100, &[], Some(75), 50); + assert_eq!(fetch_range, 76..=100); } #[test] fn calculate_range_to_fetch_fully_synced() { - let fetch_range = calculate_range_to_fetch(1, &[1..=1], 100); + let fetch_range = calculate_range_to_fetch(1, &[1..=1], None, 100); assert!(fetch_range.is_empty()); - let fetch_range = calculate_range_to_fetch(100, &[1..=100], 10); + let fetch_range = calculate_range_to_fetch(100, &[1..=100], None, 10); + assert!(fetch_range.is_empty()); + + let fetch_range = calculate_range_to_fetch(100, &[1..=100], Some(100), 10); assert!(fetch_range.is_empty()); } @@ -165,24 +187,39 @@ mod tests { fn calculate_range_to_fetch_caught_up() { let head_height = 4000; - let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], 500); + let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], None, 500); assert_eq!(fetch_range, 2500..=2999); - let fetch_range = calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], 500); + let fetch_range = calculate_range_to_fetch(head_height, &[3000..=4000], Some(2600), 500); + assert_eq!(fetch_range, 2601..=2999); + let fetch_range = + calculate_range_to_fetch(head_height, &[500..=1000, 3000..=4000], None, 500); assert_eq!(fetch_range, 2500..=2999); - let fetch_range = calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], 500); + let fetch_range = + calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], None, 500); assert_eq!(fetch_range, 2801..=2999); - let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], 500); + let fetch_range = + calculate_range_to_fetch(head_height, &[2500..=2800, 3000..=4000], Some(2000), 500); + assert_eq!(fetch_range, 2801..=2999); + let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], None, 500); assert_eq!(fetch_range, 1..=299); + let fetch_range = calculate_range_to_fetch(head_height, &[300..=4000], Some(2000), 500); + assert!(fetch_range.is_empty()); } #[test] fn calculate_range_to_fetch_catching_up() { let head_height = 4000; - let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], 500); + let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], None, 500); assert_eq!(fetch_range, 3501..=4000); - let fetch_range = calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], 500); + let fetch_range = calculate_range_to_fetch(head_height, &[2000..=3000], Some(3600), 500); + assert_eq!(fetch_range, 3601..=4000); + let fetch_range = + calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], None, 500); assert_eq!(fetch_range, 3801..=4000); + let fetch_range = + calculate_range_to_fetch(head_height, &[1..=2998, 3000..=3800], Some(3900), 500); + assert_eq!(fetch_range, 3901..=4000); } #[test] diff --git a/node/src/syncer.rs b/node/src/syncer.rs index f9e8ce01..584f48d2 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -15,6 +15,7 @@ use std::time::Duration; use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; +use celestia_tendermint::Time; use celestia_types::ExtendedHeader; use futures::FutureExt; use serde::{Deserialize, Serialize}; @@ -36,6 +37,7 @@ type Result = std::result::Result; const MAX_HEADERS_IN_BATCH: u64 = 512; const TRY_INIT_BACKOFF_MAX_INTERVAL: Duration = Duration::from_secs(60); +const SYNCING_WINDOW: Duration = Duration::from_secs(30 * 24 * 60 * 60); // 30 days /// Representation of all the errors that can occur when interacting with the [`Syncer`]. #[derive(Debug, thiserror::Error)] @@ -180,6 +182,7 @@ where headers_tx: mpsc::Sender<(Result, P2pError>, Duration)>, headers_rx: mpsc::Receiver<(Result, P2pError>, Duration)>, ongoing_batch: Option, + estimated_syncing_window_end: Option, } struct Ongoing { @@ -210,6 +213,7 @@ where headers_tx, headers_rx, ongoing_batch: None, + estimated_syncing_window_end: None, }) } @@ -466,6 +470,7 @@ where let next_batch = calculate_range_to_fetch( subjective_head_height, store_ranges.as_ref(), + self.estimated_syncing_window_end, MAX_HEADERS_IN_BATCH, ); @@ -474,6 +479,14 @@ where return; } + // make sure we're inside the syncing window before we start + if let Ok(known_header) = self.store.get_by_height(next_batch.end() + 1).await { + if !in_syncing_window(&known_header) { + self.estimated_syncing_window_end = Some(known_header.height().value()); + return; + } + } + self.event_pub.send(NodeEvent::FetchingHeadersStarted { from_height: *next_batch.start(), to_height: *next_batch.end(), @@ -540,6 +553,15 @@ where } } +fn in_syncing_window(header: &ExtendedHeader) -> bool { + let syncing_window_start = Time::now().checked_sub(SYNCING_WINDOW).unwrap_or_else(|| { + warn!("underflow when computing syncing window start, defaulting to unix epoch"); + Time::unix_epoch() + }); + + header.time().after(syncing_window_start) +} + async fn try_init(p2p: &P2p, store: &S) -> Result where S: Store, @@ -713,6 +735,58 @@ mod tests { p2p_mock.expect_no_cmd().await; } + #[async_test] + async fn syncing_window_edge() { + let month_and_day_ago = Duration::from_secs(31 * 24 * 60 * 60); + let mut gen = ExtendedHeaderGenerator::new(); + gen.set_time((Time::now() - month_and_day_ago).expect("to not underflow")); + let mut headers = gen.next_many(1200); + gen.set_time(Time::now()); + headers.append(&mut gen.next_many(2049 - 1200)); + + let (syncer, store, mut p2p_mock) = initialized_syncer(headers[2048].clone()).await; + assert_syncing(&syncer, &store, &[2049..=2049], 2049).await; + + // Syncer requested the first batch ([1537..=2048]) + handle_session_batch( + &mut p2p_mock, + &headers, + vec![ + (1537, 64), + (1601, 64), + (1665, 64), + (1729, 64), + (1793, 64), + (1857, 64), + (1921, 64), + (1985, 64), + ], + ) + .await; + assert_syncing(&syncer, &store, &[1537..=2049], 2049).await; + + // Syncer requested the second batch ([1025, 1536]) hitting the syncing window + handle_session_batch( + &mut p2p_mock, + &headers, + vec![ + (1025, 64), + (1089, 64), + (1153, 64), + (1217, 64), + (1281, 64), + (1345, 64), + (1409, 64), + (1473, 64), + ], + ) + .await; + assert_syncing(&syncer, &store, &[1025..=2049], 2049).await; + + // Syncer is fully synced and awaiting for events + p2p_mock.expect_no_cmd().await; + } + #[async_test] async fn start_with_filled_store() { let events = EventChannel::new(); diff --git a/types/src/test_utils.rs b/types/src/test_utils.rs index d65199e7..567023ce 100644 --- a/types/src/test_utils.rs +++ b/types/src/test_utils.rs @@ -1,4 +1,5 @@ //! Utilities for writing tests. +use std::time::Duration; use celestia_tendermint::block::header::{Header, Version}; use celestia_tendermint::block::{parts, Commit, CommitSig}; @@ -23,6 +24,7 @@ pub struct ExtendedHeaderGenerator { chain_id: chain::Id, key: SigningKey, current_header: Option, + spoofed_time: Option