Skip to content

Commit

Permalink
node: Remove expired certificates
Browse files Browse the repository at this point in the history
  • Loading branch information
goshawk-3 committed May 15, 2024
1 parent 0ee3196 commit caed8ea
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 38 deletions.
2 changes: 1 addition & 1 deletion node-data/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ pub mod payload {
max_entries,
}
}

pub fn add_tx_id(&mut self, id: [u8; 32]) {
self.inv_list.push(InvVect {
inv_type: InvType::MempoolTx,
Expand Down
76 changes: 39 additions & 37 deletions node/src/chain/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use tracing::{debug, error, info, warn};
const MAX_BLOCKS_TO_REQUEST: i16 = 50;
const EXPIRY_TIMEOUT_MILLIS: i16 = 5000;
const DEFAULT_HOPS_LIMIT: u16 = 100;

const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(10);
pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5;

type SharedHashSet = Arc<RwLock<HashSet<[u8; 32]>>>;
Expand Down Expand Up @@ -74,10 +74,7 @@ pub(crate) struct SimpleFSM<N: Network, DB: database::DB, VM: vm::VMExecution> {
blacklisted_blocks: SharedHashSet,

/// Certificates cached from received Quorum messages
/// TODO: Spamming attack will be addressed by adding Quorum/Certificate
/// sanity check
/// TODO: Replace this HashMap with Quorum CF in RocksDB, if needed
certificates: HashMap<(u64, [u8; 32]), Certificate>,
certificates_cache: HashMap<[u8; 32], (Certificate, Instant)>,
}

impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
Expand All @@ -96,18 +93,18 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
acc,
network,
blacklisted_blocks,
certificates: Default::default(),
certificates_cache: Default::default(),
}
}

pub async fn on_idle(&mut self, timeout: Duration) {
let acc = self.acc.read().await;
let height = acc.get_curr_height().await;
let tip_height = acc.get_curr_height().await;
let iter = acc.get_curr_iteration().await;
if let Ok(last_finalized) = acc.get_latest_final_block().await {
info!(
event = "fsm::idle",
height,
tip_height,
iter,
timeout_sec = timeout.as_secs(),
"finalized_height" = last_finalized.header().height,
Expand All @@ -132,6 +129,10 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
} else {
error!("could not request blocks");
}

let now = Instant::now();
self.certificates_cache
.retain(|_, (_, expiry)| *expiry > now);
}

pub async fn on_failed_consensus(&mut self) {
Expand Down Expand Up @@ -205,9 +206,28 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
}
}

let now = Instant::now();
self.certificates_cache
.retain(|_, (_, expiry)| *expiry > now);
self.certificates_cache.remove(&blk.header().hash);

Ok(())
}

async fn request_block(&mut self, hash: [u8; 32], cert: Certificate) {
// Save certificate in case only candidate block is received
let expiry = Instant::now()
.checked_add(DEFAULT_CERT_CACHE_EXPIRY)
.unwrap();
self.certificates_cache.insert(hash, (cert, expiry));

let mut inv = Inv::new(1);
inv.add_block_from_hash(hash);
inv.add_candidate_from_hash(hash);

flood_request(&self.network, &inv).await;
}

/// Handles a Quorum message.
///
/// Ideally, the winning block will be built from the quorum certificate
Expand All @@ -220,8 +240,8 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
) -> anyhow::Result<()> {
let res = match quorum.cert.result {
RatificationResult::Success(Vote::Valid(hash)) => {
let acc = self.acc.read().await;
let local_header = acc.tip_header().await;
let local_header = self.acc.read().await.tip_header().await;
let db = self.acc.read().await.db.clone();
let remote_height = msg.header.round;

// Quorum from future
Expand All @@ -232,14 +252,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
height = remote_height,
);

// Save certificate in case candidate block is received only
//self.certificates
// .insert((remote_height, hash), quorum.cert);

let mut inv = Inv::new(1);
inv.add_candidate_from_hash(hash);
inv.add_block_from_hash(hash);
flood_request(&self.network, &inv).await;
self.request_block(hash, quorum.cert).await;

Ok(None)
} else {
Expand All @@ -252,12 +265,12 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
|| (remote_height == local_header.height
&& local_header.hash != hash)
{
match acc
.db
let res = db
.read()
.await
.view(|t| t.fetch_candidate_block(&hash))
{
.view(|t| t.fetch_candidate_block(&hash));

match res {
Ok(b) => Ok(b),
Err(err) => {
error!(
Expand All @@ -270,18 +283,7 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
// Candidate block is not found from local
// storage. Cache the certificate and request
// candidate block only.

// Save certificate in case candidate block is
// received only
//self.certificates
// .insert((remote_height, hash),
// quorum.cert);

let mut inv = Inv::new(1);
inv.add_candidate_from_hash(hash);
inv.add_block_from_hash(hash);
flood_request(&self.network, &inv).await;

self.request_block(hash, quorum.cert).await;
Err(err)
}
}
Expand Down Expand Up @@ -351,9 +353,9 @@ impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
// The default cert means the block was retrieved from Candidate
// CF thus missing the certificate. If so, we try to set the valid
// certificate from the cache certificates.

let key = (blk.header().height, blk.header().hash);
if let Some(cert) = self.certificates.get(&key) {
if let Some((cert, _)) =
self.certificates_cache.get(&blk.header().hash)
{
let mut blk = blk.clone();
blk.set_certificate(*cert);
Some(Cow::Owned(blk))
Expand Down

0 comments on commit caed8ea

Please sign in to comment.