Skip to content

Commit

Permalink
simulate download with semaphore locks
Browse files Browse the repository at this point in the history
  • Loading branch information
sameoldlab committed Jun 28, 2024
1 parent fd1c759 commit c1050ea
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 39 deletions.
42 changes: 41 additions & 1 deletion nft-folder/src/download.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::request::{NftImage, NftNode};
use crate::request::{NftImage, NftNode, NftToken};
use base64::decode;
use eyre::{eyre, Result};
use futures::{stream::StreamExt, Stream};
use reqwest::Client;
use std::fmt::format;
use std::sync::Arc;
use std::{fs, io::Cursor, path::PathBuf, time::Duration};
use std::{
fs::File,
Expand All @@ -11,6 +13,43 @@ use std::{
use tokio::{sync::mpsc, time::sleep};
use tokio_util::bytes::Bytes;

use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use tokio::sync::Semaphore;

pub async fn download_image(semaphore: Arc<Semaphore>, data: NftToken, mp: &MultiProgress) {
// println!("==============================================");
// let mp = mp.clone();
let pb = mp.add(ProgressBar::new(100));
tokio::spawn(async move {
let permit = semaphore.acquire().await.unwrap();
// println!("{:#?}", data);

pb.set_style(
ProgressStyle::with_template(
"{spinner:.magenta} {wide_msg} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos:>3}/{len} ({eta:>3})",
)
.unwrap()
.progress_chars("█▉▊▋▌▍▎▏ ")
.tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"])
);
let name = data.name.unwrap();
pb.set_message(format!("downloading {}", &name));
// println!("Get image {} on t:{t}", url);
// println!("starting download for {}", &name);
for i in 0..100 {
let r: u64 = ethers::core::rand::random::<u64>() / 60009360303000000;
std::thread::sleep(Duration::from_millis(r));
pb.set_position(i);
// println!("downloading {} | progress {i}%", &name);
}
// println!("url: {url}");
// println!("Downloaded {} on t:{t}", url);
pb.finish();
// println!("==============================================");
drop(permit); //`semaphore` dropped here while still borrowed
});
}
/*
pub async fn test_progress(node: NftNode, progress_tx: mpsc::Sender<DownloadResult>) {
let file_path = match node.token.name {
Some(name) => name,
Expand Down Expand Up @@ -203,3 +242,4 @@ mod tests {
}
*/
}
*/
27 changes: 1 addition & 26 deletions nft-folder/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,9 @@
// mod download;
mod request;
mod download;

use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use request::handle_processing;
use reqwest::Client;

fn download_image(url: &String, mp: &MultiProgress) {
println!("spawning thread");

let pb = mp.insert_from_back(0, ProgressBar::new(100));
pb.set_style(
ProgressStyle::with_template(
"{spinner:.magenta} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos}/{len} ({eta})",
)
.unwrap()
.progress_chars("█▉▊▋▌▍▎▏ ")
.tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"])
);
// println!("Get image {} on t:{t}", url);
// thread::sleep(Duration::from_millis(r));
// for i in 0..100 {
// let r: u64 = random::<u64>() / 600093603030000000;
// thread::sleep(Duration::from_millis(r));
// // pb.set_position(i);
// }
println!("url: {url}");
// println!("Downloaded {} on t:{t}", url);
pb.finish();
}

#[tokio::main]
async fn main() {
let client = Client::new();
Expand Down
26 changes: 14 additions & 12 deletions nft-folder/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::download::download_image;
use eyre::{eyre, Result};
use futures::{stream, StreamExt};
use indicatif::{MultiProgress, ProgressBar};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::to_value;

const DEBUG: bool = false;
use std::sync::Arc;
use tokio::sync::Semaphore;

#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
Expand Down Expand Up @@ -93,7 +95,7 @@ pub async fn fetch_page(
r#"
query NFTsForAddress {{
tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}],
pagination: {{limit: 2 {} }},
pagination: {{limit: 20 {} }},
where: {{ownerAddresses: "{}"}}) {{
nodes {{
token {{
Expand Down Expand Up @@ -142,19 +144,18 @@ pub async fn fetch_page(
let response_str = String::from_utf8(response_data)
.map_err(|err| eyre!("Failed to convert response to string: {}", err))?;

let response: NftResponse = serde_json::from_str(&response_str).map_err(|err| {
// eprintln!("{}", &response_str);
eyre!("Failed to parse JSON response: {}", err)
})?;
let response: NftResponse = serde_json::from_str(&response_str)
.map_err(|err| eyre!("Failed to parse JSON response: {}", err))?;
// println!("{:?}", response);

let data = response.handle_errors().unwrap();
Ok(data.tokens)
}

pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<()> {
client: &'a Client,
address: &'a str,
) -> impl Stream<Item = eyre::Result<NftToken>> + 'a {
let semaphore = Arc::new(Semaphore::new(4));
let mp = MultiProgress::new();

let cursor = None;
let requests = stream::unfold(cursor, move |cursor| async move {
match fetch_page(&client, cursor, address).await {
Expand All @@ -178,8 +179,9 @@ pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<(
.flatten();
tokio::pin!(requests);

while let Some(data) = requests.next().await {
println!("{:#?}", data);
while let Some(token) = requests.next().await {
// let url = token.token_url.unwrap();
download_image(Arc::clone(&semaphore), token, &mp).await;
}

Ok(())
Expand Down

0 comments on commit c1050ea

Please sign in to comment.