Skip to content

Commit

Permalink
Save download attempt metadata preingest
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Mar 27, 2024
1 parent b24fabb commit 7b50f3d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 29 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "catenary-backend"
version = "0.1.0"
version = "0.3.0"
edition = "2021"

[library]
Expand Down
73 changes: 45 additions & 28 deletions src/maple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@ use catenary::postgres_tools::CatenaryPostgresPool;
// This was heavily inspired and copied from Emma Alexia, thank you Emma!
// Removal of the attribution is not allowed, as covered under the AGPL license
use catenary::postgres_tools::get_connection_pool;
use diesel::insert_into;
use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use dotenvy::dotenv;
use futures::StreamExt;
use git2::Repository;
use service::quicli::prelude::info;
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::error::Error;
Expand All @@ -16,10 +21,6 @@ use std::sync::Arc;
use std::time::Duration;
use threadpool::ThreadPool;
use tokio::runtime;
use std::collections::HashMap;
use diesel::insert_into;
use futures::StreamExt;
use git2::Repository;

mod gtfs_handlers;
mod gtfs_ingestion_sequence;
Expand Down Expand Up @@ -183,30 +184,43 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {

// 2. update metadata
futures::stream::iter(eligible_feeds.iter().map(|eligible_feed| {
async move {
/* let sql_query = sqlx::query!("INSERT INTO gtfs.static_download_attempts
(onestop_feed_id, url, file_hash, downloaded_unix_time_ms, ingested, failed, http_response_code, mark_for_redo, ingestion_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
",
eligible_feed.feed_id,
eligible_feed.url,
match eligible_feed.hash {
Some(hash) => Some(format!("{}", hash)),
None => None
},
eligible_feed.download_timestamp_ms as i64,
false,
!eligible_feed.operation_success,
eligible_feed.http_response_code,
false,
MAPLE_INGESTION_VERSION
);
let _ = insert_into()*/
{
let arc_conn_pool = Arc::clone(&arc_conn_pool);
async move {
let conn_pool = arc_conn_pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

use catenary::models::StaticDownloadAttempt;
use catenary::schema::gtfs::static_download_attempts::dsl::*;

//save the download attempt happened and information about it
let this_attempt = StaticDownloadAttempt {
onestop_feed_id: eligible_feed.feed_id.clone(),
url: eligible_feed.url.clone(),
file_hash: match eligible_feed.hash {
Some(hash) => Some(format!("{}", hash)),
None => None,
},
downloaded_unix_time_ms: eligible_feed.download_timestamp_ms as i64,
ingested: false,
failed: !eligible_feed.operation_success,
http_response_code: eligible_feed.http_response_code.clone(),
mark_for_redo: false,
ingestion_version: MAPLE_INGESTION_VERSION,
};

diesel::insert_into(static_download_attempts)
.values(&this_attempt)
.execute(conn)
.await?;

Ok(())
}
}
}))
.buffer_unordered(100)
.collect::<Vec<_>>()
.buffer_unordered(64)
.collect::<Vec<Result<(), Box<dyn Error>>>>()
.await;

// 3. Assign Attempt IDs to each feed_id that is ready to ingest
Expand Down Expand Up @@ -259,10 +273,11 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {
.build()
.unwrap();

let attempt_ids:HashMap<String,String> = {
let attempt_ids: HashMap<String, String> = {
let mut attempt_ids = HashMap::new();
for (feed_id, _) in unzip_feeds.iter() {
let attempt_id = format!("{}-{}", feed_id, chrono::Utc::now().timestamp_millis());
let attempt_id =
format!("{}-{}", feed_id, chrono::Utc::now().timestamp_millis());
attempt_ids.insert(feed_id.clone(), attempt_id);
}
attempt_ids
Expand Down Expand Up @@ -307,6 +322,8 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {
// more or is sufficiently old (over 5 days old) is wiped
} else {
//UPDATE gtfs.static_download_attempts where onstop_feed_id and download_unix_time_ms match as failure

//Delete objects from the attempt
}
}
}
Expand Down

0 comments on commit 7b50f3d

Please sign in to comment.