Skip to content

Commit

Permalink
multi: [db] rewrite reorgs [chain] add block check for filter
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Jun 11, 2024
1 parent d90f468 commit a158d9e
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ With these few simple goals in mind, the tools are set out for developers to cre
- [x] Persist to storage
- [x] Determine if the block hash or height should be the primary key
- [x] Speed up writes with pointers
- [ ] Add "write volatile" to write over heights
- [x] Add "write volatile" to write over heights
- [x] Exponential backoff for locators

#### Filters
Expand Down
1 change: 0 additions & 1 deletion example/signet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ async fn main() {
let (mut node, mut client) = builder
// Add the peers
.add_peers(vec![(peer, 38333), (peer_2, 38333)])
// .add_peers(vec![(peer, 38333)])
// The Bitcoin scripts to monitor
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
Expand Down
30 changes: 25 additions & 5 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
extern crate alloc;
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};

use bitcoin::{
block::Header,
Expand Down Expand Up @@ -56,7 +59,7 @@ impl Chain {
quorum_required: usize,
) -> Result<Self, HeaderPersistenceError> {
let params = params_from_network(network);
let loaded_headers = db
let mut loaded_headers = db
.load()
.await
.map_err(|_| HeaderPersistenceError::SQLite)?;
Expand All @@ -74,7 +77,8 @@ impl Chain {
dialog
.send_warning("Checkpoint anchor mismatch".into())
.await;
return Err(HeaderPersistenceError::GenesisMismatch);
// The header chain did not align, so just start from the anchor
loaded_headers = BTreeMap::new();
} else if loaded_headers
.iter()
.zip(loaded_headers.iter().skip(1))
Expand Down Expand Up @@ -201,6 +205,21 @@ impl Chain {
}
}

// Write the chain to disk, overriding previous heights
pub(crate) async fn flush_over_height(&mut self, height: u32) {
if let Err(e) = self
.db
.lock()
.await
.write_over(self.header_chain.headers(), height)
.await
{
self.dialog
.send_warning(format!("Error persisting to storage: {}", e))
.await;
}
}

// Sync the chain with headers from a peer, adjusting to reorgs if needed
pub(crate) async fn sync_chain(&mut self, message: Vec<Header>) -> Result<(), HeaderSyncError> {
let header_batch = HeadersBatch::new(message).map_err(|_| HeaderSyncError::EmptyMessage)?;
Expand Down Expand Up @@ -354,6 +373,7 @@ impl Chain {
self.dialog
.send_data(NodeMessage::BlocksDisconnected(reorged))
.await;
self.flush_over_height(stem).await;
Ok(())
} else {
self.dialog
Expand Down Expand Up @@ -450,7 +470,7 @@ impl Chain {
}
GetCFHeaders {
filter_type: 0x00,
start_height: (self.cf_header_chain.height() + 1),
start_height: self.cf_header_chain.height() + 1,
stop_hash,
}
}
Expand Down Expand Up @@ -642,7 +662,7 @@ mod tests {
let block_11: Header = deserialize(&hex::decode("00000020efcf8b12221fccc735b9b0b657ce15b31b9c50aff530ce96a5b4cfe02d8c0068496c1b8a89cf5dec22e46c35ea1035f80f5b666a1b3aa7f3d6f0880d0061adcc567e5e66ffff7f2001000000").unwrap()).unwrap();
let batch_2 = vec![new_block_10];
let batch_3 = vec![block_11];
let batch_4 = vec![new_block_10, block_11];
let batch_4 = vec![block_9, new_block_10, block_11];
let chain_sync = chain.sync_chain(batch_1).await;
assert!(chain_sync.is_ok());
// Forks of equal height to the chain should just get rejected
Expand Down
53 changes: 43 additions & 10 deletions src/db/sqlite/header_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,7 @@ impl HeaderStore for SqliteHeaderDb {
"db corruption. headers do not link."
);
}
None => {
assert_eq!(
next_header.prev_blockhash, self.anchor_hash,
"db corruption. headers do not link to anchor."
);
}
None => (),
}
headers.insert(height, next_header);
}
Expand All @@ -131,12 +126,11 @@ impl HeaderStore for SqliteHeaderDb {
let tx = write_lock
.transaction()
.map_err(|_| HeaderDatabaseError::WriteError)?;
let count: u32 = tx
.query_row("SELECT COUNT(*) FROM headers", [], |row| row.get(0))
let best_height: Option<u32> = tx
.query_row("SELECT MAX(height) FROM headers", [], |row| row.get(0))
.map_err(|_| HeaderDatabaseError::WriteError)?;
let adjusted_count = count.saturating_sub(1) + self.anchor_height;
for (height, header) in header_chain {
if height.ge(&(adjusted_count)) {
if height.ge(&(best_height.unwrap_or(0))) {
let hash: String = header.block_hash().to_string();
let version: i32 = header.version.to_consensus();
let prev_hash: String = header.prev_blockhash.as_raw_hash().to_string();
Expand Down Expand Up @@ -169,4 +163,43 @@ impl HeaderStore for SqliteHeaderDb {
tx.commit().map_err(|_| HeaderDatabaseError::WriteError)?;
Ok(())
}

async fn write_over<'a>(
&mut self,
header_chain: &'a BTreeMap<u32, Header>,
height: u32,
) -> Result<(), HeaderDatabaseError> {
let mut write_lock = self.conn.lock().await;
let tx = write_lock
.transaction()
.map_err(|_| HeaderDatabaseError::WriteError)?;
for (h, header) in header_chain {
if h.ge(&height) {
let hash: String = header.block_hash().to_string();
let version: i32 = header.version.to_consensus();
let prev_hash: String = header.prev_blockhash.as_raw_hash().to_string();
let merkle_root: String = header.merkle_root.to_string();
let time: u32 = header.time;
let bits: u32 = header.bits.to_consensus();
let nonce: u32 = header.nonce;
let stmt = "INSERT OR REPLACE INTO headers (height, block_hash, version, prev_hash, merkle_root, time, bits, nonce) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)";
tx.execute(
stmt,
params![
height,
hash,
version,
prev_hash,
merkle_root,
time,
bits,
nonce
],
)
.map_err(|_| HeaderDatabaseError::WriteError)?;
}
}
tx.commit().map_err(|_| HeaderDatabaseError::WriteError)?;
Ok(())
}
}
16 changes: 16 additions & 0 deletions src/db/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ use super::error::HeaderDatabaseError;
#[async_trait]
pub(crate) trait HeaderStore {
async fn load(&mut self) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError>;

async fn write<'a>(
&mut self,
header_chain: &'a BTreeMap<u32, Header>,
) -> Result<(), HeaderDatabaseError>;

async fn write_over<'a>(
&mut self,
header_chain: &'a BTreeMap<u32, Header>,
height: u32,
) -> Result<(), HeaderDatabaseError>;
}

