Skip to content

Commit

Permalink
Merge pull request #5 from rustaceanrob/load-fork
Browse files Browse the repository at this point in the history
multi: [chain] reverse block queue order, [db] load potential fork
  • Loading branch information
rustaceanrob authored Jun 18, 2024
2 parents 8209b81 + 56e300d commit 1f2f0b4
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 33 deletions.
20 changes: 11 additions & 9 deletions src/chain/block_queue.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::collections::VecDeque;

use bitcoin::BlockHash;

#[derive(Debug)]
pub(crate) struct BlockQueue {
queue: Vec<BlockHash>,
received: usize,
queue: VecDeque<BlockHash>,
want: usize,
}

impl BlockQueue {
pub(crate) fn new() -> Self {
Self {
queue: Vec::new(),
received: 0,
queue: VecDeque::new(),
want: 0,
}
}

pub(crate) fn add(&mut self, block: BlockHash) {
if !self.contains(&block) {
self.received += 1;
self.queue.push(block)
self.want += 1;
self.queue.push_front(block)
}
}

Expand All @@ -26,14 +28,14 @@ impl BlockQueue {
}

pub(crate) fn pop(&mut self) -> Option<BlockHash> {
self.queue.pop()
self.queue.pop_back()
}

pub(crate) fn receive_one(&mut self) {
self.received = self.received.saturating_sub(1);
self.want = self.want.saturating_sub(1);
}

pub(crate) fn complete(&self) -> bool {
self.received.eq(&0) && self.queue.is_empty()
self.want.eq(&0) && self.queue.is_empty()
}
}
48 changes: 40 additions & 8 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ use crate::{
IndexedBlock, IndexedTransaction,
};

const MAX_REORG_DEPTH: u32 = 5_000;

