Skip to content

Commit

Permalink
Query existing rows from metadata tables
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Mar 29, 2024
1 parent 2856621 commit 38a3f1b
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 62 deletions.
176 changes: 117 additions & 59 deletions src/maple/cleanup.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,137 @@
use catenary::postgres_tools::CatenaryPostgresPool;
use std::error::Error;
use diesel::query_dsl::methods::FilterDsl;
use diesel::ExpressionMethods;
use std::sync::Arc;
use diesel::BoolExpressionMethods;
use diesel::ExpressionMethods;
use diesel_async::RunQueryDsl;
use std::error::Error;
use std::sync::Arc;

pub async fn delete_attempt_objects(feed_id: &str, attempt_id: &str, pool: Arc<CatenaryPostgresPool>) -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {
use catenary::schema::gtfs::trips::dsl::trips as trips_table;
pub async fn delete_attempt_objects(
feed_id: &str,
attempt_id: &str,
pool: Arc<CatenaryPostgresPool>,
) -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {
use catenary::schema::gtfs::trips;
use catenary::schema::gtfs::trips::dsl::trips as trips_table;

let conn_pool = pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

let _ = diesel::delete(trips_table.filter(trips::dsl::onestop_feed_id.eq(&feed_id).and(trips::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;
let _ = diesel::delete(
trips_table.filter(
trips::dsl::onestop_feed_id
.eq(&feed_id)
.and(trips::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::stoptimes::dsl::stoptimes as stop_times_table;
use catenary::schema::gtfs::stoptimes;
use catenary::schema::gtfs::stoptimes::dsl::stoptimes as stop_times_table;

let _ = diesel::delete(stop_times_table.filter(stoptimes::dsl::onestop_feed_id.eq(&feed_id).and(stoptimes::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;
let _ = diesel::delete(
stop_times_table.filter(
stoptimes::dsl::onestop_feed_id
.eq(&feed_id)
.and(stoptimes::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

//cleanup trip freq

use catenary::schema::gtfs::trip_frequencies::dsl::trip_frequencies as frequencies_table;
use catenary::schema::gtfs::trip_frequencies;
let _ = diesel::delete(frequencies_table.filter(trip_frequencies::dsl::onestop_feed_id.eq(&feed_id).and(trip_frequencies::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::agencies::dsl::agencies as agencies_table;
use catenary::schema::gtfs::agencies;

let _ = diesel::delete(agencies_table.filter(agencies::dsl::static_onestop_id.eq(&feed_id).and(agencies::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::calendar_dates::dsl::calendar_dates as calendar_dates_table;
use catenary::schema::gtfs::calendar_dates;

let _ = diesel::delete(calendar_dates_table.filter(calendar_dates::dsl::onestop_feed_id.eq(&feed_id).and(calendar_dates::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::calendar::dsl::calendar as calendar_table;
use catenary::schema::gtfs::calendar;

let _ = diesel::delete(calendar_table.filter(calendar::dsl::onestop_feed_id.eq(&feed_id).and(calendar::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::routes::dsl::routes as routes_table;
use catenary::schema::gtfs::routes;

let _ = diesel::delete(routes_table.filter(routes::dsl::onestop_feed_id.eq(&feed_id).and(routes::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::shapes::dsl::shapes as shapes_table;
use catenary::schema::gtfs::shapes;

let _ = diesel::delete(shapes_table.filter(shapes::dsl::onestop_feed_id.eq(&feed_id).and(shapes::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::stops::dsl::stops as stops_table;
use catenary::schema::gtfs::stops;

let _ = diesel::delete(stops_table.filter(stops::dsl::onestop_feed_id.eq(&feed_id).and(stops::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;
use catenary::schema::gtfs::trip_frequencies::dsl::trip_frequencies as frequencies_table;
let _ = diesel::delete(
frequencies_table.filter(
trip_frequencies::dsl::onestop_feed_id
.eq(&feed_id)
.and(trip_frequencies::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::agencies;
use catenary::schema::gtfs::agencies::dsl::agencies as agencies_table;

let _ = diesel::delete(
agencies_table.filter(
agencies::dsl::static_onestop_id
.eq(&feed_id)
.and(agencies::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::calendar_dates;
use catenary::schema::gtfs::calendar_dates::dsl::calendar_dates as calendar_dates_table;

let _ = diesel::delete(
calendar_dates_table.filter(
calendar_dates::dsl::onestop_feed_id
.eq(&feed_id)
.and(calendar_dates::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::calendar;
use catenary::schema::gtfs::calendar::dsl::calendar as calendar_table;

let _ = diesel::delete(
calendar_table.filter(
calendar::dsl::onestop_feed_id
.eq(&feed_id)
.and(calendar::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::routes;
use catenary::schema::gtfs::routes::dsl::routes as routes_table;

let _ = diesel::delete(
routes_table.filter(
routes::dsl::onestop_feed_id
.eq(&feed_id)
.and(routes::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::shapes;
use catenary::schema::gtfs::shapes::dsl::shapes as shapes_table;

let _ = diesel::delete(
shapes_table.filter(
shapes::dsl::onestop_feed_id
.eq(&feed_id)
.and(shapes::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::stops;
use catenary::schema::gtfs::stops::dsl::stops as stops_table;

let _ = diesel::delete(
stops_table.filter(
stops::dsl::onestop_feed_id
.eq(&feed_id)
.and(stops::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

Ok(())
}
}
8 changes: 5 additions & 3 deletions src/maple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ async fn run_ingest() -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {

// todo! perform additional checks to ensure feed is not a zip bomb

// create thread pool
// process GTFS and insert into system

let attempt_ids: HashMap<String, String> = {
let mut attempt_ids = HashMap::new();
for (feed_id, _) in unzip_feeds.iter() {
Expand All @@ -313,6 +310,9 @@ async fn run_ingest() -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {

let unzip_feeds_clone = unzip_feeds.clone();

// 5. Process GTFS feeds

//Stream the feeds into the processing function
futures::stream::iter(
unzip_feeds_clone
.into_iter()
Expand All @@ -328,9 +328,11 @@ async fn run_ingest() -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {
.map(|x| x.unwrap())
.map(|(feed_id, attempt_id, chateau_id)| {

//clone the smart reference to the connection pool
let arc_conn_pool = Arc::clone(&arc_conn_pool);
let download_feed_info_hashmap = Arc::clone(&download_feed_info_hashmap);
async move {
//connect to postgres
let conn_pool = arc_conn_pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre.unwrap();
Expand Down
22 changes: 22 additions & 0 deletions src/maple/refresh_metadata_tables.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use chateau::Chateau;
use diesel::query_dsl::methods::SelectDsl;
use diesel::SelectableHelper;
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
use dmfr_folder_reader::ReturnDmfrAnalysis;
use std::collections::HashMap;
use std::error::Error;
Expand All @@ -15,6 +18,25 @@ pub async fn refresh_metadata_assignments(
) -> Result<(), Box<dyn Error + Sync + Send>> {
//update or create realtime tables and static tables

let conn_pool = pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

let existing_chateaus = catenary::schema::gtfs::chateaus::table
.select(catenary::models::Chateau::as_select())
.load::<catenary::models::Chateau>(conn)
.await?;

let existing_realtime_feeds = catenary::schema::gtfs::realtime_feeds::table
.select(catenary::models::RealtimeFeed::as_select())
.load::<catenary::models::RealtimeFeed>(conn)
.await?;

let existing_static_feeds = catenary::schema::gtfs::static_feeds::table
.select(catenary::models::StaticFeed::as_select())
.load::<catenary::models::StaticFeed>(conn)
.await?;

// if the new chateau id is different for any of the feeds, run the update function
Ok(())
}
2 changes: 2 additions & 0 deletions src/maple/transitland_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ struct StaticFeedToDownload {
//You are required under the APGL license to retain this annotation

//It's giving UC Berkeley lab assignment!!! 🐻💅🐻💅
//context for this joke: https://inst.eecs.berkeley.edu/~cs162/fa22/static/hw/hw-map-reduce-rs/
// UC Berkeley has exercises from their Rust computing courses that pack massive structs as result
#[derive(Clone)]
pub struct DownloadedFeedsInformation {

Check warning

Code scanning / clippy

field duration_download is never read Warning

field duration\_download is never read

Check warning

Code scanning / clippy

field duration_download is never read Warning

field duration\_download is never read
pub feed_id: String,
Expand Down

0 comments on commit 38a3f1b

Please sign in to comment.