Skip to content

Commit

Permalink
Merge pull request zingolabs#1588 from Oscar-Pepper/fix_minor_bugs
Browse files Browse the repository at this point in the history
Fix minor bugs
  • Loading branch information
Oscar-Pepper authored Dec 18, 2024
2 parents c0de3b8 + ca050ae commit 8ac0137
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 100 deletions.
4 changes: 2 additions & 2 deletions libtonode-tests/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ async fn sync_mainnet_test() {
.await
.unwrap();

dbg!(lightclient.wallet.wallet_blocks);
dbg!(lightclient.wallet.nullifier_map);
// dbg!(lightclient.wallet.wallet_blocks);
// dbg!(lightclient.wallet.nullifier_map);
dbg!(lightclient.wallet.sync_state);
}

Expand Down
50 changes: 42 additions & 8 deletions zingo-sync/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
};

use orchard::tree::MerkleHashOrchard;
use tokio::sync::mpsc;

use incrementalmerkletree::Position;
Expand All @@ -18,7 +19,7 @@ use crate::{
client::{self, FetchRequest},
keys::transparent::TransparentAddressId,
primitives::{Locator, NullifierMap, OutPointMap, OutputId, WalletBlock, WalletTransaction},
witness::ShardTreeData,
witness::{self, LocatedTreeData, WitnessData},
};

