diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 82ddaa5c6f..067cf893d8 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -790,7 +790,7 @@ pub mod payload { max_entries, } } - + pub fn add_tx_id(&mut self, id: [u8; 32]) { self.inv_list.push(InvVect { inv_type: InvType::MempoolTx, diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 28e53729bb..ae5c82f620 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -27,7 +27,7 @@ use tracing::{debug, error, info, warn}; const MAX_BLOCKS_TO_REQUEST: i16 = 50; const EXPIRY_TIMEOUT_MILLIS: i16 = 5000; const DEFAULT_HOPS_LIMIT: u16 = 100; - +const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(10); pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5; type SharedHashSet = Arc>>; @@ -74,10 +74,7 @@ pub(crate) struct SimpleFSM { blacklisted_blocks: SharedHashSet, /// Certificates cached from received Quorum messages - /// TODO: Spamming attack will be addressed by adding Quorum/Certificate - /// sanity check - /// TODO: Replace this HashMap with Quorum CF in RocksDB, if needed - certificates: HashMap<(u64, [u8; 32]), Certificate>, + certificates_cache: HashMap<[u8; 32], (Certificate, Instant)>, } impl SimpleFSM { @@ -96,18 +93,18 @@ impl SimpleFSM { acc, network, blacklisted_blocks, - certificates: Default::default(), + certificates_cache: Default::default(), } } pub async fn on_idle(&mut self, timeout: Duration) { let acc = self.acc.read().await; - let height = acc.get_curr_height().await; + let tip_height = acc.get_curr_height().await; let iter = acc.get_curr_iteration().await; if let Ok(last_finalized) = acc.get_latest_final_block().await { info!( event = "fsm::idle", - height, + tip_height, iter, timeout_sec = timeout.as_secs(), "finalized_height" = last_finalized.header().height, @@ -132,6 +129,10 @@ impl SimpleFSM { } else { error!("could not request blocks"); } + + let now = Instant::now(); + self.certificates_cache + .retain(|_, (_, expiry)| *expiry > now); } pub async fn on_failed_consensus(&mut self) { @@ -205,9 +206,28 @@ impl SimpleFSM { } } + let now = Instant::now(); + self.certificates_cache + .retain(|_, (_, expiry)| *expiry > now); + self.certificates_cache.remove(&blk.header().hash); + Ok(()) } + async fn request_block(&mut self, hash: [u8; 32], cert: Certificate) { + // Save certificate in case only candidate block is received + let expiry = Instant::now() + .checked_add(DEFAULT_CERT_CACHE_EXPIRY) + .unwrap(); + self.certificates_cache.insert(hash, (cert, expiry)); + + let mut inv = Inv::new(1); + inv.add_block_from_hash(hash); + inv.add_candidate_from_hash(hash); + + flood_request(&self.network, &inv).await; + } + /// Handles a Quorum message. /// /// Ideally, the winning block will be built from the quorum certificate @@ -220,8 +240,8 @@ impl SimpleFSM { ) -> anyhow::Result<()> { let res = match quorum.cert.result { RatificationResult::Success(Vote::Valid(hash)) => { - let acc = self.acc.read().await; - let local_header = acc.tip_header().await; + let local_header = self.acc.read().await.tip_header().await; + let db = self.acc.read().await.db.clone(); let remote_height = msg.header.round; // Quorum from future @@ -232,14 +252,7 @@ impl SimpleFSM { height = remote_height, ); - // Save certificate in case candidate block is received only - //self.certificates - // .insert((remote_height, hash), quorum.cert); - - let mut inv = Inv::new(1); - inv.add_candidate_from_hash(hash); - inv.add_block_from_hash(hash); - flood_request(&self.network, &inv).await; + self.request_block(hash, quorum.cert).await; Ok(None) } else { @@ -252,12 +265,12 @@ impl SimpleFSM { || (remote_height == local_header.height && local_header.hash != hash) { - match acc - .db + let res = db .read() .await - .view(|t| t.fetch_candidate_block(&hash)) - { + .view(|t| t.fetch_candidate_block(&hash)); + + match res { Ok(b) => Ok(b), Err(err) => { error!( @@ -270,18 +283,7 @@ impl SimpleFSM { // Candidate block is not found from local // storage. Cache the certificate and request // candidate block only. - - // Save certificate in case candidate block is - // received only - //self.certificates - // .insert((remote_height, hash), - // quorum.cert); - - let mut inv = Inv::new(1); - inv.add_candidate_from_hash(hash); - inv.add_block_from_hash(hash); - flood_request(&self.network, &inv).await; - + self.request_block(hash, quorum.cert).await; Err(err) } } @@ -351,9 +353,9 @@ impl SimpleFSM { // The default cert means the block was retrieved from Candidate // CF thus missing the certificate. If so, we try to set the valid // certificate from the cache certificates. - - let key = (blk.header().height, blk.header().hash); - if let Some(cert) = self.certificates.get(&key) { + if let Some((cert, _)) = + self.certificates_cache.get(&blk.header().hash) + { let mut blk = blk.clone(); blk.set_certificate(*cert); Some(Cow::Owned(blk))