Skip to content

Commit

Permalink
Merge pull request #4 from rustaceanrob/rescan
Browse files Browse the repository at this point in the history
node: add rescan
  • Loading branch information
rustaceanrob authored Jun 14, 2024
2 parents 3162b3e + b75a4d6 commit a329cba
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 25 deletions.
3 changes: 2 additions & 1 deletion example/rescan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn main() {
.unwrap(),
))
// The number of connections we would like to maintain
.num_required_peers(2)
.num_required_peers(3)
// Create the node and client
.build_node()
.await;
Expand Down Expand Up @@ -75,6 +75,7 @@ async fn main() {
sender.add_scripts(new_scripts).await.unwrap();
// // Tell the node to look for these new scripts
sender.rescan().await.unwrap();
tracing::info!("Starting rescan");
loop {
if let Ok(message) = receiver.recv().await {
match message {
Expand Down
5 changes: 5 additions & 0 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,11 @@ impl Chain {
self.scripts.insert(script);
}
}

//
pub(crate) async fn clear_filters(&mut self) {
self.filter_chain.clear_cache().await;
}
}

#[cfg(test)]
Expand Down
14 changes: 0 additions & 14 deletions src/filters/filter_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,4 @@ impl FilterChain {
pub(crate) fn last_stop_hash_request(&mut self) -> &Option<BlockHash> {
&self.prev_stophash_request
}

// pub(crate) fn filter_at_height(&self, height: usize) -> Option<Filter> {
// let adjusted_height = self.adjusted_height(height);
// match adjusted_height {
// Some(height) => {
// if let Some(filter) = self.chain.get(height) {
// Some(filter.clone())
// } else {
// None
// }
// }
// None => None,
// }
// }
}
42 changes: 32 additions & 10 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl Node {
node_map.send_random(block_request).await;
}
// If we have a transaction to broadcast and we are connected to peers, we should broadcast it
if node_map.live().ge(&1) && !tx_broadcaster.is_empty() {
if node_map.live().ge(&self.required_peers) && !tx_broadcaster.is_empty() {
let transaction = tx_broadcaster.next().unwrap();
match transaction.broadcast_policy {
TxBroadcastPolicy::AllPeers => {
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Node {
PeerMessage::Filter(filter) => {
match self.handle_filter(peer_thread.nonce, filter).await {
Some(response) => {
node_map.broadcast(response).await;
node_map.send_message(peer_thread.nonce, response).await;
}
None => continue,
}
Expand Down Expand Up @@ -302,7 +302,11 @@ impl Node {
ClientMessage::Shutdown => return Ok(()),
ClientMessage::Broadcast(transaction) => tx_broadcaster.add(transaction),
ClientMessage::AddScripts(scripts) => self.add_scripts(scripts).await,
ClientMessage::Rescan => return Ok(()),
ClientMessage::Rescan => {
if let Some(response) = self.rescan().await {
node_map.broadcast(response).await;
}
},
}
}
}
Expand Down Expand Up @@ -483,13 +487,13 @@ impl Node {
}
}
CFHeaderSyncResult::Dispute(_) => {
// Request the block from the peer
// TODO: Request the filter and block from the peer
self.dialog
.send_warning(
"Found a conflict while peers are sending filter headers".into(),
)
.await;
None
Some(MainThreadMessage::Disconnect)
}
},
Err(e) => {
Expand Down Expand Up @@ -587,6 +591,29 @@ impl Node {
}
}

// Add more scripts to the chain to look for. Does not imply a rescan.
async fn add_scripts(&mut self, scripts: HashSet<ScriptBuf>) {
let mut chain = self.chain.lock().await;
chain.put_scripts(scripts);
}

// Clear the filter hash cache and redownload the filters.
async fn rescan(&mut self) -> Option<MainThreadMessage> {
let mut state = self.state.write().await;
let mut chain = self.chain.lock().await;
match *state {
NodeState::Behind => None,
NodeState::HeadersSynced => None,
_ => {
chain.clear_filters().await;
*state = NodeState::FilterHeadersSynced;
Some(MainThreadMessage::GetFilters(
chain.next_filter_message().await,
))
}
}
}

// First we seach the whitelist for peers that we trust. Then, depending on the state
// we either need to catch up on block headers or we may start requesting filters and blocks.
// When requesting filters, we try to select peers that have signaled for CF support.
Expand Down Expand Up @@ -666,9 +693,4 @@ impl Node {
}
}
}

async fn add_scripts(&mut self, scripts: HashSet<ScriptBuf>) {
let mut chain = self.chain.lock().await;
chain.put_scripts(scripts);
}
}

0 comments on commit a329cba

Please sign in to comment.