Skip to content

Commit

Permalink
Add ingestion version, timeout download after 5min
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Mar 20, 2024
1 parent 6b2984d commit 162858d
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 128 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ chateau = {git = "https://github.com/catenarytransit/chateau"}
dmfr-folder-reader = {git = "https://github.com/catenarytransit/dmfr-folder-reader"}
tokio-zookeeper = "0.2.1"
uuid = "1.8.0"
zip = "0.6.6"

[[bin]]
name = "maple"
Expand Down
2 changes: 2 additions & 0 deletions migrations/20240110110356_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ CREATE TABLE IF NOT EXISTS gtfs.static_download_attempts (
downloaded_unix_time_ms bigint NOT NULL,
ingested boolean NOT NULL,
failed boolean NOT NULL,
ingestion_version integer NOT NULL,
mark_for_redo boolean NOT NULL,
PRIMARY KEY (onestop_feed_id, downloaded_unix_time_ms)
);

Expand Down
2 changes: 1 addition & 1 deletion src/maple/gtfs_handlers/flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::error::Error;
use std::fs;
//use std::path::PathBuf;

fn flatten_feed(feed_id: &str, zip_name: &str) -> Result<(), Box<dyn Error>> {
fn flatten_feed(feed_id: &str) -> Result<(), Box<dyn Error>> {
let _ = fs::create_dir("gtfs_uncompressed");

// unzip
Expand Down
13 changes: 12 additions & 1 deletion src/maple/gtfs_handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
mod colour_correction;
mod convex_hull;
mod flatten;
mod flatten;

#[derive(Debug, Clone)]
pub struct DownloadAttempt {
pub onestop_feed_id: String,
pub file_hash: String,
pub downloaded_unix_time_ms: i64,
pub ingested: bool,
pub failed: bool,
pub mark_for_redo: bool,
pub ingestion_version: i32,
}
4 changes: 4 additions & 0 deletions src/maple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::transitland_download::DownloadedFeedsInformation;

async fn run_ingest() -> Result<(), Box<dyn Error>> {

const maple_ingestion_version: i32 = 1;

// TODO! Ensure git submodule transitland-atlas downloads and updates correctly

//These feeds should be discarded because they are duplicated in a larger dataset called `f-sf~bay~area~rg`, which has everything in a single zip file
Expand Down Expand Up @@ -114,6 +116,8 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {
}

//determine if the old one should be deleted, if so, delete it
} else {
eprintln!("Not enough data in transitland!");
}

Ok(())
Expand Down
275 changes: 149 additions & 126 deletions src/maple/transitland_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,16 @@ use std::io::copy;
use std::io::Write;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::Duration;
use std::time::UNIX_EPOCH;
use crate::gtfs_handlers::DownloadAttempt;

#[derive(Clone)]
struct StaticFeedToDownload {
feed_id: String,
url: String,
pub feed_id: String,
pub url: String,
}

#[derive(Debug, Clone)]
pub struct DownloadAttempt {
pub onestop_feed_id: String,
pub file_hash: String,
pub downloaded_unix_time_ms: i64,
pub ingested: bool,
pub failed: bool,
}
//Written by Kyler Chin
//You are required under the APGL license to retain this annotation

Expand Down Expand Up @@ -66,7 +60,7 @@ pub async fn download_return_eligible_feeds(
transitland_meta: &ReturnDmfrAnalysis,
pool: &sqlx::Pool<sqlx::Postgres>,
) -> Result<Vec<DownloadedFeedsInformation>, ()> {
let threads: usize = 32;
let threads: usize = 16;

let _ = fs::create_dir("gtfs_static_zips");

Expand All @@ -81,134 +75,163 @@ pub async fn download_return_eligible_feeds(
} && feed.urls.static_current.is_some()).map(|(string, feed)| StaticFeedToDownload {
feed_id: feed.id.clone(),
url: feed.urls.static_current.as_ref().unwrap().to_string(),
});
}).collect::<Vec<StaticFeedToDownload>>();

