Skip to content

Commit

Permalink
v0.3.1 remove futures
Browse files Browse the repository at this point in the history
  • Loading branch information
Mon-ius committed Jun 28, 2024
1 parent 614ad73 commit fdcbdd3
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 29 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@

[package]
name = "hfd"
version = "0.3.0"
version = "0.3.1"
edition = "2021"

[dependencies]
clap = { version= "4.5.4", features=["derive"] }
reqwest = { version = "0.12.4", default-features = false, features = ["stream", "http2", "json", "rustls-tls"] }
tokio = { version = "1.37.0", default-features = false, features = ["rt-multi-thread", "fs"] }
tokio = { version = "1.38.0", default-features = false, features = ["rt-multi-thread", "fs", ] }
serde_json = { version = "1.0.117", default-features = false, features = ["alloc"] }
futures = "0.3.30"
tokio-stream = "0.1.15"

[[bin]]
name = "hfd"
Expand Down
44 changes: 18 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::PathBuf;
use reqwest::header::{HeaderMap, AUTHORIZATION, CONTENT_RANGE, RANGE, USER_AGENT};
use tokio::time::Duration;
use tokio::io::{AsyncSeekExt, SeekFrom};
use futures::StreamExt;
use tokio_stream::StreamExt;

const CHUNK_SIZE: usize = 100_000_000;
const CHUNK_SIZE_XL: usize = 10_000_000_000;
Expand Down Expand Up @@ -161,25 +161,19 @@ async fn download(
.set_len(length as u64)
.await?;

let mut tasks = futures::stream::FuturesUnordered::new();
let mut tasks = Vec::new();

for s in (0..length).step_by(chunk_size) {
let e = std::cmp::min(s + chunk_size - 1, length);
tasks.push(download_chunk_with_retry(headers.clone(), url.clone(), path.clone(), s, e));
tasks.push(
tokio::spawn(download_chunk_with_retry(headers.clone(), url.clone(), path.clone(), s, e))
);
}

while let Some(handle) = tasks.next().await {
let res = match handle {
Ok(socket) => {
socket
},
Err(e) => {
println!("Chunk Error {:#?}", e);
std::thread::sleep(Duration::from_millis(10));
continue;
}
};
for task in tasks {
task.await;
}

Ok(())
}

Expand Down Expand Up @@ -265,28 +259,26 @@ impl HfClient {
let _ = self.create_dir_all(files.clone());

for chunks in files.chunks(5){
let mut tasks = futures::stream::FuturesUnordered::new();
let mut tasks = Vec::new();
for file in chunks{
let url = self.hf_url.path(&file);
let path = self.root.join(&file);
let headers = self.headers.clone();

if self.hf_url.endpoint.contains("face"){
tasks.push(download(headers, url, path, CHUNK_SIZE));
tasks.push(
download(headers, url, path, CHUNK_SIZE)
);
}
else {
tasks.push(download(headers, url, path, CHUNK_SIZE_XL));
tasks.push(
download(headers, url, path, CHUNK_SIZE_XL)
);
}
}

while let Some(handle) = tasks.next().await {
let res = match handle {
Ok(socket) => socket,
Err(e) => {
println!("File Error {:#?}", e);
std::thread::sleep(Duration::from_millis(10));
continue;
}
};
for task in tasks {
task.await;
}
}
Ok(())
Expand Down

0 comments on commit fdcbdd3

Please sign in to comment.