Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): introduce FilterIter #1614

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/bitcoind_rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ bdk_chain = { path = "../chain" }
default = ["std"]
std = ["bitcoin/std", "bdk_core/std"]
serde = ["bitcoin/serde", "bdk_core/serde"]

[[example]]
name = "bip158"
101 changes: 101 additions & 0 deletions crates/bitcoind_rpc/examples/bip158.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#![allow(clippy::print_stdout)]
use std::time::Instant;

use bdk_bitcoind_rpc::bip158::{Event, EventInner, FilterIter};
use bdk_chain::bitcoin::{constants::genesis_block, secp256k1::Secp256k1, Network};
use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex;
use bdk_chain::local_chain::LocalChain;
use bdk_chain::miniscript::Descriptor;
use bdk_chain::{BlockId, ConfirmationBlockTime, IndexedTxGraph, SpkIterator};
use bdk_testenv::anyhow;

// This example shows how BDK chain and tx-graph structures are updated using compact filters syncing.
// Assumes a local Signet node, and "RPC_COOKIE" set in environment.

// Usage: `cargo run -p bdk_bitcoind_rpc --example bip158`

const EXTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/0/*)#uswl2jj7";
const INTERNAL: &str = "tr([7d94197e]tprv8ZgxMBicQKsPe1chHGzaa84k1inY2nAXUL8iPSyWESPrEst4E5oCFXhPATqj5fvw34LDknJz7rtXyEC4fKoXryUdc9q87pTTzfQyv61cKdE/86'/1'/0'/1/*)#dyt7h8zx";
const SPK_COUNT: u32 = 25;
const NETWORK: Network = Network::Signet;

fn main() -> anyhow::Result<()> {
// Setup receiving chain and graph structures.
let secp = Secp256k1::new();
let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?;
let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?;
let (mut chain, _) = LocalChain::from_genesis_hash(genesis_block(NETWORK).block_hash());
let mut graph = IndexedTxGraph::<ConfirmationBlockTime, KeychainTxOutIndex<&str>>::new({
let mut index = KeychainTxOutIndex::default();
index.insert_descriptor("external", descriptor.clone())?;
index.insert_descriptor("internal", change_descriptor.clone())?;
index
});

// Assume a minimum birthday height
let block = BlockId {
height: 170_000,
hash: "00000041c812a89f084f633e4cf47e819a2f6b1c0a15162355a930410522c99d".parse()?,
};
let _ = chain.insert_block(block)?;

// Configure RPC client
let rpc_client = bitcoincore_rpc::Client::new(
"127.0.0.1:38332",
bitcoincore_rpc::Auth::CookieFile(std::env::var("RPC_COOKIE")?.into()),
)?;

// Initialize block emitter
let cp = chain.tip();
let start_height = cp.height();
let mut emitter = FilterIter::new_with_checkpoint(&rpc_client, cp);
for (_, desc) in graph.index.keychains() {
let spks = SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, spk)| spk);
emitter.add_spks(spks);
}

let start = Instant::now();

// Sync
if let Some(tip) = emitter.get_tip()? {
let blocks_to_scan = tip.height - start_height;

for event in emitter.by_ref() {
let event = event?;
let curr = event.height();
// apply relevant blocks
if let Event::Block(EventInner { height, ref block }) = event {
let _ = graph.apply_block_relevant(block, height);
println!("Matched block {}", curr);
}
if curr % 1000 == 0 {
let progress = (curr - start_height) as f32 / blocks_to_scan as f32;
println!("[{:.2}%]", progress * 100.0);
}
}
// update chain
if let Some(tip) = emitter.chain_update() {
let _ = chain.apply_update(tip)?;
}
}

println!("\ntook: {}s", start.elapsed().as_secs());
println!("Local tip: {}", chain.tip().height());
let unspent: Vec<_> = graph
.graph()
.filter_chain_unspents(
&chain,
chain.tip().block_id(),
graph.index.outpoints().clone(),
)
.collect();
if !unspent.is_empty() {
println!("\nUnspent");
for (index, utxo) in unspent {
// (k, index) | value | outpoint |
println!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint,);
}
}

Ok(())
}
274 changes: 274 additions & 0 deletions crates/bitcoind_rpc/src/bip158.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
//! Compact block filters sync over RPC, see also [BIP157][0].
//!
//! This module is home to [`FilterIter`], a structure that returns bitcoin blocks by matching
//! a list of script pubkeys against a [BIP158][1] [`BlockFilter`].
//!
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki

use bdk_core::collections::BTreeMap;
use core::fmt;

use bdk_core::bitcoin;
use bdk_core::{BlockId, CheckPoint};
use bitcoin::{
bip158::{self, BlockFilter},
Block, BlockHash, ScriptBuf,
};
use bitcoincore_rpc;
use bitcoincore_rpc::RpcApi;

/// Block height
type Height = u32;

/// Type that generates block [`Event`]s by matching a list of script pubkeys against a
/// [`BlockFilter`].
#[derive(Debug)]
pub struct FilterIter<'c, C> {
// RPC client
client: &'c C,
// SPK inventory
spks: Vec<ScriptBuf>,
// local cp
cp: Option<CheckPoint>,
// blocks map
blocks: BTreeMap<Height, BlockHash>,
// next filter
next_filter: Option<NextFilter>,
// best height counter
height: Height,
// stop height
stop: Height,
}

impl<'c, C: RpcApi> FilterIter<'c, C> {
/// Construct [`FilterIter`] from a given `client` and start `height`.
pub fn new_with_height(client: &'c C, height: u32) -> Self {
Self {
client,
spks: vec![],
cp: None,
blocks: BTreeMap::new(),
next_filter: None,
height,
stop: 0,
}
}

/// Construct [`FilterIter`] from a given `client` and [`CheckPoint`].
pub fn new_with_checkpoint(client: &'c C, cp: CheckPoint) -> Self {
let mut filter_iter = Self::new_with_height(client, cp.height());
filter_iter.cp = Some(cp);
filter_iter
}

/// Extends `self` with an iterator of spks.
pub fn add_spks(&mut self, spks: impl IntoIterator<Item = ScriptBuf>) {
self.spks.extend(spks)
}

/// Add spk to the list of spks to scan with.
pub fn add_spk(&mut self, spk: ScriptBuf) {
self.spks.push(spk);
}

/// Get the next filter and increment the current best height.
///
/// Returns `Ok(None)` when the stop height is exceeded.
fn next_filter(&mut self) -> Result<Option<NextFilter>, Error> {
if self.height > self.stop {
return Ok(None);
}
let height = self.height;
let hash = match self.blocks.get(&height) {
Some(h) => *h,
None => self.client.get_block_hash(height as u64)?,
};
let filter_bytes = self.client.get_block_filter(&hash)?.filter;
let filter = BlockFilter::new(&filter_bytes);
self.height += 1;
Ok(Some((BlockId { height, hash }, filter)))
}

/// Get the remote tip.
///
/// Returns `None` if there's no difference between the height of this [`FilterIter`] and the
/// remote height.
pub fn get_tip(&mut self) -> Result<Option<BlockId>, Error> {
let tip_hash = self.client.get_best_block_hash()?;
let mut header = self.client.get_block_header_info(&tip_hash)?;
let tip_height = header.height as u32;
if self.height == tip_height {
// nothing to do
return Ok(None);
}
self.blocks.insert(tip_height, tip_hash);

// if we have a checkpoint we use a lookback of ten blocks
// to ensure consistency of the local chain
if let Some(cp) = self.cp.as_ref() {
// adjust start height to point of agreement + 1
let base = self.find_base_with(cp.clone())?;
self.height = base.height + 1;

for _ in 0..9 {
let hash = match header.previous_block_hash {
Some(hash) => hash,
None => break,
};
header = self.client.get_block_header_info(&hash)?;
let height = header.height as u32;
if height < self.height {
break;
}
self.blocks.insert(height, hash);
}
}

self.stop = tip_height;

// get the first filter
self.next_filter = self.next_filter()?;

Ok(Some(BlockId {
height: tip_height,
hash: self.blocks[&tip_height],
}))
}
}

/// Alias for a compact filter and associated block id.
type NextFilter = (BlockId, BlockFilter);

/// Event inner type
#[derive(Debug, Clone)]
pub struct EventInner {
/// Height
pub height: Height,
/// Block
pub block: Block,
}

/// Kind of event produced by [`FilterIter`].
#[derive(Debug, Clone)]
pub enum Event {
/// Block
Block(EventInner),
/// No match
NoMatch(Height),
}

impl Event {
/// Whether this event contains a matching block.
pub fn is_match(&self) -> bool {
matches!(self, Event::Block(_))
}

/// Get the height of this event.
pub fn height(&self) -> Height {
match self {
Self::Block(EventInner { height, .. }) => *height,
Self::NoMatch(h) => *h,
}
}
}

impl<'c, C: RpcApi> Iterator for FilterIter<'c, C> {
type Item = Result<Event, Error>;

fn next(&mut self) -> Option<Self::Item> {
if self.spks.is_empty() {
return None;
}

let (block, filter) = self.next_filter.clone()?;

(|| -> Result<_, Error> {
// if the next filter matches any of our watched spks, get the block
// and return it, inserting relevant block ids along the way
let height = block.height;
let hash = block.hash;
let event = if filter
.match_any(&hash, self.spks.iter().map(|script| script.as_bytes()))
.map_err(Error::Bip158)?
{
let block = self.client.get_block(&hash)?;
self.blocks.insert(height, hash);
let inner = EventInner { height, block };
Event::Block(inner)
} else {
Event::NoMatch(height)
};

self.next_filter = self.next_filter()?;
Ok(Some(event))
})()
.transpose()
}
}

impl<'c, C: RpcApi> FilterIter<'c, C> {
/// Returns the point of agreement between `self` and the given `cp`.
fn find_base_with(&mut self, mut cp: CheckPoint) -> Result<BlockId, Error> {
loop {
let height = cp.height();
let fetched_hash = match self.blocks.get(&height) {
Some(hash) => *hash,
None if height == 0 => cp.hash(),
_ => self.client.get_block_hash(height as _)?,
};
if cp.hash() == fetched_hash {
// ensure this block also exists in self
self.blocks.insert(height, cp.hash());
return Ok(cp.block_id());
}
// remember conflicts
self.blocks.insert(height, fetched_hash);
cp = cp.prev().expect("must break before genesis");
}
}

/// Returns a chain update from the newly scanned blocks.
///
/// Returns `None` if this [`FilterIter`] was not constructed using a [`CheckPoint`], or
/// if no blocks have been fetched for example by using [`get_tip`](Self::get_tip).
pub fn chain_update(&mut self) -> Option<CheckPoint> {
if self.cp.is_none() || self.blocks.is_empty() {
return None;
}

// note: to connect with the local chain we must guarantee that `self.blocks.first()`
// is also the point of agreement with `self.cp`.
Some(
CheckPoint::from_block_ids(self.blocks.iter().map(BlockId::from))
.expect("blocks must be in order"),
)
}
}

/// Errors that may occur during a compact filters sync.
#[derive(Debug)]
pub enum Error {
/// bitcoin bip158 error
Bip158(bip158::Error),
/// bitcoincore_rpc error
Rpc(bitcoincore_rpc::Error),
}

impl From<bitcoincore_rpc::Error> for Error {
fn from(e: bitcoincore_rpc::Error) -> Self {
Self::Rpc(e)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Bip158(e) => e.fmt(f),
Self::Rpc(e) => e.fmt(f),
}
}
}

#[cfg(feature = "std")]
impl std::error::Error for Error {}
5 changes: 4 additions & 1 deletion crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@

use bdk_core::{BlockId, CheckPoint};
use bitcoin::{block::Header, Block, BlockHash, Transaction};
pub use bitcoincore_rpc;
use bitcoincore_rpc::bitcoincore_rpc_json;

pub mod bip158;

pub use bitcoincore_rpc;

/// The [`Emitter`] is used to emit data sourced from [`bitcoincore_rpc::Client`].
///
/// Refer to [module-level documentation] for more.
Expand Down
Loading
Loading