#[derive(Debug)]
pub(crate) struct Chain {
header_chain: HeaderChain,
Expand All @@ -60,7 +62,7 @@ impl Chain {
) -> Result<Self, HeaderPersistenceError> {
let params = params_from_network(network);
let mut loaded_headers = db
.load()
.load(anchor.height)
.await
.map_err(|_| HeaderPersistenceError::SQLite)?;
if loaded_headers.len().gt(&0) {
Expand Down Expand Up @@ -239,11 +241,11 @@ impl Chain {
self.header_chain.extend(header_batch.inner());
return Ok(());
}
// We are not accepting floating chains from any peer
// the prev_hash of the last block in their chain does not
// point to any header we know of
if !self.contains_hash(header_batch.first().prev_blockhash) {
return Err(HeaderSyncError::FloatingHeaders);
// We see if we have this previous hash in the database, and reload our
// chain from that hash if so.
let fork_start_hash = header_batch.first().prev_blockhash;
if !self.contains_hash(fork_start_hash) {
self.load_fork(&header_batch).await?;
}
//
self.evaluate_fork(&header_batch).await?;
Expand Down Expand Up @@ -325,8 +327,6 @@ impl Chain {
"Peer is sending us malicious headers, restarting header sync.".into(),
)
.await;
// We assume that this would be so rare that we just clear the whole header chain
self.header_chain.clear_all();
return Err(HeaderSyncError::InvalidCheckpoint);
}
}
Expand Down Expand Up @@ -388,6 +388,38 @@ impl Chain {
}
}

async fn load_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> {
let mut db_lock = self.db.lock().await;
let prev_hash = header_batch.first().prev_blockhash;
let maybe_height = db_lock
.height_of(&prev_hash)
.await
.map_err(|_| HeaderSyncError::DbError)?;
match maybe_height {
Some(height) => {
// This is a very generous check to ensure a peer cannot get us to load an
// absurd amount of headers into RAM. Because headers come in batches of 2,000,
// we wouldn't accept a fork of a depth more than around 2,000 anyway.
// The only reorgs that have ever been recorded are of depth 1.
if self.height() - height > MAX_REORG_DEPTH {
Err(HeaderSyncError::FloatingHeaders)
} else {
let older_anchor = HeaderCheckpoint::new(height, prev_hash);
let loaded_headers = db_lock
.load(older_anchor.height)
.await
.map_err(|_| HeaderSyncError::DbError)?;
self.header_chain = HeaderChain::new(older_anchor, loaded_headers);
self.cf_header_chain =
CFHeaderChain::new(older_anchor, self.cf_header_chain.quorum_required());
self.filter_chain = FilterChain::new(older_anchor);
Ok(())
}
}
None => Err(HeaderSyncError::FloatingHeaders),
}
}

// Sync the compact filter headers, possibly encountering conflicts
pub(crate) async fn sync_cf_headers(
&mut self,
Expand Down
4 changes: 4 additions & 0 deletions src/chain/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ pub const SIGNET_HEADER_CP: &[(u32, &str)] = &[
190000,
"0000013a6143b7360b7ba3834316b3265ee9072dde440bd45f99c01c42abaef2",
),
(
200000,
"0000007d60f5ffc47975418ac8331c0ea52cf551730ef7ead7ff9082a536f13c",
),
];

/// A known block hash in the chain of most work.
Expand Down
2 changes: 2 additions & 0 deletions src/chain/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum HeaderSyncError {
FloatingHeaders,
#[error("less work fork")]
LessWorkFork,
#[error("the database could not load a fork")]
DbError,
}

#[derive(Error, Debug)]
Expand Down
30 changes: 18 additions & 12 deletions src/db/sqlite/header_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use bitcoin::{BlockHash, CompactTarget, Network, TxMerkleNode};
use rusqlite::{params, Connection, Result};
use tokio::sync::Mutex;

use crate::chain::checkpoints::HeaderCheckpoint;
use crate::db::error::HeaderDatabaseError;
use crate::db::traits::HeaderStore;

Expand All @@ -29,16 +28,10 @@ const SCHEMA: &str = "CREATE TABLE IF NOT EXISTS headers (
pub(crate) struct SqliteHeaderDb {
network: Network,
conn: Arc<Mutex<Connection>>,
anchor_height: u32,
anchor_hash: BlockHash,
}

impl SqliteHeaderDb {
pub fn new(
network: Network,
anchor_checkpoint: HeaderCheckpoint,
path: Option<PathBuf>,
) -> Result<Self, HeaderDatabaseError> {
pub fn new(network: Network, path: Option<PathBuf>) -> Result<Self, HeaderDatabaseError> {
let mut path = path.unwrap_or_else(|| PathBuf::from("."));
path.push("data");
path.push(network.to_string());
Expand All @@ -52,16 +45,17 @@ impl SqliteHeaderDb {
Ok(Self {
network,
conn: Arc::new(Mutex::new(conn)),
anchor_height: anchor_checkpoint.height,
anchor_hash: anchor_checkpoint.hash,
})
}
}

#[async_trait]
impl HeaderStore for SqliteHeaderDb {
// load all the known headers from storage
async fn load(&mut self) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError> {
async fn load(
&mut self,
anchor_height: u32,
) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError> {
let mut headers = BTreeMap::<u32, Header>::new();
let stmt = "SELECT * FROM headers ORDER BY height";
let write_lock = self.conn.lock().await;
Expand All @@ -74,7 +68,7 @@ impl HeaderStore for SqliteHeaderDb {
while let Some(row) = rows.next().map_err(|_| HeaderDatabaseError::LoadError)? {
let height: u32 = row.get(0).map_err(|_| HeaderDatabaseError::LoadError)?;
// The anchor height should not be included in the chain, as the anchor is non-inclusive
if height.le(&self.anchor_height) {
if height.le(&anchor_height) {
continue;
}
let hash: String = row.get(1).map_err(|_| HeaderDatabaseError::LoadError)?;
Expand Down Expand Up @@ -191,4 +185,16 @@ impl HeaderStore for SqliteHeaderDb {
tx.commit().map_err(|_| HeaderDatabaseError::WriteError)?;
Ok(())
}

async fn height_of<'a>(
&mut self,
block_hash: &'a BlockHash,
) -> Result<Option<u32>, HeaderDatabaseError> {
let write_lock = self.conn.lock().await;
let stmt = "SELECT height FROM headers WHERE block_hash = ?1";
let row: Option<u32> = write_lock
.query_row(stmt, params![block_hash.to_string()], |row| row.get(0))
.map_err(|_| HeaderDatabaseError::LoadError)?;
Ok(row)
}
}
24 changes: 21 additions & 3 deletions src/db/traits.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::collections::BTreeMap;

use async_trait::async_trait;
use bitcoin::block::Header;
use bitcoin::{block::Header, BlockHash};

use super::error::HeaderDatabaseError;

#[async_trait]
pub(crate) trait HeaderStore {
async fn load(&mut self) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError>;
async fn load(
&mut self,
anchor_height: u32,
) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError>;

async fn write<'a>(
&mut self,
Expand All @@ -19,12 +22,20 @@ pub(crate) trait HeaderStore {
header_chain: &'a BTreeMap<u32, Header>,
height: u32,
) -> Result<(), HeaderDatabaseError>;

async fn height_of<'a>(
&mut self,
hash: &'a BlockHash,
) -> Result<Option<u32>, HeaderDatabaseError>;
}

// Do nothing
#[async_trait]
impl HeaderStore for () {
async fn load(&mut self) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError> {
async fn load(
&mut self,
_anchor_height: u32,
) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError> {
Ok(BTreeMap::new())
}

Expand All @@ -42,6 +53,13 @@ impl HeaderStore for () {
) -> Result<(), HeaderDatabaseError> {
Ok(())
}

async fn height_of<'a>(
&mut self,
_block_hash: &'a BlockHash,
) -> Result<Option<u32>, HeaderDatabaseError> {
Ok(None)
}
}

impl std::fmt::Debug for dyn HeaderStore + Send + Sync + 'static {
Expand Down
4 changes: 4 additions & 0 deletions src/filters/cfheader_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,8 @@ impl CFHeaderChain {
pub(crate) fn hash_at(&self, block: &BlockHash) -> Option<&FilterHash> {
self.block_to_hash.get(block)
}

pub(crate) fn quorum_required(&self) -> usize {
self.quorum_required
}
}
2 changes: 1 addition & 1 deletion src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl Node {
let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last());
checkpoints.prune_up_to(checkpoint);
// Load the headers from storage
let db = SqliteHeaderDb::new(network, checkpoint, data_path)
let db = SqliteHeaderDb::new(network, data_path)
.map_err(|_| NodeError::LoadError(PersistenceError::HeaderLoadError))?;
// A structured way to talk to the client
let mut dialog = Dialog::new(ntx);
Expand Down

0 comments on commit 1f2f0b4

Please sign in to comment.