diff --git a/Cargo.lock b/Cargo.lock index d7acb6a22..da293a914 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1616,10 +1616,13 @@ dependencies = [ "shardtree", "tempfile", "tokio", + "tracing-subscriber", "zcash_address", "zcash_client_backend", "zcash_primitives", + "zingo-netutils", "zingo-status", + "zingo-sync", "zingo-testutils", "zingo-testvectors", "zingoconfig", @@ -4216,11 +4219,18 @@ dependencies = [ name = "zingo-sync" version = "0.1.0" dependencies = [ + "crossbeam-channel", "futures", + "getset", + "memuse", + "orchard", + "rayon", + "sapling-crypto", "tokio", "tonic", "tracing", "zcash_client_backend", + "zcash_note_encryption", "zcash_primitives", "zingo-netutils", ] diff --git a/Cargo.toml b/Cargo.toml index 7fa8ff919..8c4b3cb9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ resolver = "2" zcash_address = { git = "https://github.com/zingolabs/librustzcash.git", tag = "always_require_change" } zcash_client_backend = { git = "https://github.com/zingolabs/librustzcash.git", tag = "always_require_change", features = ["lightwalletd-tonic", "orchard", "transparent-inputs"] } zcash_encoding = { git = "https://github.com/zingolabs/librustzcash.git", tag = "always_require_change" } -zcash_keys = { git = "https://github.com/zingolabs/librustzcash.git", tag = "always_require_change", features = ["orchard"] } +zcash_keys = { git = "https://github.com/zingolabs/librustzcash.git", tag = "always_require_change", features = ["transparent-inputs", "sapling", "orchard" ] } zcash_note_encryption = "0.4" zcash_primitives = { git = "https://github.com/zingolabs/librustzcash.git", tag = "always_require_change" } zcash_proofs = { git = "https://github.com/zingolabs/librustzcash.git", tag = "always_require_change" } @@ -89,9 +89,15 @@ tempfile = "3.3.0" test-case = "3.3.1" tokio = "1.28.2" tonic-build = "0.10" +tracing = "0.1.40" tracing-subscriber = "0.3.15" +memuse = "0.2.1" enum_dispatch = "0.3.13" +# Parallel processing +crossbeam-channel = "0.5" +rayon = "1.5" + [profile.release] debug = false diff --git a/libtonode-tests/Cargo.toml b/libtonode-tests/Cargo.toml index 319ba752e..6acf202e6 100644 --- a/libtonode-tests/Cargo.toml +++ b/libtonode-tests/Cargo.toml @@ -8,11 +8,13 @@ chain_generic_tests = [] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -zingolib = { path = "../zingolib", features = ["deprecations", "test-elevation"] } +zingolib = { path = "../zingolib", features = ["deprecations", "test-elevation", "sync"] } zingo-status = { path = "../zingo-status" } zingo-testutils = { path = "../zingo-testutils" } zingo-testvectors = { path = "../zingo-testvectors" } zingoconfig = { path = "../zingoconfig" } +zingo-netutils = { path = "../zingo-netutils" } +zingo-sync = { path = "../zingo-sync" } zcash_primitives = { workspace = true } orchard = { workspace = true } @@ -29,3 +31,4 @@ itertools = { workspace = true } serde_json = { workspace = true } http.workspace = true tempfile.workspace = true +tracing-subscriber.workspace = true diff --git a/libtonode-tests/tests/sync.rs b/libtonode-tests/tests/sync.rs index bda10c279..e655597c0 100644 --- a/libtonode-tests/tests/sync.rs +++ b/libtonode-tests/tests/sync.rs @@ -1,16 +1,26 @@ use tempfile::TempDir; +use zingo_netutils::GrpcConnector; +use zingo_sync::sync::sync; +use zingo_testutils::scenarios; use zingo_testvectors::seeds::HOSPITAL_MUSEUM_SEED; use zingoconfig::{construct_lightwalletd_uri, load_clientconfig, DEFAULT_LIGHTWALLETD_SERVER}; use zingolib::{lightclient::LightClient, wallet::WalletBase}; #[tokio::test] -async fn sync_test() { +async fn sync_mainnet_test() { + tracing_subscriber::fmt().init(); + let uri = construct_lightwalletd_uri(Some(DEFAULT_LIGHTWALLETD_SERVER.to_string())); let temp_dir = TempDir::new().unwrap(); let temp_path = temp_dir.path().to_path_buf(); - let config = - load_clientconfig(uri, Some(temp_path), zingoconfig::ChainType::Mainnet, true).unwrap(); - let lightclient = LightClient::create_from_wallet_base_async( + let config = load_clientconfig( + uri.clone(), + Some(temp_path), + zingoconfig::ChainType::Mainnet, + true, + ) + .unwrap(); + let mut lightclient = LightClient::create_from_wallet_base_async( WalletBase::from_string(HOSPITAL_MUSEUM_SEED.to_string()), &config, 2_590_000, @@ -19,5 +29,27 @@ async fn sync_test() { .await .unwrap(); - lightclient.do_sync(true).await.unwrap(); + let client = GrpcConnector::new(uri).get_client().await.unwrap(); + + sync(client, &config.chain, &mut lightclient.wallet) + .await + .unwrap(); +} +#[tokio::test] +async fn sync_test() { + tracing_subscriber::fmt().init(); + + let (_regtest_manager, _cph, _faucet, mut recipient, _txid) = + scenarios::orchard_funded_recipient(5_000_000).await; + let uri = recipient.config().lightwalletd_uri.read().unwrap().clone(); + + let client = GrpcConnector::new(uri).get_client().await.unwrap(); + + sync( + client, + &recipient.config().chain.clone(), + &mut recipient.wallet, + ) + .await + .unwrap(); } diff --git a/zingo-sync/Cargo.toml b/zingo-sync/Cargo.toml index e217ae356..379b623fc 100644 --- a/zingo-sync/Cargo.toml +++ b/zingo-sync/Cargo.toml @@ -4,12 +4,32 @@ version = "0.1.0" edition = "2021" [dependencies] +# Zingo zingo-netutils = { path = "../zingo-netutils" } +# Zcash zcash_client_backend.workspace = true zcash_primitives.workspace = true +zcash_note_encryption.workspace = true +sapling-crypto.workspace = true +orchard.workspace = true +# Async futures.workspace = true tokio.workspace = true + +# Client tonic.workspace = true -tracing = "0.1.40" + +# Logging +tracing.workspace = true + +# Metrics +memuse.workspace = true + +# Parallel processing +crossbeam-channel.workspace = true +rayon.workspace = true + +# Minimise boilerplate +getset.workspace = true diff --git a/zingo-sync/src/client.rs b/zingo-sync/src/client.rs index 128d30084..04a3f6782 100644 --- a/zingo-sync/src/client.rs +++ b/zingo-sync/src/client.rs @@ -2,7 +2,13 @@ use std::ops::Range; -use zcash_client_backend::proto::{compact_formats::CompactBlock, service::BlockId}; +use zcash_client_backend::{ + data_api::chain::ChainState, + proto::{ + compact_formats::CompactBlock, + service::{BlockId, TreeState}, + }, +}; use zcash_primitives::consensus::BlockHeight; use tokio::sync::{mpsc::UnboundedSender, oneshot}; @@ -12,11 +18,14 @@ pub mod fetcher; /// Fetch requests are created and sent to the [`crate::client::fetcher::fetcher`] task when a connection to the server is required. /// /// Each variant includes a [`tokio::sync::oneshot::Sender`] for returning the fetched data to the requester. +#[derive(Debug)] pub enum FetchRequest { /// Gets the height of the blockchain from the server. ChainTip(oneshot::Sender), /// Gets the specified range of compact blocks from the server (end exclusive). CompactBlockRange(oneshot::Sender>, Range), + /// Gets the tree states for a specified block height.. + TreeState(oneshot::Sender, BlockHeight), } /// Gets the height of the blockchain from the server. @@ -48,3 +57,19 @@ pub async fn get_compact_block_range( Ok(compact_blocks) } +/// Gets the frontiers for a specified block height.. +/// +/// Requires [`crate::client::fetcher::fetcher`] to be running concurrently, connected via the `fetch_request` channel. +pub async fn get_frontiers( + fetch_request_sender: UnboundedSender, + block_height: BlockHeight, +) -> Result { + let (sender, receiver) = oneshot::channel::(); + fetch_request_sender + .send(FetchRequest::TreeState(sender, block_height)) + .unwrap(); + let tree_state = receiver.await.unwrap(); + let frontiers = tree_state.to_chain_state().unwrap(); + + Ok(frontiers) +} diff --git a/zingo-sync/src/client/fetcher.rs b/zingo-sync/src/client/fetcher.rs index dbb7d7fbe..e6aca377f 100644 --- a/zingo-sync/src/client/fetcher.rs +++ b/zingo-sync/src/client/fetcher.rs @@ -8,6 +8,7 @@ use zcash_client_backend::proto::{ compact_formats::CompactBlock, service::{ compact_tx_streamer_client::CompactTxStreamerClient, BlockId, BlockRange, ChainSpec, + TreeState, }, }; use zcash_primitives::consensus::BlockHeight; @@ -52,7 +53,7 @@ async fn receive_fetch_requests( // if there are no fetch requests to process, sleep until the next fetch request is received // or channel is closed if fetch_request_queue.is_empty() { - while let Some(fetch_request) = receiver.recv().await { + if let Some(fetch_request) = receiver.recv().await { fetch_request_queue.push(fetch_request); } } @@ -79,14 +80,12 @@ async fn receive_fetch_requests( // TODO: placeholder for algorythm that selects the next fetch request to be processed // return `None` if a fetch request could not be selected fn select_fetch_request(fetch_request_queue: &mut Vec) -> Option { - // TODO: add other fetch requests with priorities - let fetch_request_index = fetch_request_queue - .iter() - .enumerate() - .find(|(_, request)| matches!(request, FetchRequest::ChainTip(_))) - .map(|(index, _)| index); - - fetch_request_index.map(|index| fetch_request_queue.remove(index)) + // TODO: improve priority logic + if fetch_request_queue.first().is_some() { + Some(fetch_request_queue.remove(0)) + } else { + None + } } // @@ -96,13 +95,20 @@ async fn fetch_from_server( ) -> Result<(), ()> { match fetch_request { FetchRequest::ChainTip(sender) => { + tracing::info!("Fetching chain tip."); let block_id = get_latest_block(client).await; sender.send(block_id).unwrap(); } FetchRequest::CompactBlockRange(sender, block_range) => { + tracing::info!("Fetching compact blocks. {:?}", &block_range); let compact_blocks = get_block_range(client, block_range).await; sender.send(compact_blocks).unwrap(); } + FetchRequest::TreeState(sender, block_height) => { + tracing::info!("Fetching tree state. {:?}", &block_height); + let tree_state = get_tree_state(client, block_height).await; + sender.send(tree_state).unwrap(); + } } Ok(()) @@ -115,7 +121,6 @@ async fn get_latest_block( client.get_latest_block(request).await.unwrap().into_inner() } - async fn get_block_range( client: &mut CompactTxStreamerClient, block_range: Range, @@ -126,11 +131,11 @@ async fn get_block_range( let request = tonic::Request::new(BlockRange { start: Some(BlockId { height: u64::from(block_range.start), - hash: Vec::new(), + hash: vec![], }), end: Some(BlockId { height: u64::from(block_range.end) - 1, - hash: Vec::new(), + hash: vec![], }), }); let mut block_stream = client.get_block_range(request).await.unwrap().into_inner(); @@ -141,3 +146,14 @@ async fn get_block_range( compact_blocks } +async fn get_tree_state( + client: &mut CompactTxStreamerClient, + block_height: BlockHeight, +) -> TreeState { + let request = tonic::Request::new(BlockId { + height: block_height.into(), + hash: vec![], + }); + + client.get_tree_state(request).await.unwrap().into_inner() +} diff --git a/zingo-sync/src/interface.rs b/zingo-sync/src/interface.rs index 51038b3ee..3b31da4a7 100644 --- a/zingo-sync/src/interface.rs +++ b/zingo-sync/src/interface.rs @@ -1,12 +1,18 @@ //! Traits for interfacing a wallet with the sync engine -use crate::SyncState; +use std::collections::HashMap; +use std::fmt::Debug; + +use zcash_client_backend::keys::UnifiedFullViewingKey; +use zcash_primitives::zip32::AccountId; /// Temporary dump for all neccessary wallet functionality for PoC pub trait SyncWallet { - /// Errors associated with interacting with the wallet data - type Error: std::fmt::Debug; + /// Errors associated with interfacing the sync engine with wallet data + type Error: Debug; - /// Mutable reference to the wallet sync state - fn set_sync_state(&mut self) -> Result<&mut SyncState, Self::Error>; + /// Returns all unified full viewing keys known to this wallet. + fn get_unified_full_viewing_keys( + &self, + ) -> Result, Self::Error>; } diff --git a/zingo-sync/src/lib.rs b/zingo-sync/src/lib.rs index e2a928eb2..89673cfb7 100644 --- a/zingo-sync/src/lib.rs +++ b/zingo-sync/src/lib.rs @@ -3,33 +3,9 @@ //! //! Entrypoint: [`crate::sync::sync`] -use zcash_client_backend::data_api::scanning::ScanRange; - pub mod client; pub mod interface; +#[allow(missing_docs)] +pub mod primitives; +pub(crate) mod scanner; pub mod sync; - -/// Encapsulates the current state of sync -pub struct SyncState { - scan_ranges: Vec, -} - -impl SyncState { - /// Create new SyncState - pub fn new() -> Self { - SyncState { - scan_ranges: Vec::new(), - } - } - - /// TODO: doc comment - pub fn set_scan_ranges(&mut self) -> &mut Vec { - &mut self.scan_ranges - } -} - -impl Default for SyncState { - fn default() -> Self { - Self::new() - } -} diff --git a/zingo-sync/src/primitives.rs b/zingo-sync/src/primitives.rs new file mode 100644 index 000000000..d5d2ef725 --- /dev/null +++ b/zingo-sync/src/primitives.rs @@ -0,0 +1,65 @@ +//! Module for primitive structs associated with the sync engine + +use std::sync::{Arc, RwLock}; + +use getset::{Getters, MutGetters}; + +use zcash_client_backend::{data_api::scanning::ScanRange, PoolType}; +use zcash_primitives::transaction::TxId; + +/// Encapsulates the current state of sync +#[derive(Getters, MutGetters)] +#[getset(get = "pub")] +pub struct SyncState { + scan_ranges: Arc>>, +} + +impl SyncState { + /// Create new SyncState + pub fn new() -> Self { + SyncState { + scan_ranges: Arc::new(RwLock::new(Vec::new())), + } + } +} + +impl Default for SyncState { + fn default() -> Self { + Self::new() + } +} + +/// Unified general ID for any output +#[derive(Getters)] +#[getset(get = "pub")] +pub struct OutputId { + /// ID of associated transaction + txid: TxId, + /// Index of output within the transactions bundle of the given pool type. + output_index: usize, + /// Pool type the output belongs to + pool: PoolType, +} + +impl OutputId { + /// Creates new OutputId from parts + pub fn from_parts(txid: TxId, output_index: usize, pool: PoolType) -> Self { + OutputId { + txid, + output_index, + pool, + } + } +} + +/// Wallet compact block data +#[allow(dead_code)] +pub struct WalletCompactBlock { + block_height: u64, + block_hash: Vec, + prev_hash: Vec, + time: Vec, + txids: Vec, + sapling_commitment_tree_size: u32, + orchard_commitment_tree_size: u32, +} diff --git a/zingo-sync/src/scanner.rs b/zingo-sync/src/scanner.rs new file mode 100644 index 000000000..edbd90d6e --- /dev/null +++ b/zingo-sync/src/scanner.rs @@ -0,0 +1,121 @@ +use std::collections::HashSet; + +use orchard::keys::Scope; +use tokio::sync::mpsc::UnboundedSender; +use zcash_client_backend::{ + data_api::scanning::ScanRange, proto::compact_formats::CompactBlock, scanning::ScanningKeys, + PoolType, ShieldedProtocol, +}; +use zcash_primitives::{consensus::Parameters, transaction::TxId, zip32::AccountId}; + +use crate::{ + client::{get_compact_block_range, FetchRequest}, + primitives::OutputId, +}; + +use self::runners::BatchRunners; + +pub(crate) mod runners; + +pub(crate) async fn scanner

( + fetch_request_sender: UnboundedSender, + parameters: &P, + scanning_keys: &ScanningKeys, + scan_range: ScanRange, +) -> Result<(), ()> +where + P: Parameters + Send + 'static, +{ + let compact_blocks = get_compact_block_range( + fetch_request_sender.clone(), + scan_range.block_range().clone(), + ) + .await + .unwrap(); + + // TODO: check continuity + + let (incoming_output_ids, outgoing_output_ids) = + trial_decrypt(parameters, scanning_keys, &compact_blocks).unwrap(); + + // gather the IDs of all transactions relevent to the wallet + // the edge case of transactions that this capability created but did not receive change + // or create outgoing data is handled when the nullifiers are added and linked + let mut relevent_txids: HashSet = HashSet::new(); + incoming_output_ids + .iter() + .chain(outgoing_output_ids.iter()) + .for_each(|output_id| { + relevent_txids.insert(*output_id.txid()); + }); + + // TODO: build shard tree, calculate nullifiers and positions for incoming outputs, map nullifiers and write compact blocks + // for compact_block in compact_blocks { + // let zip212_enforcement = zip212_enforcement(parameters, compact_block.height()); + // } + + // FIXME: panics when less than 0 for regtest or less than sapling epoch for mainnet + // let _frontiers = get_frontiers(fetch_request_sender, scan_range.block_range().start - 1) + // .await + // .unwrap(); + + // let mut sapling_nullifiers_and_positions: HashMap< + // OutputId, + // (sapling_crypto::Nullifier, Position), + // > = HashMap::new(); + // let mut orchard_nullifiers_and_positions: HashMap< + // OutputId, + // (orchard::note::Nullifier, Position), + // > = HashMap::new(); + + Ok(()) +} + +fn trial_decrypt

( + parameters: &P, + scanning_keys: &ScanningKeys, + compact_blocks: &[CompactBlock], +) -> Result<(Vec, Vec), ()> +where + P: Parameters + Send + 'static, +{ + let mut runners = BatchRunners::<_, (), ()>::for_keys(100, scanning_keys); + for block in compact_blocks { + runners.add_block(parameters, block.clone()).unwrap(); + } + runners.flush(); + + let mut incoming_output_ids: Vec = Vec::new(); + let outgoing_output_ids: Vec = Vec::new(); // TODO: add outgoing decryption + for block in compact_blocks { + for transaction in block.vtx.iter() { + let decrypted_sapling_outputs = runners + .sapling + .collect_results(block.hash(), transaction.txid()); + decrypted_sapling_outputs + .into_keys() + .for_each(|(txid, output_index)| { + incoming_output_ids.push(OutputId::from_parts( + txid, + output_index, + PoolType::Shielded(ShieldedProtocol::Sapling), + )); + }); + + let decrypted_orchard_outputs = runners + .orchard + .collect_results(block.hash(), transaction.txid()); + decrypted_orchard_outputs + .into_keys() + .for_each(|(txid, output_index)| { + incoming_output_ids.push(OutputId::from_parts( + txid, + output_index, + PoolType::Shielded(ShieldedProtocol::Orchard), + )); + }); + } + } + + Ok((incoming_output_ids, outgoing_output_ids)) +} diff --git a/zingo-sync/src/scanner/runners.rs b/zingo-sync/src/scanner/runners.rs new file mode 100644 index 000000000..801bbb4e3 --- /dev/null +++ b/zingo-sync/src/scanner/runners.rs @@ -0,0 +1,619 @@ +//! Temporary copy of LRZ batch runners while we wait for their exposition and update LRZ + +use std::collections::HashMap; +use std::fmt; +use std::mem; +use std::sync::atomic::AtomicUsize; + +use crossbeam_channel as channel; + +use orchard::note_encryption::CompactAction; +use orchard::note_encryption::OrchardDomain; +use sapling_crypto::note_encryption::CompactOutputDescription; +use sapling_crypto::note_encryption::SaplingDomain; + +use zcash_client_backend::proto::compact_formats::CompactBlock; +use zcash_client_backend::scanning::ScanError; +use zcash_client_backend::scanning::ScanningKeys; +use zcash_client_backend::ShieldedProtocol; +use zcash_note_encryption::{batch, BatchDomain, Domain, ShieldedOutput, COMPACT_NOTE_SIZE}; +use zcash_primitives::consensus; +use zcash_primitives::transaction::components::sapling::zip212_enforcement; +use zcash_primitives::{block::BlockHash, transaction::TxId}; + +use memuse::DynamicUsage; + +type TaggedSaplingBatch = Batch< + IvkTag, + SaplingDomain, + sapling_crypto::note_encryption::CompactOutputDescription, + CompactDecryptor, +>; +type TaggedSaplingBatchRunner = BatchRunner< + IvkTag, + SaplingDomain, + sapling_crypto::note_encryption::CompactOutputDescription, + CompactDecryptor, + Tasks, +>; + +type TaggedOrchardBatch = + Batch; +type TaggedOrchardBatchRunner = BatchRunner< + IvkTag, + OrchardDomain, + orchard::note_encryption::CompactAction, + CompactDecryptor, + Tasks, +>; + +pub(crate) trait SaplingTasks: Tasks> {} +impl>> SaplingTasks for T {} + +pub(crate) trait OrchardTasks: Tasks> {} +impl>> OrchardTasks for T {} + +pub(crate) struct BatchRunners, TO: OrchardTasks> { + pub(crate) sapling: TaggedSaplingBatchRunner, + pub(crate) orchard: TaggedOrchardBatchRunner, +} + +impl BatchRunners +where + IvkTag: Clone + Send + 'static, + TS: SaplingTasks, + TO: OrchardTasks, +{ + pub(crate) fn for_keys( + batch_size_threshold: usize, + scanning_keys: &ScanningKeys, + ) -> Self { + BatchRunners { + sapling: BatchRunner::new( + batch_size_threshold, + scanning_keys + .sapling() + .iter() + .map(|(id, key)| (id.clone(), key.prepare())), + ), + orchard: BatchRunner::new( + batch_size_threshold, + scanning_keys + .orchard() + .iter() + .map(|(id, key)| (id.clone(), key.prepare())), + ), + } + } + + pub(crate) fn flush(&mut self) { + self.sapling.flush(); + self.orchard.flush(); + } + + #[tracing::instrument(skip_all, fields(height = block.height))] + pub(crate) fn add_block

(&mut self, params: &P, block: CompactBlock) -> Result<(), ScanError> + where + P: consensus::Parameters + Send + 'static, + IvkTag: Copy + Send + 'static, + { + let block_hash = block.hash(); + let block_height = block.height(); + let zip212_enforcement = zip212_enforcement(params, block_height); + + for tx in block.vtx.into_iter() { + let txid = tx.txid(); + + self.sapling.add_outputs( + block_hash, + txid, + |_| SaplingDomain::new(zip212_enforcement), + &tx.outputs + .iter() + .enumerate() + .map(|(i, output)| { + CompactOutputDescription::try_from(output).map_err(|_| { + ScanError::EncodingInvalid { + at_height: block_height, + txid, + pool_type: ShieldedProtocol::Sapling, + index: i, + } + }) + }) + .collect::, _>>()?, + ); + + self.orchard.add_outputs( + block_hash, + txid, + OrchardDomain::for_compact_action, + &tx.actions + .iter() + .enumerate() + .map(|(i, action)| { + CompactAction::try_from(action).map_err(|_| ScanError::EncodingInvalid { + at_height: block_height, + txid, + pool_type: ShieldedProtocol::Orchard, + index: i, + }) + }) + .collect::, _>>()?, + ); + } + + Ok(()) + } +} + +/// A decrypted transaction output. +pub(crate) struct DecryptedOutput { + /// The tag corresponding to the incoming viewing key used to decrypt the note. + pub(crate) ivk_tag: IvkTag, + /// The recipient of the note. + pub(crate) recipient: D::Recipient, + /// The note! + pub(crate) note: D::Note, + /// The memo field, or `()` if this is a decrypted compact output. + pub(crate) memo: M, +} + +impl fmt::Debug for DecryptedOutput +where + IvkTag: fmt::Debug, + D::IncomingViewingKey: fmt::Debug, + D::Recipient: fmt::Debug, + D::Note: fmt::Debug, + M: fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DecryptedOutput") + .field("ivk_tag", &self.ivk_tag) + .field("recipient", &self.recipient) + .field("note", &self.note) + .field("memo", &self.memo) + .finish() + } +} + +/// A decryptor of transaction outputs. +pub(crate) trait Decryptor { + type Memo; + + // Once we reach MSRV 1.75.0, this can return `impl Iterator`. + fn batch_decrypt( + tags: &[IvkTag], + ivks: &[D::IncomingViewingKey], + outputs: &[(D, Output)], + ) -> Vec>>; +} + +/// A decryptor of outputs as encoded in compact blocks. +pub(crate) struct CompactDecryptor; + +impl> Decryptor + for CompactDecryptor +{ + type Memo = (); + + fn batch_decrypt( + tags: &[IvkTag], + ivks: &[D::IncomingViewingKey], + outputs: &[(D, Output)], + ) -> Vec>> { + batch::try_compact_note_decryption(ivks, outputs) + .into_iter() + .map(|res| { + res.map(|((note, recipient), ivk_idx)| DecryptedOutput { + ivk_tag: tags[ivk_idx].clone(), + recipient, + note, + memo: (), + }) + }) + .collect() + } +} + +/// A value correlated with an output index. +struct OutputIndex { + /// The index of the output within the corresponding shielded bundle. + output_index: usize, + /// The value for the output index. + value: V, +} + +type OutputItem = OutputIndex>; + +/// The sender for the result of batch scanning a specific transaction output. +struct OutputReplier(OutputIndex>>); + +impl DynamicUsage for OutputReplier { + #[inline(always)] + fn dynamic_usage(&self) -> usize { + // We count the memory usage of items in the channel on the receiver side. + 0 + } + + #[inline(always)] + fn dynamic_usage_bounds(&self) -> (usize, Option) { + (0, Some(0)) + } +} + +/// The receiver for the result of batch scanning a specific transaction. +struct BatchReceiver(channel::Receiver>); + +impl DynamicUsage for BatchReceiver { + fn dynamic_usage(&self) -> usize { + // We count the memory usage of items in the channel on the receiver side. + let num_items = self.0.len(); + + // We know we use unbounded channels, so the items in the channel are stored as a + // linked list. `crossbeam_channel` allocates memory for the linked list in blocks + // of 31 items. + const ITEMS_PER_BLOCK: usize = 31; + let num_blocks = (num_items + ITEMS_PER_BLOCK - 1) / ITEMS_PER_BLOCK; + + // The structure of a block is: + // - A pointer to the next block. + // - For each slot in the block: + // - Space for an item. + // - The state of the slot, stored as an AtomicUsize. + const PTR_SIZE: usize = std::mem::size_of::(); + let item_size = std::mem::size_of::>(); + const ATOMIC_USIZE_SIZE: usize = std::mem::size_of::(); + let block_size = PTR_SIZE + ITEMS_PER_BLOCK * (item_size + ATOMIC_USIZE_SIZE); + + num_blocks * block_size + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let usage = self.dynamic_usage(); + (usage, Some(usage)) + } +} + +/// A tracker for the batch scanning tasks that are currently running. +/// +/// This enables a [`BatchRunner`] to be optionally configured to track heap memory usage. +pub(crate) trait Tasks { + type Task: Task; + fn new() -> Self; + fn add_task(&self, item: Item) -> Self::Task; + fn run_task(&self, item: Item) { + let task = self.add_task(item); + rayon::spawn_fifo(|| task.run()); + } +} + +/// A batch scanning task. +pub(crate) trait Task: Send + 'static { + fn run(self); +} + +impl Tasks for () { + type Task = Item; + fn new() -> Self {} + fn add_task(&self, item: Item) -> Self::Task { + // Return the item itself as the task; we aren't tracking anything about it, so + // there is no need to wrap it in a newtype. + item + } +} + +/// A batch of outputs to trial decrypt. +pub(crate) struct Batch> { + tags: Vec, + ivks: Vec, + /// We currently store outputs and repliers as parallel vectors, because + /// [`batch::try_note_decryption`] accepts a slice of domain/output pairs + /// rather than a value that implements `IntoIterator`, and therefore we + /// can't just use `map` to select the parts we need in order to perform + /// batch decryption. Ideally the domain, output, and output replier would + /// all be part of the same struct, which would also track the output index + /// (that is captured in the outer `OutputIndex` of each `OutputReplier`). + outputs: Vec<(D, Output)>, + repliers: Vec>, +} + +impl DynamicUsage for Batch +where + IvkTag: DynamicUsage, + D: BatchDomain + DynamicUsage, + D::IncomingViewingKey: DynamicUsage, + Output: DynamicUsage, + Dec: Decryptor, +{ + fn dynamic_usage(&self) -> usize { + self.tags.dynamic_usage() + + self.ivks.dynamic_usage() + + self.outputs.dynamic_usage() + + self.repliers.dynamic_usage() + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let (tags_lower, tags_upper) = self.tags.dynamic_usage_bounds(); + let (ivks_lower, ivks_upper) = self.ivks.dynamic_usage_bounds(); + let (outputs_lower, outputs_upper) = self.outputs.dynamic_usage_bounds(); + let (repliers_lower, repliers_upper) = self.repliers.dynamic_usage_bounds(); + + ( + tags_lower + ivks_lower + outputs_lower + repliers_lower, + tags_upper + .zip(ivks_upper) + .zip(outputs_upper) + .zip(repliers_upper) + .map(|(((a, b), c), d)| a + b + c + d), + ) + } +} + +impl Batch +where + IvkTag: Clone, + D: BatchDomain, + Dec: Decryptor, +{ + /// Constructs a new batch. + fn new(tags: Vec, ivks: Vec) -> Self { + assert_eq!(tags.len(), ivks.len()); + Self { + tags, + ivks, + outputs: vec![], + repliers: vec![], + } + } + + /// Returns `true` if the batch is currently empty. + fn is_empty(&self) -> bool { + self.outputs.is_empty() + } +} + +impl Task for Batch +where + IvkTag: Clone + Send + 'static, + D: BatchDomain + Send + 'static, + D::IncomingViewingKey: Send, + D::Memo: Send, + D::Note: Send, + D::Recipient: Send, + Output: Send + 'static, + Dec: Decryptor + 'static, + Dec::Memo: Send, +{ + /// Runs the batch of trial decryptions, and reports the results. + fn run(self) { + // Deconstruct self so we can consume the pieces individually. + let Self { + tags, + ivks, + outputs, + repliers, + } = self; + + assert_eq!(outputs.len(), repliers.len()); + + let decryption_results = Dec::batch_decrypt(&tags, &ivks, &outputs); + for (decryption_result, OutputReplier(replier)) in + decryption_results.into_iter().zip(repliers.into_iter()) + { + // If `decryption_result` is `None` then we will just drop `replier`, + // indicating to the parent `BatchRunner` that this output was not for us. + if let Some(value) = decryption_result { + let result = OutputIndex { + output_index: replier.output_index, + value, + }; + + if replier.value.send(result).is_err() { + tracing::debug!("BatchRunner was dropped before batch finished"); + break; + } + } + } + } +} + +impl Batch +where + D: BatchDomain, + Output: Clone, + Dec: Decryptor, +{ + /// Adds the given outputs to this batch. + /// + /// `replier` will be called with the result of every output. + fn add_outputs( + &mut self, + domain: impl Fn(&Output) -> D, + outputs: &[Output], + replier: channel::Sender>, + ) { + self.outputs.extend( + outputs + .iter() + .cloned() + .map(|output| (domain(&output), output)), + ); + self.repliers.extend((0..outputs.len()).map(|output_index| { + OutputReplier(OutputIndex { + output_index, + value: replier.clone(), + }) + })); + } +} + +/// A `HashMap` key for looking up the result of a batch scanning a specific transaction. +#[derive(PartialEq, Eq, Hash)] +struct ResultKey(BlockHash, TxId); + +impl DynamicUsage for ResultKey { + #[inline(always)] + fn dynamic_usage(&self) -> usize { + 0 + } + + #[inline(always)] + fn dynamic_usage_bounds(&self) -> (usize, Option) { + (0, Some(0)) + } +} + +/// Logic to run batches of trial decryptions on the global threadpool. +pub(crate) struct BatchRunner +where + D: BatchDomain, + Dec: Decryptor, + T: Tasks>, +{ + batch_size_threshold: usize, + // The batch currently being accumulated. + acc: Batch, + // The running batches. + running_tasks: T, + // Receivers for the results of the running batches. + pending_results: HashMap>, +} + +impl DynamicUsage for BatchRunner +where + IvkTag: DynamicUsage, + D: BatchDomain + DynamicUsage, + D::IncomingViewingKey: DynamicUsage, + Output: DynamicUsage, + Dec: Decryptor, + T: Tasks> + DynamicUsage, +{ + fn dynamic_usage(&self) -> usize { + self.acc.dynamic_usage() + + self.running_tasks.dynamic_usage() + + self.pending_results.dynamic_usage() + } + + fn dynamic_usage_bounds(&self) -> (usize, Option) { + let running_usage = self.running_tasks.dynamic_usage(); + + let bounds = ( + self.acc.dynamic_usage_bounds(), + self.pending_results.dynamic_usage_bounds(), + ); + ( + bounds.0 .0 + running_usage + bounds.1 .0, + bounds + .0 + .1 + .zip(bounds.1 .1) + .map(|(a, b)| a + running_usage + b), + ) + } +} + +impl BatchRunner +where + IvkTag: Clone, + D: BatchDomain, + Dec: Decryptor, + T: Tasks>, +{ + /// Constructs a new batch runner for the given incoming viewing keys. + pub(crate) fn new( + batch_size_threshold: usize, + ivks: impl Iterator, + ) -> Self { + let (tags, ivks) = ivks.unzip(); + Self { + batch_size_threshold, + acc: Batch::new(tags, ivks), + running_tasks: T::new(), + pending_results: HashMap::default(), + } + } +} + +impl BatchRunner +where + IvkTag: Clone + Send + 'static, + D: BatchDomain + Send + 'static, + D::IncomingViewingKey: Clone + Send, + D::Memo: Send, + D::Note: Send, + D::Recipient: Send, + Output: Clone + Send + 'static, + Dec: Decryptor, + T: Tasks>, +{ + /// Batches the given outputs for trial decryption. + /// + /// `block_tag` is the hash of the block that triggered this txid being added to the + /// batch, or the all-zeros hash to indicate that no block triggered it (i.e. it was a + /// mempool change). + /// + /// If after adding the given outputs, the accumulated batch size is at least the size + /// threshold that was set via `Self::new`, `Self::flush` is called. Subsequent calls + /// to `Self::add_outputs` will be accumulated into a new batch. + pub(crate) fn add_outputs( + &mut self, + block_tag: BlockHash, + txid: TxId, + domain: impl Fn(&Output) -> D, + outputs: &[Output], + ) { + let (tx, rx) = channel::unbounded(); + self.acc.add_outputs(domain, outputs, tx); + self.pending_results + .insert(ResultKey(block_tag, txid), BatchReceiver(rx)); + + if self.acc.outputs.len() >= self.batch_size_threshold { + self.flush(); + } + } + + /// Runs the currently accumulated batch on the global threadpool. + /// + /// Subsequent calls to `Self::add_outputs` will be accumulated into a new batch. + pub(crate) fn flush(&mut self) { + if !self.acc.is_empty() { + let mut batch = Batch::new(self.acc.tags.clone(), self.acc.ivks.clone()); + mem::swap(&mut batch, &mut self.acc); + self.running_tasks.run_task(batch); + } + } + + /// Collects the pending decryption results for the given transaction. + /// + /// `block_tag` is the hash of the block that triggered this txid being added to the + /// batch, or the all-zeros hash to indicate that no block triggered it (i.e. it was a + /// mempool change). + pub(crate) fn collect_results( + &mut self, + block_tag: BlockHash, + txid: TxId, + ) -> HashMap<(TxId, usize), DecryptedOutput> { + self.pending_results + .remove(&ResultKey(block_tag, txid)) + // We won't have a pending result if the transaction didn't have outputs of + // this runner's kind. + .map(|BatchReceiver(rx)| { + // This iterator will end once the channel becomes empty and disconnected. + // We created one sender per output, and each sender is dropped after the + // batch it is in completes (and in the case of successful decryptions, + // after the decrypted note has been sent to the channel). Completion of + // the iterator therefore corresponds to complete knowledge of the outputs + // of this transaction that could be decrypted. + rx.into_iter() + .map( + |OutputIndex { + output_index, + value, + }| { ((txid, output_index), value) }, + ) + .collect() + }) + .unwrap_or_default() + } +} diff --git a/zingo-sync/src/sync.rs b/zingo-sync/src/sync.rs index 4b0d42b32..de25d767d 100644 --- a/zingo-sync/src/sync.rs +++ b/zingo-sync/src/sync.rs @@ -2,50 +2,80 @@ use std::ops::Range; +use crate::client::FetchRequest; use crate::client::{fetcher::fetcher, get_chain_height}; use crate::interface::SyncWallet; +use crate::primitives::SyncState; +use crate::scanner::scanner; +use zcash_client_backend::scanning::ScanningKeys; use zcash_client_backend::{ data_api::scanning::{ScanPriority, ScanRange}, proto::service::compact_tx_streamer_client::CompactTxStreamerClient, }; use futures::future::try_join_all; -use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use zcash_primitives::consensus::{BlockHeight, NetworkUpgrade, Parameters}; +const BATCH_SIZE: u32 = 1_000; + /// Syncs a wallet to the latest state of the blockchain pub async fn sync( client: CompactTxStreamerClient, parameters: &P, - wallet_data: &mut W, + wallet: &mut W, ) -> Result<(), ()> where - P: Parameters, + P: Parameters + Send + 'static, W: SyncWallet, { + tracing::info!("Syncing wallet..."); + + // TODO: add trait methods to read/write wallet data to/from sync engine + // this is where sync state would be read from wallet data + let sync_state = SyncState::new(); // placeholder + // create channel for sending fetch requests and launch fetcher task let (fetch_request_sender, fetch_request_receiver) = unbounded_channel(); let fetcher_handle = tokio::spawn(fetcher(fetch_request_receiver, client)); - let chain_height = get_chain_height(fetch_request_sender).await.unwrap(); - update_scan_ranges(parameters, wallet_data, chain_height).unwrap(); + update_scan_ranges(fetch_request_sender.clone(), parameters, &sync_state) + .await + .unwrap(); + + let account_ufvks = wallet.get_unified_full_viewing_keys().unwrap(); + let scanning_keys = ScanningKeys::from_account_ufvks(account_ufvks); + + let scan_range = prepare_next_scan_range(&sync_state); + if let Some(range) = scan_range { + scanner( + fetch_request_sender, + parameters, + &scanning_keys, + range.clone(), + ) + .await + .unwrap(); + } try_join_all(vec![fetcher_handle]).await.unwrap(); Ok(()) } -fn update_scan_ranges( +// update scan_ranges to include blocks between the last known chain height (wallet height) and the chain height from the server +async fn update_scan_ranges

( + fetch_request_sender: UnboundedSender, parameters: &P, - wallet_data: &mut W, - chain_height: BlockHeight, + sync_state: &SyncState, ) -> Result<(), ()> where P: Parameters, - W: SyncWallet, { - let scan_ranges = wallet_data.set_sync_state().unwrap().set_scan_ranges(); + let chain_height = get_chain_height(fetch_request_sender).await.unwrap(); + + let mut scan_ranges = sync_state.scan_ranges().write().unwrap(); let wallet_height = if scan_ranges.is_empty() { parameters @@ -58,6 +88,11 @@ where .block_range() .end }; + + if wallet_height > chain_height { + panic!("wallet is ahead of server!") + } + let chain_tip_scan_range = ScanRange::from_parts( Range { start: wallet_height, @@ -67,8 +102,33 @@ where ); scan_ranges.push(chain_tip_scan_range); + if scan_ranges.is_empty() { + panic!("scan ranges should never be empty after updating") + } + // TODO: add logic to combine chain tip scan range with wallet tip scan range // TODO: add scan priority logic Ok(()) } + +// returns `None` if there are no more ranges to scan +fn prepare_next_scan_range(sync_state: &SyncState) -> Option { + let mut scan_ranges = sync_state.scan_ranges().write().unwrap(); + + // placeholder for algorythm that determines highest priority range to scan + let (index, selected_scan_range) = scan_ranges.iter_mut().enumerate().find(|(_, range)| { + range.priority() != ScanPriority::Scanned && range.priority() != ScanPriority::Ignored + })?; + + // if scan range is larger than BATCH_SIZE, split off and return a batch from the lower end and update scan ranges + if let Some((lower_range, higher_range)) = selected_scan_range + .split_at(selected_scan_range.block_range().start + BlockHeight::from_u32(BATCH_SIZE)) + { + scan_ranges.splice(index..=index, vec![lower_range.clone(), higher_range]); + + Some(lower_range) + } else { + Some(selected_scan_range.clone()) + } +} diff --git a/zingolib/src/wallet.rs b/zingolib/src/wallet.rs index 9fc300694..48b4ce5af 100644 --- a/zingolib/src/wallet.rs +++ b/zingolib/src/wallet.rs @@ -206,7 +206,8 @@ pub struct LightWallet { pub transaction_context: TransactionContext, #[cfg(feature = "sync")] - sync_state: zingo_sync::SyncState, + #[allow(dead_code)] + sync_state: zingo_sync::primitives::SyncState, } impl LightWallet { @@ -364,7 +365,7 @@ impl LightWallet { price: Arc::new(RwLock::new(WalletZecPriceInfo::default())), transaction_context, #[cfg(feature = "sync")] - sync_state: zingo_sync::SyncState::new(), + sync_state: zingo_sync::primitives::SyncState::new(), }) } diff --git a/zingolib/src/wallet/disk.rs b/zingolib/src/wallet/disk.rs index 2ed31292d..337d167a8 100644 --- a/zingolib/src/wallet/disk.rs +++ b/zingolib/src/wallet/disk.rs @@ -237,7 +237,7 @@ impl LightWallet { price: Arc::new(RwLock::new(price)), transaction_context, #[cfg(feature = "sync")] - sync_state: zingo_sync::SyncState::new(), + sync_state: zingo_sync::primitives::SyncState::new(), }; Ok(lw) diff --git a/zingolib/src/wallet/sync.rs b/zingolib/src/wallet/sync.rs index 89b6e03be..c189a2302 100644 --- a/zingolib/src/wallet/sync.rs +++ b/zingolib/src/wallet/sync.rs @@ -1,13 +1,35 @@ //! Trait implmentations for sync interface +use std::collections::HashMap; + +use zcash_keys::keys::{UnifiedFullViewingKey, UnifiedSpendingKey}; use zingo_sync::interface::SyncWallet; +use zip32::AccountId; use crate::wallet::LightWallet; impl SyncWallet for LightWallet { type Error = (); - fn set_sync_state(&mut self) -> Result<&mut zingo_sync::SyncState, Self::Error> { - Ok(&mut self.sync_state) + fn get_unified_full_viewing_keys( + &self, + ) -> Result, Self::Error> { + let account_id = AccountId::try_from(0).unwrap(); + let seed = self + .mnemonic() + .map(|(mmemonic, _)| mmemonic) + .unwrap() + .to_seed(""); + let usk = UnifiedSpendingKey::from_seed( + &self.transaction_context.config.chain, + &seed, + account_id, + ) + .unwrap(); + let ufvk = usk.to_unified_full_viewing_key(); + let mut ufvk_map = HashMap::new(); + ufvk_map.insert(account_id, ufvk); + + Ok(ufvk_map) } }