Skip to content

Commit

Permalink
DOWNLOADS!!*
Browse files Browse the repository at this point in the history
*mostly
  • Loading branch information
sameoldlab committed Jun 28, 2024
1 parent c1050ea commit f72e404
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 116 deletions.
175 changes: 69 additions & 106 deletions nft-folder/src/download.rs
Original file line number Diff line number Diff line change
@@ -1,91 +1,49 @@
use crate::request::{NftImage, NftNode, NftToken};
use crate::request::{NftImage, NftToken};

use base64::decode;
use eyre::{eyre, Result};
use futures::{stream::StreamExt, Stream};
use futures::stream::StreamExt;
use reqwest::Client;
use std::fmt::format;
use std::sync::Arc;
use std::{fs, io::Cursor, path::PathBuf, time::Duration};
use std::{fs, path::PathBuf};
use std::{
fs::File,
io::{self, ErrorKind, Read, Write},
io::{self, ErrorKind, Write},
};
use tokio::{sync::mpsc, time::sleep};
use tokio_util::bytes::Bytes;

use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use tokio::sync::Semaphore;

pub async fn download_image(semaphore: Arc<Semaphore>, data: NftToken, mp: &MultiProgress) {
// println!("==============================================");
// let mp = mp.clone();
let pb = mp.add(ProgressBar::new(100));
tokio::spawn(async move {
let permit = semaphore.acquire().await.unwrap();
// println!("{:#?}", data);
const DEBUG: bool = false;

pb.set_style(
ProgressStyle::with_template(
pub async fn handle_token(
semaphore: Arc<Semaphore>,
token: NftToken,
client: &Client,
mp: &MultiProgress,
dir: &PathBuf,
) -> Result<()> {
let pb_style = ProgressStyle::with_template(
"{spinner:.magenta} {wide_msg} [{elapsed_precise:.bold.blue}] [{bar:40.yellow/}] {pos:>3}/{len} ({eta:>3})",
)
.unwrap()
.progress_chars("█▉▊▋▌▍▎▏ ")
.tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"])
);
let name = data.name.unwrap();
pb.set_message(format!("downloading {}", &name));
// println!("Get image {} on t:{t}", url);
// println!("starting download for {}", &name);
for i in 0..100 {
let r: u64 = ethers::core::rand::random::<u64>() / 60009360303000000;
std::thread::sleep(Duration::from_millis(r));
pb.set_position(i);
// println!("downloading {} | progress {i}%", &name);
}
// println!("url: {url}");
// println!("Downloaded {} on t:{t}", url);
pb.finish();
// println!("==============================================");
drop(permit); //`semaphore` dropped here while still borrowed
});
}
/*
pub async fn test_progress(node: NftNode, progress_tx: mpsc::Sender<DownloadResult>) {
let file_path = match node.token.name {
Some(name) => name,
None => "d".to_string(),
};
.progress_chars("█▉▊▋▌▍▎▏ ")
.tick_strings(&["⣼", "⣹", "⢻", "⠿", "⡟", "⣏", "⣧", "⣶"]);

for n in 0..100 {
sleep(Duration::from_millis(500)).await;
let file_path = file_path.clone();
progress_tx
.send(DownloadResult {
file_path,
progress: n,
total: 100,
})
.await
.unwrap();
}
}
const DEBUG: bool = false;
pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> Result<()> {
/* Pin<Box<dyn Stream<Item = Result<DownloadResult>>>> */
let image = &node.token.image;
let name = match &node.token.name {
let image = token.image;
let name = match token.name {
Some(name) => name,
None => return Err(eyre!("Image data not found for {:#?}", node)),
None => return Err(eyre!("Image data not found for {:#?}", token.token_id)),
};
let msg = format!("downloading {}", &name);

let (url, mime) = match image {
NftImage::Object {
url,
mime_type,
size: _,
} => (url, mime_type),
NftImage::Url(url) => (url, &None), //meant here
NftImage::Url(url) => (url, None),
_ => return Err(eyre!("No image URL found for {name}")),
};

Expand All @@ -100,73 +58,80 @@ pub async fn handle_download(node: NftNode, dir: &PathBuf, client: &Client) -> R
let file_path = dir.join(format!("{name}.{extension}"));

if file_path.is_file() {
if DEBUG {
println!("Skipping {name}");
}
// progress.inc(1);
let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style));
pb.finish_with_message(format!("Already downloaded {name}"));
return Ok(());
}

if DEBUG {
println!("Downloading {name} to {:?}", file_path);
}

let (progress_tx, mut _progress_rx) = mpsc::channel(10); // Adjust the buffer size as needed
match url {
// Decode and save svg
url if url.starts_with("data:image/svg") => save_base64_image(
if url.starts_with("data:image/svg") {
let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style));
decode_and_save(
&url.strip_prefix("data:image/svg+xml;base64,")
.unwrap_or(&url),
file_path,
)?,
// append IPFS gateway
url if url.starts_with("ipfs") => {
)?;
pb.finish();
} else {
let permit = semaphore.acquire_owned().await.unwrap();
let pb = mp.add(ProgressBar::new(100).with_message(msg).with_style(pb_style));

let url = if url.starts_with("ipfs") {
// append IPFS gateway
let parts: Vec<&str> = url.split('/').collect();
if let Some(hash) = parts.iter().find(|&&part| part.starts_with("Qm")) {
let ipfs_url = format!("https://ipfs.io/ipfs/{hash}");
if let Err(error) = download_image(&client, &ipfs_url, file_path, progress_tx).await
{
return Err(eyre::eyre!("Error downloading image {}: {}", name, error));
}
let hash = parts.iter().find(|&&part| part.starts_with("Qm"));

// Handle the case where the hash is not found
match hash {
Some(hash) => format!("https://ipfs.io/ipfs/{}", hash),
None => {
// pb.finish_with_message(format!("IPFS hash not found in URL for {name}"));
return Err(eyre::eyre!("IPFS hash not found in URL"));
} //if a single image fails I want to finish it immediately without disrupting other ongoing processess
}
}
url => {
if let Err(error) = download_image(&client, &url, file_path, progress_tx).await {
return Err(eyre::eyre!("Error downloading image {}: {}", name, error));
} else {
url.to_owned()
};

let client = client.clone();
let name_cp = name.clone();

tokio::spawn(async move {
// pb.set_position(i);
match download_image(&client, &url, file_path, &pb).await {
Ok(()) => pb.finish(),
Err(error) => {
pb.finish_with_message(format!(
"Error downloading image {}: {}",
name_cp, error
));
// return Err(eyre::eyre!("Error downloading image {}: {}", name, error));
}
};
}

drop(permit);
});
}

if DEBUG {
println!("{name} saved successfully");
}
Ok(())
}
// async fn get_address()
pub struct DownloadProgress {
pub name: String,
pub progress: u64,
pub total: u64,
}
#[derive(Debug)]
pub struct DownloadResult {
file_path: String,
progress: u64,
total: u64,
}

async fn download_image(
client: &Client,
image_url: &str,
file_path: PathBuf,
progress_tx: mpsc::Sender<(u64, u64)>,
pb: &ProgressBar
) -> Result<()> {
let response = client.get(image_url).send().await?;
let content_length = response.content_length().unwrap_or(0);
let content_length = response.content_length().unwrap_or(100);
let mut byte_stream = response.bytes_stream();
pb.set_length(content_length);

let mut progress: u64 = 0;
let mut file = File::create(file_path)?;
Expand All @@ -179,8 +144,7 @@ async fn download_image(
file.write_all(&chunk)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;

// Send progress update through the channel
let _ = progress_tx.send((progress, content_length)).await;
pb.set_position(progress);
}

if content_length != progress {
Expand Down Expand Up @@ -218,7 +182,7 @@ pub async fn create_directory(dir_path: &PathBuf) -> Result<PathBuf> {
Ok(res)
}

fn save_base64_image(base64_data: &str, file_path: PathBuf) -> Result<()> {
fn decode_and_save(base64_data: &str, file_path: PathBuf) -> Result<()> {
let decoded_data = decode(base64_data)?;
let mut file = File::create(file_path)?;
file.write_all(&decoded_data)?;
Expand All @@ -242,4 +206,3 @@ mod tests {
}
*/
}
*/
24 changes: 19 additions & 5 deletions nft-folder/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
mod request;
mod download;

use std::path::PathBuf;

use download::create_directory;
use eyre::Result;
use request::handle_processing;
use reqwest::Client;

#[tokio::main]
async fn main() {
async fn main() -> Result<()>{
let client = Client::new();
let address = "0x495f947276749Ce646f68AC8c248420045cb7b5e";

if let Err(e) = handle_processing(&client, address).await {
println!("Error: {}", e);
let address = "0xa3a4548b39da96eb065ff91811ca30da40431c0d";
let mut path = PathBuf::from("test");
path.push(&address);
println!("{:#?}", &path);

match create_directory(&path).await {
Ok(path) => {
if let Err(e) = handle_processing(&client, address, path).await {
println!("Error: {}", e);
};
}
Err(err) => return Err(err)
}

Ok(())
}
10 changes: 5 additions & 5 deletions nft-folder/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::download::download_image;
use crate::download::handle_token;
use eyre::{eyre, Result};
use futures::{stream, StreamExt};
use indicatif::{MultiProgress, ProgressBar};
use indicatif::MultiProgress;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::to_value;
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use tokio::sync::Semaphore;

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -152,7 +152,7 @@ pub async fn fetch_page(
Ok(data.tokens)
}

pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<()> {
pub async fn handle_processing(client: &Client, address: &str, path: PathBuf) -> eyre::Result<()> {
let semaphore = Arc::new(Semaphore::new(4));
let mp = MultiProgress::new();

Expand Down Expand Up @@ -181,7 +181,7 @@ pub async fn handle_processing(client: &Client, address: &str) -> eyre::Result<(

while let Some(token) = requests.next().await {
// let url = token.token_url.unwrap();
download_image(Arc::clone(&semaphore), token, &mp).await;
let _ = handle_token(Arc::clone(&semaphore), token, &client, &mp, &path).await;
}

Ok(())
Expand Down

0 comments on commit f72e404

Please sign in to comment.