let download_progress: Arc<std::sync::Mutex<u16>> = Arc::new(std::sync::Mutex::new(0));
let total_feeds_to_download = feeds_to_download.len();

let static_fetches =
//perform the downloads as a future stream, so only the thread count is allowed
futures::stream::iter(feeds_to_download.into_iter().map(|staticfeed| async move {

//allow various compression algorithms to be used during the download process, as enabled in Cargo.toml
let client = reqwest::ClientBuilder::new()
.deflate(true)
.gzip(true)
.brotli(true)
.build()
.unwrap();

let request = client.get(&staticfeed.url);

//calculate how long the download takes
let start = SystemTime::now();
let current_unix_ms_time = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();

let response = request.send().await;

let duration = SystemTime::now()
.duration_since(start)
.expect("Time went backwards")
.as_millis();

// say that the download state was unsuccessful by default, and insert the duration
let mut answer = DownloadedFeedsInformation {
feed_id: staticfeed.feed_id.clone(),
url: staticfeed.url.clone(),
hash: None,
download_timestamp_ms: current_unix_ms_time as u64,
operation_success: false,
ingest: false,
byte_size: None,
duration_download: Some(duration as u64),
http_response_code: None,
};

match response {
// The download request did return a response and the connection did not drop
Ok(response) => {
answer.http_response_code = Some(response.status().as_str().to_string());
let mut out = File::create(format!(
"gtfs_static_zips/{}.zip",
staticfeed.feed_id.clone()
))
.expect("failed to create file");

// get raw bytes
let bytes_result = response.bytes().await;

if let Ok(bytes_result) = bytes_result {
let data = bytes_result.as_ref();
let byte_length = data.len();
// fast hashing algorithm of the bytes
let hash = seahash::hash(data);

answer.hash = Some(hash);
answer.byte_size = Some(byte_length as u64);

// stringify the hash
let hash_str = hash.to_string();

//query the SQL database for any ingests that have the same zip
//maybe switch to pgx for this query?
let download_attempts_postgres_lookup = sqlx::query_as!(
DownloadAttempt,
"SELECT * FROM gtfs.static_download_attempts WHERE file_hash = $1;",
hash_str
)
.fetch_all(pool)
.await;

//if the dataset is brand new, mark as success, save the file

// this is accomplished by checking in the sql table `gtfs.static_download_attempts`
//if hash exists in the table AND the ingestion operation did not fail, cancel.
//if hash doesn't exist write the file to disk

match download_attempts_postgres_lookup {
Ok( download_attempts_postgres_lookup) => {
answer.operation_success = true;

// this zip file has never been seen before! Insert it!
if download_attempts_postgres_lookup.len() == 0 {
answer.ingest = true;
futures::stream::iter(feeds_to_download.into_iter().map(
|staticfeed|
{
let download_progress = Arc::clone(&download_progress);

async move {
//allow various compression algorithms to be used during the download process, as enabled in Cargo.toml
let client = reqwest::ClientBuilder::new()
//timeout queries after 10 minutes
.timeout(Duration::from_secs(60 * 10))
.deflate(true)
.gzip(true)
.brotli(true)
.cookie_store(true)
.build()
.unwrap();

let request = client.get(&staticfeed.url);

//calculate how long the download takes
let start = SystemTime::now();
let current_unix_ms_time = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();

let response = request.send().await;

let duration = SystemTime::now()
.duration_since(start)
.expect("Time went backwards");

let duration_ms = duration.as_millis();

// say that the download state was unsuccessful by default, and insert the duration
let mut answer = DownloadedFeedsInformation {
feed_id: staticfeed.feed_id.clone(),
url: staticfeed.url.clone(),
hash: None,
download_timestamp_ms: current_unix_ms_time as u64,
operation_success: false,
ingest: false,
byte_size: None,
duration_download: Some(duration_ms as u64),
http_response_code: None,
};

match response {
// The download request did return a response and the connection did not drop
Ok(response) => {
answer.http_response_code = Some(response.status().as_str().to_string());
let mut out = File::create(format!(
"gtfs_static_zips/{}.zip",
staticfeed.feed_id.clone()
))
.expect("failed to create file");

if response.status().is_success() {
// get raw bytes
let bytes_result = response.bytes().await;

if let Ok(bytes_result) = bytes_result {
let data = bytes_result.as_ref();
let byte_length = data.len();
// fast hashing algorithm of the bytes
let hash = seahash::hash(data);

answer.hash = Some(hash);
answer.byte_size = Some(byte_length as u64);

// stringify the hash
let hash_str = hash.to_string();

//query the SQL database for any ingests that have the same zip
//maybe switch to pgx for this query?
let download_attempts_postgres_lookup = sqlx::query_as!(
DownloadAttempt,
"SELECT * FROM gtfs.static_download_attempts WHERE file_hash = $1;",
hash_str
)
.fetch_all(pool)
.await;

//if the dataset is brand new, mark as success, save the file

// this is accomplished by checking in the sql table `gtfs.static_download_attempts`
//if hash exists in the table AND the ingestion operation did not fail, cancel.
//if hash doesn't exist write the file to disk

match download_attempts_postgres_lookup {
Ok( download_attempts_postgres_lookup) => {
answer.operation_success = true;

// this zip file has never been seen before! Insert it!
if download_attempts_postgres_lookup.len() == 0 {
answer.ingest = true;
} else {

// a previous succcessful ingest has happened
let check_for_previous_insert_sucesses = download_attempts_postgres_lookup
.iter()
.find(|&x| x.ingested == true);

//thus, don't perform the ingest
if check_for_previous_insert_sucesses.is_some() {
answer.ingest = false;
} else {
//no successes have occured, reattempt this zip file
//search through zookeeper tree for current pending operations (todo!)
answer.ingest = true;
}
}
}
Err(error) => {
//could not connect to the postgres, or this query failed. Don't ingest without access to postgres
answer.operation_success = false;
}
}

let _ = out.write(&(bytes_result));
let mut download_progress = download_progress.lock().unwrap();
*download_progress += 1;

println!("Finished writing {}/{} [{:.2}%]: {}, took {:.3}s",download_progress, total_feeds_to_download, (download_progress.clone() as f32/total_feeds_to_download as f32) * 100.0, &staticfeed.clone().feed_id, duration_ms as f32 / 1000.0);
}
} else {
let mut download_progress = download_progress.lock().unwrap();
*download_progress += 1;

// a previous succcessful ingest has happened
let check_for_previous_insert_sucesses = download_attempts_postgres_lookup
.iter()
.find(|&x| x.ingested == true);

//thus, don't perform the ingest
if check_for_previous_insert_sucesses.is_some() {
answer.ingest = false;
} else {
//no successes have occured, reattempt this zip file
//search through zookeeper tree for current pending operations (todo!)
answer.ingest = true;
}
println!("Failed to download {}/{} [{:.2}%]: {} responding with {}, took {:.3}s",download_progress, total_feeds_to_download, (download_progress.clone() as f32/total_feeds_to_download as f32) * 100.0, &staticfeed.clone().feed_id, response.status().as_str(), duration_ms as f32 / 1000.0);
}
}
Err(error) => {
//could not connect to the postgres, or this query failed. Don't ingest without access to postgres
answer.operation_success = false;

let mut download_progress = download_progress.lock().unwrap();
*download_progress += 1;

println!(
"Error with downloading {}: {}",
&staticfeed.feed_id, &staticfeed.url
);
}
}

let _ = out.write(&(bytes_result));
println!("Finished writing {}", &staticfeed.clone().feed_id);

answer
}
}
Err(error) => {
println!(
"Error with downloading {}: {}",
&staticfeed.feed_id, &staticfeed.url
);
}
}


answer
}))
))
.buffer_unordered(threads)
.collect::<Vec<DownloadedFeedsInformation>>();

Expand Down
1 change: 1 addition & 0 deletions transitland-atlas
Submodule transitland-atlas added at 798a4c

0 comments on commit 162858d

Please sign in to comment.