Skip to content

Commit

Permalink
Assign Production tables in maple
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Mar 30, 2024
1 parent 89ebc51 commit 09b77c1
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 76 deletions.
12 changes: 6 additions & 6 deletions migrations/2024-03-26-004608_init/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,18 @@ CREATE TABLE gtfs.chateaus (
-- switch data asap ASAP if the start date is before the current date
-- time enable of new data when the current feed expires
CREATE TABLE gtfs.feed_info (
onestop_feed_id text,
feed_publisher_name text,
feed_publisher_url text,
feed_lang text,
onestop_feed_id text NOT NULL,
feed_publisher_name text NOT NULL,
feed_publisher_url text NOT NULL,
feed_lang text NOT NULL,
default_lang text,
feed_start_date DATE,
feed_end_date DATE,
feed_version text,
feed_contact_email text,
feed_contact_url text,
attempt_id text,
chateau text,
attempt_id text NOT NULL,
chateau text NOT NULL,
PRIMARY KEY (onestop_feed_id, attempt_id, feed_publisher_name)
);

Expand Down
188 changes: 188 additions & 0 deletions src/maple/assign_production_tables.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use catenary::postgres_tools::CatenaryPostgresPool;
use chrono::naive::NaiveDate;
use chrono::offset::Utc;
use diesel::query_dsl::methods::FilterDsl;
use diesel::query_dsl::methods::SelectDsl;
use diesel::SelectableHelper;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;
use std::error::Error;
use std::ops::Sub;
use std::sync::Arc;

struct FeedTimeRange {
attempt_id: String,
start_date: Option<NaiveDate>,
expiration_date: Option<NaiveDate>,
}

pub async fn assign_production_tables(
feed_id: &str,
attempt_id: &str,
arc_conn_pool: Arc<CatenaryPostgresPool>,
) -> Result<(), Box<dyn Error + Sync + Send>> {
let conn_pool = arc_conn_pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

//determine if the old one should be deleted, if so, delete it

//algorithm:
// If the latest file does not contain a feed info, wipe all old feeds and put the latest file into production

//call function to clean old gtfs feeds, accepting feed_id, sqlx pool as arguments
//greedy algorithm starts from newest feeds and examines date ranges, and works successively towards older feeds, assigning date ranges to feeds not already taken.
//data structure can be a Vec of (start_date, end_date, attempt_id or hash)
// older feeds cannot claim dates that are after a newer feed's experation date
//any feed that does not have a date range any
// more or is sufficiently old (over 7 days old) is wiped

//query all the feeds in the production table

//end date is always inclusive
// "service in the period from the beginning of the feed_start_date day to the end of the feed_end_date day"
//https://gtfs.org/schedule/reference/#feed_infotxt

use catenary::schema::gtfs::ingested_static::dsl::ingested_static;
use diesel::ExpressionMethods;

let all_feeds: Vec<catenary::models::IngestedStatic> = ingested_static
.select(catenary::models::IngestedStatic::as_select())
.load::<catenary::models::IngestedStatic>(conn)
.await?;

//filter only successful ingests
let mut sorted_feeds: Vec<catenary::models::IngestedStatic> = all_feeds
.clone()
.into_iter()
.filter(|ingested| {
ingested.ingestion_successfully_finished == true && ingested.deleted == false
})
.collect();

//sort all feeds by ingest_start_unix_time_ms

sorted_feeds.sort_by(|a, b| {
a.ingest_start_unix_time_ms
.cmp(&b.ingest_start_unix_time_ms)
});

let mut feed_time_ranges: Vec<FeedTimeRange> = Vec::new();

let mut drop_attempt_list: Vec<String> = Vec::new();

let mut last_claimed_start_time: Option<Option<NaiveDate>> = None;

// go from latest ingest feed to earliest ingest feed
for (i, ingested_item) in sorted_feeds.into_iter().rev().enumerate() {
//i = 0 is latest, followed by earlier and earlier data
match last_claimed_start_time {
None => {
last_claimed_start_time = Some(ingested_item.feed_start_date);

feed_time_ranges.push(FeedTimeRange {
attempt_id: ingested_item.attempt_id.clone(),
start_date: ingested_item.feed_start_date,
expiration_date: ingested_item.feed_expiration_date,
});
}
Some(last_claimed_start_time_prev) => {
//if the last claimed time is none, drop this attempt because the newer feed claims all timestamp_millis

match last_claimed_start_time_prev {
None => {
drop_attempt_list.push(ingested_item.attempt_id.clone());
}
Some(last_claimed_start_time_prev) => {
// calculate new expiration date

//does this feed have an expiration date
let new_expiration_date = match ingested_item.feed_expiration_date {
Some(this_feed_expiration) => last_claimed_start_time_prev
.sub(chrono::Days::new(1))
.min(this_feed_expiration),
// look at the previous feed's start date and subtract 1 as the expiration date
None => last_claimed_start_time_prev.sub(chrono::Days::new(1)),
};

// if the new expiration date is more than 5 days ago, then drop it

let now: NaiveDate = Utc::now().naive_utc().date();

if new_expiration_date < now.sub(chrono::Days::new(5)) {
//drop the feed
drop_attempt_list.push(ingested_item.attempt_id.clone());
} else {
// add to the feed time range, claim the time range
last_claimed_start_time = Some(ingested_item.feed_start_date);

feed_time_ranges.push(FeedTimeRange {
attempt_id: ingested_item.attempt_id.clone(),
start_date: ingested_item.feed_start_date,
expiration_date: ingested_item.feed_expiration_date,
});
}
}
}
}
}
}

//prepare data for the commit
let drop_attempt_list_transaction = drop_attempt_list.clone();
// transactions to mark as successful
let production_list_ids: Vec<String> = feed_time_ranges
.iter()
.map(|feed_time_range| feed_time_range.attempt_id.clone())
.collect();

//mark old feeds as not in production anymore and new feeds as in production
conn.transaction::<_, diesel::result::Error, _>(|conn| {
{
async move {
use catenary::schema::gtfs::ingested_static::dsl::*;
for production_list_id in production_list_ids {
let _ = diesel::update(
ingested_static
.filter(onestop_feed_id.lt(&feed_id))
.filter(attempt_id.lt(&production_list_id)),
)
.set((deleted.eq(false), production.eq(true)))
.execute(conn)
.await?;
}

for drop_id in drop_attempt_list_transaction {
let _ = diesel::update(
ingested_static
.filter(onestop_feed_id.lt(&feed_id))
.filter(attempt_id.lt(&drop_id)),
)
.set((deleted.eq(true), production.eq(false)))
.execute(conn)
.await?;
}

Ok(())
}
}
.scope_boxed()
})
.await?;

//drop and cleanup everything in drop_attempt_list

for drop_attempt_id in &drop_attempt_list {
use crate::cleanup::delete_attempt_objects;

println!(
"Deleting old data {} in feed {}",
&drop_attempt_id, &feed_id
);
let _ =
delete_attempt_objects(&feed_id, &drop_attempt_id, Arc::clone(&arc_conn_pool)).await;
}

Ok(())
}
12 changes: 12 additions & 0 deletions src/maple/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,17 @@ pub async fn delete_attempt_objects(
.execute(conn)
.await;

//delete ingested static_download_attempts
/*
use catenary::schema::gtfs::static_download_attempts;
use catenary::schema::gtfs::static_download_attempts::dsl::static_download_attempts as static_download_attempts_table;
let _ = diesel::delete(
static_download_attempts_table
.filter(static_download_attempts::dsl::onestop_feed_id.eq(&feed_id).and(static_download_attempts::dsl::attempt_id.eq(&attempt_id)))
).execute(conn).await;
*/

Ok(())
}
2 changes: 1 addition & 1 deletion src/maple/gtfs_handlers/gtfs_to_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ pub fn availability_to_int(input: &gtfs_structures::Availability) -> i16 {
gtfs_structures::Availability::Unknown(unknown) => *unknown,
gtfs_structures::Availability::InformationNotAvailable => 0,
}
}
}
2 changes: 1 addition & 1 deletion src/maple/gtfs_ingestion_sequence/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod shapes_into_postgres;
pub mod stops_into_postgres;
pub mod stops_into_postgres;
2 changes: 1 addition & 1 deletion src/maple/gtfs_ingestion_sequence/shapes_into_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use std::sync::Arc;
use crate::gtfs_handlers::colour_correction;
use crate::gtfs_handlers::enum_to_int::route_type_to_int;
use crate::gtfs_handlers::rename_route_labels::*;
use catenary::postgres_tools::CatenaryPostgresPool;
use catenary::postgres_tools::CatenaryConn;
use catenary::postgres_tools::CatenaryPostgresPool;

