Skip to content

Commit

Permalink
no more lifetimes, moved request logic handle_processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sameoldlab committed Jun 28, 2024
1 parent 6ccf237 commit 005c5e7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 66 deletions.
38 changes: 6 additions & 32 deletions nft-folder/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// mod download;
mod request;

use futures::StreamExt;
use tokio::sync::{Semaphore, SemaphorePermit};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use request::request;
use request::handle_processing;
use reqwest::Client;

fn download_image(url: &String, mp: &MultiProgress) {
println!("spawning thread");
Expand All @@ -31,36 +30,11 @@ fn download_image(url: &String, mp: &MultiProgress) {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (s, r) = futures::channel::mpsc::unbounded();

let mp = MultiProgress::new();

let receiver_stream = r.for_each(move |url| {
let mp = mp.clone();
tokio::spawn(async move {
while let Some(url) = &r.next().await {
download_image(url, &mp);
}
});
});

async fn main() {
let client = Client::new();
let address = "0x495f947276749Ce646f68AC8c248420045cb7b5e";
let client = reqwest::Client::new();

let stream = request(&client, &address).await;
tokio::pin!(stream);

while let Some(result) = stream.next().await {
let token = result?;
println!("received token: {:?}", token);
let url = token.token_url.unwrap();
if let Err(e) = s.unbounded_send(url) {
eprintln!("Error sending url to download task: {}", e);
}
if let Err(e) = handle_processing(&client, address).await {
println!("Error: {}", e);
}

drop(s);

Ok(())
}
69 changes: 35 additions & 34 deletions nft-folder/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use eyre::{eyre, Result};
use futures::{stream, Stream, StreamExt};
use futures::{stream, StreamExt};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::to_value;
Expand Down Expand Up @@ -35,14 +35,14 @@ pub struct NftNode {
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PageInfo {
end_cursor: Option<String>,
has_next_page: bool,
pub end_cursor: Option<String>,
pub has_next_page: bool,
limit: i32,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct NftNodes {
pub nodes: Vec<NftNode>,
#[serde(rename = "camelCase")]
pub page_info: PageInfo,
}
#[derive(Serialize, Deserialize, Debug)]
Expand All @@ -62,35 +62,38 @@ struct ErrorLocation {
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum NftResponse {
Success { data: NftData },
Error { errors: FailedRequest },
pub struct NftResponse {
data: Option<NftData>,
error: Option<FailedRequest>,
}

impl NftResponse {
fn handle_errors(self) -> Option<NftData> {
match self {
NftResponse::Success { data } => Some(data),
NftResponse::Error { errors } => {
eprintln!("Errors: {:?}", errors);
match self.data {
Some(data) => Some(data),
None => {
eprintln!("Errors: {:?}", self.error);
None
}
}
}
}

async fn fetch_page(client: &Client, cursor: Option<String>, address: &str) -> Result<NftNodes> {
pub async fn fetch_page(
client: &Client,
cursor: Option<String>,
address: &str,
) -> Result<NftNodes> {
let cursor = match cursor {
Some(c) => format!(r#"after: "{}"""#, c),
Some(c) => format!(r#", after: "{}""#, c),
None => "".to_owned(),
};

let query = format!(
r#"
query NFTsForAddress {{
tokens(networks: [{{network: ETHEREUM, chain: MAINNET}}],
pagination: {{limit: 20, {} }},
pagination: {{limit: 2 {} }},
where: {{ownerAddresses: "{}"}}) {{
nodes {{
token {{
Expand Down Expand Up @@ -145,41 +148,39 @@ async fn fetch_page(client: &Client, cursor: Option<String>, address: &str) -> R
})?;

let data = response.handle_errors().unwrap();


/* if data.tokens.page_info.has_next_page == false {
let _ = sender.send(QueryResult::Finished);
drop(sender);
// return;
} else {
let _ = sender.send(QueryResult::Data(data.tokens.nodes));
} */

Ok(data.tokens)
}

pub async fn request<'a>(
pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<()> {
client: &'a Client,
address: &'a str,
) -> impl Stream<Item = eyre::Result<NftToken>> + 'a {
let cursor = None;

stream::unfold(cursor, move |cursor| async move {
let requests = stream::unfold(cursor, move |cursor| async move {
match fetch_page(&client, cursor, address).await {
Ok(response) => {
println!("SUCCESS");
println!("SUCCESS");
println!("SUCCESS");
let items = stream::iter(response.nodes.into_iter().map(|node| Ok(node.token)));
let items = stream::iter(response.nodes.into_iter().map(|node| node.token));
let next_cursor = if response.page_info.has_next_page {
response.page_info.end_cursor
} else {
None
};
// Max 30 requests per min to public Zora API
std::thread::sleep(std::time::Duration::from_millis(2000));
Some((items, next_cursor))
}
Err(_) => None,
Err(err) => {
eprintln!("Error fetching data: {}", err);
None
}
}
})
.flatten()
.flatten();
tokio::pin!(requests);

while let Some(data) = requests.next().await {
println!("{:#?}", data);
}

Ok(())
}

0 comments on commit 005c5e7

Please sign in to comment.