From c1927bf9b1c9ef7bd84cc2fe9aa0c03a1c44e827 Mon Sep 17 00:00:00 2001 From: ibro Date: Tue, 25 Jun 2024 15:56:56 +0100 Subject: [PATCH 01/18] bmrk --- nft-folder/src/download.rs | 118 +++++++++++++------------------------ nft-folder/src/main.rs | 72 ++++++++++++++++++---- 2 files changed, 99 insertions(+), 91 deletions(-) diff --git a/nft-folder/src/download.rs b/nft-folder/src/download.rs index af2ef29..e743342 100644 --- a/nft-folder/src/download.rs +++ b/nft-folder/src/download.rs @@ -12,16 +12,23 @@ use tokio::{sync::mpsc, time::sleep}; use tokio_util::bytes::Bytes; pub async fn test_progress(node: NftNode, progress_tx: mpsc::Sender) { - let file_path = match node.token.name { - Some(name) => name, - None => "d".to_string() - }; - - for n in 0..100 { - sleep(Duration::from_millis(500)).await; - let file_path = file_path.clone(); - progress_tx.send(DownloadResult {file_path, progress: n, total: 100}).await.unwrap(); - } + let file_path = match node.token.name { + Some(name) => name, + None => "d".to_string(), + }; + + for n in 0..100 { + sleep(Duration::from_millis(500)).await; + let file_path = file_path.clone(); + progress_tx + .send(DownloadResult { + file_path, + progress: n, + total: 100, + }) + .await + .unwrap(); + } } const DEBUG: bool = false; @@ -52,7 +59,7 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R }; let file_path = dir.join(format!("{name}.{extension}")); - + if file_path.is_file() { if DEBUG { println!("Skipping {name}"); @@ -78,7 +85,8 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R let parts: Vec<&str> = url.split('/').collect(); if let Some(hash) = parts.iter().find(|&&part| part.starts_with("Qm")) { let ipfs_url = format!("https://ipfs.io/ipfs/{hash}"); - if let Err(error) = download_image(&client, &ipfs_url, file_path, progress_tx).await { + if let Err(error) = download_image(&client, &ipfs_url, file_path, progress_tx).await + { return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); } } @@ -111,53 +119,6 @@ pub struct DownloadResult { total: u64, } -struct ProgressTracker { - progress: u64, -} -impl ProgressTracker { - fn new() -> Self { - ProgressTracker { progress: 0 } - } - - // async fn track_progress( - async fn track_progress> + Unpin>( - &mut self, - index: usize, - mut reader_stream: R, - mut file: File, - progress_tx: &mpsc::Sender<(usize, u64)>, - ) -> Result<()> { - let mut buffer = [0; 8192]; - while let Some(chunk_result) = reader_stream.next().await { - let chunk = match chunk_result { - Ok(chunk) => chunk, - Err(e) => return Err(e.into()), - }; - - let mut cursor = Cursor::new(chunk); - let bytes_read = cursor.read(&mut buffer)?; - file.write_all(&buffer[..bytes_read])?; - self.progress += bytes_read as u64; - - match progress_tx.try_send((index, self.progress)) { - Ok(_) => { - // The progress update was sent successfully. - } - Err(mpsc::error::TrySendError::Full(_)) => { - // The receiver's buffer is full, you can either: - // 1. Drop the progress update and continue downloading - // 2. Wait for the receiver to process some messages before sending more updates - } - Err(mpsc::error::TrySendError::Closed(_)) => { - // The receiver was dropped, so we stop sending progress updates. - break; - } - } - } - Ok(()) - } -} - async fn download_image( client: &Client, image_url: &str, @@ -168,20 +129,20 @@ async fn download_image( let content_length = response.content_length().unwrap_or(0); let mut byte_stream = response.bytes_stream(); - let mut progress: u64 = 0; + let mut progress: u64 = 0; let mut file = File::create(file_path)?; while let Some(chunk) = byte_stream.next().await { - let chunk = chunk?; - let chunk_len = chunk.len(); + let chunk = chunk?; + let chunk_len = chunk.len(); - progress += chunk_len as u64; - file.write_all(&chunk) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + progress += chunk_len as u64; + file.write_all(&chunk) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - // Send progress update through the channel - let _ = progress_tx.send((progress, content_length)).await; - } + // Send progress update through the channel + let _ = progress_tx.send((progress, content_length)).await; + } if content_length != progress { return Err(eyre::eyre!( @@ -192,9 +153,8 @@ async fn download_image( Ok(()) } -pub async fn create_directory(dir_path: &PathBuf) -> Result - { - let res = match fs::metadata(dir_path) { +pub async fn create_directory(dir_path: &PathBuf) -> Result { + let res = match fs::metadata(dir_path) { Ok(metadata) => { if !metadata.is_dir() { return Err(io::Error::new( @@ -203,12 +163,14 @@ pub async fn create_directory(dir_path: &PathBuf) -> Result ) .into()); } - dir_path.to_path_buf() + dir_path.to_path_buf() } Err(e) if e.kind() == ErrorKind::NotFound => { fs::create_dir_all(dir_path)?; - if DEBUG { println!("created directory: {:?}", dir_path);} - dir_path.to_path_buf() + if DEBUG { + println!("created directory: {:?}", dir_path); + } + dir_path.to_path_buf() } Err(e) => { return Err(e.into()); @@ -226,8 +188,8 @@ fn save_base64_image(base64_data: &str, file_path: PathBuf) -> Result<()> { #[cfg(test)] mod tests { - /* - use super::*; + /* + use super::*; #[test] async fn resolve() { @@ -239,5 +201,5 @@ mod tests { assert_eq!(result, "0x"); } - */ - } + */ +} diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index ef0e6ce..a4351a9 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,5 +1,8 @@ mod download; mod request; + +use crate::download::{create_directory, handle_download, DownloadResult}; +use crate::request::NftResponse; use ::core::time; use clap::{Args, Parser, Subcommand}; use console::style; @@ -8,8 +11,6 @@ use ethers_providers::{Http, Middleware, Provider}; use eyre::Result; use futures::stream::{self, StreamExt}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use crate::download::{create_directory, handle_download, DownloadResult}; -use crate::request::NftResponse; use reqwest::Client; use std::borrow::Borrow; use tokio::sync::mpsc; @@ -84,8 +85,7 @@ async fn main() -> Result<()> { }; // Request let spinner = pending(&multi_pb, "Requesting collection data...".to_string()); - let nodes = NftResponse::request(&account.address) - .await?.tokens.nodes; + let nodes = NftResponse::request(&account.address).await?.tokens.nodes; spinner .finish_with_message(format!("Found {} NFTs. Starting download...", nodes.len())); @@ -283,17 +283,63 @@ async fn resolve_ens_name(ens_name: &str, provider: &Provider) -> Result ProgressBar { - let spinner = multi_pb.add( - ProgressBar::new_spinner().with_style( - ProgressStyle::default_spinner() - .template("{spinner} {prefix} {msg}") - .unwrap(), // .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"), - ), - ); - - spinner.set_prefix(format!("{}", style("INFO").bold().on_blue())); + // https://github.com/sindresorhus/cli-spinners/blob/main/spinners.json + let style = ProgressStyle::default_spinner() + .template("{spinner} {prefix:.bold.blue} {msg}") + .unwrap() + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]); + let spinner = multi_pb.add(ProgressBar::new_spinner().with_style(style)); + spinner.set_prefix("INFO"); spinner.set_message(msg); spinner.enable_steady_tick(time::Duration::from_millis(100)); spinner } + +/* +#[tokio::main] +async fn main() -> eyre::Result<()>{ + let nodes = vec![ + "first".to_string(), + "second".to_string(), + "third".to_string(), + "fourth node".to_string(), + ]; + + let (tx, rx) = std::sync::mpsc::channel::(); + let tasks = stream::iter(nodes.into_iter().map(|node| { + let tx = tx.clone(); + async move { worker2(node, tx).await } + })) + .buffer_unordered(2); + + // indicatif Multiprogress + let multi_pb = MultiProgress::new(); + // Tracks total progress of nodes + let total_pb = multi_pb.add(ProgressBar::new(nodes.len().try_into()?)); + + tasks + .for_each(|result| async { + let pb = multi_pb.insert_before(&total_pb, ProgressBar::new(100)); + pb.set_style( + ProgressStyle::with_template( + "{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ", + ) + .unwrap() + .progress_chars("██ "), + ); + match result { + Ok(res) => { + pb.set_message(res); + while let Ok(recv) = rx.recv() { + let pos = recv.progress * 100 / recv.total; + pb.set_position(pos); + } + total_pb.inc(1); + } + Err(_err) => pb.abandon_with_message("Error during download"), + } + }) + .await; + Ok(()) +} */ From 5b7e257b0925da2944788762ca4ffaf2969589a2 Mon Sep 17 00:00:00 2001 From: ibro Date: Tue, 25 Jun 2024 15:58:49 +0100 Subject: [PATCH 02/18] add crossbeam --- Cargo.lock | 10 ++++++++++ nft-folder/Cargo.toml | 2 ++ 2 files changed, 12 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index b737328..159aa1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,6 +537,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -1881,6 +1890,7 @@ dependencies = [ "base64 0.13.1", "clap", "console", + "crossbeam-channel", "ethers", "ethers-providers", "eyre", diff --git a/nft-folder/Cargo.toml b/nft-folder/Cargo.toml index 38525d5..9591727 100644 --- a/nft-folder/Cargo.toml +++ b/nft-folder/Cargo.toml @@ -25,3 +25,5 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = ["macros", "rt-multi-thread", "fs", "sync"] } tokio-util = { version = "0.7.10", features = ["io-util"] } +crossbeam-channel = "0.5.13" +crossbeam-utils = "0.8.20" From 1e8290ee9f6e63f3a680287424abf39b227be128 Mon Sep 17 00:00:00 2001 From: ibro Date: Tue, 25 Jun 2024 15:59:23 +0100 Subject: [PATCH 03/18] mv main --- nft-folder/src/{main.rs => main-bk.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename nft-folder/src/{main.rs => main-bk.rs} (100%) diff --git a/nft-folder/src/main.rs b/nft-folder/src/main-bk.rs similarity index 100% rename from nft-folder/src/main.rs rename to nft-folder/src/main-bk.rs From 84d533dda7e809b55b642454754d18819a090a5a Mon Sep 17 00:00:00 2001 From: ibro Date: Tue, 25 Jun 2024 18:03:30 +0100 Subject: [PATCH 04/18] channel comms --- nft-folder/src/main.rs | 81 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 nft-folder/src/main.rs diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs new file mode 100644 index 0000000..229e830 --- /dev/null +++ b/nft-folder/src/main.rs @@ -0,0 +1,81 @@ +use crossbeam_channel::{bounded, unbounded, Sender}; +use crossbeam_utils::thread::scope; +use ethers::core::rand::random; +use std::{thread, time::Duration}; + +enum QueryResult { + Urls(Vec), + Finished, +} + +fn query_api(sender: Sender, mut cursor: i32) { + if cursor == 0 { + let _ = sender.send(QueryResult::Finished); + drop(sender); + return; + } + + let urls = vec![ + "https://example.com/image1.jpg".to_owned(), + "https://example.com/image2.jpg".to_owned(), + "https://example.com/image3.jpg".to_owned(), + "https://example.com/image4.jpg".to_owned(), + "https://example.com/image5.jpg".to_owned(), + "https://example.com/image6.jpg".to_owned(), + ]; + let r: u64 = random::() / 50093603030000000; + thread::sleep(Duration::from_millis(r)); + println!("sending result"); + let _ = sender.send(QueryResult::Urls(urls)); + + cursor -= 1; + query_api(sender, cursor); +} + +fn download_image(url: String, t: i32) { + let r: u64 = random::() / 1009360303000000; + + println!("Get image {} on t:{t}", url); + thread::sleep(Duration::from_millis(r)); + println!("Downloaded {} on t:{t}", url); +} + +fn main() -> Result<(), Box> { + let (sender, receiver) = unbounded(); + let (task_sender, task_receiver) = unbounded(); + + let query_thread = thread::spawn(move || { + query_api(sender, 4); + + }); + + scope(|s| { + for t in 0..5 { + let task_receiver = task_receiver.clone(); + s.spawn(move |_| { + for url in task_receiver { + download_image(url, t); + } + }); + } + + for task in receiver { + match task { + QueryResult::Urls(task) => { + for url in task { + let _ = task_sender.send(url); + } + } + QueryResult::Finished => { + drop(task_sender); + break; + } + } + } + }) + .unwrap(); + + query_thread.join().unwrap(); + + Ok(()) +} From 6ccf237d09e467be3631cb17575140f6a8ea74d4 Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 04:35:39 +0100 Subject: [PATCH 05/18] rewrite query_api -> stream::unfold --- nft-folder/src/main.rs | 117 ++++++++++++-------------- nft-folder/src/request.rs | 170 +++++++++++++++++++++++--------------- 2 files changed, 155 insertions(+), 132 deletions(-) diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index 229e830..a1b4925 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,81 +1,66 @@ -use crossbeam_channel::{bounded, unbounded, Sender}; -use crossbeam_utils::thread::scope; -use ethers::core::rand::random; -use std::{thread, time::Duration}; +// mod download; +mod request; -enum QueryResult { - Urls(Vec), - Finished, -} - -fn query_api(sender: Sender, mut cursor: i32) { - if cursor == 0 { - let _ = sender.send(QueryResult::Finished); - drop(sender); - return; - } +use futures::StreamExt; +use tokio::sync::{Semaphore, SemaphorePermit}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use request::request; - let urls = vec![ - "https://example.com/image1.jpg".to_owned(), - "https://example.com/image2.jpg".to_owned(), - "https://example.com/image3.jpg".to_owned(), - "https://example.com/image4.jpg".to_owned(), - "https://example.com/image5.jpg".to_owned(), - "https://example.com/image6.jpg".to_owned(), - ]; - let r: u64 = random::() / 50093603030000000; - thread::sleep(Duration::from_millis(r)); - println!("sending result"); - let _ = sender.send(QueryResult::Urls(urls)); +fn download_image(url: &String, mp: &MultiProgress) { + println!("spawning thread"); - cursor -= 1; - query_api(sender, cursor); + let pb = mp.insert_from_back(0, ProgressBar::new(100)); + pb.set_style( + ProgressStyle::with_template( + "{spinner:.magenta} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos}/{len} ({eta})", + ) + .unwrap() + .progress_chars("█▉▊▋▌▍▎▏ ") + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]) + ); + // println!("Get image {} on t:{t}", url); + // thread::sleep(Duration::from_millis(r)); + // for i in 0..100 { + // let r: u64 = random::() / 600093603030000000; + // thread::sleep(Duration::from_millis(r)); + // // pb.set_position(i); + // } + println!("url: {url}"); + // println!("Downloaded {} on t:{t}", url); + pb.finish(); } -fn download_image(url: String, t: i32) { - let r: u64 = random::() / 1009360303000000; +#[tokio::main] +async fn main() -> Result<(), Box> { + let (s, r) = futures::channel::mpsc::unbounded(); - println!("Get image {} on t:{t}", url); - thread::sleep(Duration::from_millis(r)); - println!("Downloaded {} on t:{t}", url); -} + let mp = MultiProgress::new(); -fn main() -> Result<(), Box> { - let (sender, receiver) = unbounded(); - let (task_sender, task_receiver) = unbounded(); + let receiver_stream = r.for_each(move |url| { + let mp = mp.clone(); + tokio::spawn(async move { + while let Some(url) = &r.next().await { + download_image(url, &mp); + } + }); + }); - let query_thread = thread::spawn(move || { - query_api(sender, 4); - - }); + let address = "0x495f947276749Ce646f68AC8c248420045cb7b5e"; + let client = reqwest::Client::new(); - scope(|s| { - for t in 0..5 { - let task_receiver = task_receiver.clone(); - s.spawn(move |_| { - for url in task_receiver { - download_image(url, t); - } - }); - } + let stream = request(&client, &address).await; + tokio::pin!(stream); - for task in receiver { - match task { - QueryResult::Urls(task) => { - for url in task { - let _ = task_sender.send(url); - } - } - QueryResult::Finished => { - drop(task_sender); - break; - } - } + while let Some(result) = stream.next().await { + let token = result?; + println!("received token: {:?}", token); + let url = token.token_url.unwrap(); + if let Err(e) = s.unbounded_send(url) { + eprintln!("Error sending url to download task: {}", e); } - }) - .unwrap(); + } - query_thread.join().unwrap(); + drop(s); Ok(()) } diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index 86681c2..fe318df 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -1,9 +1,11 @@ use eyre::{eyre, Result}; +use futures::{stream, Stream, StreamExt}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::to_value; const DEBUG: bool = false; + #[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] #[serde(rename_all = "camelCase")] @@ -33,20 +35,19 @@ pub struct NftNode { #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct PageInfo { - end_cursor: String, + end_cursor: Option, has_next_page: bool, limit: i32, } - #[derive(Serialize, Deserialize, Debug)] -pub struct NftTokens { +pub struct NftNodes { pub nodes: Vec, #[serde(rename = "camelCase")] pub page_info: PageInfo, } #[derive(Serialize, Deserialize, Debug)] pub struct NftData { - pub tokens: NftTokens, + pub tokens: NftNodes, } #[derive(Deserialize, Serialize, Debug)] pub struct FailedRequest { @@ -72,76 +73,113 @@ impl NftResponse { match self { NftResponse::Success { data } => Some(data), NftResponse::Error { errors } => { - println!("Errors: {:?}", errors); + eprintln!("Errors: {:?}", errors); None } } } +} + +async fn fetch_page(client: &Client, cursor: Option, address: &str) -> Result { + let cursor = match cursor { + Some(c) => format!(r#"after: "{}"""#, c), + None => "".to_owned(), + }; + + let query = format!( + r#" + query NFTsForAddress {{ + tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}], + pagination: {{limit: 20, {} }}, + where: {{ownerAddresses: "{}"}}) {{ + nodes {{ + token {{ + tokenId + tokenUrl + collectionName + name + image {{ + url + size + mimeType + }} + metadata + }} + }} + pageInfo {{ + endCursor + hasNextPage + limit + }} + }} + }} + "#, + cursor, address + ); - pub async fn request(address: &str) -> Result { - let query = format!( - r#" - query NFTsForAddress {{ - tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}], - pagination: {{limit: 32}}, - where: {{ownerAddresses: "{}"}}) {{ - nodes {{ - token {{ - tokenId - tokenUrl - collectionName - name - image {{ - url - size - mimeType - }} - metadata - }} - pageInfo {{ - endCursor - hasNextPage - limit - }} - }} - }} - }} - "#, - address - ); - - let request_body = to_value(serde_json::json!({ + let request_body = to_value(serde_json::json!({ "query": query, "variables": null, - }))?; - - let response = Client::new() - .post("https://api.zora.co/graphql") - .json(&request_body) - .send() - .await - .map_err(|err| eyre!("Failed to send request: {}", err))?; - let mut response_body = response.bytes_stream(); - - let mut response_data = Vec::new(); - while let Some(item) = futures::StreamExt::next(&mut response_body).await { - let chunk = item.map_err(|err| eyre!("Failed to read response: {}", err))?; - response_data.extend_from_slice(&chunk); - } + }))?; - let response_str = String::from_utf8(response_data) - .map_err(|err| eyre!("Failed to convert response to string: {}", err))?; - if DEBUG { - println!("{}", &response_str); - } - let response: NftResponse = serde_json::from_str(&response_str) - .map_err(|err| eyre!("Failed to parse JSON response: {}", err))?; + let response = client + .post("https://api.zora.co/graphql") + .json(&request_body) + .send() + .await + .map_err(|err| eyre!("Failed to send request: {}", err))?; + let mut response_body = response.bytes_stream(); - let data = response.handle_errors().unwrap(); - if DEBUG { - println!("{:#?}", &data); - } - - Ok(data) + let mut response_data = Vec::new(); + while let Some(item) = futures::StreamExt::next(&mut response_body).await { + let chunk = item.map_err(|err| eyre!("Failed to read response: {}", err))?; + response_data.extend_from_slice(&chunk); } + + let response_str = String::from_utf8(response_data) + .map_err(|err| eyre!("Failed to convert response to string: {}", err))?; + + let response: NftResponse = serde_json::from_str(&response_str).map_err(|err| { + // eprintln!("{}", &response_str); + eyre!("Failed to parse JSON response: {}", err) + })?; + + let data = response.handle_errors().unwrap(); + + + /* if data.tokens.page_info.has_next_page == false { + let _ = sender.send(QueryResult::Finished); + drop(sender); + // return; + } else { + let _ = sender.send(QueryResult::Data(data.tokens.nodes)); + } */ + + Ok(data.tokens) +} + +pub async fn request<'a>( + client: &'a Client, + address: &'a str, +) -> impl Stream> + 'a { + let cursor = None; + + stream::unfold(cursor, move |cursor| async move { + match fetch_page(&client, cursor, address).await { + Ok(response) => { + println!("SUCCESS"); + println!("SUCCESS"); + println!("SUCCESS"); + let items = stream::iter(response.nodes.into_iter().map(|node| Ok(node.token))); + let next_cursor = if response.page_info.has_next_page { + response.page_info.end_cursor + } else { + None + }; + Some((items, next_cursor)) + } + Err(_) => None, + } + }) + .flatten() } From 005c5e72139bb48496c6462ee72ddc85ebfc52ca Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 05:23:44 +0100 Subject: [PATCH 06/18] no more lifetimes, moved request logic handle_processing --- nft-folder/src/main.rs | 38 ++++----------------- nft-folder/src/request.rs | 69 ++++++++++++++++++++------------------- 2 files changed, 41 insertions(+), 66 deletions(-) diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index a1b4925..1450ebb 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,10 +1,9 @@ // mod download; mod request; -use futures::StreamExt; -use tokio::sync::{Semaphore, SemaphorePermit}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use request::request; +use request::handle_processing; +use reqwest::Client; fn download_image(url: &String, mp: &MultiProgress) { println!("spawning thread"); @@ -31,36 +30,11 @@ fn download_image(url: &String, mp: &MultiProgress) { } #[tokio::main] -async fn main() -> Result<(), Box> { - let (s, r) = futures::channel::mpsc::unbounded(); - - let mp = MultiProgress::new(); - - let receiver_stream = r.for_each(move |url| { - let mp = mp.clone(); - tokio::spawn(async move { - while let Some(url) = &r.next().await { - download_image(url, &mp); - } - }); - }); - +async fn main() { + let client = Client::new(); let address = "0x495f947276749Ce646f68AC8c248420045cb7b5e"; - let client = reqwest::Client::new(); - let stream = request(&client, &address).await; - tokio::pin!(stream); - - while let Some(result) = stream.next().await { - let token = result?; - println!("received token: {:?}", token); - let url = token.token_url.unwrap(); - if let Err(e) = s.unbounded_send(url) { - eprintln!("Error sending url to download task: {}", e); - } + if let Err(e) = handle_processing(&client, address).await { + println!("Error: {}", e); } - - drop(s); - - Ok(()) } diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index fe318df..4b243b7 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -1,5 +1,5 @@ use eyre::{eyre, Result}; -use futures::{stream, Stream, StreamExt}; +use futures::{stream, StreamExt}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::to_value; @@ -35,14 +35,14 @@ pub struct NftNode { #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct PageInfo { - end_cursor: Option, - has_next_page: bool, + pub end_cursor: Option, + pub has_next_page: bool, limit: i32, } #[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] pub struct NftNodes { pub nodes: Vec, - #[serde(rename = "camelCase")] pub page_info: PageInfo, } #[derive(Serialize, Deserialize, Debug)] @@ -62,27 +62,30 @@ struct ErrorLocation { } #[derive(Serialize, Deserialize, Debug)] -#[serde(untagged)] -pub enum NftResponse { - Success { data: NftData }, - Error { errors: FailedRequest }, +pub struct NftResponse { + data: Option, + error: Option, } impl NftResponse { fn handle_errors(self) -> Option { - match self { - NftResponse::Success { data } => Some(data), - NftResponse::Error { errors } => { - eprintln!("Errors: {:?}", errors); + match self.data { + Some(data) => Some(data), + None => { + eprintln!("Errors: {:?}", self.error); None } } } } -async fn fetch_page(client: &Client, cursor: Option, address: &str) -> Result { +pub async fn fetch_page( + client: &Client, + cursor: Option, + address: &str, +) -> Result { let cursor = match cursor { - Some(c) => format!(r#"after: "{}"""#, c), + Some(c) => format!(r#", after: "{}""#, c), None => "".to_owned(), }; @@ -90,7 +93,7 @@ async fn fetch_page(client: &Client, cursor: Option, address: &str) -> R r#" query NFTsForAddress {{ tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}], - pagination: {{limit: 20, {} }}, + pagination: {{limit: 2 {} }}, where: {{ownerAddresses: "{}"}}) {{ nodes {{ token {{ @@ -145,41 +148,39 @@ async fn fetch_page(client: &Client, cursor: Option, address: &str) -> R })?; let data = response.handle_errors().unwrap(); - - - /* if data.tokens.page_info.has_next_page == false { - let _ = sender.send(QueryResult::Finished); - drop(sender); - // return; - } else { - let _ = sender.send(QueryResult::Data(data.tokens.nodes)); - } */ - Ok(data.tokens) } -pub async fn request<'a>( +pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<()> { client: &'a Client, address: &'a str, ) -> impl Stream> + 'a { let cursor = None; - - stream::unfold(cursor, move |cursor| async move { + let requests = stream::unfold(cursor, move |cursor| async move { match fetch_page(&client, cursor, address).await { Ok(response) => { - println!("SUCCESS"); - println!("SUCCESS"); - println!("SUCCESS"); - let items = stream::iter(response.nodes.into_iter().map(|node| Ok(node.token))); + let items = stream::iter(response.nodes.into_iter().map(|node| node.token)); let next_cursor = if response.page_info.has_next_page { response.page_info.end_cursor } else { None }; + // Max 30 requests per min to public Zora API + std::thread::sleep(std::time::Duration::from_millis(2000)); Some((items, next_cursor)) } - Err(_) => None, + Err(err) => { + eprintln!("Error fetching data: {}", err); + None + } } }) - .flatten() + .flatten(); + tokio::pin!(requests); + + while let Some(data) = requests.next().await { + println!("{:#?}", data); + } + + Ok(()) } From fd1c759f2335e8ed0705e8603a31d123f64aec8b Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 05:24:03 +0100 Subject: [PATCH 07/18] upadte Cargo.lock --- Cargo.lock | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 159aa1d..eed65fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.19" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "crunchy" @@ -1891,6 +1891,7 @@ dependencies = [ "clap", "console", "crossbeam-channel", + "crossbeam-utils", "ethers", "ethers-providers", "eyre", From c1050eaf71ce9c8f463d7e7e959c6f439f28a033 Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 10:11:06 +0100 Subject: [PATCH 08/18] simulate download with semaphore locks --- nft-folder/src/download.rs | 42 +++++++++++++++++++++++++++++++++++++- nft-folder/src/main.rs | 27 +----------------------- nft-folder/src/request.rs | 26 ++++++++++++----------- 3 files changed, 56 insertions(+), 39 deletions(-) diff --git a/nft-folder/src/download.rs b/nft-folder/src/download.rs index e743342..3c0bea3 100644 --- a/nft-folder/src/download.rs +++ b/nft-folder/src/download.rs @@ -1,8 +1,10 @@ -use crate::request::{NftImage, NftNode}; +use crate::request::{NftImage, NftNode, NftToken}; use base64::decode; use eyre::{eyre, Result}; use futures::{stream::StreamExt, Stream}; use reqwest::Client; +use std::fmt::format; +use std::sync::Arc; use std::{fs, io::Cursor, path::PathBuf, time::Duration}; use std::{ fs::File, @@ -11,6 +13,43 @@ use std::{ use tokio::{sync::mpsc, time::sleep}; use tokio_util::bytes::Bytes; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use tokio::sync::Semaphore; + +pub async fn download_image(semaphore: Arc, data: NftToken, mp: &MultiProgress) { + // println!("=============================================="); + // let mp = mp.clone(); + let pb = mp.add(ProgressBar::new(100)); + tokio::spawn(async move { + let permit = semaphore.acquire().await.unwrap(); + // println!("{:#?}", data); + + pb.set_style( + ProgressStyle::with_template( + "{spinner:.magenta} {wide_msg} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos:>3}/{len} ({eta:>3})", + ) + .unwrap() + .progress_chars("█▉▊▋▌▍▎▏ ") + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]) + ); + let name = data.name.unwrap(); + pb.set_message(format!("downloading {}", &name)); + // println!("Get image {} on t:{t}", url); + // println!("starting download for {}", &name); + for i in 0..100 { + let r: u64 = ethers::core::rand::random::() / 60009360303000000; + std::thread::sleep(Duration::from_millis(r)); + pb.set_position(i); + // println!("downloading {} | progress {i}%", &name); + } + // println!("url: {url}"); + // println!("Downloaded {} on t:{t}", url); + pb.finish(); + // println!("=============================================="); + drop(permit); //`semaphore` dropped here while still borrowed + }); +} +/* pub async fn test_progress(node: NftNode, progress_tx: mpsc::Sender) { let file_path = match node.token.name { Some(name) => name, @@ -203,3 +242,4 @@ mod tests { } */ } + */ diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index 1450ebb..7add2e1 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,34 +1,9 @@ -// mod download; mod request; +mod download; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use request::handle_processing; use reqwest::Client; -fn download_image(url: &String, mp: &MultiProgress) { - println!("spawning thread"); - - let pb = mp.insert_from_back(0, ProgressBar::new(100)); - pb.set_style( - ProgressStyle::with_template( - "{spinner:.magenta} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos}/{len} ({eta})", - ) - .unwrap() - .progress_chars("█▉▊▋▌▍▎▏ ") - .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]) - ); - // println!("Get image {} on t:{t}", url); - // thread::sleep(Duration::from_millis(r)); - // for i in 0..100 { - // let r: u64 = random::() / 600093603030000000; - // thread::sleep(Duration::from_millis(r)); - // // pb.set_position(i); - // } - println!("url: {url}"); - // println!("Downloaded {} on t:{t}", url); - pb.finish(); -} - #[tokio::main] async fn main() { let client = Client::new(); diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index 4b243b7..6551a6c 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -1,10 +1,12 @@ +use crate::download::download_image; use eyre::{eyre, Result}; use futures::{stream, StreamExt}; +use indicatif::{MultiProgress, ProgressBar}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::to_value; - -const DEBUG: bool = false; +use std::sync::Arc; +use tokio::sync::Semaphore; #[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] @@ -93,7 +95,7 @@ pub async fn fetch_page( r#" query NFTsForAddress {{ tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}], - pagination: {{limit: 2 {} }}, + pagination: {{limit: 20 {} }}, where: {{ownerAddresses: "{}"}}) {{ nodes {{ token {{ @@ -142,19 +144,18 @@ pub async fn fetch_page( let response_str = String::from_utf8(response_data) .map_err(|err| eyre!("Failed to convert response to string: {}", err))?; - let response: NftResponse = serde_json::from_str(&response_str).map_err(|err| { - // eprintln!("{}", &response_str); - eyre!("Failed to parse JSON response: {}", err) - })?; + let response: NftResponse = serde_json::from_str(&response_str) + .map_err(|err| eyre!("Failed to parse JSON response: {}", err))?; + // println!("{:?}", response); let data = response.handle_errors().unwrap(); Ok(data.tokens) } pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<()> { - client: &'a Client, - address: &'a str, -) -> impl Stream> + 'a { + let semaphore = Arc::new(Semaphore::new(4)); + let mp = MultiProgress::new(); + let cursor = None; let requests = stream::unfold(cursor, move |cursor| async move { match fetch_page(&client, cursor, address).await { @@ -178,8 +179,9 @@ pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<( .flatten(); tokio::pin!(requests); - while let Some(data) = requests.next().await { - println!("{:#?}", data); + while let Some(token) = requests.next().await { + // let url = token.token_url.unwrap(); + download_image(Arc::clone(&semaphore), token, &mp).await; } Ok(()) From f72e4049dd7b2b0cce1abfa7bc7ff5f4c7f938d0 Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 14:08:10 +0100 Subject: [PATCH 09/18] DOWNLOADS!!* *mostly --- nft-folder/src/download.rs | 175 +++++++++++++++---------------------- nft-folder/src/main.rs | 24 +++-- nft-folder/src/request.rs | 10 +-- 3 files changed, 93 insertions(+), 116 deletions(-) diff --git a/nft-folder/src/download.rs b/nft-folder/src/download.rs index 3c0bea3..619efb8 100644 --- a/nft-folder/src/download.rs +++ b/nft-folder/src/download.rs @@ -1,83 +1,41 @@ -use crate::request::{NftImage, NftNode, NftToken}; +use crate::request::{NftImage, NftToken}; + use base64::decode; use eyre::{eyre, Result}; -use futures::{stream::StreamExt, Stream}; +use futures::stream::StreamExt; use reqwest::Client; -use std::fmt::format; use std::sync::Arc; -use std::{fs, io::Cursor, path::PathBuf, time::Duration}; +use std::{fs, path::PathBuf}; use std::{ fs::File, - io::{self, ErrorKind, Read, Write}, + io::{self, ErrorKind, Write}, }; -use tokio::{sync::mpsc, time::sleep}; -use tokio_util::bytes::Bytes; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use tokio::sync::Semaphore; -pub async fn download_image(semaphore: Arc, data: NftToken, mp: &MultiProgress) { - // println!("=============================================="); - // let mp = mp.clone(); - let pb = mp.add(ProgressBar::new(100)); - tokio::spawn(async move { - let permit = semaphore.acquire().await.unwrap(); - // println!("{:#?}", data); +const DEBUG: bool = false; - pb.set_style( - ProgressStyle::with_template( +pub async fn handle_token( + semaphore: Arc, + token: NftToken, + client: &Client, + mp: &MultiProgress, + dir: &PathBuf, +) -> Result<()> { + let pb_style = ProgressStyle::with_template( "{spinner:.magenta} {wide_msg} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos:>3}/{len} ({eta:>3})", ) .unwrap() - .progress_chars("█▉▊▋▌▍▎▏ ") - .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]) - ); - let name = data.name.unwrap(); - pb.set_message(format!("downloading {}", &name)); - // println!("Get image {} on t:{t}", url); - // println!("starting download for {}", &name); - for i in 0..100 { - let r: u64 = ethers::core::rand::random::() / 60009360303000000; - std::thread::sleep(Duration::from_millis(r)); - pb.set_position(i); - // println!("downloading {} | progress {i}%", &name); - } - // println!("url: {url}"); - // println!("Downloaded {} on t:{t}", url); - pb.finish(); - // println!("=============================================="); - drop(permit); //`semaphore` dropped here while still borrowed - }); -} -/* -pub async fn test_progress(node: NftNode, progress_tx: mpsc::Sender) { - let file_path = match node.token.name { - Some(name) => name, - None => "d".to_string(), - }; + .progress_chars("█▉▊▋▌▍▎▏ ") + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]); - for n in 0..100 { - sleep(Duration::from_millis(500)).await; - let file_path = file_path.clone(); - progress_tx - .send(DownloadResult { - file_path, - progress: n, - total: 100, - }) - .await - .unwrap(); - } -} - -const DEBUG: bool = false; -pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> Result<()> { - /* Pin>>> */ - let image = &node.token.image; - let name = match &node.token.name { + let image = token.image; + let name = match token.name { Some(name) => name, - None => return Err(eyre!("Image data not found for {:#?}", node)), + None => return Err(eyre!("Image data not found for {:#?}", token.token_id)), }; + let msg = format!("downloading {}", &name); let (url, mime) = match image { NftImage::Object { @@ -85,7 +43,7 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R mime_type, size: _, } => (url, mime_type), - NftImage::Url(url) => (url, &None), //meant here + NftImage::Url(url) => (url, None), _ => return Err(eyre!("No image URL found for {name}")), }; @@ -100,10 +58,8 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R let file_path = dir.join(format!("{name}.{extension}")); if file_path.is_file() { - if DEBUG { - println!("Skipping {name}"); - } - // progress.inc(1); + let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style)); + pb.finish_with_message(format!("Already downloaded {name}")); return Ok(()); } @@ -111,62 +67,71 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R println!("Downloading {name} to {:?}", file_path); } - let (progress_tx, mut _progress_rx) = mpsc::channel(10); // Adjust the buffer size as needed - match url { - // Decode and save svg - url if url.starts_with("data:image/svg") => save_base64_image( + if url.starts_with("data:image/svg") { + let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style)); + decode_and_save( &url.strip_prefix("data:image/svg+xml;base64,") .unwrap_or(&url), file_path, - )?, - // append IPFS gateway - url if url.starts_with("ipfs") => { + )?; + pb.finish(); + } else { + let permit = semaphore.acquire_owned().await.unwrap(); + let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style)); + + let url = if url.starts_with("ipfs") { + // append IPFS gateway let parts: Vec<&str> = url.split('/').collect(); - if let Some(hash) = parts.iter().find(|&&part| part.starts_with("Qm")) { - let ipfs_url = format!("https://ipfs.io/ipfs/{hash}"); - if let Err(error) = download_image(&client, &ipfs_url, file_path, progress_tx).await - { - return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); - } + let hash = parts.iter().find(|&&part| part.starts_with("Qm")); + + // Handle the case where the hash is not found + match hash { + Some(hash) => format!("https://ipfs.io/ipfs/{}", hash), + None => { + // pb.finish_with_message(format!("IPFS hash not found in URL for {name}")); + return Err(eyre::eyre!("IPFS hash not found in URL")); + } //if a single image fails I want to finish it immediately without disrupting other ongoing processess } - } - url => { - if let Err(error) = download_image(&client, &url, file_path, progress_tx).await { - return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); + } else { + url.to_owned() + }; + + let client = client.clone(); + let name_cp = name.clone(); + + tokio::spawn(async move { + // pb.set_position(i); + match download_image(&client, &url, file_path, &pb).await { + Ok(()) => pb.finish(), + Err(error) => { + pb.finish_with_message(format!( + "Error downloading image {}: {}", + name_cp, error + )); + // return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); + } }; - } + + drop(permit); + }); } if DEBUG { println!("{name} saved successfully"); } - Ok(()) } -// async fn get_address() - -pub struct DownloadProgress { - pub name: String, - pub progress: u64, - pub total: u64, -} - -#[derive(Debug)] -pub struct DownloadResult { - file_path: String, - progress: u64, - total: u64, -} async fn download_image( client: &Client, image_url: &str, file_path: PathBuf, - progress_tx: mpsc::Sender<(u64, u64)>, + pb: &ProgressBar ) -> Result<()> { let response = client.get(image_url).send().await?; - let content_length = response.content_length().unwrap_or(0); + let content_length = response.content_length().unwrap_or(100); let mut byte_stream = response.bytes_stream(); + pb.set_length(content_length); let mut progress: u64 = 0; let mut file = File::create(file_path)?; @@ -179,8 +144,7 @@ async fn download_image( file.write_all(&chunk) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - // Send progress update through the channel - let _ = progress_tx.send((progress, content_length)).await; + pb.set_position(progress); } if content_length != progress { @@ -218,7 +182,7 @@ pub async fn create_directory(dir_path: &PathBuf) -> Result { Ok(res) } -fn save_base64_image(base64_data: &str, file_path: PathBuf) -> Result<()> { +fn decode_and_save(base64_data: &str, file_path: PathBuf) -> Result<()> { let decoded_data = decode(base64_data)?; let mut file = File::create(file_path)?; file.write_all(&decoded_data)?; @@ -242,4 +206,3 @@ mod tests { } */ } - */ diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index 7add2e1..c2bd216 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,15 +1,29 @@ mod request; mod download; +use std::path::PathBuf; + +use download::create_directory; +use eyre::Result; use request::handle_processing; use reqwest::Client; #[tokio::main] -async fn main() { +async fn main() -> Result<()>{ let client = Client::new(); - let address = "0x495f947276749Ce646f68AC8c248420045cb7b5e"; - - if let Err(e) = handle_processing(&client, address).await { - println!("Error: {}", e); + let address = "0xa3a4548b39da96eb065ff91811ca30da40431c0d"; + let mut path = PathBuf::from("test"); + path.push(&address); + println!("{:#?}", &path); + + match create_directory(&path).await { + Ok(path) => { + if let Err(e) = handle_processing(&client, address, path).await { + println!("Error: {}", e); + }; + } + Err(err) => return Err(err) } + + Ok(()) } diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index 6551a6c..0b0f81a 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -1,11 +1,11 @@ -use crate::download::download_image; +use crate::download::handle_token; use eyre::{eyre, Result}; use futures::{stream, StreamExt}; -use indicatif::{MultiProgress, ProgressBar}; +use indicatif::MultiProgress; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::to_value; -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; use tokio::sync::Semaphore; #[derive(Serialize, Deserialize, Debug)] @@ -152,7 +152,7 @@ pub async fn fetch_page( Ok(data.tokens) } -pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<()> { +pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> eyre::Result<()> { let semaphore = Arc::new(Semaphore::new(4)); let mp = MultiProgress::new(); @@ -181,7 +181,7 @@ pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<( while let Some(token) = requests.next().await { // let url = token.token_url.unwrap(); - download_image(Arc::clone(&semaphore), token, &mp).await; + let _ = handle_token(Arc::clone(&semaphore), token, &client, &mp, &path).await; } Ok(()) From 4a4437dd761fa01e529992ac55ca95e4aaa0f791 Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 17:50:54 +0100 Subject: [PATCH 10/18] break when completed --- nft-folder/src/request.rs | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index 0b0f81a..29a4361 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -108,7 +108,6 @@ pub async fn fetch_page( size mimeType }} - metadata }} }} pageInfo {{ @@ -152,6 +151,10 @@ pub async fn fetch_page( Ok(data.tokens) } +enum PageResult { + Data(NftToken), + Completed, +} pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> eyre::Result<()> { let semaphore = Arc::new(Semaphore::new(4)); let mp = MultiProgress::new(); @@ -160,7 +163,13 @@ pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> let requests = stream::unfold(cursor, move |cursor| async move { match fetch_page(&client, cursor, address).await { Ok(response) => { - let items = stream::iter(response.nodes.into_iter().map(|node| node.token)); + let items = stream::iter(response.nodes.into_iter().map(move |node| { + if response.page_info.has_next_page { + PageResult::Data(node.token) + } else { + PageResult::Completed + } + })); let next_cursor = if response.page_info.has_next_page { response.page_info.end_cursor } else { @@ -181,7 +190,13 @@ pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> while let Some(token) = requests.next().await { // let url = token.token_url.unwrap(); - let _ = handle_token(Arc::clone(&semaphore), token, &client, &mp, &path).await; + match token { + PageResult::Data(token) => { + // println!("Sending {:?}", token.name); + let _ = handle_token(Arc::clone(&semaphore), token, &client, &mp, &path).await; + } + PageResult::Completed => break, + } } Ok(()) From 1fce91f05e62062c7fa693b0fee5ea61a474b73d Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 21:06:10 +0100 Subject: [PATCH 11/18] global progress --- nft-folder/src/download.rs | 37 +++++++++++++++++++++++++------------ nft-folder/src/main.rs | 5 ++--- nft-folder/src/request.rs | 30 ++++++++++++++++++++++++++---- 3 files changed, 53 insertions(+), 19 deletions(-) diff --git a/nft-folder/src/download.rs b/nft-folder/src/download.rs index 619efb8..03e6882 100644 --- a/nft-folder/src/download.rs +++ b/nft-folder/src/download.rs @@ -58,8 +58,11 @@ pub async fn handle_token( let file_path = dir.join(format!("{name}.{extension}")); if file_path.is_file() { - let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style)); - pb.finish_with_message(format!("Already downloaded {name}")); + let pb = mp.insert( + 0, + ProgressBar::new(100).with_message(msg).with_style(pb_style), + ); + pb.finish_with_message(format!("Already saved {name}")); return Ok(()); } @@ -68,7 +71,10 @@ pub async fn handle_token( } if url.starts_with("data:image/svg") { - let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style)); + let pb = mp.insert( + 0, + ProgressBar::new(100).with_message(msg).with_style(pb_style), + ); decode_and_save( &url.strip_prefix("data:image/svg+xml;base64,") .unwrap_or(&url), @@ -77,7 +83,10 @@ pub async fn handle_token( pb.finish(); } else { let permit = semaphore.acquire_owned().await.unwrap(); - let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style)); + let pb = mp.insert( + 0, + ProgressBar::new(100).with_message(msg).with_style(pb_style), + ); let url = if url.starts_with("ipfs") { // append IPFS gateway @@ -88,25 +97,29 @@ pub async fn handle_token( match hash { Some(hash) => format!("https://ipfs.io/ipfs/{}", hash), None => { - // pb.finish_with_message(format!("IPFS hash not found in URL for {name}")); + // Handle the case where the hash is not found + pb.abandon_with_message(format!("IPFS hash not found in URL for {name}")); return Err(eyre::eyre!("IPFS hash not found in URL")); - } //if a single image fails I want to finish it immediately without disrupting other ongoing processess + } } } else { url.to_owned() }; let client = client.clone(); - let name_cp = name.clone(); tokio::spawn(async move { // pb.set_position(i); - match download_image(&client, &url, file_path, &pb).await { - Ok(()) => pb.finish(), + match download_image(&client, &url, &file_path, &pb).await { + Ok(()) => { + pb.finish_with_message(format!("Completed {name}")); + + // Ok(()) + } Err(error) => { - pb.finish_with_message(format!( - "Error downloading image {}: {}", - name_cp, error + pb.abandon_with_message(format!( + "Download failed: {} to {:?}. {}", + name, file_path, error )); // return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); } diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index c2bd216..73acedc 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -14,16 +14,15 @@ async fn main() -> Result<()>{ let address = "0xa3a4548b39da96eb065ff91811ca30da40431c0d"; let mut path = PathBuf::from("test"); path.push(&address); - println!("{:#?}", &path); + // println!("{:#?}", &path); match create_directory(&path).await { Ok(path) => { if let Err(e) = handle_processing(&client, address, path).await { println!("Error: {}", e); }; + return Ok(()) } Err(err) => return Err(err) } - - Ok(()) } diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index 29a4361..9341b48 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -1,7 +1,7 @@ use crate::download::handle_token; use eyre::{eyre, Result}; use futures::{stream, StreamExt}; -use indicatif::MultiProgress; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::to_value; @@ -188,16 +188,38 @@ pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> .flatten(); tokio::pin!(requests); + + mp.set_alignment(indicatif::MultiProgressAlignment::Bottom); + let total_pb = mp.add(ProgressBar::new(0)); + total_pb.set_style(ProgressStyle::with_template( + "{spinner:.magenta} {pos:>3.bold.blue} of {len:>3.bold.blue} {msg}", + ) + .unwrap() + .progress_chars("█▉▊▋▌▍▎▏ ")); + // .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"])); + total_pb.set_message("Complete"); + + while let Some(token) = requests.next().await { + total_pb.inc_length(1); // let url = token.token_url.unwrap(); match token { PageResult::Data(token) => { // println!("Sending {:?}", token.name); - let _ = handle_token(Arc::clone(&semaphore), token, &client, &mp, &path).await; + match handle_token(Arc::clone(&semaphore), token, &client, &mp, &path).await { + Ok(()) => { + total_pb.inc(1); + } + Err(err) => { + total_pb.println(format!("{}", err)); + } + } } - PageResult::Completed => break, + PageResult::Completed => { + total_pb.abandon_with_message("Completed Sucessfully"); + return Ok(()) + }, } } - Ok(()) } From e83a8a684303c84604a70ca1a96c6b97ba0c99f7 Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 21:13:29 +0100 Subject: [PATCH 12/18] improve error handling and escape slashes in name --- nft-folder/src/download.rs | 52 +++++++++++++++++++++++++------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/nft-folder/src/download.rs b/nft-folder/src/download.rs index 03e6882..21db96b 100644 --- a/nft-folder/src/download.rs +++ b/nft-folder/src/download.rs @@ -24,18 +24,25 @@ pub async fn handle_token( dir: &PathBuf, ) -> Result<()> { let pb_style = ProgressStyle::with_template( - "{spinner:.magenta} {wide_msg} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos:>3}/{len} ({eta:>3})", + "{spinner:.magenta} {wide_msg} {pos:>3}/{len} [{bar:40.yellow/}] [{elapsed_precise:.bold.blue}]", ) .unwrap() .progress_chars("█▉▊▋▌▍▎▏ ") .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]); + // let debug_style = ProgressStyle::with_template("{wide_msg}").unwrap(); let image = token.image; - let name = match token.name { - Some(name) => name, - None => return Err(eyre!("Image data not found for {:#?}", token.token_id)), - }; - let msg = format!("downloading {}", &name); + let name = if let Some(name) = token.name { + name + } else if let (Some(collection_name), Some(id)) = (&token.collection_name, &token.token_id) { + format!("{} #{}", collection_name, id) + } else { + return Err(eyre!("Image data not found for {:#?}", token.token_id)); + } + .replace("/", " ") + .replace("\\", " "); + + let msg = format!("{}", &name); let (url, mime) = match image { NftImage::Object { @@ -46,14 +53,28 @@ pub async fn handle_token( NftImage::Url(url) => (url, None), _ => return Err(eyre!("No image URL found for {name}")), }; - let extension = if url.starts_with("data:image/svg") { "svg".to_string() } else if let Some(mime) = mime { mime.rsplit("/").next().unwrap_or_default().to_string() + } else if url.starts_with("ipfs") { + // This is probably not going to be an image, but let's take a shot and see what happens + // println!("{} {}", name, url); + "png".to_string() + } else if url.starts_with("ens") { + // println!("{} {}", name, url); + return Err(eyre!("{name} is not an image")); } else { - url.rsplit('.').next().unwrap_or_default().to_lowercase() + let ext = url.rsplit('.').next().unwrap_or_default().to_lowercase(); + if ext.len() > 5 { + return Err(eyre!("No suitable extension found for {} {}", name, url)); + } else { + ext + } }; + // TODO: Timeout if download takes too long + // TODO: Maybe panic automatically on unrecognized file types + // TODO: Some SVGs seem to be having issues let file_path = dir.join(format!("{name}.{extension}")); @@ -90,10 +111,11 @@ pub async fn handle_token( let url = if url.starts_with("ipfs") { // append IPFS gateway - let parts: Vec<&str> = url.split('/').collect(); - let hash = parts.iter().find(|&&part| part.starts_with("Qm")); + let hash = url + .split('/') + .into_iter() + .find(|&part| part.starts_with("Qm")); - // Handle the case where the hash is not found match hash { Some(hash) => format!("https://ipfs.io/ipfs/{}", hash), None => { @@ -128,18 +150,14 @@ pub async fn handle_token( drop(permit); }); } - - if DEBUG { - println!("{name} saved successfully"); - } Ok(()) } async fn download_image( client: &Client, image_url: &str, - file_path: PathBuf, - pb: &ProgressBar + file_path: &PathBuf, + pb: &ProgressBar, ) -> Result<()> { let response = client.get(image_url).send().await?; let content_length = response.content_length().unwrap_or(100); From 71801e2a14e222a323d7cee7fef1139b2d7c2737 Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 21:14:16 +0100 Subject: [PATCH 13/18] remove unused dependencies --- Cargo.lock | 11 ----------- nft-folder/Cargo.toml | 2 -- 2 files changed, 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eed65fe..f2c96a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,15 +537,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -1890,8 +1881,6 @@ dependencies = [ "base64 0.13.1", "clap", "console", - "crossbeam-channel", - "crossbeam-utils", "ethers", "ethers-providers", "eyre", diff --git a/nft-folder/Cargo.toml b/nft-folder/Cargo.toml index 9591727..38525d5 100644 --- a/nft-folder/Cargo.toml +++ b/nft-folder/Cargo.toml @@ -25,5 +25,3 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = ["macros", "rt-multi-thread", "fs", "sync"] } tokio-util = { version = "0.7.10", features = ["io-util"] } -crossbeam-channel = "0.5.13" -crossbeam-utils = "0.8.20" From 6f88465b836555791f760086a56ed839726ae9f0 Mon Sep 17 00:00:00 2001 From: ibro Date: Fri, 28 Jun 2024 21:22:22 +0100 Subject: [PATCH 14/18] move new main --- nft-folder/src/{main.rs => main_v2.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename nft-folder/src/{main.rs => main_v2.rs} (100%) diff --git a/nft-folder/src/main.rs b/nft-folder/src/main_v2.rs similarity index 100% rename from nft-folder/src/main.rs rename to nft-folder/src/main_v2.rs From e2dca6804a47f8215298e3e7b472bc387a87b923 Mon Sep 17 00:00:00 2001 From: ibro Date: Sat, 29 Jun 2024 13:49:31 +0100 Subject: [PATCH 15/18] move old-main --- nft-folder/src/{main-bk.rs => main.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename nft-folder/src/{main-bk.rs => main.rs} (100%) diff --git a/nft-folder/src/main-bk.rs b/nft-folder/src/main.rs similarity index 100% rename from nft-folder/src/main-bk.rs rename to nft-folder/src/main.rs From f9e931e1094ea97ba81a289ca99d42c379b49435 Mon Sep 17 00:00:00 2001 From: ibro Date: Sat, 29 Jun 2024 13:54:53 +0100 Subject: [PATCH 16/18] move main-v2 --- nft-folder/src/{main_v2.rs => main.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename nft-folder/src/{main_v2.rs => main.rs} (100%) diff --git a/nft-folder/src/main_v2.rs b/nft-folder/src/main.rs similarity index 100% rename from nft-folder/src/main_v2.rs rename to nft-folder/src/main.rs From 7681af9ac215681c66326a2724862f3c947597b8 Mon Sep 17 00:00:00 2001 From: ibro Date: Sun, 30 Jun 2024 05:52:46 +0100 Subject: [PATCH 17/18] merge main: fix errors --- nft-folder/src/download.rs | 30 ++--- nft-folder/src/main.rs | 251 ++++--------------------------------- nft-folder/src/request.rs | 14 +-- 3 files changed, 45 insertions(+), 250 deletions(-) diff --git a/nft-folder/src/download.rs b/nft-folder/src/download.rs index 21db96b..8da5692 100644 --- a/nft-folder/src/download.rs +++ b/nft-folder/src/download.rs @@ -23,12 +23,14 @@ pub async fn handle_token( mp: &MultiProgress, dir: &PathBuf, ) -> Result<()> { + + // "{wide_msg} {pos:>7}/{len:7} {bar.cyan/blue} {percent}", let pb_style = ProgressStyle::with_template( - "{spinner:.magenta} {wide_msg} {pos:>3}/{len} [{bar:40.yellow/}] [{elapsed_precise:.bold.blue}]", + "{spinner:.magenta} {wide_msg} {pos:>3}/{len} [{bar:40.yellow}] [{elapsed_precise:.bold.blue}]", ) .unwrap() .progress_chars("█▉▊▋▌▍▎▏ ") - .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]); + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶", "⣿"]); // let debug_style = ProgressStyle::with_template("{wide_msg}").unwrap(); let image = token.image; @@ -187,30 +189,28 @@ async fn download_image( Ok(()) } -pub async fn create_directory(dir_path: &PathBuf) -> Result { - let res = match fs::metadata(dir_path) { +pub async fn create_directory(dir_path: PathBuf) -> Result { + let copy = dir_path.clone(); + match fs::metadata(copy) { Ok(metadata) => { if !metadata.is_dir() { - return Err(io::Error::new( + Err(io::Error::new( ErrorKind::InvalidInput, format!("{:?} is not a directory", dir_path), ) - .into()); + .into()) + } else { + Ok(dir_path) } - dir_path.to_path_buf() } Err(e) if e.kind() == ErrorKind::NotFound => { - fs::create_dir_all(dir_path)?; - if DEBUG { - println!("created directory: {:?}", dir_path); - } - dir_path.to_path_buf() + fs::create_dir_all(&dir_path)?; + Ok(dir_path) } Err(e) => { - return Err(e.into()); + Err(e.into()) } - }; - Ok(res) + } } fn decode_and_save(base64_data: &str, file_path: PathBuf) -> Result<()> { diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index e629a92..a070d96 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,22 +1,18 @@ mod download; mod request; -use std::path::PathBuf; use download::create_directory; use request::handle_processing; -use crate::request::NftResponse; use ::core::time; use clap::{Args, Parser, Subcommand}; use console::style; use ethers::utils::hex::encode; use ethers_providers::{Http, Middleware, Provider}; use eyre::Result; -use futures::stream::{self, StreamExt}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use reqwest::Client; use std::borrow::Borrow; -use tokio::sync::mpsc; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -66,11 +62,9 @@ async fn main() -> Result<()> { let provider = Provider::::try_from(args.rpc)?; let account = match args.address { arg if arg.split(".").last().unwrap() == "eth" => { - // format!("{spinner} {} {msg}", style("INFO").bright()); - let spinner = pending(&multi_pb, "Resolving address...".to_string()); - let address = resolve_ens_name(&arg, &provider).await?; - spinner.finish_with_message(format!("Resolving address: {}", address)); - + let spinner = pending(&multi_pb, "ENS Detected. Resolving address...".to_string()); + let address = resolve_ens_name(&arg, provider).await?; + spinner.finish_with_message(format!("Name Resolved to {address}")); Account { name: Some(arg.to_string()), address, @@ -86,92 +80,37 @@ async fn main() -> Result<()> { style("Invalid address").red() )) } - }; - // Request - let spinner = pending(&multi_pb, "Requesting collection data...".to_string()); - let nodes = NftResponse::request(&account.address).await?.tokens.nodes; - spinner - .finish_with_message(format!("Found {} NFTs. Starting download...", nodes.len())); + }; let path = match account.name { - Some(name) => args.path.join(name), - None => args.path.join(account.address), - }; - - match create_directory(&path).await { - Ok(path) => path, - Err(err) => return Err(eyre::eyre!("{} {err}", style("Invalid Path").red())), + Some(name) => { + let spinner = pending(&multi_pb, format!("Saving files to {}", name)); + match create_directory(args.path.join(name)).await { + Ok(path) => { + spinner.finish(); + path + }, + Err(err) => return Err(eyre::eyre!("{} {err}", style("Invalid Path").red())), + } + } + None => args.path.join(&account.address), }; let client = Client::new(); - let address = "0xa3a4548b39da96eb065ff91811ca30da40431c0d"; - let mut path = PathBuf::from("test"); - path.push(&address); - // println!("{:#?}", &path); - - match create_directory(&path).await { - Ok(path) => { - if let Err(e) = handle_processing(&client, address, path).await { - println!("Error: {}", e); - }; - return Ok(()) - } - Err(err) => return Err(err) - } + if let Err(e) = handle_processing(&client, account.address.as_str(), path, args.max_concurrent_downloads).await { + println!("Error: {}", e); + }; - let main_pb = multi_pb.add(ProgressBar::new(nodes.len() as u64)); - main_pb.set_style( - ProgressStyle::with_template( - "Total ({pos:>7}/{len:7}) {wide_bar.cyan/blue} {percent}", - ) - .unwrap() - .progress_chars("█░ "), - ); /* - :: Remove make dependencies after install? [y/N] :: (1/6) ENS Name Detected. Resolving name :: (2/6) Name resolved to 0x21B0...42fa :: (3/6) Saving files to name.eth :: (4/6) Requesting NFT Data :: (5/6) 45 NFTs found. Starting download */ - let (tx, rx): (mpsc::Sender, mpsc::Receiver) = - mpsc::channel(100); - - let tasks = stream::iter(nodes.into_iter().map(|node| { - let pb = multi_pb.insert_before(&main_pb, ProgressBar::new(0)); - pb.set_style( - ProgressStyle::with_template( - "{wide_msg} {pos:>7}/{len:7} {bar.cyan/blue} {percent}", - ) - .unwrap() - .progress_chars("█░ "), - ); - let path = path.clone(); - let client = client.clone(); - let tx = tx.clone(); - async move { - // test_progress(node, tx).await; - handle_download(node, &path, &client).await - } - })) - .buffer_unordered(args.max_concurrent_downloads); - - tasks - .for_each(|result| async { - match result { - Ok(()) => { - // println!("finished with success"); - main_pb.inc(1); - } - Err(_err) => todo!("save output for failed downloads"), // println!("finished with err"), - } - }) - .await; - - main_pb.finish(); Ok(()) } + Commands::Test => { let nodes = vec![ "first".to_string(), @@ -193,119 +132,23 @@ async fn main() -> Result<()> { .progress_chars("█░ "), ); - // let (tx, rx) = mpsc::channel::(100); - - let tasks = total_pb - .wrap_stream(stream::iter(nodes.into_iter().map(|node| { - // let tx = tx.clone(); - async move { - track_task(node, &multi_pb).await - // worker2(node, tx).await } - } - }))) - .buffer_unordered(3) - .collect::>(); - - let _x = tasks.await; - /* tasks - .for_each(|result| async move { - let pb = multi_pb.insert_from_back(1, ProgressBar::new(100)); - pb.set_style( - ProgressStyle::with_template( - "{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ", - ) - .unwrap() - .progress_chars("██ "), - ); - match result { - Ok(res) => { - pb.set_message(res); - - //******** */ - while let Some(recv) = rx.recv().await { - let pos = recv.progress * 100 / recv.total; - pb.set_position(pos); - } - } - Err(_err) => pb.abandon_with_message("Error during download"), - } - }) - .await; */ - // tokio::spawn(tasks); - // Wait for the worker thread to finish and receive the result - // let result = rx.recv().unwrap(); - - // Print the received result - - /* for recv in rx { - pb.set_message(recv.node); - let pos = recv.progress * 100 / recv.total; - - pb.set_position(pos); - // println!("{:?} / {:?} = {:?}", recv.progress, recv.total, pos); - } */ Ok(()) } } } -async fn track_task(node: String, _multi_pb: &MultiProgress) -> Result<()> { - let (tx, rx) = std::sync::mpsc::channel::(); - - let pb = ProgressBar::new(100); - let _ = worker2(&node, tx); - pb.set_message(node); - pb.set_style( - ProgressStyle::with_template( - "{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ", - ) - .unwrap() - .progress_chars("██ "), - ); - for recv in rx { - let pos = recv.progress * 100 / recv.total; - - pb.set_position(pos); - // println!("{:?} / {:?} = {:?}", recv.progress, recv.total, pos); - } /* - while let Ok(recv) = rx.recv() { - let pos = recv.progress * 100 / recv.total; - println!("{pos}"); - pb.set_position(pos); - }; */ - Ok(()) -} -#[derive(Debug)] -struct Ret { - progress: u64, - total: u64, -} - -fn worker2(node: &String, progress_tx: std::sync::mpsc::Sender) -> Result<()> { - let total = 100; - for n in 0..total { - std::thread::sleep(tokio::time::Duration::from_millis(10)); - progress_tx - .send(Ret { - progress: n, - total: total, - }) - .unwrap(); - } - Ok(()) -} -async fn resolve_ens_name(ens_name: &str, provider: &Provider) -> Result { +async fn resolve_ens_name(ens_name: &str, provider: Provider) -> Result { let address = provider.resolve_name(ens_name).await?; Ok(format!("0x{}", encode(address))) } -/* function which wraps a generic action with a spinner then returns it's reult */ +/// Wrapsa generic action with a spinner then return it's result fn pending(multi_pb: &MultiProgress, msg: String) -> ProgressBar { // https://github.com/sindresorhus/cli-spinners/blob/main/spinners.json let style = ProgressStyle::default_spinner() - .template("{spinner} {prefix:.bold.blue} {msg}") + .template("{spinner:.magenta} {prefix:.bold.blue} {msg}") .unwrap() - .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]); + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶", "⣿"]); let spinner = multi_pb.add(ProgressBar::new_spinner().with_style(style)); spinner.set_prefix("INFO"); spinner.set_message(msg); @@ -313,51 +156,3 @@ fn pending(multi_pb: &MultiProgress, msg: String) -> ProgressBar { spinner } - -/* -#[tokio::main] -async fn main() -> eyre::Result<()>{ - let nodes = vec![ - "first".to_string(), - "second".to_string(), - "third".to_string(), - "fourth node".to_string(), - ]; - - let (tx, rx) = std::sync::mpsc::channel::(); - let tasks = stream::iter(nodes.into_iter().map(|node| { - let tx = tx.clone(); - async move { worker2(node, tx).await } - })) - .buffer_unordered(2); - - // indicatif Multiprogress - let multi_pb = MultiProgress::new(); - // Tracks total progress of nodes - let total_pb = multi_pb.add(ProgressBar::new(nodes.len().try_into()?)); - - tasks - .for_each(|result| async { - let pb = multi_pb.insert_before(&total_pb, ProgressBar::new(100)); - pb.set_style( - ProgressStyle::with_template( - "{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ", - ) - .unwrap() - .progress_chars("██ "), - ); - match result { - Ok(res) => { - pb.set_message(res); - while let Ok(recv) = rx.recv() { - let pos = recv.progress * 100 / recv.total; - pb.set_position(pos); - } - total_pb.inc(1); - } - Err(_err) => pb.abandon_with_message("Error during download"), - } - }) - .await; - Ok(()) -} */ diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index 9341b48..7203593 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -155,10 +155,7 @@ enum PageResult { Data(NftToken), Completed, } -pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> eyre::Result<()> { - let semaphore = Arc::new(Semaphore::new(4)); - let mp = MultiProgress::new(); - +pub async fn handle_processing(client: &Client, address: &str, path: PathBuf, max: usize) -> eyre::Result<()> { let cursor = None; let requests = stream::unfold(cursor, move |cursor| async move { match fetch_page(&client, cursor, address).await { @@ -188,16 +185,18 @@ pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> .flatten(); tokio::pin!(requests); + let semaphore = Arc::new(Semaphore::new(max)); + let mp = MultiProgress::new(); mp.set_alignment(indicatif::MultiProgressAlignment::Bottom); let total_pb = mp.add(ProgressBar::new(0)); total_pb.set_style(ProgressStyle::with_template( - "{spinner:.magenta} {pos:>3.bold.blue} of {len:>3.bold.blue} {msg}", + "Found: {len:>3.bold.blue} Saved: {pos:>3.bold.blue} {msg}", ) .unwrap() .progress_chars("█▉▊▋▌▍▎▏ ")); // .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"])); - total_pb.set_message("Complete"); + // total_pb.set_message("Complete"); while let Some(token) = requests.next().await { @@ -216,7 +215,8 @@ pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> } } PageResult::Completed => { - total_pb.abandon_with_message("Completed Sucessfully"); + // total_pb.abandon_with_message("Completed Sucessfully"); + total_pb.abandon(); return Ok(()) }, } From 9b25372a2e39f50273dc8430420e99c0e6511f04 Mon Sep 17 00:00:00 2001 From: ibro Date: Sun, 30 Jun 2024 06:06:58 +0100 Subject: [PATCH 18/18] update version --- Cargo.lock | 2 +- nft-folder/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2c96a3..b22ecd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1876,7 +1876,7 @@ checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" [[package]] name = "nft-folder" -version = "0.2.1" +version = "0.3.1" dependencies = [ "base64 0.13.1", "clap", diff --git a/nft-folder/Cargo.toml b/nft-folder/Cargo.toml index 38525d5..4a8d0da 100644 --- a/nft-folder/Cargo.toml +++ b/nft-folder/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nft-folder" -version = "0.2.1" +version = "0.3.1" edition = "2021" authors = ["ibro "] license = "MIT OR Apache-2.0"