diff --git a/Cargo.toml b/Cargo.toml index 34ce8001..7465e035 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "catenary-backend" -version = "0.1.0" +version = "0.3.0" edition = "2021" [library] diff --git a/src/maple/main.rs b/src/maple/main.rs index 105db772..6681a816 100644 --- a/src/maple/main.rs +++ b/src/maple/main.rs @@ -4,9 +4,14 @@ use catenary::postgres_tools::CatenaryPostgresPool; // This was heavily inspired and copied from Emma Alexia, thank you Emma! // Removal of the attribution is not allowed, as covered under the AGPL license use catenary::postgres_tools::get_connection_pool; +use diesel::insert_into; use diesel::prelude::*; +use diesel_async::RunQueryDsl; use dotenvy::dotenv; +use futures::StreamExt; +use git2::Repository; use service::quicli::prelude::info; +use std::collections::HashMap; use std::collections::HashSet; use std::env; use std::error::Error; @@ -16,10 +21,6 @@ use std::sync::Arc; use std::time::Duration; use threadpool::ThreadPool; use tokio::runtime; -use std::collections::HashMap; -use diesel::insert_into; -use futures::StreamExt; -use git2::Repository; mod gtfs_handlers; mod gtfs_ingestion_sequence; @@ -183,30 +184,43 @@ async fn run_ingest() -> Result<(), Box> { // 2. update metadata futures::stream::iter(eligible_feeds.iter().map(|eligible_feed| { - async move { - /* let sql_query = sqlx::query!("INSERT INTO gtfs.static_download_attempts - (onestop_feed_id, url, file_hash, downloaded_unix_time_ms, ingested, failed, http_response_code, mark_for_redo, ingestion_version) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ", - eligible_feed.feed_id, - eligible_feed.url, - match eligible_feed.hash { - Some(hash) => Some(format!("{}", hash)), - None => None - }, - eligible_feed.download_timestamp_ms as i64, - false, - !eligible_feed.operation_success, - eligible_feed.http_response_code, - false, - MAPLE_INGESTION_VERSION - ); - - let _ = insert_into()*/ + { + let arc_conn_pool = Arc::clone(&arc_conn_pool); + async move { + let conn_pool = arc_conn_pool.as_ref(); + let conn_pre = conn_pool.get().await; + let conn = &mut conn_pre?; + + use catenary::models::StaticDownloadAttempt; + use catenary::schema::gtfs::static_download_attempts::dsl::*; + + //save the download attempt happened and information about it + let this_attempt = StaticDownloadAttempt { + onestop_feed_id: eligible_feed.feed_id.clone(), + url: eligible_feed.url.clone(), + file_hash: match eligible_feed.hash { + Some(hash) => Some(format!("{}", hash)), + None => None, + }, + downloaded_unix_time_ms: eligible_feed.download_timestamp_ms as i64, + ingested: false, + failed: !eligible_feed.operation_success, + http_response_code: eligible_feed.http_response_code.clone(), + mark_for_redo: false, + ingestion_version: MAPLE_INGESTION_VERSION, + }; + + diesel::insert_into(static_download_attempts) + .values(&this_attempt) + .execute(conn) + .await?; + + Ok(()) + } } })) - .buffer_unordered(100) - .collect::>() + .buffer_unordered(64) + .collect::>>>() .await; // 3. Assign Attempt IDs to each feed_id that is ready to ingest @@ -259,10 +273,11 @@ async fn run_ingest() -> Result<(), Box> { .build() .unwrap(); - let attempt_ids:HashMap = { + let attempt_ids: HashMap = { let mut attempt_ids = HashMap::new(); for (feed_id, _) in unzip_feeds.iter() { - let attempt_id = format!("{}-{}", feed_id, chrono::Utc::now().timestamp_millis()); + let attempt_id = + format!("{}-{}", feed_id, chrono::Utc::now().timestamp_millis()); attempt_ids.insert(feed_id.clone(), attempt_id); } attempt_ids @@ -307,6 +322,8 @@ async fn run_ingest() -> Result<(), Box> { // more or is sufficiently old (over 5 days old) is wiped } else { //UPDATE gtfs.static_download_attempts where onstop_feed_id and download_unix_time_ms match as failure + + //Delete objects from the attempt } } }