Skip to content

Commit

Permalink
feat: housekeeping
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonPaulGithub committed Mar 11, 2024
1 parent 9b0dc52 commit 0d0f1eb
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 25 deletions.
2 changes: 1 addition & 1 deletion sn_cli/src/subcommands/acc_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ impl AccountPacket {
let files_api = FilesApi::build(self.client.clone(), self.wallet_dir.clone())?;
let mut chunk_manager = ChunkManager::new(&self.tracking_info_dir.clone());

let chunks_to_upload = files::ctu(
let chunks_to_upload = files::chunks_to_upload(
&files_api,
&mut chunk_manager,
&self.files_dir.clone(),
Expand Down
4 changes: 2 additions & 2 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub(crate) async fn files_cmds(
bail!("The provided file path is invalid. Please verify the path.");
}
} else {
let chunks_to_upload = ctu(
let chunks_to_upload = chunks_to_upload(
&files_api,
&mut chunk_manager,
&file_path,
Expand Down Expand Up @@ -272,7 +272,7 @@ pub(crate) async fn files_cmds(
Ok(())
}

pub async fn ctu(
pub async fn chunks_to_upload(
files_api: &FilesApi,
chunk_manager: &mut ChunkManager,
file_path: &Path,
Expand Down
34 changes: 12 additions & 22 deletions sn_cli/src/subcommands/files/iterative_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,39 +43,30 @@ impl IterativeUploader {

msg_init(&files_path, &batch_size, &verify_store, make_data_public);

let chunk_amount_to_upload = chunks_to_upload.len();
let progress_bar = files::get_progress_bar(chunks_to_upload.len() as u64)?;
let total_existing_chunks = Arc::new(AtomicU64::new(0));
let mut files_upload = FilesUpload::new(self.files_api)
.set_batch_size(batch_size)
.set_verify_store(verify_store)
.set_retry_strategy(retry_strategy);

let upload_event_rx = files_upload.get_upload_events();
let progress_bar_clone = progress_bar.clone();
let total_existing_chunks_clone = total_existing_chunks.clone();

let progress_bar = files::get_progress_bar(chunks_to_upload.len() as u64)?;
let total_existing_chunks = Arc::new(AtomicU64::new(0));
let process_join_handle = spawn_progress_handler(
self.chunk_manager,
make_data_public,
progress_bar,
upload_event_rx,
progress_bar_clone,
total_existing_chunks_clone,
files_upload.get_upload_events(),
total_existing_chunks.clone(),
);

msg_uploading_chunks(chunk_amount_to_upload);

msg_uploading_chunks(&chunks_to_upload.len());
let current_instant = Instant::now();

IterativeUploader::upload_result(chunks_to_upload, &mut files_upload).await?;
IterativeUploader::upload_result(chunks_to_upload.clone(), &mut files_upload).await?;

process_join_handle
.await?
.map_err(|err| eyre!("Failed to write uploaded files with err: {err:?}"))?;

msg_final(
chunk_amount_to_upload,
chunks_to_upload.len(),
current_instant,
total_existing_chunks,
files_upload,
Expand Down Expand Up @@ -109,24 +100,23 @@ fn spawn_progress_handler(
make_data_public: bool,
progress_bar: ProgressBar,
mut upload_event_rx: Receiver<FileUploadEvent>,
progress_bar_clone: ProgressBar,
total_existing_chunks_clone: Arc<AtomicU64>,
total_existing_chunks: Arc<AtomicU64>,
) -> JoinHandle<Result<(), Error>> {
tokio::spawn(async move {
let mut upload_terminated_with_error = false;
// The loop is guaranteed to end, as the channel will be closed when the upload completes or errors out.
while let Some(event) = upload_event_rx.recv().await {
match event {
FileUploadEvent::Uploaded(addr) => {
progress_bar_clone.inc(1);
progress_bar.clone().inc(1);
if let Err(err) = chunk_manager.mark_completed(std::iter::once(*addr.xorname()))
{
error!("Failed to mark chunk {addr:?} as completed: {err:?}");
}
}
FileUploadEvent::AlreadyExistsInNetwork(addr) => {
let _ = total_existing_chunks_clone.fetch_add(1, Ordering::Relaxed);
progress_bar_clone.inc(1);
let _ = total_existing_chunks.fetch_add(1, Ordering::Relaxed);
progress_bar.clone().inc(1);
if let Err(err) = chunk_manager.mark_completed(std::iter::once(*addr.xorname()))
{
error!("Failed to mark chunk {addr:?} as completed: {err:?}");
Expand Down Expand Up @@ -283,7 +273,7 @@ fn msg_star_line() {
println!("**************************************");
}

fn msg_uploading_chunks(chunks_to_upload_len: usize) {
fn msg_uploading_chunks(chunks_to_upload_len: &usize) {
println!("Uploading {chunks_to_upload_len} chunks",);
}

Expand Down

0 comments on commit 0d0f1eb

Please sign in to comment.