From fdcbdd306be286cb460638a1c42bd422f7559ef1 Mon Sep 17 00:00:00 2001 From: Monius Date: Fri, 28 Jun 2024 09:28:37 +0800 Subject: [PATCH] v0.3.1 remove futures --- Cargo.toml | 6 +++--- src/lib.rs | 44 ++++++++++++++++++-------------------------- 2 files changed, 21 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7f5ab53..f144771 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,15 +1,15 @@ [package] name = "hfd" -version = "0.3.0" +version = "0.3.1" edition = "2021" [dependencies] clap = { version= "4.5.4", features=["derive"] } reqwest = { version = "0.12.4", default-features = false, features = ["stream", "http2", "json", "rustls-tls"] } -tokio = { version = "1.37.0", default-features = false, features = ["rt-multi-thread", "fs"] } +tokio = { version = "1.38.0", default-features = false, features = ["rt-multi-thread", "fs", ] } serde_json = { version = "1.0.117", default-features = false, features = ["alloc"] } -futures = "0.3.30" +tokio-stream = "0.1.15" [[bin]] name = "hfd" diff --git a/src/lib.rs b/src/lib.rs index 28ef4bd..f41574e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use reqwest::header::{HeaderMap, AUTHORIZATION, CONTENT_RANGE, RANGE, USER_AGENT}; use tokio::time::Duration; use tokio::io::{AsyncSeekExt, SeekFrom}; -use futures::StreamExt; +use tokio_stream::StreamExt; const CHUNK_SIZE: usize = 100_000_000; const CHUNK_SIZE_XL: usize = 10_000_000_000; @@ -161,25 +161,19 @@ async fn download( .set_len(length as u64) .await?; - let mut tasks = futures::stream::FuturesUnordered::new(); + let mut tasks = Vec::new(); for s in (0..length).step_by(chunk_size) { let e = std::cmp::min(s + chunk_size - 1, length); - tasks.push(download_chunk_with_retry(headers.clone(), url.clone(), path.clone(), s, e)); + tasks.push( + tokio::spawn(download_chunk_with_retry(headers.clone(), url.clone(), path.clone(), s, e)) + ); } - while let Some(handle) = tasks.next().await { - let res = match handle { - Ok(socket) => { - socket - }, - Err(e) => { - println!("Chunk Error {:#?}", e); - std::thread::sleep(Duration::from_millis(10)); - continue; - } - }; + for task in tasks { + task.await; } + Ok(()) } @@ -265,28 +259,26 @@ impl HfClient { let _ = self.create_dir_all(files.clone()); for chunks in files.chunks(5){ - let mut tasks = futures::stream::FuturesUnordered::new(); + let mut tasks = Vec::new(); for file in chunks{ let url = self.hf_url.path(&file); let path = self.root.join(&file); let headers = self.headers.clone(); + if self.hf_url.endpoint.contains("face"){ - tasks.push(download(headers, url, path, CHUNK_SIZE)); + tasks.push( + download(headers, url, path, CHUNK_SIZE) + ); } else { - tasks.push(download(headers, url, path, CHUNK_SIZE_XL)); + tasks.push( + download(headers, url, path, CHUNK_SIZE_XL) + ); } } - while let Some(handle) = tasks.next().await { - let res = match handle { - Ok(socket) => socket, - Err(e) => { - println!("File Error {:#?}", e); - std::thread::sleep(Duration::from_millis(10)); - continue; - } - }; + for task in tasks { + task.await; } } Ok(())