diff --git a/Cargo.lock b/Cargo.lock index b737328..b22ecd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -558,9 +558,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.19" +version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" [[package]] name = "crunchy" @@ -1876,7 +1876,7 @@ checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" [[package]] name = "nft-folder" -version = "0.2.1" +version = "0.3.1" dependencies = [ "base64 0.13.1", "clap", diff --git a/nft-folder/Cargo.toml b/nft-folder/Cargo.toml index 38525d5..4a8d0da 100644 --- a/nft-folder/Cargo.toml +++ b/nft-folder/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "nft-folder" -version = "0.2.1" +version = "0.3.1" edition = "2021" authors = ["ibro "] license = "MIT OR Apache-2.0" diff --git a/nft-folder/src/download.rs b/nft-folder/src/download.rs index af2ef29..8da5692 100644 --- a/nft-folder/src/download.rs +++ b/nft-folder/src/download.rs @@ -1,37 +1,50 @@ -use crate::request::{NftImage, NftNode}; +use crate::request::{NftImage, NftToken}; + use base64::decode; use eyre::{eyre, Result}; -use futures::{stream::StreamExt, Stream}; +use futures::stream::StreamExt; use reqwest::Client; -use std::{fs, io::Cursor, path::PathBuf, time::Duration}; +use std::sync::Arc; +use std::{fs, path::PathBuf}; use std::{ fs::File, - io::{self, ErrorKind, Read, Write}, + io::{self, ErrorKind, Write}, }; -use tokio::{sync::mpsc, time::sleep}; -use tokio_util::bytes::Bytes; - -pub async fn test_progress(node: NftNode, progress_tx: mpsc::Sender) { - let file_path = match node.token.name { - Some(name) => name, - None => "d".to_string() - }; - - for n in 0..100 { - sleep(Duration::from_millis(500)).await; - let file_path = file_path.clone(); - progress_tx.send(DownloadResult {file_path, progress: n, total: 100}).await.unwrap(); - } -} + +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use tokio::sync::Semaphore; const DEBUG: bool = false; -pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> Result<()> { - /* Pin>>> */ - let image = &node.token.image; - let name = match &node.token.name { - Some(name) => name, - None => return Err(eyre!("Image data not found for {:#?}", node)), - }; + +pub async fn handle_token( + semaphore: Arc, + token: NftToken, + client: &Client, + mp: &MultiProgress, + dir: &PathBuf, +) -> Result<()> { + + // "{wide_msg} {pos:>7}/{len:7} {bar.cyan/blue} {percent}", + let pb_style = ProgressStyle::with_template( + "{spinner:.magenta} {wide_msg} {pos:>3}/{len} [{bar:40.yellow}] [{elapsed_precise:.bold.blue}]", + ) + .unwrap() + .progress_chars("█▉▊▋▌▍▎▏ ") + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶", "⣿"]); + // let debug_style = ProgressStyle::with_template("{wide_msg}").unwrap(); + + let image = token.image; + let name = if let Some(name) = token.name { + name + } else if let (Some(collection_name), Some(id)) = (&token.collection_name, &token.token_id) { + format!("{} #{}", collection_name, id) + } else { + return Err(eyre!("Image data not found for {:#?}", token.token_id)); + } + .replace("/", " ") + .replace("\\", " "); + + let msg = format!("{}", &name); let (url, mime) = match image { NftImage::Object { @@ -39,25 +52,40 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R mime_type, size: _, } => (url, mime_type), - NftImage::Url(url) => (url, &None), //meant here + NftImage::Url(url) => (url, None), _ => return Err(eyre!("No image URL found for {name}")), }; - let extension = if url.starts_with("data:image/svg") { "svg".to_string() } else if let Some(mime) = mime { mime.rsplit("/").next().unwrap_or_default().to_string() + } else if url.starts_with("ipfs") { + // This is probably not going to be an image, but let's take a shot and see what happens + // println!("{} {}", name, url); + "png".to_string() + } else if url.starts_with("ens") { + // println!("{} {}", name, url); + return Err(eyre!("{name} is not an image")); } else { - url.rsplit('.').next().unwrap_or_default().to_lowercase() + let ext = url.rsplit('.').next().unwrap_or_default().to_lowercase(); + if ext.len() > 5 { + return Err(eyre!("No suitable extension found for {} {}", name, url)); + } else { + ext + } }; + // TODO: Timeout if download takes too long + // TODO: Maybe panic automatically on unrecognized file types + // TODO: Some SVGs seem to be having issues let file_path = dir.join(format!("{name}.{extension}")); - + if file_path.is_file() { - if DEBUG { - println!("Skipping {name}"); - } - // progress.inc(1); + let pb = mp.insert( + 0, + ProgressBar::new(100).with_message(msg).with_style(pb_style), + ); + pb.finish_with_message(format!("Already saved {name}")); return Ok(()); } @@ -65,123 +93,92 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R println!("Downloading {name} to {:?}", file_path); } - let (progress_tx, mut _progress_rx) = mpsc::channel(10); // Adjust the buffer size as needed - match url { - // Decode and save svg - url if url.starts_with("data:image/svg") => save_base64_image( + if url.starts_with("data:image/svg") { + let pb = mp.insert( + 0, + ProgressBar::new(100).with_message(msg).with_style(pb_style), + ); + decode_and_save( &url.strip_prefix("data:image/svg+xml;base64,") .unwrap_or(&url), file_path, - )?, - // append IPFS gateway - url if url.starts_with("ipfs") => { - let parts: Vec<&str> = url.split('/').collect(); - if let Some(hash) = parts.iter().find(|&&part| part.starts_with("Qm")) { - let ipfs_url = format!("https://ipfs.io/ipfs/{hash}"); - if let Err(error) = download_image(&client, &ipfs_url, file_path, progress_tx).await { - return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); + )?; + pb.finish(); + } else { + let permit = semaphore.acquire_owned().await.unwrap(); + let pb = mp.insert( + 0, + ProgressBar::new(100).with_message(msg).with_style(pb_style), + ); + + let url = if url.starts_with("ipfs") { + // append IPFS gateway + let hash = url + .split('/') + .into_iter() + .find(|&part| part.starts_with("Qm")); + + match hash { + Some(hash) => format!("https://ipfs.io/ipfs/{}", hash), + None => { + // Handle the case where the hash is not found + pb.abandon_with_message(format!("IPFS hash not found in URL for {name}")); + return Err(eyre::eyre!("IPFS hash not found in URL")); } } - } - url => { - if let Err(error) = download_image(&client, &url, file_path, progress_tx).await { - return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); - }; - } - } - - if DEBUG { - println!("{name} saved successfully"); - } - - Ok(()) -} -// async fn get_address() - -pub struct DownloadProgress { - pub name: String, - pub progress: u64, - pub total: u64, -} - -#[derive(Debug)] -pub struct DownloadResult { - file_path: String, - progress: u64, - total: u64, -} - -struct ProgressTracker { - progress: u64, -} -impl ProgressTracker { - fn new() -> Self { - ProgressTracker { progress: 0 } - } + } else { + url.to_owned() + }; - // async fn track_progress( - async fn track_progress> + Unpin>( - &mut self, - index: usize, - mut reader_stream: R, - mut file: File, - progress_tx: &mpsc::Sender<(usize, u64)>, - ) -> Result<()> { - let mut buffer = [0; 8192]; - while let Some(chunk_result) = reader_stream.next().await { - let chunk = match chunk_result { - Ok(chunk) => chunk, - Err(e) => return Err(e.into()), - }; + let client = client.clone(); - let mut cursor = Cursor::new(chunk); - let bytes_read = cursor.read(&mut buffer)?; - file.write_all(&buffer[..bytes_read])?; - self.progress += bytes_read as u64; + tokio::spawn(async move { + // pb.set_position(i); + match download_image(&client, &url, &file_path, &pb).await { + Ok(()) => { + pb.finish_with_message(format!("Completed {name}")); - match progress_tx.try_send((index, self.progress)) { - Ok(_) => { - // The progress update was sent successfully. - } - Err(mpsc::error::TrySendError::Full(_)) => { - // The receiver's buffer is full, you can either: - // 1. Drop the progress update and continue downloading - // 2. Wait for the receiver to process some messages before sending more updates + // Ok(()) } - Err(mpsc::error::TrySendError::Closed(_)) => { - // The receiver was dropped, so we stop sending progress updates. - break; + Err(error) => { + pb.abandon_with_message(format!( + "Download failed: {} to {:?}. {}", + name, file_path, error + )); + // return Err(eyre::eyre!("Error downloading image {}: {}", name, error)); } - } - } - Ok(()) + }; + + drop(permit); + }); } + Ok(()) } async fn download_image( client: &Client, image_url: &str, - file_path: PathBuf, - progress_tx: mpsc::Sender<(u64, u64)>, + file_path: &PathBuf, + pb: &ProgressBar, ) -> Result<()> { let response = client.get(image_url).send().await?; - let content_length = response.content_length().unwrap_or(0); + let content_length = response.content_length().unwrap_or(100); let mut byte_stream = response.bytes_stream(); + pb.set_length(content_length); - let mut progress: u64 = 0; + let mut progress: u64 = 0; let mut file = File::create(file_path)?; while let Some(chunk) = byte_stream.next().await { - let chunk = chunk?; - let chunk_len = chunk.len(); + let chunk = chunk?; + let chunk_len = chunk.len(); - progress += chunk_len as u64; - file.write_all(&chunk) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + progress += chunk_len as u64; + file.write_all(&chunk) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - // Send progress update through the channel - let _ = progress_tx.send((progress, content_length)).await; - } + pb.set_position(progress); + } if content_length != progress { return Err(eyre::eyre!( @@ -192,32 +189,31 @@ async fn download_image( Ok(()) } -pub async fn create_directory(dir_path: &PathBuf) -> Result - { - let res = match fs::metadata(dir_path) { +pub async fn create_directory(dir_path: PathBuf) -> Result { + let copy = dir_path.clone(); + match fs::metadata(copy) { Ok(metadata) => { if !metadata.is_dir() { - return Err(io::Error::new( + Err(io::Error::new( ErrorKind::InvalidInput, format!("{:?} is not a directory", dir_path), ) - .into()); + .into()) + } else { + Ok(dir_path) } - dir_path.to_path_buf() } Err(e) if e.kind() == ErrorKind::NotFound => { - fs::create_dir_all(dir_path)?; - if DEBUG { println!("created directory: {:?}", dir_path);} - dir_path.to_path_buf() + fs::create_dir_all(&dir_path)?; + Ok(dir_path) } Err(e) => { - return Err(e.into()); + Err(e.into()) } - }; - Ok(res) + } } -fn save_base64_image(base64_data: &str, file_path: PathBuf) -> Result<()> { +fn decode_and_save(base64_data: &str, file_path: PathBuf) -> Result<()> { let decoded_data = decode(base64_data)?; let mut file = File::create(file_path)?; file.write_all(&decoded_data)?; @@ -226,8 +222,8 @@ fn save_base64_image(base64_data: &str, file_path: PathBuf) -> Result<()> { #[cfg(test)] mod tests { - /* - use super::*; + /* + use super::*; #[test] async fn resolve() { @@ -239,5 +235,5 @@ mod tests { assert_eq!(result, "0x"); } - */ - } + */ +} diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index ef0e6ce..a070d96 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,18 +1,18 @@ mod download; mod request; + +use download::create_directory; +use request::handle_processing; + use ::core::time; use clap::{Args, Parser, Subcommand}; use console::style; use ethers::utils::hex::encode; use ethers_providers::{Http, Middleware, Provider}; use eyre::Result; -use futures::stream::{self, StreamExt}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use crate::download::{create_directory, handle_download, DownloadResult}; -use crate::request::NftResponse; use reqwest::Client; use std::borrow::Borrow; -use tokio::sync::mpsc; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -57,15 +57,14 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match cli.command { Commands::Create(args) => { + let multi_pb = MultiProgress::new(); let provider = Provider::::try_from(args.rpc)?; let account = match args.address { arg if arg.split(".").last().unwrap() == "eth" => { - // format!("{spinner} {} {msg}", style("INFO").bright()); - let spinner = pending(&multi_pb, "Resolving address...".to_string()); - let address = resolve_ens_name(&arg, &provider).await?; - spinner.finish_with_message(format!("Resolving address: {}", address)); - + let spinner = pending(&multi_pb, "ENS Detected. Resolving address...".to_string()); + let address = resolve_ens_name(&arg, provider).await?; + spinner.finish_with_message(format!("Name Resolved to {address}")); Account { name: Some(arg.to_string()), address, @@ -81,79 +80,37 @@ async fn main() -> Result<()> { style("Invalid address").red() )) } - }; - // Request - let spinner = pending(&multi_pb, "Requesting collection data...".to_string()); - let nodes = NftResponse::request(&account.address) - .await?.tokens.nodes; - spinner - .finish_with_message(format!("Found {} NFTs. Starting download...", nodes.len())); + }; let path = match account.name { - Some(name) => args.path.join(name), - None => args.path.join(account.address), - }; - - match create_directory(&path).await { - Ok(path) => path, - Err(err) => return Err(eyre::eyre!("{} {err}", style("Invalid Path").red())), + Some(name) => { + let spinner = pending(&multi_pb, format!("Saving files to {}", name)); + match create_directory(args.path.join(name)).await { + Ok(path) => { + spinner.finish(); + path + }, + Err(err) => return Err(eyre::eyre!("{} {err}", style("Invalid Path").red())), + } + } + None => args.path.join(&account.address), }; let client = Client::new(); + if let Err(e) = handle_processing(&client, account.address.as_str(), path, args.max_concurrent_downloads).await { + println!("Error: {}", e); + }; - let main_pb = multi_pb.add(ProgressBar::new(nodes.len() as u64)); - main_pb.set_style( - ProgressStyle::with_template( - "Total ({pos:>7}/{len:7}) {wide_bar.cyan/blue} {percent}", - ) - .unwrap() - .progress_chars("█░ "), - ); /* - :: Remove make dependencies after install? [y/N] :: (1/6) ENS Name Detected. Resolving name :: (2/6) Name resolved to 0x21B0...42fa :: (3/6) Saving files to name.eth :: (4/6) Requesting NFT Data :: (5/6) 45 NFTs found. Starting download */ - let (tx, rx): (mpsc::Sender, mpsc::Receiver) = - mpsc::channel(100); - - let tasks = stream::iter(nodes.into_iter().map(|node| { - let pb = multi_pb.insert_before(&main_pb, ProgressBar::new(0)); - pb.set_style( - ProgressStyle::with_template( - "{wide_msg} {pos:>7}/{len:7} {bar.cyan/blue} {percent}", - ) - .unwrap() - .progress_chars("█░ "), - ); - let path = path.clone(); - let client = client.clone(); - let tx = tx.clone(); - async move { - // test_progress(node, tx).await; - handle_download(node, &path, &client).await - } - })) - .buffer_unordered(args.max_concurrent_downloads); - - tasks - .for_each(|result| async { - match result { - Ok(()) => { - // println!("finished with success"); - main_pb.inc(1); - } - Err(_err) => todo!("save output for failed downloads"), // println!("finished with err"), - } - }) - .await; - - main_pb.finish(); Ok(()) } + Commands::Test => { let nodes = vec![ "first".to_string(), @@ -175,123 +132,25 @@ async fn main() -> Result<()> { .progress_chars("█░ "), ); - // let (tx, rx) = mpsc::channel::(100); - - let tasks = total_pb - .wrap_stream(stream::iter(nodes.into_iter().map(|node| { - // let tx = tx.clone(); - async move { - track_task(node, &multi_pb).await - // worker2(node, tx).await } - } - }))) - .buffer_unordered(3) - .collect::>(); - - let _x = tasks.await; - /* tasks - .for_each(|result| async move { - let pb = multi_pb.insert_from_back(1, ProgressBar::new(100)); - pb.set_style( - ProgressStyle::with_template( - "{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ", - ) - .unwrap() - .progress_chars("██ "), - ); - match result { - Ok(res) => { - pb.set_message(res); - - //******** */ - while let Some(recv) = rx.recv().await { - let pos = recv.progress * 100 / recv.total; - pb.set_position(pos); - } - } - Err(_err) => pb.abandon_with_message("Error during download"), - } - }) - .await; */ - // tokio::spawn(tasks); - // Wait for the worker thread to finish and receive the result - // let result = rx.recv().unwrap(); - - // Print the received result - - /* for recv in rx { - pb.set_message(recv.node); - let pos = recv.progress * 100 / recv.total; - - pb.set_position(pos); - // println!("{:?} / {:?} = {:?}", recv.progress, recv.total, pos); - } */ Ok(()) } } } -async fn track_task(node: String, _multi_pb: &MultiProgress) -> Result<()> { - let (tx, rx) = std::sync::mpsc::channel::(); - let pb = ProgressBar::new(100); - let _ = worker2(&node, tx); - pb.set_message(node); - pb.set_style( - ProgressStyle::with_template( - "{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ", - ) - .unwrap() - .progress_chars("██ "), - ); - for recv in rx { - let pos = recv.progress * 100 / recv.total; - - pb.set_position(pos); - // println!("{:?} / {:?} = {:?}", recv.progress, recv.total, pos); - } /* - while let Ok(recv) = rx.recv() { - let pos = recv.progress * 100 / recv.total; - println!("{pos}"); - pb.set_position(pos); - }; */ - Ok(()) -} -#[derive(Debug)] -struct Ret { - progress: u64, - total: u64, -} - -fn worker2(node: &String, progress_tx: std::sync::mpsc::Sender) -> Result<()> { - let total = 100; - for n in 0..total { - std::thread::sleep(tokio::time::Duration::from_millis(10)); - progress_tx - .send(Ret { - progress: n, - total: total, - }) - .unwrap(); - } - Ok(()) -} - -async fn resolve_ens_name(ens_name: &str, provider: &Provider) -> Result { +async fn resolve_ens_name(ens_name: &str, provider: Provider) -> Result { let address = provider.resolve_name(ens_name).await?; Ok(format!("0x{}", encode(address))) } -/* function which wraps a generic action with a spinner then returns it's reult */ +/// Wrapsa generic action with a spinner then return it's result fn pending(multi_pb: &MultiProgress, msg: String) -> ProgressBar { - let spinner = multi_pb.add( - ProgressBar::new_spinner().with_style( - ProgressStyle::default_spinner() - .template("{spinner} {prefix} {msg}") - .unwrap(), // .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"), - ), - ); - - spinner.set_prefix(format!("{}", style("INFO").bold().on_blue())); + // https://github.com/sindresorhus/cli-spinners/blob/main/spinners.json + let style = ProgressStyle::default_spinner() + .template("{spinner:.magenta} {prefix:.bold.blue} {msg}") + .unwrap() + .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶", "⣿"]); + let spinner = multi_pb.add(ProgressBar::new_spinner().with_style(style)); + spinner.set_prefix("INFO"); spinner.set_message(msg); spinner.enable_steady_tick(time::Duration::from_millis(100)); diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index 86681c2..7203593 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -1,9 +1,13 @@ +use crate::download::handle_token; use eyre::{eyre, Result}; +use futures::{stream, StreamExt}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::to_value; +use std::{path::PathBuf, sync::Arc}; +use tokio::sync::Semaphore; -const DEBUG: bool = false; #[derive(Serialize, Deserialize, Debug)] #[serde(untagged)] #[serde(rename_all = "camelCase")] @@ -33,20 +37,19 @@ pub struct NftNode { #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct PageInfo { - end_cursor: String, - has_next_page: bool, + pub end_cursor: Option, + pub has_next_page: bool, limit: i32, } - #[derive(Serialize, Deserialize, Debug)] -pub struct NftTokens { +#[serde(rename_all = "camelCase")] +pub struct NftNodes { pub nodes: Vec, - #[serde(rename = "camelCase")] pub page_info: PageInfo, } #[derive(Serialize, Deserialize, Debug)] pub struct NftData { - pub tokens: NftTokens, + pub tokens: NftNodes, } #[derive(Deserialize, Serialize, Debug)] pub struct FailedRequest { @@ -61,87 +64,162 @@ struct ErrorLocation { } #[derive(Serialize, Deserialize, Debug)] -#[serde(untagged)] -pub enum NftResponse { - Success { data: NftData }, - Error { errors: FailedRequest }, +pub struct NftResponse { + data: Option, + error: Option, } impl NftResponse { fn handle_errors(self) -> Option { - match self { - NftResponse::Success { data } => Some(data), - NftResponse::Error { errors } => { - println!("Errors: {:?}", errors); + match self.data { + Some(data) => Some(data), + None => { + eprintln!("Errors: {:?}", self.error); None } } } +} + +pub async fn fetch_page( + client: &Client, + cursor: Option, + address: &str, +) -> Result { + let cursor = match cursor { + Some(c) => format!(r#", after: "{}""#, c), + None => "".to_owned(), + }; - pub async fn request(address: &str) -> Result { - let query = format!( - r#" - query NFTsForAddress {{ - tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}], - pagination: {{limit: 32}}, - where: {{ownerAddresses: "{}"}}) {{ - nodes {{ - token {{ - tokenId - tokenUrl - collectionName - name - image {{ - url - size - mimeType - }} - metadata - }} - pageInfo {{ - endCursor - hasNextPage - limit - }} - }} - }} - }} - "#, - address - ); + let query = format!( + r#" + query NFTsForAddress {{ + tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}], + pagination: {{limit: 20 {} }}, + where: {{ownerAddresses: "{}"}}) {{ + nodes {{ + token {{ + tokenId + tokenUrl + collectionName + name + image {{ + url + size + mimeType + }} + }} + }} + pageInfo {{ + endCursor + hasNextPage + limit + }} + }} + }} + "#, + cursor, address + ); - let request_body = to_value(serde_json::json!({ + let request_body = to_value(serde_json::json!({ "query": query, "variables": null, - }))?; + }))?; - let response = Client::new() - .post("https://api.zora.co/graphql") - .json(&request_body) - .send() - .await - .map_err(|err| eyre!("Failed to send request: {}", err))?; - let mut response_body = response.bytes_stream(); + let response = client + .post("https://api.zora.co/graphql") + .json(&request_body) + .send() + .await + .map_err(|err| eyre!("Failed to send request: {}", err))?; + let mut response_body = response.bytes_stream(); - let mut response_data = Vec::new(); - while let Some(item) = futures::StreamExt::next(&mut response_body).await { - let chunk = item.map_err(|err| eyre!("Failed to read response: {}", err))?; - response_data.extend_from_slice(&chunk); - } + let mut response_data = Vec::new(); + while let Some(item) = futures::StreamExt::next(&mut response_body).await { + let chunk = item.map_err(|err| eyre!("Failed to read response: {}", err))?; + response_data.extend_from_slice(&chunk); + } - let response_str = String::from_utf8(response_data) - .map_err(|err| eyre!("Failed to convert response to string: {}", err))?; - if DEBUG { - println!("{}", &response_str); - } - let response: NftResponse = serde_json::from_str(&response_str) - .map_err(|err| eyre!("Failed to parse JSON response: {}", err))?; + let response_str = String::from_utf8(response_data) + .map_err(|err| eyre!("Failed to convert response to string: {}", err))?; - let data = response.handle_errors().unwrap(); - if DEBUG { - println!("{:#?}", &data); + let response: NftResponse = serde_json::from_str(&response_str) + .map_err(|err| eyre!("Failed to parse JSON response: {}", err))?; + // println!("{:?}", response); + + let data = response.handle_errors().unwrap(); + Ok(data.tokens) +} + +enum PageResult { + Data(NftToken), + Completed, +} +pub async fn handle_processing(client: &Client, address: &str, path: PathBuf, max: usize) -> eyre::Result<()> { + let cursor = None; + let requests = stream::unfold(cursor, move |cursor| async move { + match fetch_page(&client, cursor, address).await { + Ok(response) => { + let items = stream::iter(response.nodes.into_iter().map(move |node| { + if response.page_info.has_next_page { + PageResult::Data(node.token) + } else { + PageResult::Completed + } + })); + let next_cursor = if response.page_info.has_next_page { + response.page_info.end_cursor + } else { + None + }; + // Max 30 requests per min to public Zora API + std::thread::sleep(std::time::Duration::from_millis(2000)); + Some((items, next_cursor)) + } + Err(err) => { + eprintln!("Error fetching data: {}", err); + None + } } + }) + .flatten(); + tokio::pin!(requests); - Ok(data) + let semaphore = Arc::new(Semaphore::new(max)); + let mp = MultiProgress::new(); + + mp.set_alignment(indicatif::MultiProgressAlignment::Bottom); + let total_pb = mp.add(ProgressBar::new(0)); + total_pb.set_style(ProgressStyle::with_template( + "Found: {len:>3.bold.blue} Saved: {pos:>3.bold.blue} {msg}", + ) + .unwrap() + .progress_chars("█▉▊▋▌▍▎▏ ")); + // .tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"])); + // total_pb.set_message("Complete"); + + + while let Some(token) = requests.next().await { + total_pb.inc_length(1); + // let url = token.token_url.unwrap(); + match token { + PageResult::Data(token) => { + // println!("Sending {:?}", token.name); + match handle_token(Arc::clone(&semaphore), token, &client, &mp, &path).await { + Ok(()) => { + total_pb.inc(1); + } + Err(err) => { + total_pb.println(format!("{}", err)); + } + } + } + PageResult::Completed => { + // total_pb.abandon_with_message("Completed Sucessfully"); + total_pb.abandon(); + return Ok(()) + }, + } } + Ok(()) }