diff --git a/nft-folder/src/main.rs b/nft-folder/src/main.rs index a1b4925..1450ebb 100644 --- a/nft-folder/src/main.rs +++ b/nft-folder/src/main.rs @@ -1,10 +1,9 @@ // mod download; mod request; -use futures::StreamExt; -use tokio::sync::{Semaphore, SemaphorePermit}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use request::request; +use request::handle_processing; +use reqwest::Client; fn download_image(url: &String, mp: &MultiProgress) { println!("spawning thread"); @@ -31,36 +30,11 @@ fn download_image(url: &String, mp: &MultiProgress) { } #[tokio::main] -async fn main() -> Result<(), Box> { - let (s, r) = futures::channel::mpsc::unbounded(); - - let mp = MultiProgress::new(); - - let receiver_stream = r.for_each(move |url| { - let mp = mp.clone(); - tokio::spawn(async move { - while let Some(url) = &r.next().await { - download_image(url, &mp); - } - }); - }); - +async fn main() { + let client = Client::new(); let address = "0x495f947276749Ce646f68AC8c248420045cb7b5e"; - let client = reqwest::Client::new(); - let stream = request(&client, &address).await; - tokio::pin!(stream); - - while let Some(result) = stream.next().await { - let token = result?; - println!("received token: {:?}", token); - let url = token.token_url.unwrap(); - if let Err(e) = s.unbounded_send(url) { - eprintln!("Error sending url to download task: {}", e); - } + if let Err(e) = handle_processing(&client, address).await { + println!("Error: {}", e); } - - drop(s); - - Ok(()) } diff --git a/nft-folder/src/request.rs b/nft-folder/src/request.rs index fe318df..4b243b7 100644 --- a/nft-folder/src/request.rs +++ b/nft-folder/src/request.rs @@ -1,5 +1,5 @@ use eyre::{eyre, Result}; -use futures::{stream, Stream, StreamExt}; +use futures::{stream, StreamExt}; use reqwest::Client; use serde::{Deserialize, Serialize}; use serde_json::to_value; @@ -35,14 +35,14 @@ pub struct NftNode { #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct PageInfo { - end_cursor: Option, - has_next_page: bool, + pub end_cursor: Option, + pub has_next_page: bool, limit: i32, } #[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] pub struct NftNodes { pub nodes: Vec, - #[serde(rename = "camelCase")] pub page_info: PageInfo, } #[derive(Serialize, Deserialize, Debug)] @@ -62,27 +62,30 @@ struct ErrorLocation { } #[derive(Serialize, Deserialize, Debug)] -#[serde(untagged)] -pub enum NftResponse { - Success { data: NftData }, - Error { errors: FailedRequest }, +pub struct NftResponse { + data: Option, + error: Option, } impl NftResponse { fn handle_errors(self) -> Option { - match self { - NftResponse::Success { data } => Some(data), - NftResponse::Error { errors } => { - eprintln!("Errors: {:?}", errors); + match self.data { + Some(data) => Some(data), + None => { + eprintln!("Errors: {:?}", self.error); None } } } } -async fn fetch_page(client: &Client, cursor: Option, address: &str) -> Result { +pub async fn fetch_page( + client: &Client, + cursor: Option, + address: &str, +) -> Result { let cursor = match cursor { - Some(c) => format!(r#"after: "{}"""#, c), + Some(c) => format!(r#", after: "{}""#, c), None => "".to_owned(), }; @@ -90,7 +93,7 @@ async fn fetch_page(client: &Client, cursor: Option, address: &str) -> R r#" query NFTsForAddress {{ tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}], - pagination: {{limit: 20, {} }}, + pagination: {{limit: 2 {} }}, where: {{ownerAddresses: "{}"}}) {{ nodes {{ token {{ @@ -145,41 +148,39 @@ async fn fetch_page(client: &Client, cursor: Option, address: &str) -> R })?; let data = response.handle_errors().unwrap(); - - - /* if data.tokens.page_info.has_next_page == false { - let _ = sender.send(QueryResult::Finished); - drop(sender); - // return; - } else { - let _ = sender.send(QueryResult::Data(data.tokens.nodes)); - } */ - Ok(data.tokens) } -pub async fn request<'a>( +pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<()> { client: &'a Client, address: &'a str, ) -> impl Stream> + 'a { let cursor = None; - - stream::unfold(cursor, move |cursor| async move { + let requests = stream::unfold(cursor, move |cursor| async move { match fetch_page(&client, cursor, address).await { Ok(response) => { - println!("SUCCESS"); - println!("SUCCESS"); - println!("SUCCESS"); - let items = stream::iter(response.nodes.into_iter().map(|node| Ok(node.token))); + let items = stream::iter(response.nodes.into_iter().map(|node| node.token)); let next_cursor = if response.page_info.has_next_page { response.page_info.end_cursor } else { None }; + // Max 30 requests per min to public Zora API + std::thread::sleep(std::time::Duration::from_millis(2000)); Some((items, next_cursor)) } - Err(_) => None, + Err(err) => { + eprintln!("Error fetching data: {}", err); + None + } } }) - .flatten() + .flatten(); + tokio::pin!(requests); + + while let Some(data) = requests.next().await { + println!("{:#?}", data); + } + + Ok(()) }