diff --git a/Cargo.toml b/Cargo.toml index cd34e962..063403b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ chateau = {git = "https://github.com/catenarytransit/chateau"} dmfr-folder-reader = {git = "https://github.com/catenarytransit/dmfr-folder-reader"} tokio-zookeeper = "0.2.1" uuid = "1.8.0" +zip = "0.6.6" [[bin]] name = "maple" diff --git a/migrations/20240110110356_init.sql b/migrations/20240110110356_init.sql index eee9acfb..85fcbae9 100644 --- a/migrations/20240110110356_init.sql +++ b/migrations/20240110110356_init.sql @@ -12,6 +12,8 @@ CREATE TABLE IF NOT EXISTS gtfs.static_download_attempts ( downloaded_unix_time_ms bigint NOT NULL, ingested boolean NOT NULL, failed boolean NOT NULL, + ingestion_version integer NOT NULL, + mark_for_redo boolean NOT NULL, PRIMARY KEY (onestop_feed_id, downloaded_unix_time_ms) ); diff --git a/src/maple/gtfs_handlers/flatten.rs b/src/maple/gtfs_handlers/flatten.rs index be852e24..cfffcf24 100644 --- a/src/maple/gtfs_handlers/flatten.rs +++ b/src/maple/gtfs_handlers/flatten.rs @@ -2,7 +2,7 @@ use std::error::Error; use std::fs; //use std::path::PathBuf; -fn flatten_feed(feed_id: &str, zip_name: &str) -> Result<(), Box> { +fn flatten_feed(feed_id: &str) -> Result<(), Box> { let _ = fs::create_dir("gtfs_uncompressed"); // unzip diff --git a/src/maple/gtfs_handlers/mod.rs b/src/maple/gtfs_handlers/mod.rs index f9af3035..92624ef8 100644 --- a/src/maple/gtfs_handlers/mod.rs +++ b/src/maple/gtfs_handlers/mod.rs @@ -1,3 +1,14 @@ mod colour_correction; mod convex_hull; -mod flatten; \ No newline at end of file +mod flatten; + +#[derive(Debug, Clone)] +pub struct DownloadAttempt { + pub onestop_feed_id: String, + pub file_hash: String, + pub downloaded_unix_time_ms: i64, + pub ingested: bool, + pub failed: bool, + pub mark_for_redo: bool, + pub ingestion_version: i32, +} \ No newline at end of file diff --git a/src/maple/main.rs b/src/maple/main.rs index a441c8bc..97e5675d 100644 --- a/src/maple/main.rs +++ b/src/maple/main.rs @@ -27,6 +27,8 @@ use crate::transitland_download::DownloadedFeedsInformation; async fn run_ingest() -> Result<(), Box> { + const maple_ingestion_version: i32 = 1; + // TODO! Ensure git submodule transitland-atlas downloads and updates correctly //These feeds should be discarded because they are duplicated in a larger dataset called `f-sf~bay~area~rg`, which has everything in a single zip file @@ -114,6 +116,8 @@ async fn run_ingest() -> Result<(), Box> { } //determine if the old one should be deleted, if so, delete it + } else { + eprintln!("Not enough data in transitland!"); } Ok(()) diff --git a/src/maple/transitland_download.rs b/src/maple/transitland_download.rs index 1486ce60..fec0d051 100644 --- a/src/maple/transitland_download.rs +++ b/src/maple/transitland_download.rs @@ -10,22 +10,16 @@ use std::io::copy; use std::io::Write; use std::sync::Arc; use std::time::SystemTime; +use std::time::Duration; use std::time::UNIX_EPOCH; +use crate::gtfs_handlers::DownloadAttempt; #[derive(Clone)] struct StaticFeedToDownload { - feed_id: String, - url: String, + pub feed_id: String, + pub url: String, } -#[derive(Debug, Clone)] -pub struct DownloadAttempt { - pub onestop_feed_id: String, - pub file_hash: String, - pub downloaded_unix_time_ms: i64, - pub ingested: bool, - pub failed: bool, -} //Written by Kyler Chin //You are required under the APGL license to retain this annotation @@ -66,7 +60,7 @@ pub async fn download_return_eligible_feeds( transitland_meta: &ReturnDmfrAnalysis, pool: &sqlx::Pool, ) -> Result, ()> { - let threads: usize = 32; + let threads: usize = 16; let _ = fs::create_dir("gtfs_static_zips"); @@ -81,134 +75,163 @@ pub async fn download_return_eligible_feeds( } && feed.urls.static_current.is_some()).map(|(string, feed)| StaticFeedToDownload { feed_id: feed.id.clone(), url: feed.urls.static_current.as_ref().unwrap().to_string(), - }); + }).collect::>(); + + let download_progress: Arc> = Arc::new(std::sync::Mutex::new(0)); + let total_feeds_to_download = feeds_to_download.len(); let static_fetches = //perform the downloads as a future stream, so only the thread count is allowed - futures::stream::iter(feeds_to_download.into_iter().map(|staticfeed| async move { - - //allow various compression algorithms to be used during the download process, as enabled in Cargo.toml - let client = reqwest::ClientBuilder::new() - .deflate(true) - .gzip(true) - .brotli(true) - .build() - .unwrap(); - - let request = client.get(&staticfeed.url); - - //calculate how long the download takes - let start = SystemTime::now(); - let current_unix_ms_time = start - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis(); - - let response = request.send().await; - - let duration = SystemTime::now() - .duration_since(start) - .expect("Time went backwards") - .as_millis(); - - // say that the download state was unsuccessful by default, and insert the duration - let mut answer = DownloadedFeedsInformation { - feed_id: staticfeed.feed_id.clone(), - url: staticfeed.url.clone(), - hash: None, - download_timestamp_ms: current_unix_ms_time as u64, - operation_success: false, - ingest: false, - byte_size: None, - duration_download: Some(duration as u64), - http_response_code: None, - }; - - match response { - // The download request did return a response and the connection did not drop - Ok(response) => { - answer.http_response_code = Some(response.status().as_str().to_string()); - let mut out = File::create(format!( - "gtfs_static_zips/{}.zip", - staticfeed.feed_id.clone() - )) - .expect("failed to create file"); - - // get raw bytes - let bytes_result = response.bytes().await; - - if let Ok(bytes_result) = bytes_result { - let data = bytes_result.as_ref(); - let byte_length = data.len(); - // fast hashing algorithm of the bytes - let hash = seahash::hash(data); - - answer.hash = Some(hash); - answer.byte_size = Some(byte_length as u64); - - // stringify the hash - let hash_str = hash.to_string(); - - //query the SQL database for any ingests that have the same zip - //maybe switch to pgx for this query? - let download_attempts_postgres_lookup = sqlx::query_as!( - DownloadAttempt, - "SELECT * FROM gtfs.static_download_attempts WHERE file_hash = $1;", - hash_str - ) - .fetch_all(pool) - .await; - - //if the dataset is brand new, mark as success, save the file - - // this is accomplished by checking in the sql table `gtfs.static_download_attempts` - //if hash exists in the table AND the ingestion operation did not fail, cancel. - //if hash doesn't exist write the file to disk - - match download_attempts_postgres_lookup { - Ok( download_attempts_postgres_lookup) => { - answer.operation_success = true; - - // this zip file has never been seen before! Insert it! - if download_attempts_postgres_lookup.len() == 0 { - answer.ingest = true; + futures::stream::iter(feeds_to_download.into_iter().map( + |staticfeed| + { + let download_progress = Arc::clone(&download_progress); + + async move { + //allow various compression algorithms to be used during the download process, as enabled in Cargo.toml + let client = reqwest::ClientBuilder::new() + //timeout queries after 10 minutes + .timeout(Duration::from_secs(60 * 10)) + .deflate(true) + .gzip(true) + .brotli(true) + .cookie_store(true) + .build() + .unwrap(); + + let request = client.get(&staticfeed.url); + + //calculate how long the download takes + let start = SystemTime::now(); + let current_unix_ms_time = start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis(); + + let response = request.send().await; + + let duration = SystemTime::now() + .duration_since(start) + .expect("Time went backwards"); + + let duration_ms = duration.as_millis(); + + // say that the download state was unsuccessful by default, and insert the duration + let mut answer = DownloadedFeedsInformation { + feed_id: staticfeed.feed_id.clone(), + url: staticfeed.url.clone(), + hash: None, + download_timestamp_ms: current_unix_ms_time as u64, + operation_success: false, + ingest: false, + byte_size: None, + duration_download: Some(duration_ms as u64), + http_response_code: None, + }; + + match response { + // The download request did return a response and the connection did not drop + Ok(response) => { + answer.http_response_code = Some(response.status().as_str().to_string()); + let mut out = File::create(format!( + "gtfs_static_zips/{}.zip", + staticfeed.feed_id.clone() + )) + .expect("failed to create file"); + + if response.status().is_success() { + // get raw bytes + let bytes_result = response.bytes().await; + + if let Ok(bytes_result) = bytes_result { + let data = bytes_result.as_ref(); + let byte_length = data.len(); + // fast hashing algorithm of the bytes + let hash = seahash::hash(data); + + answer.hash = Some(hash); + answer.byte_size = Some(byte_length as u64); + + // stringify the hash + let hash_str = hash.to_string(); + + //query the SQL database for any ingests that have the same zip + //maybe switch to pgx for this query? + let download_attempts_postgres_lookup = sqlx::query_as!( + DownloadAttempt, + "SELECT * FROM gtfs.static_download_attempts WHERE file_hash = $1;", + hash_str + ) + .fetch_all(pool) + .await; + + //if the dataset is brand new, mark as success, save the file + + // this is accomplished by checking in the sql table `gtfs.static_download_attempts` + //if hash exists in the table AND the ingestion operation did not fail, cancel. + //if hash doesn't exist write the file to disk + + match download_attempts_postgres_lookup { + Ok( download_attempts_postgres_lookup) => { + answer.operation_success = true; + + // this zip file has never been seen before! Insert it! + if download_attempts_postgres_lookup.len() == 0 { + answer.ingest = true; + } else { + + // a previous succcessful ingest has happened + let check_for_previous_insert_sucesses = download_attempts_postgres_lookup + .iter() + .find(|&x| x.ingested == true); + + //thus, don't perform the ingest + if check_for_previous_insert_sucesses.is_some() { + answer.ingest = false; + } else { + //no successes have occured, reattempt this zip file + //search through zookeeper tree for current pending operations (todo!) + answer.ingest = true; + } + } + } + Err(error) => { + //could not connect to the postgres, or this query failed. Don't ingest without access to postgres + answer.operation_success = false; + } + } + + let _ = out.write(&(bytes_result)); + let mut download_progress = download_progress.lock().unwrap(); + *download_progress += 1; + + println!("Finished writing {}/{} [{:.2}%]: {}, took {:.3}s",download_progress, total_feeds_to_download, (download_progress.clone() as f32/total_feeds_to_download as f32) * 100.0, &staticfeed.clone().feed_id, duration_ms as f32 / 1000.0); + } } else { + let mut download_progress = download_progress.lock().unwrap(); + *download_progress += 1; - // a previous succcessful ingest has happened - let check_for_previous_insert_sucesses = download_attempts_postgres_lookup - .iter() - .find(|&x| x.ingested == true); - - //thus, don't perform the ingest - if check_for_previous_insert_sucesses.is_some() { - answer.ingest = false; - } else { - //no successes have occured, reattempt this zip file - //search through zookeeper tree for current pending operations (todo!) - answer.ingest = true; - } + println!("Failed to download {}/{} [{:.2}%]: {} responding with {}, took {:.3}s",download_progress, total_feeds_to_download, (download_progress.clone() as f32/total_feeds_to_download as f32) * 100.0, &staticfeed.clone().feed_id, response.status().as_str(), duration_ms as f32 / 1000.0); } } Err(error) => { - //could not connect to the postgres, or this query failed. Don't ingest without access to postgres - answer.operation_success = false; + + let mut download_progress = download_progress.lock().unwrap(); + *download_progress += 1; + + println!( + "Error with downloading {}: {}", + &staticfeed.feed_id, &staticfeed.url + ); } } - - let _ = out.write(&(bytes_result)); - println!("Finished writing {}", &staticfeed.clone().feed_id); + + answer } - } - Err(error) => { - println!( - "Error with downloading {}: {}", - &staticfeed.feed_id, &staticfeed.url - ); - } } + - answer - })) + )) .buffer_unordered(threads) .collect::>(); diff --git a/transitland-atlas b/transitland-atlas new file mode 160000 index 00000000..798a4cb5 --- /dev/null +++ b/transitland-atlas @@ -0,0 +1 @@ +Subproject commit 798a4cb56f05b22e66bd49102a87b98727ff1822