Skip to content

Commit

Permalink
multi: [example] start rescan [chain] handle dup cf headers
Browse files Browse the repository at this point in the history
  • Loading branch information
rustaceanrob committed Jun 13, 2024
1 parent 3249894 commit c974121
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ tokio = { version = "1", default-features = false, features = [
name = "signet"
path = "example/signet.rs"

[[example]]
name = "rescan"
path = "example/rescan.rs"

[lib]
name = "kyoto"
path = "src/lib.rs"
93 changes: 93 additions & 0 deletions example/rescan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use bitcoin::BlockHash;
use kyoto::node::messages::NodeMessage;
use kyoto::{chain::checkpoints::HeaderCheckpoint, node::builder::NodeBuilder};
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
str::FromStr,
};

#[tokio::main]
async fn main() {
// Add third-party logging
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Add Bitcoin scripts to scan the blockchain for
let address = bitcoin::Address::from_str("tb1q9pvjqz5u5sdgpatg3wn0ce438u5cyv85lly0pc")
.unwrap()
.require_network(bitcoin::Network::Signet)
.unwrap();
let addresses = vec![address];
// Add preferred peers to connect to
let peer = IpAddr::V4(Ipv4Addr::new(95, 217, 198, 121));
let peer_2 = IpAddr::V4(Ipv4Addr::new(23, 137, 57, 100));
// Create a new node builder
let builder = NodeBuilder::new(bitcoin::Network::Signet);
// Add node preferences and build the node/client
let (mut node, mut client) = builder
// Add the peers
.add_peers(vec![(peer, 38333), (peer_2, 38333)])
// The Bitcoin scripts to monitor
.add_scripts(addresses)
// Only scan blocks strictly after an anchor checkpoint
.anchor_checkpoint(HeaderCheckpoint::new(
170_000,
BlockHash::from_str("00000041c812a89f084f633e4cf47e819a2f6b1c0a15162355a930410522c99d")
.unwrap(),
))
// The number of connections we would like to maintain
.num_required_peers(2)
// Create the node and client
.build_node()
.await;
// Run the node and wait for the sync message;
tokio::task::spawn(async move { node.run().await });
tracing::info!("Running the node and waiting for a sync message. Please wait a minute!");
// Split the client into components that send messages and listen to messages
let (mut sender, mut receiver) = client.split();
// Sync with the single script added
loop {
if let Ok(message) = receiver.recv().await {
match message {
NodeMessage::Dialog(d) => tracing::info!("{}", d),
NodeMessage::Warning(e) => tracing::warn!("{}", e),
NodeMessage::Synced(tip) => {
tracing::info!("Synced chain up to block {}", tip.height,);
tracing::info!("Chain tip: {}", tip.hash.to_string(),);
break;
}
_ => (),
}
}
}
// Add new scripts to the node.
let mut new_scripts = HashSet::new();
let new_script = bitcoin::Address::from_str(
"tb1par6ufhp0t448t908kyyvkp3a48r42qcjmg0z9p6a0zuakc44nn2seh63jr",
)
.unwrap()
.require_network(bitcoin::Network::Signet)
.unwrap()
.into();
new_scripts.insert(new_script);
sender.add_scripts(new_scripts).await.unwrap();
// // Tell the node to look for these new scripts
sender.rescan().await.unwrap();
// Continually listen for events until the node has rescaned the filters.
loop {
if let Ok(message) = receiver.recv().await {
match message {
NodeMessage::Dialog(d) => tracing::info!("{}", d),
NodeMessage::Warning(e) => tracing::warn!("{}", e),
NodeMessage::Synced(tip) => {
tracing::info!("Synced chain up to block {}", tip.height,);
tracing::info!("Chain tip: {}", tip.hash.to_string(),);
break;
}
_ => (),
}
}
}
let _ = sender.shutdown().await;
tracing::info!("Shutting down");
}
12 changes: 12 additions & 0 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,18 @@ impl Chain {
.unwrap(),
)
.await;
match batch.last_header() {
Some(batch_last) => {
if let Some(prev_header) = self.cf_header_chain.prev_header() {
// A new block was mined and we ended up asking for this batch twice,
// or the quorum required is less than our connected peers.
if batch_last.eq(&prev_header) {
return Ok(CFHeaderSyncResult::AddedToQueue);
}
}
}
None => return Err(CFHeaderSyncError::EmptyMessage),
}
self.audit_cf_headers(&batch).await?;
match self.cf_header_chain.append(peer_id, batch).await? {
AppendAttempt::AddedToQueue => Ok(CFHeaderSyncResult::AddedToQueue),
Expand Down
7 changes: 7 additions & 0 deletions src/chain/header_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,10 @@ mod tests {
chain.values()
);
assert_eq!(chain.header_at_height(12), None);
assert_eq!(chain.header_at_height(11), Some(&block_11));
assert_eq!(chain.header_at_height(10), Some(&new_block_10));
assert_eq!(chain.header_at_height(9), Some(&block_9));
assert_eq!(chain.header_at_height(8), Some(&block_8));
}

#[test]
Expand Down Expand Up @@ -369,6 +372,10 @@ mod tests {
vec![block_1, block_2, new_block_3, new_block_4],
chain.values()
);
assert_eq!(chain.header_at_height(4), Some(&new_block_4));
assert_eq!(chain.header_at_height(3), Some(&new_block_3));
assert_eq!(chain.header_at_height(2), Some(&block_2));
assert_eq!(chain.header_at_height(1), Some(&block_1));
}

#[tokio::test]
Expand Down
4 changes: 4 additions & 0 deletions src/filters/cfheader_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ impl CFHeaderBatch {
pub(crate) fn inner(&self) -> Vec<(FilterHeader, FilterHash)> {
self.inner.clone()
}

pub(crate) fn last_header(&self) -> Option<FilterHeader> {
self.inner.last().map(|(header, _)| *header)
}
}

impl From<CFHeaders> for CFHeaderBatch {
Expand Down
8 changes: 8 additions & 0 deletions src/node/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,12 @@ impl ClientSender {
.await
.map_err(|_| ClientError::SendError)
}

/// Starting at the configured anchor checkpoint, look for block inclusions with newly added scripts.
pub async fn rescan(&mut self) -> Result<(), ClientError> {
self.ntx
.send(ClientMessage::Rescan)
.await
.map_err(|_| ClientError::SendError)
}
}
2 changes: 2 additions & 0 deletions src/node/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ pub enum ClientMessage {
Broadcast(TxBroadcast),
/// Add more Bitcoin [`ScriptBuf`] to look for.
AddScripts(HashSet<ScriptBuf>),
/// Starting at the configured anchor checkpoint, look for block inclusions with newly added scripts.
Rescan,
}
1 change: 1 addition & 0 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ impl Node {
ClientMessage::Shutdown => return Ok(()),
ClientMessage::Broadcast(transaction) => tx_broadcaster.add(transaction),
ClientMessage::AddScripts(scripts) => self.add_scripts(scripts).await,
ClientMessage::Rescan => return Ok(()),
}
}
}
Expand Down

0 comments on commit c974121

Please sign in to comment.