From 56e300d8fa713b391ae35211a2d5bcb2c9149655 Mon Sep 17 00:00:00 2001 From: rustaceanrob Date: Mon, 17 Jun 2024 14:21:07 -1000 Subject: [PATCH] multi: [chain] reverse block queue order, [db] load potential fork --- src/chain/block_queue.rs | 20 ++++++++------- src/chain/chain.rs | 48 +++++++++++++++++++++++++++++------ src/chain/checkpoints.rs | 4 +++ src/chain/error.rs | 2 ++ src/db/sqlite/header_db.rs | 30 +++++++++++++--------- src/db/traits.rs | 24 +++++++++++++++--- src/filters/cfheader_chain.rs | 4 +++ src/node/node.rs | 2 +- 8 files changed, 101 insertions(+), 33 deletions(-) diff --git a/src/chain/block_queue.rs b/src/chain/block_queue.rs index 144ff60..ee22a45 100644 --- a/src/chain/block_queue.rs +++ b/src/chain/block_queue.rs @@ -1,23 +1,25 @@ +use std::collections::VecDeque; + use bitcoin::BlockHash; #[derive(Debug)] pub(crate) struct BlockQueue { - queue: Vec, - received: usize, + queue: VecDeque, + want: usize, } impl BlockQueue { pub(crate) fn new() -> Self { Self { - queue: Vec::new(), - received: 0, + queue: VecDeque::new(), + want: 0, } } pub(crate) fn add(&mut self, block: BlockHash) { if !self.contains(&block) { - self.received += 1; - self.queue.push(block) + self.want += 1; + self.queue.push_front(block) } } @@ -26,14 +28,14 @@ impl BlockQueue { } pub(crate) fn pop(&mut self) -> Option { - self.queue.pop() + self.queue.pop_back() } pub(crate) fn receive_one(&mut self) { - self.received = self.received.saturating_sub(1); + self.want = self.want.saturating_sub(1); } pub(crate) fn complete(&self) -> bool { - self.received.eq(&0) && self.queue.is_empty() + self.want.eq(&0) && self.queue.is_empty() } } diff --git a/src/chain/chain.rs b/src/chain/chain.rs index fd66ee5..462cf81 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -34,6 +34,8 @@ use crate::{ IndexedBlock, IndexedTransaction, }; +const MAX_REORG_DEPTH: u32 = 5_000; + #[derive(Debug)] pub(crate) struct Chain { header_chain: HeaderChain, @@ -60,7 +62,7 @@ impl Chain { ) -> Result { let params = params_from_network(network); let mut loaded_headers = db - .load() + .load(anchor.height) .await .map_err(|_| HeaderPersistenceError::SQLite)?; if loaded_headers.len().gt(&0) { @@ -239,11 +241,11 @@ impl Chain { self.header_chain.extend(header_batch.inner()); return Ok(()); } - // We are not accepting floating chains from any peer - // the prev_hash of the last block in their chain does not - // point to any header we know of - if !self.contains_hash(header_batch.first().prev_blockhash) { - return Err(HeaderSyncError::FloatingHeaders); + // We see if we have this previous hash in the database, and reload our + // chain from that hash if so. + let fork_start_hash = header_batch.first().prev_blockhash; + if !self.contains_hash(fork_start_hash) { + self.load_fork(&header_batch).await?; } // self.evaluate_fork(&header_batch).await?; @@ -325,8 +327,6 @@ impl Chain { "Peer is sending us malicious headers, restarting header sync.".into(), ) .await; - // We assume that this would be so rare that we just clear the whole header chain - self.header_chain.clear_all(); return Err(HeaderSyncError::InvalidCheckpoint); } } @@ -388,6 +388,38 @@ impl Chain { } } + async fn load_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> { + let mut db_lock = self.db.lock().await; + let prev_hash = header_batch.first().prev_blockhash; + let maybe_height = db_lock + .height_of(&prev_hash) + .await + .map_err(|_| HeaderSyncError::DbError)?; + match maybe_height { + Some(height) => { + // This is a very generous check to ensure a peer cannot get us to load an + // absurd amount of headers into RAM. Because headers come in batches of 2,000, + // we wouldn't accept a fork of a depth more than around 2,000 anyway. + // The only reorgs that have ever been recorded are of depth 1. + if self.height() - height > MAX_REORG_DEPTH { + Err(HeaderSyncError::FloatingHeaders) + } else { + let older_anchor = HeaderCheckpoint::new(height, prev_hash); + let loaded_headers = db_lock + .load(older_anchor.height) + .await + .map_err(|_| HeaderSyncError::DbError)?; + self.header_chain = HeaderChain::new(older_anchor, loaded_headers); + self.cf_header_chain = + CFHeaderChain::new(older_anchor, self.cf_header_chain.quorum_required()); + self.filter_chain = FilterChain::new(older_anchor); + Ok(()) + } + } + None => Err(HeaderSyncError::FloatingHeaders), + } + } + // Sync the compact filter headers, possibly encountering conflicts pub(crate) async fn sync_cf_headers( &mut self, diff --git a/src/chain/checkpoints.rs b/src/chain/checkpoints.rs index f3c29d8..e4fc47f 100644 --- a/src/chain/checkpoints.rs +++ b/src/chain/checkpoints.rs @@ -96,6 +96,10 @@ pub const SIGNET_HEADER_CP: &[(u32, &str)] = &[ 190000, "0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2", ), + ( + 200000, + "0000007d60f5ffc47975418ac8331c0ea52cf551730ef7ead7ff9082a536f13c", + ), ]; /// A known block hash in the chain of most work. diff --git a/src/chain/error.rs b/src/chain/error.rs index 82c0935..48d6430 100644 --- a/src/chain/error.rs +++ b/src/chain/error.rs @@ -20,6 +20,8 @@ pub enum HeaderSyncError { FloatingHeaders, #[error("less work fork")] LessWorkFork, + #[error("the database could not load a fork")] + DbError, } #[derive(Error, Debug)] diff --git a/src/db/sqlite/header_db.rs b/src/db/sqlite/header_db.rs index 8f169ec..4cb9a17 100644 --- a/src/db/sqlite/header_db.rs +++ b/src/db/sqlite/header_db.rs @@ -10,7 +10,6 @@ use bitcoin::{BlockHash, CompactTarget, Network, TxMerkleNode}; use rusqlite::{params, Connection, Result}; use tokio::sync::Mutex; -use crate::chain::checkpoints::HeaderCheckpoint; use crate::db::error::HeaderDatabaseError; use crate::db::traits::HeaderStore; @@ -29,16 +28,10 @@ const SCHEMA: &str = "CREATE TABLE IF NOT EXISTS headers ( pub(crate) struct SqliteHeaderDb { network: Network, conn: Arc>, - anchor_height: u32, - anchor_hash: BlockHash, } impl SqliteHeaderDb { - pub fn new( - network: Network, - anchor_checkpoint: HeaderCheckpoint, - path: Option, - ) -> Result { + pub fn new(network: Network, path: Option) -> Result { let mut path = path.unwrap_or_else(|| PathBuf::from(".")); path.push("data"); path.push(network.to_string()); @@ -52,8 +45,6 @@ impl SqliteHeaderDb { Ok(Self { network, conn: Arc::new(Mutex::new(conn)), - anchor_height: anchor_checkpoint.height, - anchor_hash: anchor_checkpoint.hash, }) } } @@ -61,7 +52,10 @@ impl SqliteHeaderDb { #[async_trait] impl HeaderStore for SqliteHeaderDb { // load all the known headers from storage - async fn load(&mut self) -> Result, HeaderDatabaseError> { + async fn load( + &mut self, + anchor_height: u32, + ) -> Result, HeaderDatabaseError> { let mut headers = BTreeMap::::new(); let stmt = "SELECT * FROM headers ORDER BY height"; let write_lock = self.conn.lock().await; @@ -74,7 +68,7 @@ impl HeaderStore for SqliteHeaderDb { while let Some(row) = rows.next().map_err(|_| HeaderDatabaseError::LoadError)? { let height: u32 = row.get(0).map_err(|_| HeaderDatabaseError::LoadError)?; // The anchor height should not be included in the chain, as the anchor is non-inclusive - if height.le(&self.anchor_height) { + if height.le(&anchor_height) { continue; } let hash: String = row.get(1).map_err(|_| HeaderDatabaseError::LoadError)?; @@ -191,4 +185,16 @@ impl HeaderStore for SqliteHeaderDb { tx.commit().map_err(|_| HeaderDatabaseError::WriteError)?; Ok(()) } + + async fn height_of<'a>( + &mut self, + block_hash: &'a BlockHash, + ) -> Result, HeaderDatabaseError> { + let write_lock = self.conn.lock().await; + let stmt = "SELECT height FROM headers WHERE block_hash = ?1"; + let row: Option = write_lock + .query_row(stmt, params![block_hash.to_string()], |row| row.get(0)) + .map_err(|_| HeaderDatabaseError::LoadError)?; + Ok(row) + } } diff --git a/src/db/traits.rs b/src/db/traits.rs index 3e84e2c..21e5b4c 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -1,13 +1,16 @@ use std::collections::BTreeMap; use async_trait::async_trait; -use bitcoin::block::Header; +use bitcoin::{block::Header, BlockHash}; use super::error::HeaderDatabaseError; #[async_trait] pub(crate) trait HeaderStore { - async fn load(&mut self) -> Result, HeaderDatabaseError>; + async fn load( + &mut self, + anchor_height: u32, + ) -> Result, HeaderDatabaseError>; async fn write<'a>( &mut self, @@ -19,12 +22,20 @@ pub(crate) trait HeaderStore { header_chain: &'a BTreeMap, height: u32, ) -> Result<(), HeaderDatabaseError>; + + async fn height_of<'a>( + &mut self, + hash: &'a BlockHash, + ) -> Result, HeaderDatabaseError>; } // Do nothing #[async_trait] impl HeaderStore for () { - async fn load(&mut self) -> Result, HeaderDatabaseError> { + async fn load( + &mut self, + _anchor_height: u32, + ) -> Result, HeaderDatabaseError> { Ok(BTreeMap::new()) } @@ -42,6 +53,13 @@ impl HeaderStore for () { ) -> Result<(), HeaderDatabaseError> { Ok(()) } + + async fn height_of<'a>( + &mut self, + _block_hash: &'a BlockHash, + ) -> Result, HeaderDatabaseError> { + Ok(None) + } } impl std::fmt::Debug for dyn HeaderStore + Send + Sync + 'static { diff --git a/src/filters/cfheader_chain.rs b/src/filters/cfheader_chain.rs index 498679b..ca6154b 100644 --- a/src/filters/cfheader_chain.rs +++ b/src/filters/cfheader_chain.rs @@ -151,4 +151,8 @@ impl CFHeaderChain { pub(crate) fn hash_at(&self, block: &BlockHash) -> Option<&FilterHash> { self.block_to_hash.get(block) } + + pub(crate) fn quorum_required(&self) -> usize { + self.quorum_required + } } diff --git a/src/node/node.rs b/src/node/node.rs index 1ffb038..396fd02 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -103,7 +103,7 @@ impl Node { let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last()); checkpoints.prune_up_to(checkpoint); // Load the headers from storage - let db = SqliteHeaderDb::new(network, checkpoint, data_path) + let db = SqliteHeaderDb::new(network, data_path) .map_err(|_| NodeError::LoadError(PersistenceError::HeaderLoadError))?; // A structured way to talk to the client let mut dialog = Dialog::new(ntx);