Skip to content

Commit

Permalink
node: Address issues and PR comments
Browse files Browse the repository at this point in the history
- Use request_block_by_height to initialize a presync procedure
- Update default hops_limit to 128
- Update DEFAULT_CERT_CACHE_EXPIRY to 1min
- Fix the condition for detecting sync target is reached
  • Loading branch information
goshawk-3 committed May 21, 2024
1 parent 447df74 commit fec2369
Showing 1 changed file with 47 additions and 10 deletions.
57 changes: 47 additions & 10 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use crate::{vm, Network};
use crate::database::{Candidate, Ledger};
use metrics::counter;
use node_data::ledger::{to_str, Block, Certificate, Label};
use node_data::message::payload::{GetBlocks, Inv, RatificationResult, Vote};
use node_data::message::payload::{
GetBlocks, GetResource, Inv, RatificationResult, Vote,
};

use node_data::message::{payload, Message, Metadata};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
Expand All @@ -26,8 +29,8 @@ use tracing::{debug, error, info, warn};

const MAX_BLOCKS_TO_REQUEST: i16 = 50;
const EXPIRY_TIMEOUT_MILLIS: i16 = 5000;
const DEFAULT_HOPS_LIMIT: u16 = 100;
const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(10);
const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(60);
const DEFAULT_HOPS_LIMIT: u16 = 128;
pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5;

type SharedHashSet = Arc<RwLock<HashSet<[u8; 32]>>>;
Expand Down Expand Up @@ -167,7 +170,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
return Ok(());
}

if let Some(blk) = self.get_block_with_cert(blk).as_ref() {
if let Some(blk) = self.attach_cert_if_needed(blk).as_ref() {
match &mut self.curr {
State::InSync(ref mut curr) => {
if let Some((b, peer_addr)) =
Expand Down Expand Up @@ -206,6 +209,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
}
}

// Clean up certificate cache
let now = Instant::now();
self.certificates_cache
.retain(|_, (_, expiry)| *expiry > now);
Expand All @@ -215,6 +219,10 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
}

async fn request_block(&mut self, hash: [u8; 32], cert: Certificate) {
if self.certificates_cache.contains_key(&hash) {
return;
}

// Save certificate in case only candidate block is received
let expiry = Instant::now()
.checked_add(DEFAULT_CERT_CACHE_EXPIRY)
Expand Down Expand Up @@ -345,7 +353,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
}

/// Try to attach the certificate to a block that misses it
fn get_block_with_cert<'a>(
fn attach_cert_if_needed<'a>(
&self,
blk: &'a Block,
) -> Option<Cow<'a, Block>> {
Expand Down Expand Up @@ -615,14 +623,39 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network> InSyncImpl<DB, VM, N> {
));
}

let mut inv = Inv::new(1);
inv.add_block_from_height(local_header.height + 1);
flood_request(&self.network, &inv).await;
Self::request_block_by_height(
&self.network,
local_header.height + 1,
metadata.src_addr,
)
.await;
}

Ok(None)
}

/// Requests a block by height from a `peer_addr`
async fn request_block_by_height(
network: &Arc<RwLock<N>>,
height: u64,
peer_addr: SocketAddr,
) {
let mut inv = Inv::new(1);
inv.add_block_from_height(height);
let this_peer = *network.read().await.public_addr();
let req = GetResource::new(inv, this_peer, u64::MAX, 1);
debug!(event = "request block by height", ?req, ?peer_addr);

if let Err(err) = network
.read()
.await
.send_to_peer(&Message::new_get_resource(req), peer_addr)
.await
{
warn!("could not request block {err}")
}
}

async fn on_heartbeat(&mut self) -> anyhow::Result<bool> {
// TODO: Consider reporting metrics here

Expand Down Expand Up @@ -761,10 +794,12 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
}
}

let tip = acc.get_curr_height().await;
// Check target height is reached
if acc.get_curr_height().await == self.range.1 {
if tip >= self.range.1 {
debug!(event = "sync target reached", height = tip);

// Block sync-up procedure manages to download all requested
// blocks
acc.restart_consensus().await;

// Transit to InSync mode
Expand Down Expand Up @@ -792,6 +827,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
.unwrap()
<= SystemTime::now()
{
debug!(event = "out_of_sync timer expired");
// sync-up has timed out, recover consensus task
self.acc.write().await.restart_consensus().await;

Expand All @@ -807,6 +843,7 @@ impl<DB: database::DB, VM: vm::VMExecution, N: Network>
/// Flood-request approach.
async fn flood_request<N: Network>(network: &Arc<RwLock<N>>, inv: &Inv) {
debug!(event = "flood_request", ?inv);

if let Err(err) = network
.read()
.await
Expand Down

0 comments on commit fec2369

Please sign in to comment.