// Do nothing
Expand All @@ -20,12 +27,21 @@ impl HeaderStore for () {
async fn load(&mut self) -> Result<BTreeMap<u32, Header>, HeaderDatabaseError> {
Ok(BTreeMap::new())
}

async fn write<'a>(
&mut self,
_header_chain: &'a BTreeMap<u32, Header>,
) -> Result<(), HeaderDatabaseError> {
Ok(())
}

async fn write_over<'a>(
&mut self,
_header_chain: &'a BTreeMap<u32, Header>,
_height: u32,
) -> Result<(), HeaderDatabaseError> {
Ok(())
}
}

impl std::fmt::Debug for dyn HeaderStore + Send + Sync + 'static {
Expand Down
3 changes: 0 additions & 3 deletions src/filters/cfheader_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ impl CFHeaderChain {
peer_id: u32,
cf_headers: CFHeaderBatch,
) -> Result<AppendAttempt, CFHeaderSyncError> {
if self.merged_queue.contains_key(&peer_id) {
return Err(CFHeaderSyncError::UnexpectedCFHeaderMessage);
}
self.merged_queue.insert(peer_id, cf_headers.inner());
self.try_merge().await
}
Expand Down
44 changes: 43 additions & 1 deletion src/filters/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use bitcoin::{bip158::BlockFilter, BlockHash, FilterHash, ScriptBuf};
use bitcoin::{bip158::BlockFilter, Block, BlockHash, FilterHash, ScriptBuf};
use bitcoin_hashes::{sha256d, Hash};

use super::error::FilterError;
Expand All @@ -27,6 +27,10 @@ impl Filter {
FilterHash::from_raw_hash(hash)
}

fn block_hash(&self) -> &BlockHash {
&self.block_hash
}

pub async fn contains_any(
&mut self,
scripts: &HashSet<ScriptBuf>,
Expand All @@ -38,4 +42,42 @@ impl Filter {
)
.map_err(|_| FilterError::IORead)
}

pub async fn is_filter_for_block(&mut self, block: &Block) -> Result<bool, FilterError> {
// Skip the coinbase transaction
for tx in block.txdata.iter().skip(1) {
let scripts = tx
.output
.iter()
.filter(|output| !output.script_pubkey.is_op_return())
.map(|output| output.script_pubkey.clone());
if !self
.block_filter
.match_all(
&self.block_hash,
&mut scripts.map(|script| script.to_bytes()),
)
.map_err(|_| FilterError::IORead)?
{
return Ok(false);
}
// The filter should not contain OP_RETURN outputs
// let scripts = tx
// .output
// .iter()
// .filter(|output| output.script_pubkey.is_op_return())
// .map(|output| output.script_pubkey.clone());
// if self
// .block_filter
// .match_any(
// &self.block_hash,
// &mut scripts.map(|script| script.to_bytes()),
// )
// .map_err(|_| FilterError::IORead)?
// {
// return Ok(false);
// }
}
Ok(true)
}
}
1 change: 0 additions & 1 deletion src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ impl Node {
if chain.height().le(&new_height) {
chain.set_best_known_height(new_height).await;
}
chain.clear_filter_header_queue();
Some(MainThreadMessage::GetHeaders(next_headers))
}
}
Expand Down

0 comments on commit a158d9e

Please sign in to comment.