pub async fn shapes_into_postgres(
gtfs: &gtfs_structures::Gtfs,
Expand Down
84 changes: 48 additions & 36 deletions src/maple/gtfs_ingestion_sequence/stops_into_postgres.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::collections::{HashMap, HashSet};
use titlecase::titlecase;
use catenary::postgres_tools::CatenaryPostgresPool;
use std::sync::Arc;
use diesel_async::RunQueryDsl;
use diesel_async::AsyncConnection;
use catenary::schema::gtfs::stops::dsl::stops as stops_table;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use titlecase::titlecase;

pub async fn stops_into_postgres(gtfs: &gtfs_structures::Gtfs,
pub async fn stops_into_postgres(
gtfs: &gtfs_structures::Gtfs,
feed_id: &str,
arc_conn_pool: Arc<CatenaryPostgresPool>,
chateau_id: &str,
attempt_id: &str,
stop_ids_to_route_types: &HashMap<String, HashSet<i16>>,
stop_ids_to_route_ids: &HashMap<String, HashSet<String>>,
stop_id_to_children_ids: &HashMap<String, HashSet<String>>,
stop_id_to_children_route: & HashMap<String, HashSet<i16>>
) -> Result<(), Box<dyn std::error::Error + Sync + Send>>
{
stop_id_to_children_route: &HashMap<String, HashSet<i16>>,
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
for (stop_id, stop) in &gtfs.stops {
let name: Option<String> = titlecase_process_new(stop.name.as_ref());
let display_name: Option<String> = match &name {
Expand All @@ -30,7 +30,7 @@ pub async fn stops_into_postgres(gtfs: &gtfs_structures::Gtfs,
),
None => None,
};

let stop_pg = catenary::models::Stop {
onestop_feed_id: feed_id.to_string(),
chateau: chateau_id.to_string(),
Expand All @@ -42,56 +42,67 @@ pub async fn stops_into_postgres(gtfs: &gtfs_structures::Gtfs,
code: stop.code.clone(),
gtfs_desc: stop.description.clone(),
gtfs_desc_translations: None,
location_type: crate::gtfs_handlers::gtfs_to_int::location_type_conversion(&stop.location_type),
location_type: crate::gtfs_handlers::gtfs_to_int::location_type_conversion(
&stop.location_type,
),
children_ids: match stop_id_to_children_ids.get(&stop.id) {
Some(children_ids) => {
children_ids.iter().map(|x| Some(x.clone())).collect::<Vec<Option<String>>>()
},
None => vec![]
Some(children_ids) => children_ids
.iter()
.map(|x| Some(x.clone()))
.collect::<Vec<Option<String>>>(),
None => vec![],
},
location_alias: None,
hidden: false,
parent_station: stop.parent_station.clone(),
zone_id: stop.zone_id.clone(),
url: stop.url.clone(),
point: match stop.latitude.is_some() && stop.longitude.is_some() {
true => {
Some(postgis_diesel::types::Point::new(stop.longitude.unwrap(), stop.latitude.unwrap(), Some(4326)))
},
false => None
true => Some(postgis_diesel::types::Point::new(
stop.longitude.unwrap(),
stop.latitude.unwrap(),
Some(4326),
)),
false => None,
},
timezone: stop.timezone.clone(),
level_id: stop.level_id.clone(),
station_feature: false,
wheelchair_boarding: crate::gtfs_handlers::gtfs_to_int::availability_to_int(&stop.wheelchair_boarding),
wheelchair_boarding: crate::gtfs_handlers::gtfs_to_int::availability_to_int(
&stop.wheelchair_boarding,
),
primary_route_type: match stop_ids_to_route_types.get(&stop.id) {
Some(route_types) => {
let mut route_types = route_types.iter().map(|x| x.clone()).collect::<Vec<i16>>();
let mut route_types =
route_types.iter().map(|x| x.clone()).collect::<Vec<i16>>();
Some(route_types[0])
},
None => None
}
None => None,
},
platform_code: stop.platform_code.clone(),
routes: match stop_ids_to_route_ids.get(&stop.id) {
Some(route_ids) => {
route_ids.iter().map(|x| Some(x.clone())).collect::<Vec<Option<String>>>()
},
None => vec![]
Some(route_ids) => route_ids
.iter()
.map(|x| Some(x.clone()))
.collect::<Vec<Option<String>>>(),
None => vec![],
},
children_route_types: match stop_id_to_children_route.get(&stop.id) {
Some(route_types) => {
route_types.iter().map(|x| Some(x.clone())).collect::<Vec<Option<i16>>>()
},
None => vec![]
Some(route_types) => route_types
.iter()
.map(|x| Some(x.clone()))
.collect::<Vec<Option<i16>>>(),
None => vec![],
},
tts_name: stop.tts_name.clone(),
tts_name_translations: None,
platform_code_translations: None,
route_types: match stop_ids_to_route_types.get(&stop.id) {
Some(route_types) => {
route_types.iter().map(|x| Some(x.clone())).collect::<Vec<Option<i16>>>()
},
None => vec![]
Some(route_types) => route_types
.iter()
.map(|x| Some(x.clone()))
.collect::<Vec<Option<i16>>>(),
None => vec![],
},
};

Expand All @@ -101,7 +112,8 @@ pub async fn stops_into_postgres(gtfs: &gtfs_structures::Gtfs,

diesel::insert_into(stops_table)
.values(stop_pg)
.execute(conn).await?;
.execute(conn)
.await?;
}

Ok(())
Expand Down
Loading

0 comments on commit 09b77c1

Please sign in to comment.