use self::{
Expand All @@ -39,7 +40,7 @@ struct InitialScanData {
impl InitialScanData {
async fn new<P>(
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
parameters: &P,
consensus_parameters: &P,
first_block: &CompactBlock,
previous_wallet_block: Option<WalletBlock>,
) -> Result<Self, ()>
Expand Down Expand Up @@ -83,7 +84,7 @@ impl InitialScanData {
.unwrap(),
)
} else {
let sapling_activation_height = parameters
let sapling_activation_height = consensus_parameters
.activation_height(NetworkUpgrade::Sapling)
.expect("should have some sapling activation height");

Expand Down Expand Up @@ -116,15 +117,16 @@ struct ScanData {
wallet_blocks: BTreeMap<BlockHeight, WalletBlock>,
relevant_txids: HashSet<TxId>,
decrypted_note_data: DecryptedNoteData,
shard_tree_data: ShardTreeData,
witness_data: WitnessData,
}

pub(crate) struct ScanResults {
pub(crate) nullifiers: NullifierMap,
pub(crate) outpoints: OutPointMap,
pub(crate) wallet_blocks: BTreeMap<BlockHeight, WalletBlock>,
pub(crate) wallet_transactions: HashMap<TxId, WalletTransaction>,
pub(crate) shard_tree_data: ShardTreeData,
pub(crate) sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
pub(crate) orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
}

pub(crate) struct DecryptedNoteData {
Expand Down Expand Up @@ -173,14 +175,25 @@ where
.await
.unwrap();

let scan_data = scan_compact_blocks(compact_blocks, parameters, ufvks, initial_scan_data)?;
let consensus_parameters_clone = parameters.clone();
let ufvks_clone = ufvks.clone();
let scan_data = tokio::task::spawn_blocking(move || {
scan_compact_blocks(
compact_blocks,
&consensus_parameters_clone,
&ufvks_clone,
initial_scan_data,
)
})
.await
.unwrap()?;

let ScanData {
nullifiers,
wallet_blocks,
mut relevant_txids,
decrypted_note_data,
shard_tree_data,
witness_data,
} = scan_data;

locators.into_iter().map(|(_, txid)| txid).for_each(|txid| {
Expand All @@ -201,11 +214,32 @@ where
.await
.unwrap();

let WitnessData {
sapling_initial_position,
orchard_initial_position,
sapling_leaves_and_retentions,
orchard_leaves_and_retentions,
} = witness_data;

let sapling_located_trees = tokio::task::spawn_blocking(move || {
witness::build_located_trees(sapling_initial_position, sapling_leaves_and_retentions)
.unwrap()
})
.await
.unwrap();
let orchard_located_trees = tokio::task::spawn_blocking(move || {
witness::build_located_trees(orchard_initial_position, orchard_leaves_and_retentions)
.unwrap()
})
.await
.unwrap();

Ok(ScanResults {
nullifiers,
outpoints,
wallet_blocks,
wallet_transactions,
shard_tree_data,
sapling_located_trees,
orchard_located_trees,
})
}
11 changes: 6 additions & 5 deletions zingo-sync/src/scan/compact_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use zcash_primitives::{
use crate::{
keys::{KeyId, ScanningKeyOps, ScanningKeys},
primitives::{NullifierMap, OutputId, WalletBlock},
witness::ShardTreeData,
witness::WitnessData,
};

use self::runners::{BatchRunners, DecryptedOutput};
Expand Down Expand Up @@ -48,7 +48,7 @@ where
let mut nullifiers = NullifierMap::new();
let mut relevant_txids: HashSet<TxId> = HashSet::new();
let mut decrypted_note_data = DecryptedNoteData::new();
let mut shard_tree_data = ShardTreeData::new(
let mut witness_data = WitnessData::new(
Position::from(u64::from(initial_scan_data.sapling_initial_tree_size)),
Position::from(u64::from(initial_scan_data.orchard_initial_tree_size)),
);
Expand Down Expand Up @@ -77,7 +77,7 @@ where

collect_nullifiers(&mut nullifiers, block.height(), transaction).unwrap();

shard_tree_data.sapling_leaves_and_retentions.extend(
witness_data.sapling_leaves_and_retentions.extend(
calculate_sapling_leaves_and_retentions(
&transaction.outputs,
block.height(),
Expand All @@ -86,7 +86,7 @@ where
)
.unwrap(),
);
shard_tree_data.orchard_leaves_and_retentions.extend(
witness_data.orchard_leaves_and_retentions.extend(
calculate_orchard_leaves_and_retentions(
&transaction.actions,
block.height(),
Expand Down Expand Up @@ -135,7 +135,7 @@ where
wallet_blocks,
relevant_txids,
decrypted_note_data,
shard_tree_data,
witness_data,
})
}

Expand All @@ -158,6 +158,7 @@ where

// checks height and hash continuity of a batch of compact blocks.
// takes the last wallet compact block of the adjacent lower scan range, if available.
// TODO: remove option and revisit scanner flow to use the last block of previously scanned batch to check continuity
fn check_continuity(
compact_blocks: &[CompactBlock],
previous_compact_block: Option<&WalletBlock>,
Expand Down
15 changes: 7 additions & 8 deletions zingo-sync/src/scan/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ where
if let Some(worker) = self.idle_worker() {
if let Some(scan_task) = sync::state::create_scan_task(wallet).unwrap() {
worker.add_scan_task(scan_task).unwrap();
} else {
} else if wallet.get_sync_state().unwrap().scan_complete() {
self.state.shutdown();
}
}
Expand All @@ -213,7 +213,7 @@ struct ScanWorker<P> {
handle: Option<JoinHandle<()>>,
is_scanning: Arc<AtomicBool>,
consensus_parameters: P,
scan_task_sender: Option<mpsc::UnboundedSender<ScanTask>>,
scan_task_sender: Option<mpsc::Sender<ScanTask>>,
scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result<ScanResults, ScanError>)>,
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
ufvks: HashMap<AccountId, UnifiedFullViewingKey>,
Expand All @@ -226,7 +226,7 @@ where
fn new(
id: usize,
consensus_parameters: P,
scan_task_sender: Option<mpsc::UnboundedSender<ScanTask>>,
scan_task_sender: Option<mpsc::Sender<ScanTask>>,
scan_results_sender: mpsc::UnboundedSender<(ScanRange, Result<ScanResults, ScanError>)>,
fetch_request_sender: mpsc::UnboundedSender<FetchRequest>,
ufvks: HashMap<AccountId, UnifiedFullViewingKey>,
Expand All @@ -247,7 +247,7 @@ where
///
/// Waits for a scan task and then calls [`crate::scan::scan`] on the given range.
fn run(&mut self) -> Result<(), ()> {
let (scan_task_sender, mut scan_task_receiver) = mpsc::unbounded_channel::<ScanTask>();
let (scan_task_sender, mut scan_task_receiver) = mpsc::channel::<ScanTask>(1);

let is_scanning = self.is_scanning.clone();
let scan_results_sender = self.scan_results_sender.clone();
Expand All @@ -257,8 +257,6 @@ where

let handle = tokio::spawn(async move {
while let Some(scan_task) = scan_task_receiver.recv().await {
is_scanning.store(true, atomic::Ordering::Release);

let scan_results = scan(
fetch_request_sender.clone(),
&consensus_parameters,
Expand Down Expand Up @@ -293,8 +291,9 @@ where
self.scan_task_sender
.clone()
.unwrap()
.send(scan_task)
.unwrap();
.try_send(scan_task)
.expect("worker should never be sent multiple tasks at one time");
self.is_scanning.store(true, atomic::Ordering::Release);

Ok(())
}
Expand Down
18 changes: 10 additions & 8 deletions zingo-sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ where

// TODO: consider what happens when there is no verification range i.e. all ranges already scanned
// TODO: invalidate any pending transactions after eviction height (40 below best chain height?)
// TODO: implement an option for continuous scanning where it doesnt exit when complete

let mut interval = tokio::time::interval(Duration::from_millis(30));
loop {
Expand Down Expand Up @@ -133,10 +134,6 @@ where
mempool_stream_response,
&mut mempool_stream)
.await;

// reset interval to ensure all mempool transactions have been scanned before sync completes.
// if a full interval passes without receiving a transaction from the mempool we can safely finish sync.
interval.reset();
}

_update_scanner = interval.tick() => {
Expand Down Expand Up @@ -257,7 +254,8 @@ async fn process_mempool_stream_response<W>(
) where
W: SyncWallet + SyncBlocks + SyncTransactions + SyncNullifiers + SyncOutPoints,
{
match mempool_stream_response.unwrap() {
// TODO: replace this unwrap_or_else with proper error handling
match mempool_stream_response.unwrap_or(None) {
Some(raw_transaction) => {
let block_height =
BlockHeight::from_u32(u32::try_from(raw_transaction.height).unwrap());
Expand Down Expand Up @@ -351,6 +349,7 @@ async fn process_mempool_stream_response<W>(
*mempool_stream = client::get_mempool_transaction_stream(fetch_request_sender.clone())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
}
Expand All @@ -377,10 +376,11 @@ where
{
let ScanResults {
nullifiers,
outpoints,
wallet_blocks,
wallet_transactions,
shard_tree_data,
outpoints,
sapling_located_trees,
orchard_located_trees,
} = scan_results;

wallet.append_wallet_blocks(wallet_blocks).unwrap();
Expand All @@ -389,7 +389,9 @@ where
.unwrap();
wallet.append_nullifiers(nullifiers).unwrap();
wallet.append_outpoints(outpoints).unwrap();
wallet.update_shard_trees(shard_tree_data).unwrap();
wallet
.update_shard_trees(sapling_located_trees, orchard_located_trees)
.unwrap();
// TODO: add trait to save wallet data to persistance for in-memory wallets

Ok(())
Expand Down
75 changes: 13 additions & 62 deletions zingo-sync/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
use std::collections::{BTreeMap, HashMap};
use std::fmt::Debug;

use incrementalmerkletree::Level;
use shardtree::LocatedPrunableTree;
use zcash_client_backend::{
data_api::{ORCHARD_SHARD_HEIGHT, SAPLING_SHARD_HEIGHT},
keys::UnifiedFullViewingKey,
};
use orchard::tree::MerkleHashOrchard;
use zcash_client_backend::keys::UnifiedFullViewingKey;
use zcash_primitives::consensus::BlockHeight;
use zcash_primitives::transaction::TxId;
use zcash_primitives::zip32::AccountId;

use crate::keys::transparent::TransparentAddressId;
use crate::primitives::{NullifierMap, OutPointMap, SyncState, WalletBlock, WalletTransaction};
use crate::witness::{ShardTreeData, ShardTrees};
use crate::witness::{LocatedTreeData, ShardTrees};

// TODO: clean up interface and move many default impls out of traits. consider merging to a simplified SyncWallet interface.

Expand Down Expand Up @@ -229,66 +225,21 @@ pub trait SyncShardTrees: SyncWallet {
fn get_shard_trees_mut(&mut self) -> Result<&mut ShardTrees, Self::Error>;

/// Update wallet shard trees with new shard tree data
fn update_shard_trees(&mut self, shard_tree_data: ShardTreeData) -> Result<(), Self::Error> {
let ShardTreeData {
sapling_initial_position,
orchard_initial_position,
sapling_leaves_and_retentions,
orchard_leaves_and_retentions,
} = shard_tree_data;
//TODO: Play with numbers. Is it more efficient to
// build larger trees to allow for more pruning to
// happen in parallel before insertion?
// Is it better to build smaller trees so that more
// trees can be built in parallel at the same time?
// Is inserting trees more efficient if trees are
// a power of 2 size? Is it more efficient if they
// are 'aligned' so that the initial_position is
// a multiple of tree size? All unanswered questions
// that want to be benchmarked.

let (sapling_sender, sapling_receiver) = crossbeam_channel::unbounded();
let (orchard_sender, orchard_receiver) = crossbeam_channel::unbounded();
rayon::scope_fifo(|scope| {
for (i, sapling_chunk) in sapling_leaves_and_retentions.chunks(128).enumerate() {
let sapling_sender = sapling_sender.clone();
scope.spawn_fifo(move |_scope| {
let start_position = sapling_initial_position + (i as u64 * 128);
let tree = LocatedPrunableTree::from_iter(
start_position..(start_position + sapling_chunk.len() as u64),
Level::from(SAPLING_SHARD_HEIGHT),
sapling_chunk.iter().copied(),
);
sapling_sender.send(tree).unwrap();
})
}

for (i, orchard_chunk) in orchard_leaves_and_retentions.chunks(128).enumerate() {
let orchard_sender = orchard_sender.clone();
scope.spawn_fifo(move |_scope| {
let start_position = orchard_initial_position + (i as u64 * 128);
let tree = LocatedPrunableTree::from_iter(
start_position..(start_position + orchard_chunk.len() as u64),
Level::from(ORCHARD_SHARD_HEIGHT),
orchard_chunk.iter().copied(),
);
orchard_sender.send(tree).unwrap();
})
}
});
drop((orchard_sender, sapling_sender));
fn update_shard_trees(
&mut self,
sapling_located_trees: Vec<LocatedTreeData<sapling_crypto::Node>>,
orchard_located_trees: Vec<LocatedTreeData<MerkleHashOrchard>>,
) -> Result<(), Self::Error> {
let shard_trees = self.get_shard_trees_mut()?;

let trees = self.get_shard_trees_mut()?;
for tree in sapling_receiver.iter() {
let tree = tree.unwrap();
trees
for tree in sapling_located_trees.into_iter() {
shard_trees
.sapling_mut()
.insert_tree(tree.subtree, tree.checkpoints)
.unwrap();
}
for tree in orchard_receiver {
let tree = tree.unwrap();
trees
for tree in orchard_located_trees.into_iter() {
shard_trees
.orchard_mut()
.insert_tree(tree.subtree, tree.checkpoints)
.unwrap();
Expand Down
Loading

0 comments on commit 8ac0137

Please sign in to comment.