Skip to content

Commit

Permalink
feat!(node): make syncer batch sizes configurable (#327)
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Zwoliński <[email protected]>
Co-authored-by: Mikołaj Florkiewicz <[email protected]>
  • Loading branch information
zvolin and fl0rek authored Jul 10, 2024
1 parent 1fa1bbf commit a5db92c
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 205 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
test-wasm:
runs-on: ubuntu-latest
env:
WASM_BINDGEN_TEST_TIMEOUT: 60
WASM_BINDGEN_TEST_TIMEOUT: 120
steps:
- uses: actions/checkout@v1

Expand Down
1 change: 1 addition & 0 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub(crate) async fn run(args: Params) -> Result<()> {
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: args.listen_addrs,
sync_batch_size: 512,
blockstore,
store,
})
Expand Down
1 change: 1 addition & 0 deletions node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ impl WasmNodeConfig {
p2p_bootnodes,
p2p_local_keypair,
p2p_listen_on: vec![],
sync_batch_size: 128,
blockstore,
store,
})
Expand Down
1 change: 1 addition & 0 deletions node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async fn main() {
p2p_local_keypair,
p2p_bootnodes,
p2p_listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
sync_batch_size: 512,
blockstore,
store,
})
Expand Down
3 changes: 3 additions & 0 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ where
pub p2p_bootnodes: Vec<Multiaddr>,
/// List of the addresses where [`Node`] will listen for incoming connections.
pub p2p_listen_on: Vec<Multiaddr>,
/// Maximum number of headers in batch while syncing.
pub sync_batch_size: u64,
/// The blockstore for bitswap.
pub blockstore: B,
/// The store for headers.
Expand Down Expand Up @@ -123,6 +125,7 @@ where
store: store.clone(),
p2p: p2p.clone(),
event_pub: event_channel.publisher(),
batch_size: config.sync_batch_size,
})?);

