From f74580cf6b8434972895dbfdce9f4d869fd33372 Mon Sep 17 00:00:00 2001 From: valued mammal Date: Sat, 14 Sep 2024 13:12:33 -0400 Subject: [PATCH] feat(rpc): introduce FilterIter --- crates/bitcoind_rpc/Cargo.toml | 3 + crates/bitcoind_rpc/examples/bip158.rs | 106 ++++++++++ crates/bitcoind_rpc/src/bip158.rs | 277 +++++++++++++++++++++++++ crates/bitcoind_rpc/src/lib.rs | 5 +- crates/bitcoind_rpc/tests/bip158.rs | 141 +++++++++++++ 5 files changed, 531 insertions(+), 1 deletion(-) create mode 100644 crates/bitcoind_rpc/examples/bip158.rs create mode 100644 crates/bitcoind_rpc/src/bip158.rs create mode 100644 crates/bitcoind_rpc/tests/bip158.rs diff --git a/crates/bitcoind_rpc/Cargo.toml b/crates/bitcoind_rpc/Cargo.toml index a34260074..db7ec6dcc 100644 --- a/crates/bitcoind_rpc/Cargo.toml +++ b/crates/bitcoind_rpc/Cargo.toml @@ -28,3 +28,6 @@ bdk_chain = { path = "../chain" } default = ["std"] std = ["bitcoin/std", "bdk_core/std"] serde = ["bitcoin/serde", "bdk_core/serde"] + +[[example]] +name = "bip158" diff --git a/crates/bitcoind_rpc/examples/bip158.rs b/crates/bitcoind_rpc/examples/bip158.rs new file mode 100644 index 000000000..85262c977 --- /dev/null +++ b/crates/bitcoind_rpc/examples/bip158.rs @@ -0,0 +1,106 @@ +#![allow(clippy::print_stdout)] +use std::time::Instant; + +use anyhow::Context; +use bdk_bitcoind_rpc::bip158::{Event, EventInner, FilterIter}; +use bdk_chain::bitcoin::{constants::genesis_block, secp256k1::Secp256k1, Network}; +use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex; +use bdk_chain::local_chain::LocalChain; +use bdk_chain::miniscript::Descriptor; +use bdk_chain::{BlockId, ConfirmationBlockTime, IndexedTxGraph, SpkIterator}; +use bdk_testenv::anyhow; + +// This example shows how BDK chain and tx-graph structures are updated using compact +// filters syncing. Assumes a connection can be made to a bitcoin node via environment +// variables `RPC_URL` and `RPC_COOKIE`. + +// Usage: `cargo run -p bdk_bitcoind_rpc --example bip158` + +const EXTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/0/*)#uswl2jj7"; +const INTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/1/*)#dyt7h8zx"; +const SPK_COUNT: u32 = 25; +const NETWORK: Network = Network::Signet; + +const START_HEIGHT: u32 = 170_000; +const START_HASH: &str = "00000041c812a89f084f633e4cf47e819a2f6b1c0a15162355a930410522c99d"; + +fn main() -> anyhow::Result<()> { + // Setup receiving chain and graph structures. + let secp = Secp256k1::new(); + let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?; + let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?; + let (mut chain, _) = LocalChain::from_genesis_hash(genesis_block(NETWORK).block_hash()); + let mut graph = IndexedTxGraph::>::new({ + let mut index = KeychainTxOutIndex::default(); + index.insert_descriptor("external", descriptor.clone())?; + index.insert_descriptor("internal", change_descriptor.clone())?; + index + }); + + // Assume a minimum birthday height + let block = BlockId { + height: START_HEIGHT, + hash: START_HASH.parse()?, + }; + let _ = chain.insert_block(block)?; + + // Configure RPC client + let url = std::env::var("RPC_URL").context("must set RPC_URL")?; + let cookie = std::env::var("RPC_COOKIE").context("must set RPC_COOKIE")?; + let rpc_client = + bitcoincore_rpc::Client::new(&url, bitcoincore_rpc::Auth::CookieFile(cookie.into()))?; + + // Initialize block emitter + let cp = chain.tip(); + let start_height = cp.height(); + let mut emitter = FilterIter::new_with_checkpoint(&rpc_client, cp); + for (_, desc) in graph.index.keychains() { + let spks = SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, spk)| spk); + emitter.add_spks(spks); + } + + let start = Instant::now(); + + // Sync + if let Some(tip) = emitter.get_tip()? { + let blocks_to_scan = tip.height - start_height; + + for event in emitter.by_ref() { + let event = event?; + let curr = event.height(); + // apply relevant blocks + if let Event::Block(EventInner { height, ref block }) = event { + let _ = graph.apply_block_relevant(block, height); + println!("Matched block {}", curr); + } + if curr % 1000 == 0 { + let progress = (curr - start_height) as f32 / blocks_to_scan as f32; + println!("[{:.2}%]", progress * 100.0); + } + } + // update chain + if let Some(tip) = emitter.chain_update() { + let _ = chain.apply_update(tip)?; + } + } + + println!("\ntook: {}s", start.elapsed().as_secs()); + println!("Local tip: {}", chain.tip().height()); + let unspent: Vec<_> = graph + .graph() + .filter_chain_unspents( + &chain, + chain.tip().block_id(), + graph.index.outpoints().clone(), + ) + .collect(); + if !unspent.is_empty() { + println!("\nUnspent"); + for (index, utxo) in unspent { + // (k, index) | value | outpoint | + println!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint,); + } + } + + Ok(()) +} diff --git a/crates/bitcoind_rpc/src/bip158.rs b/crates/bitcoind_rpc/src/bip158.rs new file mode 100644 index 000000000..6c0200ad9 --- /dev/null +++ b/crates/bitcoind_rpc/src/bip158.rs @@ -0,0 +1,277 @@ +//! Compact block filters sync over RPC, see also [BIP157][0]. +//! +//! This module is home to [`FilterIter`], a structure that returns bitcoin blocks by matching +//! a list of script pubkeys against a [BIP158][1] [`BlockFilter`]. +//! +//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki +//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki + +use bdk_core::collections::BTreeMap; +use core::fmt; + +use bdk_core::bitcoin; +use bdk_core::{BlockId, CheckPoint}; +use bitcoin::{ + bip158::{self, BlockFilter}, + Block, BlockHash, ScriptBuf, +}; +use bitcoincore_rpc; +use bitcoincore_rpc::RpcApi; + +/// Block height +type Height = u32; + +/// Type that generates block [`Event`]s by matching a list of script pubkeys against a +/// [`BlockFilter`]. +#[derive(Debug)] +pub struct FilterIter<'c, C> { + // RPC client + client: &'c C, + // SPK inventory + spks: Vec, + // local cp + cp: Option, + // blocks map + blocks: BTreeMap, + // next filter + next_filter: Option, + // best height counter + height: Height, + // stop height + stop: Height, +} + +impl<'c, C: RpcApi> FilterIter<'c, C> { + /// Construct [`FilterIter`] from a given `client` and start `height`. + pub fn new_with_height(client: &'c C, height: u32) -> Self { + Self { + client, + spks: vec![], + cp: None, + blocks: BTreeMap::new(), + next_filter: None, + height, + stop: 0, + } + } + + /// Construct [`FilterIter`] from a given `client` and [`CheckPoint`]. + pub fn new_with_checkpoint(client: &'c C, cp: CheckPoint) -> Self { + let mut filter_iter = Self::new_with_height(client, cp.height()); + filter_iter.cp = Some(cp); + filter_iter + } + + /// Extends `self` with an iterator of spks. + pub fn add_spks(&mut self, spks: impl IntoIterator) { + self.spks.extend(spks) + } + + /// Add spk to the list of spks to scan with. + pub fn add_spk(&mut self, spk: ScriptBuf) { + self.spks.push(spk); + } + + /// Get the next filter and increment the current best height. + /// + /// Returns `Ok(None)` when the stop height is exceeded. + fn next_filter(&mut self) -> Result, Error> { + if self.height > self.stop { + return Ok(None); + } + let height = self.height; + let hash = match self.blocks.get(&height) { + Some(h) => *h, + None => self.client.get_block_hash(height as u64)?, + }; + let filter_bytes = self.client.get_block_filter(&hash)?.filter; + let filter = BlockFilter::new(&filter_bytes); + self.height += 1; + Ok(Some((BlockId { height, hash }, filter))) + } + + /// Get the remote tip. + /// + /// Returns `None` if there's no difference between the height of this [`FilterIter`] and the + /// remote height. + pub fn get_tip(&mut self) -> Result, Error> { + let tip_hash = self.client.get_best_block_hash()?; + let mut header = self.client.get_block_header_info(&tip_hash)?; + let tip_height = header.height as u32; + if self.height == tip_height { + // nothing to do + return Ok(None); + } + self.blocks.insert(tip_height, tip_hash); + + // if we have a checkpoint we use a lookback of ten blocks + // to ensure consistency of the local chain + if let Some(cp) = self.cp.as_ref() { + // adjust start height to point of agreement + 1 + let base = self.find_base_with(cp.clone())?; + self.height = base.height + 1; + + for _ in 0..9 { + let hash = match header.previous_block_hash { + Some(hash) => hash, + None => break, + }; + header = self.client.get_block_header_info(&hash)?; + let height = header.height as u32; + if height < self.height { + break; + } + self.blocks.insert(height, hash); + } + } + + self.stop = tip_height; + + // get the first filter + self.next_filter = self.next_filter()?; + + Ok(Some(BlockId { + height: tip_height, + hash: self.blocks[&tip_height], + })) + } +} + +/// Alias for a compact filter and associated block id. +type NextFilter = (BlockId, BlockFilter); + +/// Event inner type +#[derive(Debug, Clone)] +pub struct EventInner { + /// Height + pub height: Height, + /// Block + pub block: Block, +} + +/// Kind of event produced by [`FilterIter`]. +#[derive(Debug, Clone)] +pub enum Event { + /// Block + Block(EventInner), + /// No match + NoMatch(Height), +} + +impl Event { + /// Whether this event contains a matching block. + pub fn is_match(&self) -> bool { + matches!(self, Event::Block(_)) + } + + /// Get the height of this event. + pub fn height(&self) -> Height { + match self { + Self::Block(EventInner { height, .. }) => *height, + Self::NoMatch(h) => *h, + } + } +} + +impl<'c, C: RpcApi> Iterator for FilterIter<'c, C> { + type Item = Result; + + fn next(&mut self) -> Option { + let (block, filter) = self.next_filter.clone()?; + + (|| -> Result<_, Error> { + // if the next filter matches any of our watched spks, get the block + // and return it, inserting relevant block ids along the way + let height = block.height; + let hash = block.hash; + + let result = if self.spks.is_empty() { + Err(Error::NoScripts) + } else if filter + .match_any(&hash, self.spks.iter().map(|script| script.as_bytes())) + .map_err(Error::Bip158)? + { + let block = self.client.get_block(&hash)?; + self.blocks.insert(height, hash); + let inner = EventInner { height, block }; + Ok(Some(Event::Block(inner))) + } else { + Ok(Some(Event::NoMatch(height))) + }; + + self.next_filter = self.next_filter()?; + + result + })() + .transpose() + } +} + +impl<'c, C: RpcApi> FilterIter<'c, C> { + /// Returns the point of agreement between `self` and the given `cp`. + fn find_base_with(&mut self, mut cp: CheckPoint) -> Result { + loop { + let height = cp.height(); + let fetched_hash = match self.blocks.get(&height) { + Some(hash) => *hash, + None if height == 0 => cp.hash(), + _ => self.client.get_block_hash(height as _)?, + }; + if cp.hash() == fetched_hash { + // ensure this block also exists in self + self.blocks.insert(height, cp.hash()); + return Ok(cp.block_id()); + } + // remember conflicts + self.blocks.insert(height, fetched_hash); + cp = cp.prev().expect("must break before genesis"); + } + } + + /// Returns a chain update from the newly scanned blocks. + /// + /// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or + /// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip). + pub fn chain_update(&mut self) -> Option { + if self.cp.is_none() || self.blocks.is_empty() { + return None; + } + + // note: to connect with the local chain we must guarantee that `self.blocks.first()` + // is also the point of agreement with `self.cp`. + Some( + CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from)) + .expect("blocks must be in order"), + ) + } +} + +/// Errors that may occur during a compact filters sync. +#[derive(Debug)] +pub enum Error { + /// bitcoin bip158 error + Bip158(bip158::Error), + /// attempted to scan blocks without any script pubkeys + NoScripts, + /// bitcoincore_rpc error + Rpc(bitcoincore_rpc::Error), +} + +impl From for Error { + fn from(e: bitcoincore_rpc::Error) -> Self { + Self::Rpc(e) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Bip158(e) => e.fmt(f), + Self::NoScripts => write!(f, "no script pubkeys were provided to match with"), + Self::Rpc(e) => e.fmt(f), + } + } +} + +#[cfg(feature = "std")] +impl std::error::Error for Error {} diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 49121cead..3fa17ef19 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -11,9 +11,12 @@ use bdk_core::{BlockId, CheckPoint}; use bitcoin::{block::Header, Block, BlockHash, Transaction}; -pub use bitcoincore_rpc; use bitcoincore_rpc::bitcoincore_rpc_json; +pub mod bip158; + +pub use bitcoincore_rpc; + /// The [`Emitter`] is used to emit data sourced from [`bitcoincore_rpc::Client`]. /// /// Refer to [module-level documentation] for more. diff --git a/crates/bitcoind_rpc/tests/bip158.rs b/crates/bitcoind_rpc/tests/bip158.rs new file mode 100644 index 000000000..0b681defc --- /dev/null +++ b/crates/bitcoind_rpc/tests/bip158.rs @@ -0,0 +1,141 @@ +use bitcoin::{constants, Network}; + +use bdk_bitcoind_rpc::bip158::FilterIter; +use bdk_core::{BlockId, CheckPoint}; +use bdk_testenv::{anyhow, bitcoind, block_id, TestEnv}; +use bitcoincore_rpc::RpcApi; + +fn testenv() -> anyhow::Result { + let mut conf = bitcoind::Conf::default(); + conf.args.push("-blockfilterindex=1"); + conf.args.push("-peerblockfilters=1"); + TestEnv::new_with_config(bdk_testenv::Config { + bitcoind: conf, + ..Default::default() + }) +} + +// Test the result of `chain_update` given a local checkpoint. +// +// new blocks +// 2--3--4--5--6--7--8--9--10--11 +// +// case 1: base below new blocks +// 0- +// case 2: base overlaps with new blocks +// 0--1--2--3--4 +// case 3: stale tip (with overlap) +// 0--1--2--3--4--x +// case 4: stale tip (no overlap) +// 0--x +#[test] +fn get_tip_and_chain_update() -> anyhow::Result<()> { + let env = testenv()?; + + let genesis_hash = constants::genesis_block(Network::Regtest).block_hash(); + let genesis = BlockId { + height: 0, + hash: genesis_hash, + }; + + let hash = env.rpc_client().get_best_block_hash()?; + let header = env.rpc_client().get_block_header_info(&hash)?; + assert_eq!(header.height, 1); + let block_1 = BlockId { + height: header.height as u32, + hash, + }; + + // `FilterIter` will try to return up to ten recent blocks + // so we keep them for reference + let new_blocks: Vec = (2..=11) + .zip(env.mine_blocks(10, None)?) + .map(BlockId::from) + .collect(); + + struct TestCase { + // name + name: &'static str, + // local blocks + chain: Vec, + // expected blocks + exp: Vec, + } + + // For each test we create a new `FilterIter` with the checkpoint given + // by the blocks in the test chain. Then we sync to the remote tip and + // check the blocks that are returned in the chain update. + [ + TestCase { + name: "point of agreement below new blocks, expect base + new", + chain: vec![genesis, block_1], + exp: [block_1].into_iter().chain(new_blocks.clone()).collect(), + }, + TestCase { + name: "point of agreement genesis, expect base + new", + chain: vec![genesis], + exp: [genesis].into_iter().chain(new_blocks.clone()).collect(), + }, + TestCase { + name: "point of agreement within new blocks, expect base + remaining", + chain: new_blocks[..=2].to_vec(), + exp: new_blocks[2..].to_vec(), + }, + TestCase { + name: "stale tip within new blocks, expect base + corrected + remaining", + // base height: 4, stale height: 5 + chain: vec![new_blocks[2], block_id!(5, "E")], + exp: new_blocks[2..].to_vec(), + }, + TestCase { + name: "stale tip below new blocks, expect base + corrected + new", + chain: vec![genesis, block_id!(1, "A")], + exp: [genesis, block_1].into_iter().chain(new_blocks).collect(), + }, + ] + .into_iter() + .for_each(|test| { + let cp = CheckPoint::from_block_ids(test.chain).unwrap(); + let mut iter = FilterIter::new_with_checkpoint(env.rpc_client(), cp); + let _ = iter.get_tip().unwrap(); + let update_cp = iter.chain_update().unwrap(); + let mut update_blocks: Vec<_> = update_cp.iter().map(|cp| cp.block_id()).collect(); + update_blocks.reverse(); + assert_eq!(update_blocks, test.exp, "{}", test.name); + }); + + Ok(()) +} + +#[test] +fn filter_iter_error_no_scripts() -> anyhow::Result<()> { + use bdk_bitcoind_rpc::bip158::Error; + let env = testenv()?; + + let addr = env + .rpc_client() + .get_new_address(None, None)? + .assume_checked(); + let spk = addr.script_pubkey(); + let _ = env.mine_blocks(9, Some(addr))?; + + let mut iter = FilterIter::new_with_height(env.rpc_client(), 1); + let tip = iter.get_tip()?.unwrap(); + assert_eq!(tip.height, 10); + + // iterator should return ten errors + for res in iter.by_ref().take(10) { + assert!(matches!(res, Err(Error::NoScripts))); + } + assert!(iter.next().is_none()); + + // now add scripts and rescan + iter = FilterIter::new_with_height(env.rpc_client(), 1); + iter.add_spk(spk); + let _ = iter.get_tip()?; + for res in iter { + assert!(res.is_ok()); + } + + Ok(()) +}