From 24277a6ba4013ead9a385acceff6db3de7a0c198 Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Fri, 9 Feb 2024 16:57:13 +0000 Subject: [PATCH 1/2] zcash_client_backend: Implement async wallet synchronization function This implements the necessary state machine for taking a wallet in some arbitrary synchronization status, and fully scanning (the remainder of) the chain. Closes zcash/librustzcash#1169. --- Cargo.lock | 14 + zcash_client_backend/CHANGELOG.md | 9 +- zcash_client_backend/Cargo.toml | 18 +- zcash_client_backend/src/data_api/chain.rs | 25 +- zcash_client_backend/src/lib.rs | 3 + zcash_client_backend/src/sync.rs | 491 +++++++++++++++++++++ 6 files changed, 543 insertions(+), 17 deletions(-) create mode 100644 zcash_client_backend/src/sync.rs diff --git a/Cargo.lock b/Cargo.lock index e43ea00643..d74b4aaf87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -783,6 +783,17 @@ version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +[[package]] +name = "futures-macro" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.53", +] + [[package]] name = "futures-sink" version = "0.3.29" @@ -802,9 +813,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -3021,6 +3034,7 @@ dependencies = [ "byteorder", "crossbeam-channel", "document-features", + "futures-util", "group", "gumdrop", "hdwallet", diff --git a/zcash_client_backend/CHANGELOG.md b/zcash_client_backend/CHANGELOG.md index ac4d35c3d7..b7cf53a9f5 100644 --- a/zcash_client_backend/CHANGELOG.md +++ b/zcash_client_backend/CHANGELOG.md @@ -7,6 +7,13 @@ and this library adheres to Rust's notion of ## [Unreleased] +### Added +- `zcash_client_backend::data_api`: + - `chain::BlockCache` trait, behind the `sync` feature flag. +- `zcash_client_backend::scanning`: + - `testing` module +- `zcash_client_backend::sync` module, behind the `sync` feature flag. + ## [0.12.1] - 2024-03-27 ### Fixed @@ -39,7 +46,6 @@ and this library adheres to Rust's notion of - `WalletSummary::next_orchard_subtree_index` - `chain::ChainState` - `chain::ScanSummary::{spent_orchard_note_count, received_orchard_note_count}` - - `chain::BlockCache` trait - `impl Debug for chain::CommitmentTreeRoot` - `zcash_client_backend::fees`: - `orchard` @@ -55,7 +61,6 @@ and this library adheres to Rust's notion of - `Nullifiers::{orchard, extend_orchard, retain_orchard}` - `TaggedOrchardBatch` - `TaggedOrchardBatchRunner` - - `testing` module - `zcash_client_backend::wallet`: - `Note::Orchard` - `WalletOrchardSpend` diff --git a/zcash_client_backend/Cargo.toml b/zcash_client_backend/Cargo.toml index d5347b56c6..98d8b8202e 100644 --- a/zcash_client_backend/Cargo.toml +++ b/zcash_client_backend/Cargo.toml @@ -66,7 +66,7 @@ tracing.workspace = true # - Protobuf interfaces and gRPC bindings hex.workspace = true prost.workspace = true -tonic = { workspace = true, optional = true, features = ["prost", "codegen"]} +tonic = { workspace = true, optional = true, features = ["prost", "codegen"] } # - Secret management secrecy.workspace = true @@ -78,6 +78,10 @@ group.workspace = true orchard = { workspace = true, optional = true } sapling.workspace = true +# - Sync engine +async-trait = { version = "0.1", optional = true } +futures-util = { version = "0.3", optional = true } + # - Note commitment trees incrementalmerkletree.workspace = true shardtree.workspace = true @@ -89,9 +93,6 @@ jubjub = { workspace = true, optional = true } # - ZIP 321 nom = "7" -# - Asychronous -async-trait = "0.1.78" - # Dependencies used internally: # (Breaking upgrades to these are usually backwards-compatible, but check MSRVs.) # - Documentation @@ -106,7 +107,7 @@ crossbeam-channel.workspace = true rayon.workspace = true [build-dependencies] -tonic-build = { workspace = true, features = ["prost"]} +tonic-build = { workspace = true, features = ["prost"] } which = "4" [dev-dependencies] @@ -141,6 +142,13 @@ transparent-inputs = [ ## Enables receiving and spending Orchard funds. orchard = ["dep:orchard", "zcash_keys/orchard"] +## Exposes a wallet synchronization function that implements the necessary state machine. +sync = [ + "lightwalletd-tonic", + "dep:async-trait", + "dep:futures-util", +] + ## Exposes APIs that are useful for testing, such as `proptest` strategies. test-dependencies = [ "dep:proptest", diff --git a/zcash_client_backend/src/data_api/chain.rs b/zcash_client_backend/src/data_api/chain.rs index a0da739cb7..f458bc8b2a 100644 --- a/zcash_client_backend/src/data_api/chain.rs +++ b/zcash_client_backend/src/data_api/chain.rs @@ -151,9 +151,8 @@ //! # } //! ``` -use std::ops::{Add, Range}; +use std::ops::Range; -use async_trait::async_trait; use incrementalmerkletree::frontier::Frontier; use subtle::ConditionallySelectable; use zcash_primitives::{ @@ -162,15 +161,20 @@ use zcash_primitives::{ }; use crate::{ - data_api::{scanning::ScanRange, NullifierQuery, WalletWrite}, + data_api::{NullifierQuery, WalletWrite}, proto::compact_formats::CompactBlock, scanning::{scan_block_with_runners, BatchRunners, Nullifiers, ScanningKeys}, }; +#[cfg(feature = "sync")] +use { + super::scanning::ScanPriority, crate::data_api::scanning::ScanRange, async_trait::async_trait, +}; + pub mod error; use error::Error; -use super::{scanning::ScanPriority, WalletRead}; +use super::WalletRead; /// A struct containing metadata about a subtree root of the note commitment tree. /// @@ -300,7 +304,7 @@ pub trait BlockSource { /// Ok(()) /// } /// -/// async fn delete(&self, range: &ScanRange) -> Result<(), Self::Error> { +/// async fn delete(&self, range: ScanRange) -> Result<(), Self::Error> { /// self.cached_blocks /// .lock() /// .unwrap() @@ -368,11 +372,12 @@ pub trait BlockSource { /// /// // Delete blocks from the block cache /// rt.block_on(async { -/// block_cache.delete(&range).await.unwrap(); +/// block_cache.delete(range).await.unwrap(); /// }); /// assert_eq!(block_cache.cached_blocks.lock().unwrap().len(), 0); /// assert_eq!(block_cache.get_tip_height(None).unwrap(), None); /// ``` +#[cfg(feature = "sync")] #[async_trait] pub trait BlockCache: BlockSource + Send + Sync where @@ -404,10 +409,10 @@ where /// Removes all cached blocks above a specified block height. async fn truncate(&self, block_height: BlockHeight) -> Result<(), Self::Error> { if let Some(latest) = self.get_tip_height(None)? { - self.delete(&ScanRange::from_parts( + self.delete(ScanRange::from_parts( Range { - start: block_height.add(1), - end: latest.add(1), + start: block_height + 1, + end: latest + 1, }, ScanPriority::Ignored, )) @@ -421,7 +426,7 @@ where /// # Errors /// /// In the case of an error, some blocks requested for deletion may remain in the block cache. - async fn delete(&self, range: &ScanRange) -> Result<(), Self::Error>; + async fn delete(&self, range: ScanRange) -> Result<(), Self::Error>; } /// Metadata about modifications to the wallet state made in the course of scanning a set of diff --git a/zcash_client_backend/src/lib.rs b/zcash_client_backend/src/lib.rs index a6928a1b36..a849684640 100644 --- a/zcash_client_backend/src/lib.rs +++ b/zcash_client_backend/src/lib.rs @@ -74,6 +74,9 @@ pub mod scanning; pub mod wallet; pub mod zip321; +#[cfg(feature = "sync")] +pub mod sync; + #[cfg(feature = "unstable-serialization")] pub mod serialization; diff --git a/zcash_client_backend/src/sync.rs b/zcash_client_backend/src/sync.rs new file mode 100644 index 0000000000..b7b0f88e52 --- /dev/null +++ b/zcash_client_backend/src/sync.rs @@ -0,0 +1,491 @@ +//! Implementation of the synchronization flow described in the crate root. +//! +//! This is currently a simple implementation that does not yet implement a few features: +//! +//! - Block batches are not downloaded in parallel with scanning. +//! - Transactions are not enhanced once detected (that is, after an output is detected in +//! a transaction, the full transaction is not downloaded and scanned). +//! - There is no mechanism for notifying the caller of progress updates. +//! - There is no mechanism for interrupting the synchronization flow, other than ending +//! the process. + +use std::fmt; + +use futures_util::TryStreamExt; +use shardtree::error::ShardTreeError; +use subtle::ConditionallySelectable; +use tonic::{ + body::BoxBody, + client::GrpcService, + codegen::{Body, Bytes, StdError}, +}; +use tracing::{debug, info}; +use zcash_primitives::{ + consensus::{BlockHeight, Parameters}, + merkle_tree::HashSer, +}; + +use crate::{ + data_api::{ + chain::{ + error::Error as ChainError, scan_cached_blocks, BlockCache, ChainState, + CommitmentTreeRoot, + }, + scanning::{ScanPriority, ScanRange}, + WalletCommitmentTrees, WalletRead, WalletWrite, + }, + proto::service::{self, compact_tx_streamer_client::CompactTxStreamerClient, BlockId}, + scanning::ScanError, +}; + +#[cfg(feature = "orchard")] +use orchard::tree::MerkleHashOrchard; + +/// Scans the chain until the wallet is up-to-date. +pub async fn run( + client: &mut CompactTxStreamerClient, + params: &P, + db_cache: &CaT, + db_data: &mut DbT, + batch_size: u32, +) -> Result<(), Error::Error, ::Error>> +where + P: Parameters + Send + 'static, + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + CaT: BlockCache, + CaT::Error: std::error::Error + Send + Sync + 'static, + DbT: WalletWrite + WalletCommitmentTrees, + DbT::AccountId: ConditionallySelectable + Default + Send + 'static, + ::Error: std::error::Error + Send + Sync + 'static, + ::Error: std::error::Error + Send + Sync + 'static, +{ + // 1) Download note commitment tree data from lightwalletd + // 2) Pass the commitment tree data to the database. + update_subtree_roots(client, db_data).await?; + + while running(client, params, db_cache, db_data, batch_size).await? {} + + Ok(()) +} + +async fn running( + client: &mut CompactTxStreamerClient, + params: &P, + db_cache: &CaT, + db_data: &mut DbT, + batch_size: u32, +) -> Result::Error, TrErr>> +where + P: Parameters + Send + 'static, + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + CaT: BlockCache, + CaT::Error: std::error::Error + Send + Sync + 'static, + DbT: WalletWrite, + DbT::AccountId: ConditionallySelectable + Default + Send + 'static, + DbT::Error: std::error::Error + Send + Sync + 'static, +{ + // 3) Download chain tip metadata from lightwalletd + // 4) Notify the wallet of the updated chain tip. + update_chain_tip(client, db_data).await?; + + // 5) Get the suggested scan ranges from the wallet database + let mut scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?; + + // Store the handles to cached block deletions (which we spawn into separate + // tasks to allow us to continue downloading and scanning other ranges). + let mut block_deletions = vec![]; + + // 6) Run the following loop until the wallet's view of the chain tip as of + // the previous wallet session is valid. + loop { + // If there is a range of blocks that needs to be verified, it will always + // be returned as the first element of the vector of suggested ranges. + match scan_ranges.first() { + Some(scan_range) if scan_range.priority() == ScanPriority::Verify => { + // Download the blocks in `scan_range` into the block source, + // overwriting any existing blocks in this range. + download_blocks(client, db_cache, scan_range).await?; + + let chain_state = + download_chain_state(client, scan_range.block_range().start - 1).await?; + + // Scan the downloaded blocks and check for scanning errors that + // indicate the wallet's chain tip is out of sync with blockchain + // history. + let scan_ranges_updated = + scan_blocks(params, db_cache, db_data, &chain_state, scan_range).await?; + + // Delete the now-scanned blocks, because keeping the entire chain + // in CompactBlock files on disk is horrendous for the filesystem. + block_deletions.push(db_cache.delete(scan_range.clone())); + + if scan_ranges_updated { + // The suggested scan ranges have been updated, so we re-request. + scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?; + } else { + // At this point, the cache and scanned data are locally + // consistent (though not necessarily consistent with the + // latest chain tip - this would be discovered the next time + // this codepath is executed after new blocks are received) so + // we can break out of the loop. + break; + } + } + _ => { + // Nothing to verify; break out of the loop + break; + } + } + } + + // 7) Loop over the remaining suggested scan ranges, retrieving the requested data + // and calling `scan_cached_blocks` on each range. + let scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?; + debug!("Suggested ranges: {:?}", scan_ranges); + for scan_range in scan_ranges.into_iter().flat_map(|r| { + // Limit the number of blocks we download and scan at any one time. + (0..).scan(r, |acc, _| { + if acc.is_empty() { + None + } else if let Some((cur, next)) = acc.split_at(acc.block_range().start + batch_size) { + *acc = next; + Some(cur) + } else { + let cur = acc.clone(); + let end = acc.block_range().end; + *acc = ScanRange::from_parts(end..end, acc.priority()); + Some(cur) + } + }) + }) { + // Download the blocks in `scan_range` into the block source. + download_blocks(client, db_cache, &scan_range).await?; + + let chain_state = download_chain_state(client, scan_range.block_range().start - 1).await?; + + // Scan the downloaded blocks. + let scan_ranges_updated = + scan_blocks(params, db_cache, db_data, &chain_state, &scan_range).await?; + + // Delete the now-scanned blocks. + block_deletions.push(db_cache.delete(scan_range)); + + if scan_ranges_updated { + // The suggested scan ranges have been updated (either due to a continuity + // error or because a higher priority range has been added). + info!("Waiting for cached blocks to be deleted..."); + for deletion in block_deletions { + deletion.await.map_err(Error::Cache)?; + } + return Ok(true); + } + } + + info!("Waiting for cached blocks to be deleted..."); + for deletion in block_deletions { + deletion.await.map_err(Error::Cache)?; + } + Ok(false) +} + +async fn update_subtree_roots( + client: &mut CompactTxStreamerClient, + db_data: &mut DbT, +) -> Result<(), Error::Error>> +where + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + DbT: WalletCommitmentTrees, + ::Error: std::error::Error + Send + Sync + 'static, +{ + let mut request = service::GetSubtreeRootsArg::default(); + request.set_shielded_protocol(service::ShieldedProtocol::Sapling); + // Hack to work around a bug in the initial lightwalletd implementation. + request.max_entries = 65536; + + let sapling_roots: Vec> = client + .get_subtree_roots(request) + .await? + .into_inner() + .and_then(|root| async move { + let root_hash = sapling::Node::read(&root.root_hash[..])?; + Ok(CommitmentTreeRoot::from_parts( + BlockHeight::from_u32(root.completing_block_height as u32), + root_hash, + )) + }) + .try_collect() + .await?; + + info!("Sapling tree has {} subtrees", sapling_roots.len()); + db_data + .put_sapling_subtree_roots(0, &sapling_roots) + .map_err(Error::WalletTrees)?; + + #[cfg(feature = "orchard")] + { + let mut request = service::GetSubtreeRootsArg::default(); + request.set_shielded_protocol(service::ShieldedProtocol::Orchard); + // Hack to work around a bug in the initial lightwalletd implementation. + request.max_entries = 65536; + let orchard_roots: Vec> = client + .get_subtree_roots(request) + .await? + .into_inner() + .and_then(|root| async move { + let root_hash = MerkleHashOrchard::read(&root.root_hash[..])?; + Ok(CommitmentTreeRoot::from_parts( + BlockHeight::from_u32(root.completing_block_height as u32), + root_hash, + )) + }) + .try_collect() + .await?; + + info!("Orchard tree has {} subtrees", orchard_roots.len()); + db_data + .put_orchard_subtree_roots(0, &orchard_roots) + .map_err(Error::WalletTrees)?; + } + + Ok(()) +} + +async fn update_chain_tip( + client: &mut CompactTxStreamerClient, + db_data: &mut DbT, +) -> Result<(), Error::Error, TrErr>> +where + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + DbT: WalletWrite, + DbT::Error: std::error::Error + Send + Sync + 'static, +{ + let tip_height: BlockHeight = client + .get_latest_block(service::ChainSpec::default()) + .await? + .get_ref() + .height + .try_into() + .map_err(|_| Error::MisbehavingServer)?; + + info!("Latest block height is {}", tip_height); + db_data + .update_chain_tip(tip_height) + .map_err(Error::Wallet)?; + + Ok(()) +} + +async fn download_blocks( + client: &mut CompactTxStreamerClient, + db_cache: &CaT, + scan_range: &ScanRange, +) -> Result<(), Error> +where + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, + CaT: BlockCache, + CaT::Error: std::error::Error + Send + Sync + 'static, +{ + info!("Fetching {}", scan_range); + let mut start = service::BlockId::default(); + start.height = scan_range.block_range().start.into(); + let mut end = service::BlockId::default(); + end.height = (scan_range.block_range().end - 1).into(); + let range = service::BlockRange { + start: Some(start), + end: Some(end), + }; + let compact_blocks = client + .get_block_range(range) + .await? + .into_inner() + .try_collect::>() + .await?; + + db_cache + .insert(compact_blocks) + .await + .map_err(Error::Cache)?; + + Ok(()) +} + +async fn download_chain_state( + client: &mut CompactTxStreamerClient, + block_height: BlockHeight, +) -> Result> +where + ChT: GrpcService, + ChT::Error: Into, + ChT::ResponseBody: Body + Send + 'static, + ::Error: Into + Send, +{ + let tree_state = client + .get_tree_state(BlockId { + height: block_height.into(), + hash: vec![], + }) + .await?; + + tree_state + .into_inner() + .to_chain_state() + .map_err(|_| Error::MisbehavingServer) +} + +/// Scans the given block range and checks for scanning errors that indicate the wallet's +/// chain tip is out of sync with blockchain history. +/// +/// Returns `true` if scanning these blocks materially changed the suggested scan ranges. +async fn scan_blocks( + params: &P, + db_cache: &CaT, + db_data: &mut DbT, + initial_chain_state: &ChainState, + scan_range: &ScanRange, +) -> Result::Error, TrErr>> +where + P: Parameters + Send + 'static, + CaT: BlockCache, + CaT::Error: std::error::Error + Send + Sync + 'static, + DbT: WalletWrite, + DbT::AccountId: ConditionallySelectable + Default + Send + 'static, + DbT::Error: std::error::Error + Send + Sync + 'static, +{ + info!("Scanning {}", scan_range); + let scan_result = scan_cached_blocks( + params, + db_cache, + db_data, + scan_range.block_range().start, + initial_chain_state, + scan_range.len(), + ); + + match scan_result { + Err(ChainError::Scan(err)) if err.is_continuity_error() => { + // Pick a height to rewind to, which must be at least one block before the + // height at which the error occurred, but may be an earlier height determined + // based on heuristics such as the platform, available bandwidth, size of + // recent CompactBlocks, etc. + let rewind_height = err.at_height().saturating_sub(10); + info!( + "Chain reorg detected at {}, rewinding to {}", + err.at_height(), + rewind_height, + ); + + // Rewind to the chosen height. + db_data + .truncate_to_height(rewind_height) + .map_err(Error::Wallet)?; + + // Delete cached blocks from rewind_height onwards. + // + // This does imply that assumed-valid blocks will be re-downloaded, but it is + // also possible that in the intervening time, a chain reorg has occurred that + // orphaned some of those blocks. + db_cache + .truncate(rewind_height) + .await + .map_err(Error::Cache)?; + + // The database was truncated, invalidating prior suggested ranges. + Ok(true) + } + Ok(_) => { + // If scanning these blocks caused a suggested range to be added that has a + // higher priority than the current range, invalidate the current ranges. + let latest_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?; + + Ok(if let Some(range) = latest_ranges.first() { + range.priority() > scan_range.priority() + } else { + false + }) + } + Err(e) => Err(e.into()), + } +} + +/// Errors that can occur while syncing. +#[derive(Debug)] +pub enum Error { + /// An error while interacting with a [`BlockCache`]. + Cache(CaErr), + /// The lightwalletd server returned invalid information, and is misbehaving. + MisbehavingServer, + /// An error while scanning blocks. + Scan(ScanError), + /// An error while communicating with the lightwalletd server. + Server(tonic::Status), + /// An error while interacting with a wallet database via [`WalletRead`] or + /// [`WalletWrite`]. + Wallet(DbErr), + /// An error while interacting with a wallet database via [`WalletCommitmentTrees`]. + WalletTrees(ShardTreeError), +} + +impl fmt::Display for Error +where + CaErr: fmt::Display, + DbErr: fmt::Display, + TrErr: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Cache(e) => write!(f, "Error while interacting with block cache: {}", e), + Error::MisbehavingServer => write!(f, "lightwalletd server is misbehaving"), + Error::Scan(e) => write!(f, "Error while scanning blocks: {}", e), + Error::Server(e) => write!( + f, + "Error while communicating with lightwalletd server: {}", + e + ), + Error::Wallet(e) => write!(f, "Error while interacting with wallet database: {}", e), + Error::WalletTrees(e) => write!( + f, + "Error while interacting with wallet commitment trees: {}", + e + ), + } + } +} + +impl std::error::Error for Error +where + CaErr: std::error::Error, + DbErr: std::error::Error, + TrErr: std::error::Error, +{ +} + +impl From> for Error { + fn from(e: ChainError) -> Self { + match e { + ChainError::Wallet(e) => Error::Wallet(e), + ChainError::BlockSource(e) => Error::Cache(e), + ChainError::Scan(e) => Error::Scan(e), + } + } +} + +impl From for Error { + fn from(status: tonic::Status) -> Self { + Error::Server(status) + } +} From 7f017bc126ce8998e3727320a3b1e6acc9ff2a9e Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Tue, 2 Apr 2024 00:37:16 +0000 Subject: [PATCH 2/2] CI: Test with `sync` feature flag --- .github/actions/prepare/action.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/actions/prepare/action.yml b/.github/actions/prepare/action.yml index a74f550712..9ac597d495 100644 --- a/.github/actions/prepare/action.yml +++ b/.github/actions/prepare/action.yml @@ -28,6 +28,7 @@ runs: bundled-prover download-params lightwalletd-tonic + sync temporary-zcashd transparent-inputs unstable