diff --git a/README.md b/README.md index 458ad3f..454c850 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,7 @@ With these few simple goals in mind, the tools are set out for developers to cre - [x] Persist to storage - [x] Determine if the block hash or height should be the primary key - [x] Speed up writes with pointers - - [ ] Add "write volatile" to write over heights + - [x] Add "write volatile" to write over heights - [x] Exponential backoff for locators #### Filters diff --git a/example/signet.rs b/example/signet.rs index 51fa0f7..35f30d9 100644 --- a/example/signet.rs +++ b/example/signet.rs @@ -26,7 +26,6 @@ async fn main() { let (mut node, mut client) = builder // Add the peers .add_peers(vec![(peer, 38333), (peer_2, 38333)]) - // .add_peers(vec![(peer, 38333)]) // The Bitcoin scripts to monitor .add_scripts(addresses) // Only scan blocks strictly after an anchor checkpoint diff --git a/src/chain/chain.rs b/src/chain/chain.rs index f4bf4dc..5decfc6 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -1,5 +1,8 @@ extern crate alloc; -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; use bitcoin::{ block::Header, @@ -56,7 +59,7 @@ impl Chain { quorum_required: usize, ) -> Result { let params = params_from_network(network); - let loaded_headers = db + let mut loaded_headers = db .load() .await .map_err(|_| HeaderPersistenceError::SQLite)?; @@ -74,7 +77,8 @@ impl Chain { dialog .send_warning("Checkpoint anchor mismatch".into()) .await; - return Err(HeaderPersistenceError::GenesisMismatch); + // The header chain did not align, so just start from the anchor + loaded_headers = BTreeMap::new(); } else if loaded_headers .iter() .zip(loaded_headers.iter().skip(1)) @@ -201,6 +205,21 @@ impl Chain { } } + // Write the chain to disk, overriding previous heights + pub(crate) async fn flush_over_height(&mut self, height: u32) { + if let Err(e) = self + .db + .lock() + .await + .write_over(self.header_chain.headers(), height) + .await + { + self.dialog + .send_warning(format!("Error persisting to storage: {}", e)) + .await; + } + } + // Sync the chain with headers from a peer, adjusting to reorgs if needed pub(crate) async fn sync_chain(&mut self, message: Vec
) -> Result<(), HeaderSyncError> { let header_batch = HeadersBatch::new(message).map_err(|_| HeaderSyncError::EmptyMessage)?; @@ -354,6 +373,7 @@ impl Chain { self.dialog .send_data(NodeMessage::BlocksDisconnected(reorged)) .await; + self.flush_over_height(stem).await; Ok(()) } else { self.dialog @@ -450,7 +470,7 @@ impl Chain { } GetCFHeaders { filter_type: 0x00, - start_height: (self.cf_header_chain.height() + 1), + start_height: self.cf_header_chain.height() + 1, stop_hash, } } @@ -642,7 +662,7 @@ mod tests { let block_11: Header = deserialize(&hex::decode("00000020efcf8b12221fccc735b9b0b657ce15b31b9c50aff530ce96a5b4cfe02d8c0068496c1b8a89cf5dec22e46c35ea1035f80f5b666a1b3aa7f3d6f0880d0061adcc567e5e66ffff7f2001000000").unwrap()).unwrap(); let batch_2 = vec![new_block_10]; let batch_3 = vec![block_11]; - let batch_4 = vec![new_block_10, block_11]; + let batch_4 = vec![block_9, new_block_10, block_11]; let chain_sync = chain.sync_chain(batch_1).await; assert!(chain_sync.is_ok()); // Forks of equal height to the chain should just get rejected diff --git a/src/db/sqlite/header_db.rs b/src/db/sqlite/header_db.rs index 2eaa25a..d4ca761 100644 --- a/src/db/sqlite/header_db.rs +++ b/src/db/sqlite/header_db.rs @@ -111,12 +111,7 @@ impl HeaderStore for SqliteHeaderDb { "db corruption. headers do not link." ); } - None => { - assert_eq!( - next_header.prev_blockhash, self.anchor_hash, - "db corruption. headers do not link to anchor." - ); - } + None => (), } headers.insert(height, next_header); } @@ -131,12 +126,11 @@ impl HeaderStore for SqliteHeaderDb { let tx = write_lock .transaction() .map_err(|_| HeaderDatabaseError::WriteError)?; - let count: u32 = tx - .query_row("SELECT COUNT(*) FROM headers", [], |row| row.get(0)) + let best_height: Option = tx + .query_row("SELECT MAX(height) FROM headers", [], |row| row.get(0)) .map_err(|_| HeaderDatabaseError::WriteError)?; - let adjusted_count = count.saturating_sub(1) + self.anchor_height; for (height, header) in header_chain { - if height.ge(&(adjusted_count)) { + if height.ge(&(best_height.unwrap_or(0))) { let hash: String = header.block_hash().to_string(); let version: i32 = header.version.to_consensus(); let prev_hash: String = header.prev_blockhash.as_raw_hash().to_string(); @@ -169,4 +163,43 @@ impl HeaderStore for SqliteHeaderDb { tx.commit().map_err(|_| HeaderDatabaseError::WriteError)?; Ok(()) } + + async fn write_over<'a>( + &mut self, + header_chain: &'a BTreeMap, + height: u32, + ) -> Result<(), HeaderDatabaseError> { + let mut write_lock = self.conn.lock().await; + let tx = write_lock + .transaction() + .map_err(|_| HeaderDatabaseError::WriteError)?; + for (h, header) in header_chain { + if h.ge(&height) { + let hash: String = header.block_hash().to_string(); + let version: i32 = header.version.to_consensus(); + let prev_hash: String = header.prev_blockhash.as_raw_hash().to_string(); + let merkle_root: String = header.merkle_root.to_string(); + let time: u32 = header.time; + let bits: u32 = header.bits.to_consensus(); + let nonce: u32 = header.nonce; + let stmt = "INSERT OR REPLACE INTO headers (height, block_hash, version, prev_hash, merkle_root, time, bits, nonce) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"; + tx.execute( + stmt, + params![ + height, + hash, + version, + prev_hash, + merkle_root, + time, + bits, + nonce + ], + ) + .map_err(|_| HeaderDatabaseError::WriteError)?; + } + } + tx.commit().map_err(|_| HeaderDatabaseError::WriteError)?; + Ok(()) + } } diff --git a/src/db/traits.rs b/src/db/traits.rs index 5a6fd21..3e84e2c 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -8,10 +8,17 @@ use super::error::HeaderDatabaseError; #[async_trait] pub(crate) trait HeaderStore { async fn load(&mut self) -> Result, HeaderDatabaseError>; + async fn write<'a>( &mut self, header_chain: &'a BTreeMap, ) -> Result<(), HeaderDatabaseError>; + + async fn write_over<'a>( + &mut self, + header_chain: &'a BTreeMap, + height: u32, + ) -> Result<(), HeaderDatabaseError>; } // Do nothing @@ -20,12 +27,21 @@ impl HeaderStore for () { async fn load(&mut self) -> Result, HeaderDatabaseError> { Ok(BTreeMap::new()) } + async fn write<'a>( &mut self, _header_chain: &'a BTreeMap, ) -> Result<(), HeaderDatabaseError> { Ok(()) } + + async fn write_over<'a>( + &mut self, + _header_chain: &'a BTreeMap, + _height: u32, + ) -> Result<(), HeaderDatabaseError> { + Ok(()) + } } impl std::fmt::Debug for dyn HeaderStore + Send + Sync + 'static { diff --git a/src/filters/cfheader_chain.rs b/src/filters/cfheader_chain.rs index 2268e2c..498679b 100644 --- a/src/filters/cfheader_chain.rs +++ b/src/filters/cfheader_chain.rs @@ -52,9 +52,6 @@ impl CFHeaderChain { peer_id: u32, cf_headers: CFHeaderBatch, ) -> Result { - if self.merged_queue.contains_key(&peer_id) { - return Err(CFHeaderSyncError::UnexpectedCFHeaderMessage); - } self.merged_queue.insert(peer_id, cf_headers.inner()); self.try_merge().await } diff --git a/src/filters/filter.rs b/src/filters/filter.rs index 9901c0b..b301211 100644 --- a/src/filters/filter.rs +++ b/src/filters/filter.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use bitcoin::{bip158::BlockFilter, BlockHash, FilterHash, ScriptBuf}; +use bitcoin::{bip158::BlockFilter, Block, BlockHash, FilterHash, ScriptBuf}; use bitcoin_hashes::{sha256d, Hash}; use super::error::FilterError; @@ -27,6 +27,10 @@ impl Filter { FilterHash::from_raw_hash(hash) } + fn block_hash(&self) -> &BlockHash { + &self.block_hash + } + pub async fn contains_any( &mut self, scripts: &HashSet, @@ -38,4 +42,42 @@ impl Filter { ) .map_err(|_| FilterError::IORead) } + + pub async fn is_filter_for_block(&mut self, block: &Block) -> Result { + // Skip the coinbase transaction + for tx in block.txdata.iter().skip(1) { + let scripts = tx + .output + .iter() + .filter(|output| !output.script_pubkey.is_op_return()) + .map(|output| output.script_pubkey.clone()); + if !self + .block_filter + .match_all( + &self.block_hash, + &mut scripts.map(|script| script.to_bytes()), + ) + .map_err(|_| FilterError::IORead)? + { + return Ok(false); + } + // The filter should not contain OP_RETURN outputs + // let scripts = tx + // .output + // .iter() + // .filter(|output| output.script_pubkey.is_op_return()) + // .map(|output| output.script_pubkey.clone()); + // if self + // .block_filter + // .match_any( + // &self.block_hash, + // &mut scripts.map(|script| script.to_bytes()), + // ) + // .map_err(|_| FilterError::IORead)? + // { + // return Ok(false); + // } + } + Ok(true) + } } diff --git a/src/node/node.rs b/src/node/node.rs index 87bc68b..f9112db 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -549,7 +549,6 @@ impl Node { if chain.height().le(&new_height) { chain.set_best_known_height(new_height).await; } - chain.clear_filter_header_queue(); Some(MainThreadMessage::GetHeaders(next_headers)) } }