Skip to content

Commit

Permalink
basic "error handling" for failed requests
Browse files Browse the repository at this point in the history
  • Loading branch information
sameoldlab committed Apr 28, 2024
1 parent f6bdccc commit 15e8aa7
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 9 deletions.
52 changes: 47 additions & 5 deletions nft-folder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,21 +254,56 @@ pub struct NftToken {
pub struct NftNode {
token: NftToken,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PageInfo {
end_cursor: String,
has_next_page: bool,
limit: i32,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct NftTokens {
pub nodes: Vec<NftNode>,
#[serde(rename = "camelCase")]
pub page_info: PageInfo,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct NftData {
pub tokens: NftTokens,
}
#[derive(Deserialize, Serialize, Debug)]
pub struct FailedRequest {
message: String,
locations: Vec<ErrorLocation>,
path: Vec<String>,
}
#[derive(Deserialize, Serialize, Debug)]
struct ErrorLocation {
line: u64,
column: u64,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct NftResponse {
pub data: NftData,
#[serde(untagged)]
pub enum NftResponse {
Success { data: NftData },
Error { errors: FailedRequest},
}


impl NftResponse {
pub async fn request(address: &str) -> Result<NftResponse> {
fn handle_errors(self) -> Option<NftData> {
match self {
NftResponse::Success { data } => Some(data),
NftResponse::Error { errors } => {
println!("Errors: {:?}", errors);
None
}
}
}

pub async fn request(address: &str) -> Result<NftData> {
let query = format!(
r#"
query NFTsForAddress {{
Expand All @@ -288,6 +323,11 @@ impl NftResponse {
}}
metadata
}}
pageInfo {{
endCursor
hasNextPage
limit
}}
}}
}}
}}
Expand Down Expand Up @@ -321,11 +361,13 @@ impl NftResponse {
}
let response: NftResponse = serde_json::from_str(&response_str)
.map_err(|err| eyre!("Failed to parse JSON response: {}", err))?;

let data = response.handle_errors().unwrap();
if DEBUG {
println!("{:#?}", &response.data.tokens.nodes);
println!("{:#?}", &data);
}

Ok(response)
Ok(data)
}
}

Expand Down
126 changes: 122 additions & 4 deletions nft-folder/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use futures::stream::{self, StreamExt};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use nft_folder::{self, create_directory, handle_download, NftResponse};
use reqwest::Client;
use std::borrow::Borrow;
use tokio::sync::mpsc;

#[derive(Parser)]
#[command(version, about, long_about = None)]
Expand All @@ -21,6 +23,7 @@ struct Cli {
enum Commands {
/// Create a folder for the provided address
Create(CreateArgs),
Test,
}

#[derive(Args)]
Expand Down Expand Up @@ -79,10 +82,7 @@ async fn main() -> Result<()> {
// Request
let spinner = pending(&multi_pb, "Requesting collection data...".to_string());
let nodes = NftResponse::request(&account.address)
.await?
.data
.tokens
.nodes;
.await?.tokens.nodes;
spinner
.finish_with_message(format!("Found {} NFTs. Starting download...", nodes.len()));

Expand Down Expand Up @@ -151,6 +151,124 @@ async fn main() -> Result<()> {
main_pb.finish();
Ok(())
}
Commands::Test => {
let nodes = vec![
"first".to_string(),
"second".to_string(),
"third".to_string(),
"fourth node".to_string(),
];

// indicatif Multiprogress
// Tracks total progress of nodes
let multi_pb = MultiProgress::new();
let multi_pb = multi_pb.borrow();
let total_pb = multi_pb.add(ProgressBar::new(nodes.len().try_into()?));
total_pb.set_style(
ProgressStyle::with_template(
"Total [{pos:>}/{len:>}] {elapsed:>} {bar:40} {percent:>3}% ",
)
.unwrap()
.progress_chars("█░ "),
);

// let (tx, rx) = mpsc::channel::<Ret>(100);

let tasks = total_pb
.wrap_stream(stream::iter(nodes.into_iter().map(|node| {
// let tx = tx.clone();
async move {
track_task(node, &multi_pb).await
// worker2(node, tx).await }
}
})))
.buffer_unordered(3)
.collect::<Vec<_>>();

let _x = tasks.await;
/* tasks
.for_each(|result| async move {
let pb = multi_pb.insert_from_back(1, ProgressBar::new(100));
pb.set_style(
ProgressStyle::with_template(
"{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ",
)
.unwrap()
.progress_chars("██ "),
);
match result {
Ok(res) => {
pb.set_message(res);
//******** */
while let Some(recv) = rx.recv().await {
let pos = recv.progress * 100 / recv.total;
pb.set_position(pos);
}
}
Err(_err) => pb.abandon_with_message("Error during download"),
}
})
.await; */
// tokio::spawn(tasks);
// Wait for the worker thread to finish and receive the result
// let result = rx.recv().unwrap();

// Print the received result

/* for recv in rx {
pb.set_message(recv.node);
let pos = recv.progress * 100 / recv.total;
pb.set_position(pos);
// println!("{:?} / {:?} = {:?}", recv.progress, recv.total, pos);
} */
Ok(())
}
}
}
async fn track_task(node: String, _multi_pb: &MultiProgress) -> Result<()> {
let (tx, rx) = std::sync::mpsc::channel::<Ret>();

let pb = ProgressBar::new(100);
let _ = worker2(&node, tx);
pb.set_message(node);
pb.set_style(
ProgressStyle::with_template(
"{wide_msg:!} {bytes_per_sec} {elapsed:>} {bar:40} {percent:>3}% ",
)
.unwrap()
.progress_chars("██ "),
);
for recv in rx {
let pos = recv.progress * 100 / recv.total;

pb.set_position(pos);
// println!("{:?} / {:?} = {:?}", recv.progress, recv.total, pos);
} /*
while let Ok(recv) = rx.recv() {
let pos = recv.progress * 100 / recv.total;
println!("{pos}");
pb.set_position(pos);
}; */
Ok(())
}
#[derive(Debug)]
struct Ret {
progress: u64,
total: u64,
}

fn worker2(node: &String, progress_tx: std::sync::mpsc::Sender<Ret>) -> Result<()> {
let total = 100;
for n in 0..total {
std::thread::sleep(tokio::time::Duration::from_millis(10));
progress_tx
.send(Ret {
progress: n,
total: total,
})
.unwrap();
}
Ok(())
}
Expand Down

0 comments on commit 15e8aa7

Please sign in to comment.