let daser = Arc::new(Daser::start(DaserArgs {
Expand Down
2 changes: 1 addition & 1 deletion node/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument, trace, warn};

mod header_ex;
mod header_session;
pub(crate) mod header_session;
pub(crate) mod shwap;
mod swarm;

Expand Down
126 changes: 89 additions & 37 deletions node/src/p2p/header_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::executor::spawn;
use crate::p2p::header_ex::utils::HeaderRequestExt;
use crate::p2p::{P2pCmd, P2pError};

const MAX_AMOUNT_PER_REQ: u64 = 64;
const MAX_CONCURRENT_REQS: usize = 8;
pub(crate) const MIN_AMOUNT_PER_REQ: u64 = 8;
pub(crate) const MAX_AMOUNT_PER_REQ: u64 = 64;
pub(crate) const MAX_CONCURRENT_REQS: usize = 8;

type Result<T, E = P2pError> = std::result::Result<T, E>;

Expand All @@ -19,6 +20,7 @@ pub(crate) struct HeaderSession {
response_tx: mpsc::Sender<(u64, u64, Result<Vec<ExtendedHeader>>)>,
response_rx: mpsc::Receiver<(u64, u64, Result<Vec<ExtendedHeader>>)>,
ongoing: usize,
batch_size: u64,
}

impl HeaderSession {
Expand All @@ -32,13 +34,18 @@ impl HeaderSession {
/// [`Store::get_stored_header_ranges`]: crate::store::Store::get_stored_header_ranges
pub(crate) fn new(range: BlockRange, cmd_tx: mpsc::Sender<P2pCmd>) -> Self {
let (response_tx, response_rx) = mpsc::channel(MAX_CONCURRENT_REQS);
let batch_size = range
.len()
.div_ceil(MAX_CONCURRENT_REQS as u64)
.clamp(MIN_AMOUNT_PER_REQ, MAX_AMOUNT_PER_REQ);

HeaderSession {
to_fetch: Some(range),
cmd_tx,
response_tx,
response_rx,
ongoing: 0,
batch_size,
}
}

Expand Down Expand Up @@ -100,7 +107,7 @@ impl HeaderSession {
}

pub(crate) async fn send_next_request(&mut self) -> Result<()> {
let Some(range) = take_next_batch(&mut self.to_fetch, MAX_AMOUNT_PER_REQ) else {
let Some(range) = take_next_batch(&mut self.to_fetch, self.batch_size) else {
return Ok(());
};

Expand Down Expand Up @@ -161,28 +168,50 @@ mod tests {
use crate::test_utils::async_test;
use celestia_types::test_utils::ExtendedHeaderGenerator;

#[async_test]
async fn retry_on_missing_range() {
async fn test_batching(to_fetch: u64, batches: usize, batch_size: u64) {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(64);
let headers = gen.next_many(to_fetch);

let mut session = HeaderSession::new(1..=to_fetch, p2p_mock.cmd_tx.clone());
assert_eq!(session.batch_size, batch_size);

let mut session = HeaderSession::new(1..=64, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
result_tx.send(res).unwrap();
});

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
respond_to.send(Ok(headers[..60].to_vec())).unwrap();
// all batches but last should have batch_size
let full_batches = batches - 1;
for i in 1..=full_batches {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 61);
assert_eq!(amount, 4);
respond_to.send(Ok(headers[60..64].to_vec())).unwrap();
// batches will be taken backward
let end_offset = i as u64 * batch_size;
let expected_start = 1 + to_fetch - end_offset;
assert_eq!(height, expected_start);
assert_eq!(amount, batch_size);

let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

// last batch should be what's left
let leftover = to_fetch - batch_size * full_batches as u64;
if leftover > 0 {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;

assert_eq!(height, 1);
assert_eq!(amount, leftover);

let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

p2p_mock.expect_no_cmd().await;

Expand All @@ -191,34 +220,57 @@ mod tests {
}

#[async_test]
async fn nine_batches() {
async fn split_range_to_batches() {
// single batch smaller or equal MIN_AMOUNT_PER_REQ
test_batching(1, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(5, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(7, 1, MIN_AMOUNT_PER_REQ).await;
test_batching(8, 1, MIN_AMOUNT_PER_REQ).await;

// many batches of MIN_AMOUNT_PER_REQ
test_batching(10, 2, MIN_AMOUNT_PER_REQ).await;
test_batching(16, 2, MIN_AMOUNT_PER_REQ).await;
test_batching(30, 4, MIN_AMOUNT_PER_REQ).await;
test_batching(50, 7, MIN_AMOUNT_PER_REQ).await;
test_batching(63, 8, MIN_AMOUNT_PER_REQ).await;
test_batching(64, 8, MIN_AMOUNT_PER_REQ).await;

// bigger batches which are fetched in a single go (with MAX_CONCURRENT_REQS requests)
test_batching(65, MAX_CONCURRENT_REQS, 9).await;
test_batching(128, MAX_CONCURRENT_REQS, 16).await;
test_batching(129, MAX_CONCURRENT_REQS, 17).await;
test_batching(256, MAX_CONCURRENT_REQS, 32).await;
test_batching(500, MAX_CONCURRENT_REQS, 63).await;
test_batching(512, MAX_CONCURRENT_REQS, 64).await;

// more than MAX_AMOUNT_PER_REQ batches
test_batching(520, 9, MAX_AMOUNT_PER_REQ).await;
test_batching(600, 10, MAX_AMOUNT_PER_REQ).await;
test_batching(1024, 16, MAX_AMOUNT_PER_REQ).await;
}

#[async_test]
async fn retry_on_missing_range() {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(520);
let headers = gen.next_many(8);

let mut session = HeaderSession::new(1..=520, p2p_mock.cmd_tx.clone());
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
result_tx.send(res).unwrap();
});

for i in (0..8).rev() {
let (height, amount, respond_to) =
p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 9 + 64 * i);
assert_eq!(amount, 64);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
}

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 8);
let start = (height - 1) as usize;
let end = start + amount as usize;
respond_to.send(Ok(headers[start..end].to_vec())).unwrap();
respond_to.send(Ok(headers[..6].to_vec())).unwrap();

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 7);
assert_eq!(amount, 2);
respond_to.send(Ok(headers[6..8].to_vec())).unwrap();

p2p_mock.expect_no_cmd().await;

Expand All @@ -230,9 +282,9 @@ mod tests {
async fn not_found_is_not_fatal() {
let (_p2p, mut p2p_mock) = P2p::mocked();
let mut gen = ExtendedHeaderGenerator::new();
let headers = gen.next_many(64);
let headers = gen.next_many(8);

let mut session = HeaderSession::new(1..=64, p2p_mock.cmd_tx.clone());
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
Expand All @@ -241,14 +293,14 @@ mod tests {

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
assert_eq!(amount, 8);
respond_to
.send(Err(P2pError::HeaderEx(HeaderExError::HeaderNotFound)))
.unwrap();

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
assert_eq!(amount, 8);
respond_to.send(Ok(headers.clone())).unwrap();

p2p_mock.expect_no_cmd().await;
Expand All @@ -261,7 +313,7 @@ mod tests {
async fn no_peers_is_fatal() {
let (_p2p, mut p2p_mock) = P2p::mocked();

let mut session = HeaderSession::new(1..=64, p2p_mock.cmd_tx.clone());
let mut session = HeaderSession::new(1..=8, p2p_mock.cmd_tx.clone());
let (result_tx, result_rx) = oneshot::channel();
spawn(async move {
let res = session.run().await;
Expand All @@ -270,7 +322,7 @@ mod tests {

let (height, amount, respond_to) = p2p_mock.expect_header_request_for_height_cmd().await;
assert_eq!(height, 1);
assert_eq!(amount, 64);
assert_eq!(amount, 8);
respond_to.send(Err(P2pError::NoConnectedPeers)).unwrap();

p2p_mock.expect_no_cmd().await;
Expand Down
Loading

0 comments on commit a5db92c

Please sign in to comment.