diff --git a/Cargo.lock b/Cargo.lock index b02dbeecf3..4a4442d359 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5085,6 +5085,7 @@ dependencies = [ "tiny-keccak", "tokio", "tracing", + "uuid", "walkdir", "wasm-bindgen-futures", "wasmtimer", diff --git a/sn_cli/src/subcommands/acc_packet.rs b/sn_cli/src/subcommands/acc_packet.rs index 9cf14b8ac2..2bab7da7a5 100644 --- a/sn_cli/src/subcommands/acc_packet.rs +++ b/sn_cli/src/subcommands/acc_packet.rs @@ -15,7 +15,8 @@ use sn_client::transfers::HotWallet; use sn_client::{Client, FilesApi, FolderEntry, FoldersApi, Metadata, WalletClient}; use crate::subcommands::files::{ - download::download_file, iterative_uploader::IterativeUploader, upload::FilesUploadOptions, + self, download::download_file, iterative_uploader::IterativeUploader, + upload::FilesUploadOptions, }; use color_eyre::{ eyre::{bail, eyre}, @@ -661,14 +662,20 @@ impl AccountPacket { options: FilesUploadOptions, ) -> Result { let files_api = FilesApi::build(self.client.clone(), self.wallet_dir.clone())?; - let chunk_manager = ChunkManager::new(&self.tracking_info_dir.clone()); + let mut chunk_manager = ChunkManager::new(&self.tracking_info_dir.clone()); + + let chunks_to_upload = files::chunks_to_upload( + &files_api, + &mut chunk_manager, + &self.files_dir.clone(), + options.batch_size, + options.make_data_public, + true, + ) + .await?; IterativeUploader::new(chunk_manager, files_api) - .iterate_upload( - self.iter_only_files(), - self.files_dir.clone(), - options.clone(), - ) + .iterate_upload(chunks_to_upload, self.files_dir.clone(), options.clone()) .await?; // Let's make the storage payment for Folders diff --git a/sn_cli/src/subcommands/files.rs b/sn_cli/src/subcommands/files.rs index b76399b67f..cebcca22e8 100644 --- a/sn_cli/src/subcommands/files.rs +++ b/sn_cli/src/subcommands/files.rs @@ -22,6 +22,8 @@ use color_eyre::{ Help, Result, }; use indicatif::{ProgressBar, ProgressStyle}; +use rand::prelude::SliceRandom; +use rand::thread_rng; use sn_client::protocol::storage::{Chunk, ChunkAddress, RetryStrategy}; use sn_client::{Client, FilesApi, BATCH_SIZE}; use std::time::Duration; @@ -101,7 +103,7 @@ pub(crate) async fn files_cmds( verify_store: bool, ) -> Result<()> { let files_api = FilesApi::build(client.clone(), root_dir.to_path_buf())?; - let chunk_manager = ChunkManager::new(root_dir); + let mut chunk_manager = ChunkManager::new(root_dir); match cmds { FilesCmds::Estimate { @@ -118,24 +120,44 @@ pub(crate) async fn files_cmds( retry_strategy, make_data_public, } => { - let total_files = IterativeUploader::new(chunk_manager, files_api) - .iterate_upload( - WalkDir::new(&file_path).into_iter().flatten(), - file_path.clone(), - FilesUploadOptions { - make_data_public, - verify_store, - batch_size, - retry_strategy, - }, - ) - .await?; + let total_files = chunk_manager.chunk_with_iter( + WalkDir::new(&file_path).into_iter().flatten(), + true, + make_data_public, + )?; + if total_files == 0 { if file_path.is_dir() { - bail!("The directory specified for upload is empty. Please verify the provided path."); + bail!( + "The directory specified for upload is empty. \ + Please verify the provided path." + ); } else { bail!("The provided file path is invalid. Please verify the path."); } + } else { + let chunks_to_upload = chunks_to_upload( + &files_api, + &mut chunk_manager, + &file_path, + batch_size, + make_data_public, + false, + ) + .await?; + + IterativeUploader::new(chunk_manager, files_api) + .iterate_upload( + chunks_to_upload, + file_path.clone(), + FilesUploadOptions { + make_data_public, + verify_store, + batch_size, + retry_strategy, + }, + ) + .await?; } } FilesCmds::Download { @@ -251,6 +273,57 @@ pub(crate) async fn files_cmds( Ok(()) } +pub async fn chunks_to_upload( + files_api: &FilesApi, + chunk_manager: &mut ChunkManager, + file_path: &Path, + batch_size: usize, + make_data_public: bool, + acc_pac: bool, +) -> Result> { + let chunks_to_upload = if chunk_manager.is_chunks_empty() { + let chunks = chunk_manager.already_put_chunks(file_path, make_data_public)?; + + let failed_chunks = files_api + .client() + .verify_uploaded_chunks(&chunks, batch_size) + .await?; + + chunk_manager.mark_completed( + chunks + .into_iter() + .filter(|c| !failed_chunks.contains(c)) + .map(|(xor, _)| xor), + )?; + + if failed_chunks.is_empty() { + msg_files_already_uploaded_verified(); + if !make_data_public { + msg_not_public_by_default(); + } + msg_star_line(); + if chunk_manager.completed_files().is_empty() { + msg_chk_mgr_no_verified_file_nor_re_upload(); + } + iterative_uploader::msg_chunk_manager_upload_complete(chunk_manager.clone()); + + if acc_pac { + bail!("") + } else { + return Ok(vec![]); + } + } + msg_unverified_chunks_reattempted(&failed_chunks.len()); + failed_chunks + } else { + let mut chunks = chunk_manager.get_chunks(); + let mut rng = thread_rng(); + chunks.shuffle(&mut rng); + chunks + }; + Ok(chunks_to_upload) +} + pub fn get_progress_bar(length: u64) -> Result { let progress_bar = ProgressBar::new(length); progress_bar.set_style( @@ -261,3 +334,30 @@ pub fn get_progress_bar(length: u64) -> Result { progress_bar.enable_steady_tick(Duration::from_millis(100)); Ok(progress_bar) } + +fn msg_files_already_uploaded_verified() { + println!("All files were already uploaded and verified"); + println!("**************************************"); + println!("* Uploaded Files *"); +} + +fn msg_chk_mgr_no_verified_file_nor_re_upload() { + println!("chunk_manager doesn't have any verified_files, nor any failed_chunks to re-upload."); +} + +fn msg_not_public_by_default() { + println!("* *"); + println!("* These are not public by default. *"); + println!("* Reupload with `-p` option *"); + println!("* to publish the datamaps. *"); +} + +fn msg_star_line() { + println!("**************************************"); +} + +fn msg_unverified_chunks_reattempted(failed_amount: &usize) { + println!( + "{failed_amount} chunks were uploaded in the past but failed to verify. Will attempt to upload them again..." + ); +} diff --git a/sn_cli/src/subcommands/files/iterative_uploader.rs b/sn_cli/src/subcommands/files/iterative_uploader.rs index cb4bc3492a..a72b6308c1 100644 --- a/sn_cli/src/subcommands/files/iterative_uploader.rs +++ b/sn_cli/src/subcommands/files/iterative_uploader.rs @@ -2,8 +2,6 @@ use crate::subcommands::files; use crate::subcommands::files::{ChunkManager, FilesUploadOptions}; use color_eyre::{eyre::eyre, Result}; use indicatif::ProgressBar; -use rand::prelude::SliceRandom; -use rand::thread_rng; use sn_client::transfers::{NanoTokens, TransferError, WalletError}; use sn_client::{Error as ClientError, Error, FileUploadEvent, FilesApi, FilesUpload}; use std::path::PathBuf; @@ -12,7 +10,6 @@ use std::sync::Arc; use std::time::Instant; use tokio::sync::mpsc::Receiver; use tokio::task::JoinHandle; -use walkdir::DirEntry; use xor_name::XorName; pub(crate) struct IterativeUploader { @@ -32,11 +29,11 @@ impl IterativeUploader { impl IterativeUploader { /// Given an iterator over files, upload them. Optionally verify if the data was stored successfully. pub(crate) async fn iterate_upload( - mut self, - entries_iter: impl Iterator, + self, + chunks_to_upload: Vec<(XorName, PathBuf)>, files_path: PathBuf, options: FilesUploadOptions, - ) -> Result { + ) -> Result<()> { let FilesUploadOptions { make_data_public, verify_store, @@ -44,106 +41,38 @@ impl IterativeUploader { retry_strategy, } = options; - let mut rng = thread_rng(); - msg_init(&files_path, &batch_size, &verify_store, make_data_public); - let total_files = - self.chunk_manager - .chunk_with_iter(entries_iter, true, make_data_public)?; - if total_files == 0 { - return Ok(0); - } - - // Return early if we already uploaded them - let mut chunks_to_upload = if self.chunk_manager.is_chunks_empty() { - // make sure we don't have any failed chunks in those - - let chunks = self - .chunk_manager - .already_put_chunks(&files_path, make_data_public)?; - println!( - "Files upload attempted previously, verifying {} chunks", - chunks.len() - ); - - let failed_chunks = self - .files_api - .client() - .verify_uploaded_chunks(&chunks, batch_size) - .await?; - - // mark the non-failed ones as completed - self.chunk_manager.mark_completed( - chunks - .into_iter() - .filter(|c| !failed_chunks.contains(c)) - .map(|(xor, _)| xor), - )?; - - // if none are failed, we can return early - if failed_chunks.is_empty() { - msg_files_already_uploaded_verified(); - if !make_data_public { - msg_not_public_by_default(); - } - msg_star_line(); - if self.chunk_manager.completed_files().is_empty() { - msg_chk_mgr_no_verified_file_nor_re_upload(); - } - msg_chunk_manager_upload_complete(self.chunk_manager); - return Ok(total_files); - } - msg_unverified_chunks_reattempted(&failed_chunks.len()); - failed_chunks - } else { - self.chunk_manager.get_chunks() - }; - - // Random shuffle the chunks_to_upload, so that uploading of a large file can be speed up by - // having multiple client instances uploading the same target. - chunks_to_upload.shuffle(&mut rng); - - let chunk_amount_to_upload = chunks_to_upload.len(); - let progress_bar = files::get_progress_bar(chunks_to_upload.len() as u64)?; - let total_existing_chunks = Arc::new(AtomicU64::new(0)); let mut files_upload = FilesUpload::new(self.files_api) .set_batch_size(batch_size) .set_verify_store(verify_store) .set_retry_strategy(retry_strategy); - - let upload_event_rx = files_upload.get_upload_events(); - // keep track of the progress in a separate task - let progress_bar_clone = progress_bar.clone(); - let total_existing_chunks_clone = total_existing_chunks.clone(); - + let progress_bar = files::get_progress_bar(chunks_to_upload.len() as u64)?; + let total_existing_chunks = Arc::new(AtomicU64::new(0)); let process_join_handle = spawn_progress_handler( self.chunk_manager, make_data_public, progress_bar, - upload_event_rx, - progress_bar_clone, - total_existing_chunks_clone, + files_upload.get_upload_events(), + total_existing_chunks.clone(), ); - msg_uploading_chunks(chunk_amount_to_upload); - + msg_uploading_chunks(&chunks_to_upload.len()); let current_instant = Instant::now(); - - IterativeUploader::upload_result(chunks_to_upload, &mut files_upload).await?; + IterativeUploader::upload_result(chunks_to_upload.clone(), &mut files_upload).await?; process_join_handle .await? .map_err(|err| eyre!("Failed to write uploaded files with err: {err:?}"))?; msg_final( - chunk_amount_to_upload, + chunks_to_upload.len(), current_instant, total_existing_chunks, files_upload, ); - Ok(total_files) + Ok(()) } async fn upload_result( @@ -171,8 +100,7 @@ fn spawn_progress_handler( make_data_public: bool, progress_bar: ProgressBar, mut upload_event_rx: Receiver, - progress_bar_clone: ProgressBar, - total_existing_chunks_clone: Arc, + total_existing_chunks: Arc, ) -> JoinHandle> { tokio::spawn(async move { let mut upload_terminated_with_error = false; @@ -180,15 +108,15 @@ fn spawn_progress_handler( while let Some(event) = upload_event_rx.recv().await { match event { FileUploadEvent::Uploaded(addr) => { - progress_bar_clone.inc(1); + progress_bar.clone().inc(1); if let Err(err) = chunk_manager.mark_completed(std::iter::once(*addr.xorname())) { error!("Failed to mark chunk {addr:?} as completed: {err:?}"); } } FileUploadEvent::AlreadyExistsInNetwork(addr) => { - let _ = total_existing_chunks_clone.fetch_add(1, Ordering::Relaxed); - progress_bar_clone.inc(1); + let _ = total_existing_chunks.fetch_add(1, Ordering::Relaxed); + progress_bar.clone().inc(1); if let Err(err) = chunk_manager.mark_completed(std::iter::once(*addr.xorname())) { error!("Failed to mark chunk {addr:?} as completed: {err:?}"); @@ -291,7 +219,8 @@ fn msg_final( msg_made_payment_info(files_upload.get_upload_storage_cost(), uploaded_chunks); } -fn msg_chunk_manager_upload_complete(chunk_manager: ChunkManager) { + +pub fn msg_chunk_manager_upload_complete(chunk_manager: ChunkManager) { for (file_name, addr) in chunk_manager.completed_files() { let hex_addr = addr.to_hex(); if let Some(file_name) = file_name.to_str() { @@ -340,29 +269,11 @@ fn msg_payment_details( println!("Made payment of {total_royalty_fees} for royalties fees"); println!("New wallet balance: {final_balance}"); } - -fn msg_chk_mgr_no_verified_file_nor_re_upload() { - println!("chunk_manager doesn't have any verified_files, nor any failed_chunks to re-upload."); -} - fn msg_star_line() { println!("**************************************"); } -fn msg_not_public_by_default() { - println!("* *"); - println!("* These are not public by default. *"); - println!("* Reupload with `-p` option *"); - println!("* to publish the datamaps. *"); -} - -fn msg_files_already_uploaded_verified() { - println!("All files were already uploaded and verified"); - println!("**************************************"); - println!("* Uploaded Files *"); -} - -fn msg_uploading_chunks(chunks_to_upload_len: usize) { +fn msg_uploading_chunks(chunks_to_upload_len: &usize) { println!("Uploading {chunks_to_upload_len} chunks",); } @@ -377,8 +288,3 @@ fn msg_uploaded_files_banner() { println!("**************************************"); println!("* Uploaded Files *"); } -fn msg_unverified_chunks_reattempted(failed_amount: &usize) { - println!( - "{failed_amount} chunks were uploaded in the past but failed to verify. Will attempt to upload them again..." - ); -} diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index 96cc3cdbaa..5c6caa9092 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -1107,6 +1107,10 @@ impl Client { } } + println!( + "Files upload attempted previously, verifying {} chunks", + failed_chunks.len() + ); Ok(failed_chunks) } } diff --git a/sn_networking/Cargo.toml b/sn_networking/Cargo.toml index 02a0bd6d65..c5cc9a820d 100644 --- a/sn_networking/Cargo.toml +++ b/sn_networking/Cargo.toml @@ -75,6 +75,7 @@ bls = { package = "blsttc", version = "8.0.1" } libp2p-identity = { version = "0.2.7", features = ["rand"] } quickcheck = "1.0.3" eyre = "0.6.8" +uuid = { version = "1.5.0", features = ["v4"] } [lints] workspace = true diff --git a/sn_networking/src/record_store.rs b/sn_networking/src/record_store.rs index 562f5265f5..88f0ca82bb 100644 --- a/sn_networking/src/record_store.rs +++ b/sn_networking/src/record_store.rs @@ -462,7 +462,7 @@ impl NodeRecordStore { }) .count(); - debug!("Relevant records len is {:?}", relevant_records_len); + debug!("Relevant records len is {relevant_records_len:?}"); relevant_records_len } @@ -945,9 +945,14 @@ mod tests { async fn get_records_within_distance_range() -> eyre::Result<()> { let max_records = 50; + let temp_dir = std::env::temp_dir(); + let unique_dir_name = uuid::Uuid::new_v4().to_string(); + let storage_dir = temp_dir.join(unique_dir_name); + // setup the store let store_config = NodeRecordStoreConfig { max_records, + storage_dir, ..Default::default() }; let self_id = PeerId::random();