diff --git a/sn_cli/src/acc_packet.rs b/sn_cli/src/acc_packet.rs index 809bf3994c..892d171fa6 100644 --- a/sn_cli/src/acc_packet.rs +++ b/sn_cli/src/acc_packet.rs @@ -21,7 +21,7 @@ use color_eyre::{ use sn_client::{ protocol::storage::{Chunk, RegisterAddress, RetryStrategy}, registers::EntryHash, - Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg, UploadSummary, + Client, FilesApi, FolderEntry, FoldersApi, Metadata, UploadCfg, }; use std::{ collections::{ @@ -33,6 +33,7 @@ use std::{ io::Write, path::{Path, PathBuf}, }; +use tokio::task::JoinSet; use tracing::trace; use walkdir::{DirEntry, WalkDir}; use xor_name::XorName; @@ -221,10 +222,8 @@ impl AccountPacket { /// Sync local changes made to files and folder with their version on the network, /// both pushing and pulling changes to/form the network. pub async fn sync(&mut self, upload_cfg: UploadCfg, make_data_public: bool) -> Result<()> { - let ChangesToApply { - mut folders, - mutations, - } = self.scan_files_and_folders_for_changes(make_data_public)?; + let ChangesToApply { folders, mutations } = + self.scan_files_and_folders_for_changes(make_data_public)?; if mutations.is_empty() { println!("No local changes made to files/folders to be pushed to network."); @@ -234,8 +233,8 @@ impl AccountPacket { } println!("Paying for folders hierarchy and uploading..."); - let _synced_folders = self - .pay_and_sync(&mut folders, upload_cfg.clone(), make_data_public) + let synced_folders = self + .pay_and_sync_folders(folders, upload_cfg, make_data_public) .await?; // mark root folder as created if it wasn't already @@ -266,7 +265,7 @@ impl AccountPacket { } // download files/folders which are new in the synced folders - let folders_to_download: Vec<_> = folders + let folders_to_download: Vec<_> = synced_folders .iter() .map(|(path, (folders_api, _))| { let folder_name: OsString = path.file_name().unwrap_or_default().into(); @@ -633,28 +632,57 @@ impl AccountPacket { .filter(|e| e.file_type().is_file()) } - // Pay and upload all the files and their metadata along with the folders (registers). - // This also merged the folder with the one from the network. - async fn pay_and_sync( + // Pay and upload all the files and folder. + async fn pay_and_sync_folders( &self, - folders: &mut Folders, + folders: Folders, upload_cfg: UploadCfg, make_data_public: bool, - ) -> Result { + ) -> Result { let files_uploader = FilesUploader::new(self.client.clone(), self.wallet_dir.clone()) - .set_upload_cfg(upload_cfg.clone()) + .set_upload_cfg(upload_cfg) .set_make_data_public(make_data_public) .insert_entries(self.iter_only_files()); + let _summary = files_uploader.start_upload().await?; - let files_summary = files_uploader.start_upload().await?; + // Sync the folders. The payment is made inside sync() if required. + let mut tasks = JoinSet::new(); + for (path, (mut folder, folder_change)) in folders { + let op = if folder_change.is_new_folder() { + "Creation" + } else { + "Syncing" + }; - // batch sync the folders - let folder_summary = FoldersApi::sync_multiple( - folders.iter_mut().map(|(_, (folder, _))| folder), - upload_cfg, - ) - .await?; - Ok(files_summary.merge(folder_summary)?) + tasks.spawn(async move { + match folder.sync(upload_cfg).await { + Ok(()) => { + println!( + "{op} of Folder (for {path:?}) succeeded. Address: {}", + folder.address().to_hex() + ); + } + Err(err) => { + println!("{op} of Folder (for {path:?}) failed: {err}") + } + } + (path, folder, folder_change) + }); + } + + let mut synced_folders = Folders::new(); + while let Some(res) = tasks.join_next().await { + match res { + Ok((path, folder, c)) => { + synced_folders.insert(path, (folder, c)); + } + Err(err) => { + println!("Failed to sync/create a Folder with/on the network: {err:?}"); + } + } + } + + Ok(synced_folders) } // Download a Folders and their files from the network and generate tracking info diff --git a/sn_cli/src/files/files_uploader.rs b/sn_cli/src/files/files_uploader.rs index b95db2dc43..3242f8f711 100644 --- a/sn_cli/src/files/files_uploader.rs +++ b/sn_cli/src/files/files_uploader.rs @@ -87,7 +87,7 @@ impl FilesUploader { let now = Instant::now(); let mut uploader = Uploader::new(self.client, self.root_dir); - uploader.set_upload_cfg(self.upload_cfg.clone()); + uploader.set_upload_cfg(self.upload_cfg); uploader.insert_chunk_paths(chunks_to_upload); let events_handle = Self::spawn_upload_events_handler( diff --git a/sn_client/src/error.rs b/sn_client/src/error.rs index 9e471fd951..a5880bd420 100644 --- a/sn_client/src/error.rs +++ b/sn_client/src/error.rs @@ -109,13 +109,6 @@ pub enum Error { #[error("Task completion notification channel is done")] FailedToReadFromNotificationChannel, - // ------ Batch Sync ------ - #[error("Inconsistent wallet directory during batch sync")] - InconsistentBatchSyncState, - - #[error("Batch sync encountered an empty list")] - BatchSyncEmptyList, - #[error("Could not find register after batch sync: {0:?}")] RegisterNotFoundAfterUpload(XorName), diff --git a/sn_client/src/folders.rs b/sn_client/src/folders.rs index cf4f6e826f..a2fe92b11c 100644 --- a/sn_client/src/folders.rs +++ b/sn_client/src/folders.rs @@ -7,13 +7,13 @@ // permissions and limitations relating to use of the SAFE Network Software. use super::{error::Result, Client, ClientRegister, WalletClient}; -use crate::{uploader::UploadCfg, Error, FilesApi, UploadSummary, Uploader}; +use crate::{Error, UploadCfg, Uploader}; use bls::{Ciphertext, PublicKey}; use bytes::{BufMut, BytesMut}; use self_encryption::MAX_CHUNK_SIZE; use serde::{Deserialize, Serialize}; use sn_protocol::{ - storage::{Chunk, ChunkAddress, RegisterAddress, RetryStrategy}, + storage::{Chunk, ChunkAddress, RegisterAddress}, NetworkAddress, }; use sn_registers::{Entry, EntryHash}; @@ -48,7 +48,6 @@ pub struct FoldersApi { client: Client, wallet_dir: PathBuf, register: ClientRegister, - files_api: FilesApi, // Cache of metadata chunks. We keep the Chunk itself till we upload it to the network. metadata: BTreeMap)>, } @@ -180,70 +179,22 @@ impl FoldersApi { } /// Sync local Folder with the network. - pub async fn sync( - &mut self, - verify_store: bool, - retry_strategy: Option, - ) -> Result<()> { - let mut wallet_client = self.wallet()?; - - // First upload any newly created metadata chunk - for (_, meta_chunk) in self.metadata.values_mut() { - if let Some(chunk) = meta_chunk.take() { - self.files_api - .get_local_payment_and_upload_chunk(chunk.clone(), verify_store, retry_strategy) - .await?; - } - } - - let payment_info = wallet_client.get_non_expired_payment_for_addr(&self.as_net_addr())?; - - self.register - .sync(&mut wallet_client, verify_store, Some(payment_info)) - .await?; - - Ok(()) - } - - /// Sync multiple Folders with the network. - pub async fn sync_multiple( - folders: impl IntoIterator, - upload_cfg: UploadCfg, - ) -> Result { - let folders_vec = folders.into_iter().collect::>(); - // Ensure there's only one unique root_dir across all folders - let unique_root_dirs: BTreeSet<&Path> = folders_vec - .iter() - .map(|folder| folder.wallet_dir.as_path()) - .collect(); - let root_dir = match unique_root_dirs.iter().next() { - Some(&dir) if unique_root_dirs.len() == 1 => dir, - _ => return Err(Error::InconsistentBatchSyncState), - }; - let client = folders_vec - .first() - .ok_or(Error::BatchSyncEmptyList)? - .client - .clone(); - - let mut uploader = Uploader::new(client, root_dir.to_path_buf()); + /// This makes a payment and uploads the folder if the metadata chunks and registers have not yet been paid. + pub async fn sync(&mut self, upload_cfg: UploadCfg) -> Result<()> { + let mut uploader = Uploader::new(self.client.clone(), self.wallet_dir.to_path_buf()); uploader.set_upload_cfg(upload_cfg); - uploader.set_collect_registers(true); // override upload cfg to collect all registers - uploader.insert_chunks(folders_vec.iter().flat_map(|folder| folder.meta_chunks())); - uploader.insert_register(folders_vec.iter().map(|folder| folder.register())); - let mut upload_summary = uploader.start_upload().await?; - - // now update the registers - for folder in folders_vec { - let address = folder.address(); - let updated_register = upload_summary - .uploaded_registers - .remove(address) - .ok_or(Error::RegisterNotFoundAfterUpload(address.xorname()))?; - folder.register = updated_register; - } - - Ok(upload_summary) + uploader.set_collect_registers(true); // override upload cfg to collect the updated register. + uploader.insert_chunks(self.meta_chunks()); + uploader.insert_register(vec![self.register()]); + let upload_summary = uploader.start_upload().await?; + + let updated_register = upload_summary + .uploaded_registers + .get(self.address()) + .ok_or(Error::RegisterNotFoundAfterUpload(self.address().xorname()))? + .clone(); + self.register = updated_register; + Ok(()) } /// Download a copy of the Folder from the network. @@ -334,13 +285,10 @@ impl FoldersApi { // Create a new FoldersApi instance with given register. fn create(client: Client, wallet_dir: &Path, register: ClientRegister) -> Result { - let files_api = FilesApi::new(client.clone(), wallet_dir.to_path_buf()); - Ok(Self { client, wallet_dir: wallet_dir.to_path_buf(), register, - files_api, metadata: BTreeMap::new(), }) } diff --git a/sn_client/src/register.rs b/sn_client/src/register.rs index d556555793..e2106a5edc 100644 --- a/sn_client/src/register.rs +++ b/sn_client/src/register.rs @@ -6,12 +6,9 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{ - wallet::StoragePaymentResult, Client, Error, Result, UploadCfg, UploadSummary, Uploader, - WalletClient, -}; - +use crate::{wallet::StoragePaymentResult, Client, Error, Result, WalletClient}; use bls::PublicKey; +use crdts::merkle_reg::MerkleReg; use libp2p::{ kad::{Quorum, Record}, PeerId, @@ -23,14 +20,9 @@ use sn_protocol::{ storage::{try_serialize_record, RecordKind, RetryStrategy}, NetworkAddress, }; - -use crdts::merkle_reg::MerkleReg; use sn_registers::{Entry, EntryHash, Permissions, Register, RegisterAddress, SignedRegister}; use sn_transfers::{NanoTokens, Payment}; -use std::{ - collections::{BTreeSet, HashSet, LinkedList}, - path::PathBuf, -}; +use std::collections::{BTreeSet, HashSet, LinkedList}; use xor_name::XorName; /// Cached operations made to an offline Register instance are applied locally only, @@ -555,38 +547,6 @@ impl ClientRegister { Ok((storage_cost, royalties_fees)) } - /// Sync multiple Registers with the network. - pub async fn sync_multiple( - registers: impl IntoIterator, - root_dir: PathBuf, - upload_cfg: UploadCfg, - ) -> Result { - let registers_vec = registers.into_iter().collect::>(); - let client = registers_vec - .first() - .ok_or(Error::BatchSyncEmptyList)? - .client - .clone(); - - let mut uploader = Uploader::new(client, root_dir); - uploader.set_upload_cfg(upload_cfg); - uploader.set_collect_registers(true); // override cfg to collect all the registers - uploader.insert_register(registers_vec.iter().map(|reg| (*reg).clone())); - let mut upload_summary = uploader.start_upload().await?; - - // now update the registers - for current_register in registers_vec { - let address = current_register.address(); - let updated_register = upload_summary - .uploaded_registers - .remove(address) - .ok_or(Error::RegisterNotFoundAfterUpload(address.xorname()))?; - *current_register = updated_register; - } - - Ok(upload_summary) - } - /// Push all operations made locally to the replicas of this Register on the network. /// This optionally verifies that the stored Register is the same as our local register. /// diff --git a/sn_client/src/uploader/mod.rs b/sn_client/src/uploader/mod.rs index a689fb384b..f87ed1574b 100644 --- a/sn_client/src/uploader/mod.rs +++ b/sn_client/src/uploader/mod.rs @@ -29,7 +29,7 @@ use tokio::sync::mpsc; use xor_name::XorName; /// The set of options to pass into the `Uploader` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub struct UploadCfg { pub batch_size: usize, pub verify_store: bool, diff --git a/sn_client/tests/folders_api.rs b/sn_client/tests/folders_api.rs index 5350510e6a..609c4abb5c 100644 --- a/sn_client/tests/folders_api.rs +++ b/sn_client/tests/folders_api.rs @@ -8,16 +8,12 @@ // All tests require a network running so Clients can be instantiated. -use sn_client::test_utils::{ - get_funded_wallet, get_new_client, pay_for_storage, random_file_chunk, -}; - +use bls::SecretKey; +use eyre::Result; +use sn_client::test_utils::{get_funded_wallet, get_new_client, random_file_chunk}; use sn_client::{FolderEntry, FoldersApi, Metadata}; use sn_protocol::{storage::ChunkAddress, NetworkAddress}; use sn_registers::{EntryHash, RegisterAddress}; - -use bls::SecretKey; -use eyre::Result; use xor_name::XorName; #[tokio::test] @@ -212,14 +208,8 @@ async fn test_folder_retrieve() -> Result<()> { let (file2_entry_hash, file2_meta_xorname, file2_metadata) = subfolder.add_file("file2.txt".into(), file2_chunk.clone(), None)?; - // let's pay for storage - let mut addrs2pay = vec![folder.as_net_addr(), subfolder.as_net_addr()]; - addrs2pay.extend(folder.meta_addrs_to_pay()); - addrs2pay.extend(subfolder.meta_addrs_to_pay()); - pay_for_storage(&client, wallet_dir, addrs2pay).await?; - - folder.sync(false, None).await?; - subfolder.sync(false, None).await?; + folder.sync(Default::default()).await?; + subfolder.sync(Default::default()).await?; let mut retrieved_folder = FoldersApi::retrieve(client.clone(), wallet_dir, *folder.address()).await?; @@ -304,20 +294,12 @@ async fn test_folder_merge_changes() -> Result<()> { let (file_b2_entry_hash, file_b2_meta_xorname, file_b2_metadata) = subfolder_b.add_file("fileB2.txt".into(), file_b2_chunk.clone(), None)?; - // let's pay for storage - let mut addrs2pay = vec![folder_a.as_net_addr(), subfolder_a.as_net_addr()]; - addrs2pay.extend(folder_a.meta_addrs_to_pay()); - addrs2pay.extend(subfolder_a.meta_addrs_to_pay()); - addrs2pay.extend(folder_b.meta_addrs_to_pay()); - addrs2pay.extend(subfolder_b.meta_addrs_to_pay()); - pay_for_storage(&client, wallet_dir, addrs2pay).await?; - - folder_a.sync(false, None).await?; - subfolder_a.sync(false, None).await?; - folder_b.sync(false, None).await?; - subfolder_b.sync(false, None).await?; - folder_a.sync(false, None).await?; - subfolder_a.sync(false, None).await?; + folder_a.sync(Default::default()).await?; + subfolder_a.sync(Default::default()).await?; + folder_b.sync(Default::default()).await?; + subfolder_b.sync(Default::default()).await?; + folder_a.sync(Default::default()).await?; + subfolder_a.sync(Default::default()).await?; let folder_a_entries = folder_a.entries().await?; let folder_b_entries = folder_b.entries().await?;