From f78b141d866caa0186e6b05982b38ae8476a56ea Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Fri, 13 Dec 2024 15:32:37 +0000 Subject: [PATCH 1/6] create located tree data fn --- libtonode-tests/tests/sync.rs | 4 +- zingo-sync/src/scan.rs | 14 ++++- zingo-sync/src/scan/compact_blocks.rs | 1 + zingo-sync/src/scan/task.rs | 18 +++++-- zingo-sync/src/sync.rs | 61 +++++++++++---------- zingo-sync/src/traits.rs | 68 +++++------------------- zingo-sync/src/witness.rs | 76 +++++++++++++++++++++++++-- 7 files changed, 145 insertions(+), 97 deletions(-) diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index d5cc2ef43c..d314c6ab08 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -43,8 +43,8 @@ async fn sync_mainnet_test() { .await .unwrap(); - dbg!(lightclient.wallet.wallet_blocks); - dbg!(lightclient.wallet.nullifier_map); + // dbg!(lightclient.wallet.wallet_blocks); + // dbg!(lightclient.wallet.nullifier_map); dbg!(lightclient.wallet.sync_state); } diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index 6efa835f2d..3ffab72759 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -173,7 +173,19 @@ where .await .unwrap(); - let scan_data = scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data)?; + let consensus_parameters_clone = parameters.clone(); + let ufvks_clone = ufvks.clone(); + let scan_data = tokio::task::spawn_blocking(move || { + scan_compact_blocks( + compact_blocks, + &consensus_parameters_clone, + &ufvks_clone, + initial_scan_data, + ) + }) + .await + .unwrap()?; + // let scan_data = scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data)?; let ScanData { nullifiers, diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index 2c6d820aa0..cbc9d98024 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -158,6 +158,7 @@ where // checks height and hash continuity of a batch of compact blocks. // takes the last wallet compact block of the adjacent lower scan range, if available. +// TODO: remove option and revisit scanner flow to use the last block of previously scanned batch to check continuity fn check_continuity( compact_blocks: &[CompactBlock], previous_compact_block: Option<&WalletBlock>, diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 6f24e2993b..778507009f 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -185,10 +185,13 @@ where ScannerState::Scan => { // create scan tasks until all ranges are scanned or currently scanning if let Some(worker) = self.idle_worker() { + tracing::info!("idle worker... creating scan task"); if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() { worker.add_scan_task(scan_task).unwrap(); } else { - self.state.shutdown(); + if wallet.get_sync_state().unwrap().scan_complete() { + self.state.shutdown(); + } } } } @@ -255,9 +258,14 @@ where let consensus_parameters = self.consensus_parameters.clone(); let ufvks = self.ufvks.clone(); + let id = self.id; + let handle = tokio::spawn(async move { while let Some(scan_task) = scan_task_receiver.recv().await { - is_scanning.store(true, atomic::Ordering::Release); + tracing::info!("TASK RECEIVED: {:#?}", &scan_task); + is_scanning.store(true, atomic::Ordering::SeqCst); + tracing::info!("WORKER {} SCANNING", id); + // is_scanning.store(true, atomic::Ordering::Release); let scan_results = scan( fetch_request_sender.clone(), @@ -274,7 +282,8 @@ where .send((scan_task.scan_range, scan_results)) .expect("receiver should never be dropped before sender!"); - is_scanning.store(false, atomic::Ordering::Release); + is_scanning.store(false, atomic::Ordering::SeqCst); + // is_scanning.store(false, atomic::Ordering::Release); } }); @@ -285,7 +294,8 @@ where } fn is_scanning(&self) -> bool { - self.is_scanning.load(atomic::Ordering::Acquire) + self.is_scanning.load(atomic::Ordering::SeqCst) + // self.is_scanning.load(atomic::Ordering::Acquire) } fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 2b4e5b8009..b0d851b924 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -71,15 +71,15 @@ where } let ufvks = wallet.get_unified_full_viewing_keys().unwrap(); - transparent::update_addresses_and_locators( - consensus_parameters, - wallet, - fetch_request_sender.clone(), - &ufvks, - wallet_height, - chain_height, - ) - .await; + // transparent::update_addresses_and_locators( + // consensus_parameters, + // wallet, + // fetch_request_sender.clone(), + // &ufvks, + // wallet_height, + // chain_height, + // ) + // .await; state::update_scan_ranges( wallet_height, @@ -99,15 +99,16 @@ where ); scanner.spawn_workers(); - // setup the initial mempool stream - let mut mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone()) - .await - .unwrap(); + // // setup the initial mempool stream + // let mut mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone()) + // .await + // .unwrap(); // TODO: consider what happens when there is no verification range i.e. all ranges already scanned // TODO: invalidate any pending transactions after eviction height (40 below best chain height?) + // TODO: implement an option for continuous scanning where it doesnt exit when complete - let mut interval = tokio::time::interval(Duration::from_millis(30)); + let mut interval = tokio::time::interval(Duration::from_millis(100)); loop { tokio::select! { Some((scan_range, scan_results)) = scan_results_receiver.recv() => { @@ -124,20 +125,20 @@ where .unwrap(); } - mempool_stream_response = mempool_stream.message() => { - process_mempool_stream_response( - consensus_parameters, - fetch_request_sender.clone(), - &ufvks, - wallet, - mempool_stream_response, - &mut mempool_stream) - .await; - - // reset interval to ensure all mempool transactions have been scanned before sync completes. - // if a full interval passes without receiving a transaction from the mempool we can safely finish sync. - interval.reset(); - } + // mempool_stream_response = mempool_stream.message() => { + // process_mempool_stream_response( + // consensus_parameters, + // fetch_request_sender.clone(), + // &ufvks, + // wallet, + // mempool_stream_response, + // &mut mempool_stream) + // .await; + + // // reset interval to ensure all mempool transactions have been scanned before sync completes. + // // if a full interval passes without receiving a transaction from the mempool we can safely finish sync. + // interval.reset(); + // } _update_scanner = interval.tick() => { scanner.update(wallet).await; @@ -257,7 +258,8 @@ async fn process_mempool_stream_response( ) where W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints, { - match mempool_stream_response.unwrap() { + // TODO: replace this unwrap_or_else with proper error handling + match mempool_stream_response.unwrap_or_else(|_| None) { Some(raw_transaction) => { let block_height = BlockHeight::from_u32(u32::try_from(raw_transaction.height).unwrap()); @@ -351,6 +353,7 @@ async fn process_mempool_stream_response( *mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone()) .await .unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; } } } diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index c7fe533c85..0a901765ca 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -4,6 +4,7 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; use incrementalmerkletree::Level; +use orchard::tree::MerkleHashOrchard; use shardtree::LocatedPrunableTree; use zcash_client_backend::{ data_api::{ORCHARD_SHARD_HEIGHT, SAPLING_SHARD_HEIGHT}, @@ -15,7 +16,7 @@ use zcash_primitives::zip32::AccountId; use crate::keys::transparent::TransparentAddressId; use crate::primitives::{NullifierMap, OutPointMap, SyncState, WalletBlock, WalletTransaction}; -use crate::witness::{ShardTreeData, ShardTrees}; +use crate::witness::{LocatedTreeData, ShardTreeData, ShardTrees}; // TODO: clean up interface and move many default impls out of traits. consider merging to a simplified SyncWallet interface. @@ -229,66 +230,21 @@ pub trait SyncShardTrees: SyncWallet { fn get_shard_trees_mut(&mut self) -> Result<&mut ShardTrees, Self::Error>; /// Update wallet shard trees with new shard tree data - fn update_shard_trees(&mut self, shard_tree_data: ShardTreeData) -> Result<(), Self::Error> { - let ShardTreeData { - sapling_initial_position, - orchard_initial_position, - sapling_leaves_and_retentions, - orchard_leaves_and_retentions, - } = shard_tree_data; - //TODO: Play with numbers. Is it more efficient to - // build larger trees to allow for more pruning to - // happen in parallel before insertion? - // Is it better to build smaller trees so that more - // trees can be built in parallel at the same time? - // Is inserting trees more efficient if trees are - // a power of 2 size? Is it more efficient if they - // are 'aligned' so that the initial_position is - // a multiple of tree size? All unanswered questions - // that want to be benchmarked. - - let (sapling_sender, sapling_receiver) = crossbeam_channel::unbounded(); - let (orchard_sender, orchard_receiver) = crossbeam_channel::unbounded(); - rayon::scope_fifo(|scope| { - for (i, sapling_chunk) in sapling_leaves_and_retentions.chunks(128).enumerate() { - let sapling_sender = sapling_sender.clone(); - scope.spawn_fifo(move |_scope| { - let start_position = sapling_initial_position + (i as u64 * 128); - let tree = LocatedPrunableTree::from_iter( - start_position..(start_position + sapling_chunk.len() as u64), - Level::from(SAPLING_SHARD_HEIGHT), - sapling_chunk.iter().copied(), - ); - sapling_sender.send(tree).unwrap(); - }) - } - - for (i, orchard_chunk) in orchard_leaves_and_retentions.chunks(128).enumerate() { - let orchard_sender = orchard_sender.clone(); - scope.spawn_fifo(move |_scope| { - let start_position = orchard_initial_position + (i as u64 * 128); - let tree = LocatedPrunableTree::from_iter( - start_position..(start_position + orchard_chunk.len() as u64), - Level::from(ORCHARD_SHARD_HEIGHT), - orchard_chunk.iter().copied(), - ); - orchard_sender.send(tree).unwrap(); - }) - } - }); - drop((orchard_sender, sapling_sender)); + fn update_shard_trees( + &mut self, + sapling_located_tree_data: Vec>, + orchard_located_tree_data: Vec>, + ) -> Result<(), Self::Error> { + let shard_trees = self.get_shard_trees_mut()?; - let trees = self.get_shard_trees_mut()?; - for tree in sapling_receiver.iter() { - let tree = tree.unwrap(); - trees + for tree in sapling_located_tree_data.into_iter() { + shard_trees .sapling_mut() .insert_tree(tree.subtree, tree.checkpoints) .unwrap(); } - for tree in orchard_receiver { - let tree = tree.unwrap(); - trees + for tree in orchard_located_tree_data.into_iter() { + shard_trees .orchard_mut() .insert_tree(tree.subtree, tree.checkpoints) .unwrap(); diff --git a/zingo-sync/src/witness.rs b/zingo-sync/src/witness.rs index 7f684bcc47..9e9ab2f14a 100644 --- a/zingo-sync/src/witness.rs +++ b/zingo-sync/src/witness.rs @@ -1,17 +1,20 @@ //! Module for stucts and types associated with witness construction +use std::collections::BTreeMap; + use getset::{Getters, MutGetters}; use incrementalmerkletree::{Position, Retention}; use orchard::tree::MerkleHashOrchard; -use sapling_crypto::Node; -use shardtree::{store::memory::MemoryShardStore, ShardTree}; +use shardtree::{store::memory::MemoryShardStore, LocatedPrunableTree, ShardTree}; +use zcash_client_backend::data_api::{ORCHARD_SHARD_HEIGHT, SAPLING_SHARD_HEIGHT}; use zcash_primitives::consensus::BlockHeight; const NOTE_COMMITMENT_TREE_DEPTH: u8 = 32; const SHARD_HEIGHT: u8 = 16; const MAX_CHECKPOINTS: usize = 100; +const LOCATED_TREE_SIZE: usize = 128; -type SaplingShardStore = MemoryShardStore; +type SaplingShardStore = MemoryShardStore; type OrchardShardStore = MemoryShardStore; /// Shard tree wallet data struct @@ -41,10 +44,10 @@ impl Default for ShardTrees { } /// Required data for updating [`shardtree::ShardTree`] -pub struct ShardTreeData { +pub(crate) struct ShardTreeData { pub(crate) sapling_initial_position: Position, pub(crate) orchard_initial_position: Position, - pub(crate) sapling_leaves_and_retentions: Vec<(Node, Retention)>, + pub(crate) sapling_leaves_and_retentions: Vec<(sapling_crypto::Node, Retention)>, pub(crate) orchard_leaves_and_retentions: Vec<(MerkleHashOrchard, Retention)>, } @@ -59,3 +62,66 @@ impl ShardTreeData { } } } + +// TODO: add more batch insertion results as they become relavent +/// TODO +pub struct LocatedTreeData { + /// TODO + pub subtree: LocatedPrunableTree, + /// TODO + pub checkpoints: BTreeMap, +} + +fn create_located_trees( + initial_position: Position, + leaves_and_retentions: Vec<(H, Retention)>, + located_tree_level: incrementalmerkletree::Level, +) -> Result>, ()> +where + H: Copy + PartialEq + incrementalmerkletree::Hashable + Sync + Send, +{ + // let ShardTreeData { + // sapling_initial_position, + // orchard_initial_position, + // sapling_leaves_and_retentions, + // orchard_leaves_and_retentions, + // } = shard_tree_data; + //TODO: Play with numbers. Is it more efficient to + // build larger trees to allow for more pruning to + // happen in parallel before insertion? + // Is it better to build smaller trees so that more + // trees can be built in parallel at the same time? + // Is inserting trees more efficient if trees are + // a power of 2 size? Is it more efficient if they + // are 'aligned' so that the initial_position is + // a multiple of tree size? All unanswered questions + // that want to be benchmarked. + + let (sender, receiver) = crossbeam_channel::unbounded(); + rayon::scope_fifo(|scope| { + for (i, chunk) in leaves_and_retentions.chunks(LOCATED_TREE_SIZE).enumerate() { + let sender = sender.clone(); + scope.spawn_fifo(move |_scope| { + let start_position = initial_position + ((i * LOCATED_TREE_SIZE) as u64); + let tree = LocatedPrunableTree::from_iter( + start_position..(start_position + chunk.len() as u64), + located_tree_level, + // incrementalmerkletree::Level::from(SAPLING_SHARD_HEIGHT), + chunk.iter().copied(), + ); + sender.send(tree).unwrap(); + }) + } + }); + + let mut located_tree_data = Vec::new(); + for tree in receiver.iter() { + let tree = tree.unwrap(); + located_tree_data.push(LocatedTreeData { + subtree: tree.subtree, + checkpoints: tree.checkpoints, + }); + } + + Ok(located_tree_data) +} From bf80b33dd5e53be8aa2a7109d835b1000a2fd15c Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Fri, 13 Dec 2024 16:27:12 +0000 Subject: [PATCH 2/6] implemented with hang --- zingo-sync/src/scan.rs | 49 +++++++++++++++++++++++++++++++++++---- zingo-sync/src/sync.rs | 9 ++++--- zingo-sync/src/witness.rs | 11 +++------ 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index 3ffab72759..d9042683ce 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -3,10 +3,14 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, }; +use orchard::tree::MerkleHashOrchard; use tokio::sync::mpsc; use incrementalmerkletree::Position; -use zcash_client_backend::{data_api::scanning::ScanRange, proto::compact_formats::CompactBlock}; +use zcash_client_backend::{ + data_api::{scanning::ScanRange, ORCHARD_SHARD_HEIGHT}, + proto::compact_formats::CompactBlock, +}; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{ consensus::{BlockHeight, NetworkUpgrade, Parameters}, @@ -18,7 +22,7 @@ use crate::{ client::{self, FetchRequest}, keys::transparent::TransparentAddressId, primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletBlock, WalletTransaction}, - witness::ShardTreeData, + witness::{self, LocatedTreeData, ShardTreeData}, }; use self::{ @@ -124,7 +128,8 @@ pub(crate) struct ScanResults { pub(crate) outpoints: OutPointMap, pub(crate) wallet_blocks: BTreeMap, pub(crate) wallet_transactions: HashMap, - pub(crate) shard_tree_data: ShardTreeData, + pub(crate) sapling_located_tree_data: Vec>, + pub(crate) orchard_located_tree_data: Vec>, } pub(crate) struct DecryptedNoteData { @@ -213,11 +218,47 @@ where .await .unwrap(); + let ShardTreeData { + sapling_initial_position, + orchard_initial_position, + sapling_leaves_and_retentions, + orchard_leaves_and_retentions, + } = shard_tree_data; + + let sapling_located_tree_data = witness::build_located_trees( + sapling_initial_position, + sapling_leaves_and_retentions, + incrementalmerkletree::Level::from(sapling_crypto::NOTE_COMMITMENT_TREE_DEPTH / 2), + ) + .unwrap(); + // let sapling_located_tree_data = tokio::task::spawn_blocking(move || { + // witness::build_located_trees( + // sapling_initial_position, + // sapling_leaves_and_retentions, + // incrementalmerkletree::Level::from(sapling_crypto::NOTE_COMMITMENT_TREE_DEPTH / 2), + // ) + // .unwrap() + // }) + // .await + // .unwrap(); + // let orchard_located_tree_data = tokio::task::spawn_blocking(move || { + // witness::build_located_trees( + // orchard_initial_position, + // orchard_leaves_and_retentions, + // incrementalmerkletree::Level::from(orchard::NOTE_COMMITMENT_TREE_DEPTH as u8 / 2), + // ) + // .unwrap() + // }) + // .await + // .unwrap(); + let orchard_located_tree_data = Vec::new(); + Ok(ScanResults { nullifiers, outpoints, wallet_blocks, wallet_transactions, - shard_tree_data, + sapling_located_tree_data, + orchard_located_tree_data, }) } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index b0d851b924..7091d07103 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -380,10 +380,11 @@ where { let ScanResults { nullifiers, + outpoints, wallet_blocks, wallet_transactions, - shard_tree_data, - outpoints, + sapling_located_tree_data, + orchard_located_tree_data, } = scan_results; wallet.append_wallet_blocks(wallet_blocks).unwrap(); @@ -392,7 +393,9 @@ where .unwrap(); wallet.append_nullifiers(nullifiers).unwrap(); wallet.append_outpoints(outpoints).unwrap(); - wallet.update_shard_trees(shard_tree_data).unwrap(); + wallet + .update_shard_trees(sapling_located_tree_data, orchard_located_tree_data) + .unwrap(); // TODO: add trait to save wallet data to persistance for in-memory wallets Ok(()) diff --git a/zingo-sync/src/witness.rs b/zingo-sync/src/witness.rs index 9e9ab2f14a..c938383b46 100644 --- a/zingo-sync/src/witness.rs +++ b/zingo-sync/src/witness.rs @@ -72,7 +72,7 @@ pub struct LocatedTreeData { pub checkpoints: BTreeMap, } -fn create_located_trees( +pub(crate) fn build_located_trees( initial_position: Position, leaves_and_retentions: Vec<(H, Retention)>, located_tree_level: incrementalmerkletree::Level, @@ -80,12 +80,6 @@ fn create_located_trees( where H: Copy + PartialEq + incrementalmerkletree::Hashable + Sync + Send, { - // let ShardTreeData { - // sapling_initial_position, - // orchard_initial_position, - // sapling_leaves_and_retentions, - // orchard_leaves_and_retentions, - // } = shard_tree_data; //TODO: Play with numbers. Is it more efficient to // build larger trees to allow for more pruning to // happen in parallel before insertion? @@ -106,15 +100,16 @@ where let tree = LocatedPrunableTree::from_iter( start_position..(start_position + chunk.len() as u64), located_tree_level, - // incrementalmerkletree::Level::from(SAPLING_SHARD_HEIGHT), chunk.iter().copied(), ); + tracing::info!("SENDING"); sender.send(tree).unwrap(); }) } }); let mut located_tree_data = Vec::new(); + tracing::info!("RECEIVING"); for tree in receiver.iter() { let tree = tree.unwrap(); located_tree_data.push(LocatedTreeData { From 70a5606c1b02acda8d34f5f63e7dc3368a97b9e0 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Sat, 14 Dec 2024 03:36:51 +0000 Subject: [PATCH 3/6] finished moving located tree build to scan --- zingo-sync/src/scan.rs | 61 +++++++++------------------ zingo-sync/src/scan/compact_blocks.rs | 10 ++--- zingo-sync/src/scan/task.rs | 13 ++---- zingo-sync/src/sync.rs | 6 +-- zingo-sync/src/traits.rs | 10 ++--- zingo-sync/src/witness.rs | 20 ++++----- 6 files changed, 45 insertions(+), 75 deletions(-) diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index d9042683ce..eaa76663cd 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -7,10 +7,7 @@ use orchard::tree::MerkleHashOrchard; use tokio::sync::mpsc; use incrementalmerkletree::Position; -use zcash_client_backend::{ - data_api::{scanning::ScanRange, ORCHARD_SHARD_HEIGHT}, - proto::compact_formats::CompactBlock, -}; +use zcash_client_backend::{data_api::scanning::ScanRange, proto::compact_formats::CompactBlock}; use zcash_keys::keys::UnifiedFullViewingKey; use zcash_primitives::{ consensus::{BlockHeight, NetworkUpgrade, Parameters}, @@ -22,7 +19,7 @@ use crate::{ client::{self, FetchRequest}, keys::transparent::TransparentAddressId, primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletBlock, WalletTransaction}, - witness::{self, LocatedTreeData, ShardTreeData}, + witness::{self, LocatedTreeData, WitnessData}, }; use self::{ @@ -120,7 +117,7 @@ struct ScanData { wallet_blocks: BTreeMap, relevant_txids: HashSet, decrypted_note_data: DecryptedNoteData, - shard_tree_data: ShardTreeData, + witness_data: WitnessData, } pub(crate) struct ScanResults { @@ -128,8 +125,8 @@ pub(crate) struct ScanResults { pub(crate) outpoints: OutPointMap, pub(crate) wallet_blocks: BTreeMap, pub(crate) wallet_transactions: HashMap, - pub(crate) sapling_located_tree_data: Vec>, - pub(crate) orchard_located_tree_data: Vec>, + pub(crate) sapling_located_trees: Vec>, + pub(crate) orchard_located_trees: Vec>, } pub(crate) struct DecryptedNoteData { @@ -190,14 +187,13 @@ where }) .await .unwrap()?; - // let scan_data = scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data)?; let ScanData { nullifiers, wallet_blocks, mut relevant_txids, decrypted_note_data, - shard_tree_data, + witness_data, } = scan_data; locators.into_iter().map(|(_, txid)| txid).for_each(|txid| { @@ -218,47 +214,32 @@ where .await .unwrap(); - let ShardTreeData { + let WitnessData { sapling_initial_position, orchard_initial_position, sapling_leaves_and_retentions, orchard_leaves_and_retentions, - } = shard_tree_data; + } = witness_data; - let sapling_located_tree_data = witness::build_located_trees( - sapling_initial_position, - sapling_leaves_and_retentions, - incrementalmerkletree::Level::from(sapling_crypto::NOTE_COMMITMENT_TREE_DEPTH / 2), - ) + let sapling_located_trees = tokio::task::spawn_blocking(move || { + witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions) + .unwrap() + }) + .await + .unwrap(); + let orchard_located_trees = tokio::task::spawn_blocking(move || { + witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions) + .unwrap() + }) + .await .unwrap(); - // let sapling_located_tree_data = tokio::task::spawn_blocking(move || { - // witness::build_located_trees( - // sapling_initial_position, - // sapling_leaves_and_retentions, - // incrementalmerkletree::Level::from(sapling_crypto::NOTE_COMMITMENT_TREE_DEPTH / 2), - // ) - // .unwrap() - // }) - // .await - // .unwrap(); - // let orchard_located_tree_data = tokio::task::spawn_blocking(move || { - // witness::build_located_trees( - // orchard_initial_position, - // orchard_leaves_and_retentions, - // incrementalmerkletree::Level::from(orchard::NOTE_COMMITMENT_TREE_DEPTH as u8 / 2), - // ) - // .unwrap() - // }) - // .await - // .unwrap(); - let orchard_located_tree_data = Vec::new(); Ok(ScanResults { nullifiers, outpoints, wallet_blocks, wallet_transactions, - sapling_located_tree_data, - orchard_located_tree_data, + sapling_located_trees, + orchard_located_trees, }) } diff --git a/zingo-sync/src/scan/compact_blocks.rs b/zingo-sync/src/scan/compact_blocks.rs index cbc9d98024..99890c362c 100644 --- a/zingo-sync/src/scan/compact_blocks.rs +++ b/zingo-sync/src/scan/compact_blocks.rs @@ -18,7 +18,7 @@ use zcash_primitives::{ use crate::{ keys::{KeyId, ScanningKeyOps, ScanningKeys}, primitives::{NullifierMap, OutputId, WalletBlock}, - witness::ShardTreeData, + witness::WitnessData, }; use self::runners::{BatchRunners, DecryptedOutput}; @@ -48,7 +48,7 @@ where let mut nullifiers = NullifierMap::new(); let mut relevant_txids: HashSet = HashSet::new(); let mut decrypted_note_data = DecryptedNoteData::new(); - let mut shard_tree_data = ShardTreeData::new( + let mut witness_data = WitnessData::new( Position::from(u64::from(initial_scan_data.sapling_initial_tree_size)), Position::from(u64::from(initial_scan_data.orchard_initial_tree_size)), ); @@ -77,7 +77,7 @@ where collect_nullifiers(&mut nullifiers, block.height(), transaction).unwrap(); - shard_tree_data.sapling_leaves_and_retentions.extend( + witness_data.sapling_leaves_and_retentions.extend( calculate_sapling_leaves_and_retentions( &transaction.outputs, block.height(), @@ -86,7 +86,7 @@ where ) .unwrap(), ); - shard_tree_data.orchard_leaves_and_retentions.extend( + witness_data.orchard_leaves_and_retentions.extend( calculate_orchard_leaves_and_retentions( &transaction.actions, block.height(), @@ -135,7 +135,7 @@ where wallet_blocks, relevant_txids, decrypted_note_data, - shard_tree_data, + witness_data, }) } diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 778507009f..32fc8fbc01 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -258,14 +258,9 @@ where let consensus_parameters = self.consensus_parameters.clone(); let ufvks = self.ufvks.clone(); - let id = self.id; - let handle = tokio::spawn(async move { while let Some(scan_task) = scan_task_receiver.recv().await { - tracing::info!("TASK RECEIVED: {:#?}", &scan_task); - is_scanning.store(true, atomic::Ordering::SeqCst); - tracing::info!("WORKER {} SCANNING", id); - // is_scanning.store(true, atomic::Ordering::Release); + is_scanning.store(true, atomic::Ordering::Release); let scan_results = scan( fetch_request_sender.clone(), @@ -282,8 +277,7 @@ where .send((scan_task.scan_range, scan_results)) .expect("receiver should never be dropped before sender!"); - is_scanning.store(false, atomic::Ordering::SeqCst); - // is_scanning.store(false, atomic::Ordering::Release); + is_scanning.store(false, atomic::Ordering::Release); } }); @@ -294,8 +288,7 @@ where } fn is_scanning(&self) -> bool { - self.is_scanning.load(atomic::Ordering::SeqCst) - // self.is_scanning.load(atomic::Ordering::Acquire) + self.is_scanning.load(atomic::Ordering::Acquire) } fn add_scan_task(&self, scan_task: ScanTask) -> Result<(), ()> { diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 7091d07103..029eaf2231 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -383,8 +383,8 @@ where outpoints, wallet_blocks, wallet_transactions, - sapling_located_tree_data, - orchard_located_tree_data, + sapling_located_trees, + orchard_located_trees, } = scan_results; wallet.append_wallet_blocks(wallet_blocks).unwrap(); @@ -394,7 +394,7 @@ where wallet.append_nullifiers(nullifiers).unwrap(); wallet.append_outpoints(outpoints).unwrap(); wallet - .update_shard_trees(sapling_located_tree_data, orchard_located_tree_data) + .update_shard_trees(sapling_located_trees, orchard_located_trees) .unwrap(); // TODO: add trait to save wallet data to persistance for in-memory wallets diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index 0a901765ca..b81e0e1c3c 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -16,7 +16,7 @@ use zcash_primitives::zip32::AccountId; use crate::keys::transparent::TransparentAddressId; use crate::primitives::{NullifierMap, OutPointMap, SyncState, WalletBlock, WalletTransaction}; -use crate::witness::{LocatedTreeData, ShardTreeData, ShardTrees}; +use crate::witness::{LocatedTreeData, ShardTrees, WitnessData}; // TODO: clean up interface and move many default impls out of traits. consider merging to a simplified SyncWallet interface. @@ -232,18 +232,18 @@ pub trait SyncShardTrees: SyncWallet { /// Update wallet shard trees with new shard tree data fn update_shard_trees( &mut self, - sapling_located_tree_data: Vec>, - orchard_located_tree_data: Vec>, + sapling_located_trees: Vec>, + orchard_located_trees: Vec>, ) -> Result<(), Self::Error> { let shard_trees = self.get_shard_trees_mut()?; - for tree in sapling_located_tree_data.into_iter() { + for tree in sapling_located_trees.into_iter() { shard_trees .sapling_mut() .insert_tree(tree.subtree, tree.checkpoints) .unwrap(); } - for tree in orchard_located_tree_data.into_iter() { + for tree in orchard_located_trees.into_iter() { shard_trees .orchard_mut() .insert_tree(tree.subtree, tree.checkpoints) diff --git a/zingo-sync/src/witness.rs b/zingo-sync/src/witness.rs index c938383b46..fd5a10185f 100644 --- a/zingo-sync/src/witness.rs +++ b/zingo-sync/src/witness.rs @@ -6,7 +6,6 @@ use getset::{Getters, MutGetters}; use incrementalmerkletree::{Position, Retention}; use orchard::tree::MerkleHashOrchard; use shardtree::{store::memory::MemoryShardStore, LocatedPrunableTree, ShardTree}; -use zcash_client_backend::data_api::{ORCHARD_SHARD_HEIGHT, SAPLING_SHARD_HEIGHT}; use zcash_primitives::consensus::BlockHeight; const NOTE_COMMITMENT_TREE_DEPTH: u8 = 32; @@ -44,17 +43,17 @@ impl Default for ShardTrees { } /// Required data for updating [`shardtree::ShardTree`] -pub(crate) struct ShardTreeData { +pub(crate) struct WitnessData { pub(crate) sapling_initial_position: Position, pub(crate) orchard_initial_position: Position, pub(crate) sapling_leaves_and_retentions: Vec<(sapling_crypto::Node, Retention)>, pub(crate) orchard_leaves_and_retentions: Vec<(MerkleHashOrchard, Retention)>, } -impl ShardTreeData { +impl WitnessData { /// Creates new ShardTreeData pub fn new(sapling_initial_position: Position, orchard_initial_position: Position) -> Self { - ShardTreeData { + WitnessData { sapling_initial_position, orchard_initial_position, sapling_leaves_and_retentions: Vec::new(), @@ -63,19 +62,17 @@ impl ShardTreeData { } } -// TODO: add more batch insertion results as they become relavent -/// TODO +/// Located prunable tree data built from nodes and retentions during scanning for insertion into the shard store. pub struct LocatedTreeData { - /// TODO + /// Located prunable tree pub subtree: LocatedPrunableTree, - /// TODO + /// Checkpoints pub checkpoints: BTreeMap, } pub(crate) fn build_located_trees( initial_position: Position, leaves_and_retentions: Vec<(H, Retention)>, - located_tree_level: incrementalmerkletree::Level, ) -> Result>, ()> where H: Copy + PartialEq + incrementalmerkletree::Hashable + Sync + Send, @@ -99,17 +96,16 @@ where let start_position = initial_position + ((i * LOCATED_TREE_SIZE) as u64); let tree = LocatedPrunableTree::from_iter( start_position..(start_position + chunk.len() as u64), - located_tree_level, + incrementalmerkletree::Level::from(SHARD_HEIGHT), chunk.iter().copied(), ); - tracing::info!("SENDING"); sender.send(tree).unwrap(); }) } }); + drop(sender); let mut located_tree_data = Vec::new(); - tracing::info!("RECEIVING"); for tree in receiver.iter() { let tree = tree.unwrap(); located_tree_data.push(LocatedTreeData { From a6c4582dd86c944245d02bee3ec9fd018df228c0 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Sat, 14 Dec 2024 04:33:13 +0000 Subject: [PATCH 4/6] fixed mempool --- zingo-sync/src/scan/task.rs | 14 +++++----- zingo-sync/src/sync.rs | 52 +++++++++++++++++-------------------- zingo-sync/src/traits.rs | 9 ++----- 3 files changed, 32 insertions(+), 43 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 32fc8fbc01..9f9204eb4c 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -185,7 +185,6 @@ where ScannerState::Scan => { // create scan tasks until all ranges are scanned or currently scanning if let Some(worker) = self.idle_worker() { - tracing::info!("idle worker... creating scan task"); if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() { worker.add_scan_task(scan_task).unwrap(); } else { @@ -216,7 +215,7 @@ struct ScanWorker

{ handle: Option>, is_scanning: Arc, consensus_parameters: P, - scan_task_sender: Option>, + scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, ufvks: HashMap, @@ -229,7 +228,7 @@ where fn new( id: usize, consensus_parameters: P, - scan_task_sender: Option>, + scan_task_sender: Option>, scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result)>, fetch_request_sender: mpsc::UnboundedSender, ufvks: HashMap, @@ -250,7 +249,7 @@ where /// /// Waits for a scan task and then calls [`crate::scan::scan`] on the given range. fn run(&mut self) -> Result<(), ()> { - let (scan_task_sender, mut scan_task_receiver) = mpsc::unbounded_channel::(); + let (scan_task_sender, mut scan_task_receiver) = mpsc::channel::(1); let is_scanning = self.is_scanning.clone(); let scan_results_sender = self.scan_results_sender.clone(); @@ -260,8 +259,6 @@ where let handle = tokio::spawn(async move { while let Some(scan_task) = scan_task_receiver.recv().await { - is_scanning.store(true, atomic::Ordering::Release); - let scan_results = scan( fetch_request_sender.clone(), &consensus_parameters, @@ -296,8 +293,9 @@ where self.scan_task_sender .clone() .unwrap() - .send(scan_task) - .unwrap(); + .try_send(scan_task) + .expect("worker should never be sent multiple tasks at one time"); + self.is_scanning.store(true, atomic::Ordering::Release); Ok(()) } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 029eaf2231..f1f8bce220 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -71,15 +71,15 @@ where } let ufvks = wallet.get_unified_full_viewing_keys().unwrap(); - // transparent::update_addresses_and_locators( - // consensus_parameters, - // wallet, - // fetch_request_sender.clone(), - // &ufvks, - // wallet_height, - // chain_height, - // ) - // .await; + transparent::update_addresses_and_locators( + consensus_parameters, + wallet, + fetch_request_sender.clone(), + &ufvks, + wallet_height, + chain_height, + ) + .await; state::update_scan_ranges( wallet_height, @@ -99,10 +99,10 @@ where ); scanner.spawn_workers(); - // // setup the initial mempool stream - // let mut mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone()) - // .await - // .unwrap(); + // setup the initial mempool stream + let mut mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone()) + .await + .unwrap(); // TODO: consider what happens when there is no verification range i.e. all ranges already scanned // TODO: invalidate any pending transactions after eviction height (40 below best chain height?) @@ -125,20 +125,16 @@ where .unwrap(); } - // mempool_stream_response = mempool_stream.message() => { - // process_mempool_stream_response( - // consensus_parameters, - // fetch_request_sender.clone(), - // &ufvks, - // wallet, - // mempool_stream_response, - // &mut mempool_stream) - // .await; - - // // reset interval to ensure all mempool transactions have been scanned before sync completes. - // // if a full interval passes without receiving a transaction from the mempool we can safely finish sync. - // interval.reset(); - // } + mempool_stream_response = mempool_stream.message() => { + process_mempool_stream_response( + consensus_parameters, + fetch_request_sender.clone(), + &ufvks, + wallet, + mempool_stream_response, + &mut mempool_stream) + .await; + } _update_scanner = interval.tick() => { scanner.update(wallet).await; @@ -353,7 +349,7 @@ async fn process_mempool_stream_response( *mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone()) .await .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(1000)).await; } } } diff --git a/zingo-sync/src/traits.rs b/zingo-sync/src/traits.rs index b81e0e1c3c..d34dea9336 100644 --- a/zingo-sync/src/traits.rs +++ b/zingo-sync/src/traits.rs @@ -3,20 +3,15 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; -use incrementalmerkletree::Level; use orchard::tree::MerkleHashOrchard; -use shardtree::LocatedPrunableTree; -use zcash_client_backend::{ - data_api::{ORCHARD_SHARD_HEIGHT, SAPLING_SHARD_HEIGHT}, - keys::UnifiedFullViewingKey, -}; +use zcash_client_backend::keys::UnifiedFullViewingKey; use zcash_primitives::consensus::BlockHeight; use zcash_primitives::transaction::TxId; use zcash_primitives::zip32::AccountId; use crate::keys::transparent::TransparentAddressId; use crate::primitives::{NullifierMap, OutPointMap, SyncState, WalletBlock, WalletTransaction}; -use crate::witness::{LocatedTreeData, ShardTrees, WitnessData}; +use crate::witness::{LocatedTreeData, ShardTrees}; // TODO: clean up interface and move many default impls out of traits. consider merging to a simplified SyncWallet interface. From 1b5329699994b59e686d20994425a8778046e305 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Sat, 14 Dec 2024 05:37:53 +0000 Subject: [PATCH 5/6] cleanup and rename to consensus parameters for consistency --- zingo-sync/src/scan.rs | 4 ++-- zingo-sync/src/sync.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/zingo-sync/src/scan.rs b/zingo-sync/src/scan.rs index eaa76663cd..1a3c8e4b49 100644 --- a/zingo-sync/src/scan.rs +++ b/zingo-sync/src/scan.rs @@ -40,7 +40,7 @@ struct InitialScanData { impl InitialScanData { async fn new

( fetch_request_sender: mpsc::UnboundedSender, - parameters: &P, + consensus_parameters: &P, first_block: &CompactBlock, previous_wallet_block: Option, ) -> Result @@ -84,7 +84,7 @@ impl InitialScanData { .unwrap(), ) } else { - let sapling_activation_height = parameters + let sapling_activation_height = consensus_parameters .activation_height(NetworkUpgrade::Sapling) .expect("should have some sapling activation height"); diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index f1f8bce220..f23feece2c 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -108,7 +108,7 @@ where // TODO: invalidate any pending transactions after eviction height (40 below best chain height?) // TODO: implement an option for continuous scanning where it doesnt exit when complete - let mut interval = tokio::time::interval(Duration::from_millis(100)); + let mut interval = tokio::time::interval(Duration::from_millis(30)); loop { tokio::select! { Some((scan_range, scan_results)) = scan_results_receiver.recv() => { From ca050ae595c70bb445ca0d6ad3f224d9a67a5191 Mon Sep 17 00:00:00 2001 From: Oscar Pepper Date: Sat, 14 Dec 2024 08:15:30 +0000 Subject: [PATCH 6/6] fix clippy --- zingo-sync/src/scan/task.rs | 6 ++---- zingo-sync/src/sync.rs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/zingo-sync/src/scan/task.rs b/zingo-sync/src/scan/task.rs index 9f9204eb4c..9fc33be98c 100644 --- a/zingo-sync/src/scan/task.rs +++ b/zingo-sync/src/scan/task.rs @@ -187,10 +187,8 @@ where if let Some(worker) = self.idle_worker() { if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() { worker.add_scan_task(scan_task).unwrap(); - } else { - if wallet.get_sync_state().unwrap().scan_complete() { - self.state.shutdown(); - } + } else if wallet.get_sync_state().unwrap().scan_complete() { + self.state.shutdown(); } } } diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index f23feece2c..8fadb1124c 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -255,7 +255,7 @@ async fn process_mempool_stream_response( W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints, { // TODO: replace this unwrap_or_else with proper error handling - match mempool_stream_response.unwrap_or_else(|_| None) { + match mempool_stream_response.unwrap_or(None) { Some(raw_transaction) => { let block_height = BlockHeight::from_u32(u32::try_from(raw_transaction.height).unwrap());