Skip to content

Commit

Permalink
Improve schema for compression of stoptimes
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 6, 2024
1 parent e9b2016 commit 70efc14
Show file tree
Hide file tree
Showing 6 changed files with 11 additions and 307 deletions.
13 changes: 8 additions & 5 deletions migrations/2024-04-06-053500_timetable-compression-v1/down.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- This file should undo anything in `up.sql`
CREATE TABLE gtfs.stoptimes (
CREATE TABLE IF NOT EXISTS gtfs.stoptimes (
onestop_feed_id text NOT NULL,
attempt_id text NOT NULL,
trip_id text NOT NULL,
Expand All @@ -22,9 +22,9 @@ CREATE TABLE gtfs.stoptimes (
PRIMARY KEY (onestop_feed_id, attempt_id, trip_id, stop_sequence)
);

CREATE INDEX stoptimes_chateau_idx ON gtfs.stoptimes (chateau);
CREATE INDEX IF NOT EXISTS stoptimes_chateau_idx ON gtfs.stoptimes (chateau);

CREATE TABLE gtfs.trips (
CREATE TABLE IF NOT EXISTS gtfs.trips (
onestop_feed_id text NOT NULL,
trip_id text NOT NULL,
attempt_id text NOT NULL,
Expand All @@ -46,7 +46,7 @@ CREATE TABLE gtfs.trips (
PRIMARY KEY (onestop_feed_id, attempt_id, trip_id)
);

CREATE TABLE gtfs.trip_frequencies (
CREATE TABLE IF NOT EXISTS gtfs.trip_frequencies (
onestop_feed_id text NOT NULL,
trip_id text NOT NULL,
attempt_id text NOT NULL,
Expand All @@ -63,4 +63,7 @@ CREATE INDEX IF NOT EXISTS trips_chateau ON gtfs.trips (chateau);

DROP TABLE gtfs.itinerary_pattern CASCADE;
DROP TABLE gtfs.itinerary_pattern_meta CASCADE;
DROP TABLE gtfs.trips_compressed CASCADE;
DROP TABLE gtfs.trips_compressed CASCADE;

DROP INDEX IF EXISTS itinerary_pattern_chateau_idx;
DROP INDEX IF EXISTS trips_compressed_chateau_idx;
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ CREATE TABLE gtfs.itinerary_pattern_meta (
trip_headsign text,
trip_headsign_translations jsonb,
shape_id text,
timezone text NOT NULL,
PRIMARY KEY (onestop_feed_id, attempt_id, itinerary_pattern_id)
);

Expand Down
39 changes: 0 additions & 39 deletions src/maple/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,11 @@ pub async fn delete_attempt_objects(
attempt_id: &str,
pool: Arc<CatenaryPostgresPool>,
) -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {
use catenary::schema::gtfs::trips;
use catenary::schema::gtfs::trips::dsl::trips as trips_table;

let conn_pool = pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

let _ = diesel::delete(
trips_table.filter(
trips::dsl::onestop_feed_id
.eq(&feed_id)
.and(trips::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::stoptimes;
use catenary::schema::gtfs::stoptimes::dsl::stoptimes as stop_times_table;

let _ = diesel::delete(
stop_times_table.filter(
stoptimes::dsl::onestop_feed_id
.eq(&feed_id)
.and(stoptimes::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

//cleanup trip freq

use catenary::schema::gtfs::trip_frequencies;
use catenary::schema::gtfs::trip_frequencies::dsl::trip_frequencies as frequencies_table;
let _ = diesel::delete(
frequencies_table.filter(
trip_frequencies::dsl::onestop_feed_id
.eq(&feed_id)
.and(trip_frequencies::dsl::attempt_id.eq(&attempt_id)),
),
)
.execute(conn)
.await;

use catenary::schema::gtfs::agencies;
use catenary::schema::gtfs::agencies::dsl::agencies as agencies_table;

Expand Down
155 changes: 0 additions & 155 deletions src/maple/gtfs_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use catenary::models::Route as RoutePgModel;
use catenary::postgres_tools::CatenaryConn;
use catenary::postgres_tools::CatenaryPostgresPool;
use catenary::schema::gtfs::chateaus::languages_avaliable;
use catenary::schema::gtfs::stoptimes::continuous_drop_off;
use chrono::NaiveDate;
use diesel::ExpressionMethods;
use diesel_async::RunQueryDsl;
Expand Down Expand Up @@ -171,160 +170,6 @@ pub async fn gtfs_process_feed(
)
.await?;

//insert trip
for (trip_id, trip) in &gtfs.trips {
let mut stop_headsigns: HashSet<String> = HashSet::new();

for stop_time in &trip.stop_times {
if let Some(stop_headsign) = stop_time.stop_headsign.as_ref() {
stop_headsigns.insert(stop_headsign.clone());
}
}

let stop_headsigns = stop_headsigns
.into_iter()
.map(|stop_headsign| Some(stop_headsign))
.collect::<Vec<Option<String>>>();

let has_stop_headsigns = stop_headsigns.len() > 0;

use gtfs_structures::DirectionType;

let frequencies_vec: Option<Vec<Option<catenary::models::TripFrequencyModel>>> =
match trip.frequencies.len() {
0 => None,
_ => Some(
trip.frequencies
.iter()
.map(|freq| {
Some(catenary::models::TripFrequencyModel {
start_time: freq.start_time as i32,
end_time: freq.end_time as i32,
headway_secs: freq.headway_secs as i32,
exact_times: match freq.exact_times {
Some(exact_times_num) => match exact_times_num {
ExactTimes::FrequencyBased => false,
ExactTimes::ScheduleBased => true,
},
None => false,
},
})
})
.collect(),
),
};

let trip_pg = catenary::models::Trip {
onestop_feed_id: feed_id.to_string(),
trip_id: trip_id.clone(),
attempt_id: attempt_id.to_string(),
service_id: trip.service_id.clone(),
trip_headsign: trip.trip_headsign.clone(),
trip_headsign_translations: None,
route_id: trip.route_id.clone(),
has_stop_headsigns: has_stop_headsigns,
stop_headsigns: match stop_headsigns.len() {
0 => None,
_ => Some(stop_headsigns.clone()),
},
trip_short_name: trip.trip_short_name.clone(),
direction_id: match trip.direction_id {
Some(direction) => Some(match direction {
DirectionType::Outbound => 0,
DirectionType::Inbound => 1,
}),
None => None,
},
bikes_allowed: bikes_allowed_to_int(&trip.bikes_allowed),
block_id: trip.block_id.clone(),
shape_id: trip.shape_id.clone(),
wheelchair_accessible: availability_to_int(&trip.wheelchair_accessible),
chateau: chateau_id.to_string(),
frequencies: None,
has_frequencies: trip.frequencies.len() >= 1,
};

use catenary::schema::gtfs::trips::dsl::trips;

diesel::insert_into(trips)
.values(trip_pg)
.execute(conn)
.await?;

//insert trip frequencies into seperate table for now until custom types are fixed in diesel or i find a better solution
use catenary::models::TripFrequencyTableRow;

let frequencies_for_table = trip
.frequencies
.iter()
.enumerate()
.map(|(i, freq)| catenary::models::TripFrequencyTableRow {
trip_id: trip_id.clone(),
onestop_feed_id: feed_id.to_string(),
attempt_id: attempt_id.to_string(),
index: i as i16,
start_time: freq.start_time,
end_time: freq.end_time,
headway_secs: freq.headway_secs,
exact_times: match freq.exact_times {
Some(exact_times_num) => match exact_times_num {
ExactTimes::FrequencyBased => false,
ExactTimes::ScheduleBased => true,
},
None => false,
},
})
.collect::<Vec<TripFrequencyTableRow>>();

use catenary::schema::gtfs::trip_frequencies::dsl::trip_frequencies;

diesel::insert_into(trip_frequencies)
.values(frequencies_for_table)
.execute(conn)
.await?;

//inside insert stoptimes

let stop_times_pg = trip
.stop_times
.iter()
.enumerate()
.map(|(stop_time_i, stop_time)| catenary::models::StopTime {
onestop_feed_id: feed_id.to_string(),
route_id: trip.route_id.clone(),
stop_headsign_translations: None,
trip_id: trip_id.clone(),
attempt_id: attempt_id.to_string(),
stop_id: stop_time.stop.id.clone(),
stop_sequence: stop_time_i as i32,
arrival_time: stop_time.arrival_time,
departure_time: stop_time.departure_time,
stop_headsign: stop_time.stop_headsign.clone(),
pickup_type: pickup_dropoff_to_i16(&stop_time.pickup_type),
drop_off_type: pickup_dropoff_to_i16(&stop_time.drop_off_type),
shape_dist_traveled: stop_time.shape_dist_traveled,
timepoint: match stop_time.timepoint {
gtfs_structures::TimepointType::Exact => true,
gtfs_structures::TimepointType::Approximate => false,
},
chateau: chateau_id.to_string(),
continuous_pickup: continuous_pickup_drop_off_to_i16(&stop_time.continuous_pickup),
continuous_drop_off: continuous_pickup_drop_off_to_i16(
&stop_time.continuous_drop_off,
),
})
.collect::<Vec<catenary::models::StopTime>>();

use catenary::schema::gtfs::stoptimes::dsl::stoptimes;

for stop_chunk in stop_times_pg.chunks(80) {
diesel::insert_into(stoptimes)
.values(stop_chunk)
.execute(conn)
.await?;
}
}

//insert stops
let _ = stops_into_postgres(
&gtfs,
Expand Down
108 changes: 0 additions & 108 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,114 +162,6 @@ pub struct Agency {
pub chateau: String,
}

#[derive(Queryable, Selectable, Insertable, Debug, Clone, Serialize, Deserialize)]
#[diesel(table_name = crate::schema::gtfs::trips)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Trip {
pub onestop_feed_id: String,
pub trip_id: String,
pub attempt_id: String,
pub route_id: String,
pub service_id: String,
pub trip_headsign: Option<String>,
pub trip_headsign_translations: Option<Value>,
pub has_stop_headsigns: bool,
pub stop_headsigns: Option<Vec<Option<String>>>,
pub trip_short_name: Option<String>,
pub direction_id: Option<i16>,
pub block_id: Option<String>,
pub shape_id: Option<String>,
pub wheelchair_accessible: i16,
pub bikes_allowed: i16,
pub chateau: String,
pub frequencies: Option<Vec<Option<TripFrequencyModel>>>,
pub has_frequencies: bool,
}

//Attempted custom type, still doesn't work for some reason
//Error inserting trip: SerializationError(FailedToLookupTypeError(PgMetadataCacheKey { schema: Some("public"), type_name: "trip_frequency" }))
//Even though the type clearly exists
#[derive(Clone, Debug, PartialEq, AsExpression, Serialize, Deserialize)]
#[diesel(sql_type = crate::custom_pg_types::TripFrequency)]
pub struct TripFrequencyModel {
pub start_time: i32,
pub end_time: i32,
pub headway_secs: i32,
pub exact_times: bool,
}

use diesel::serialize::Output;
use diesel::serialize::WriteTuple;

// Learned from https://inve.rs/postgres-diesel-composite/
// https://docs.diesel.rs/2.0.x/diesel/deserialize/trait.FromSql.html
// https://docs.diesel.rs/2.0.x/diesel/serialize/trait.ToSql.html

// Imports https://docs.diesel.rs/master/diesel/pg/struct.PgValue.html as backend raw value
impl ToSql<TripFrequency, Pg> for TripFrequencyModel {
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> diesel::serialize::Result {
WriteTuple::<(Int4, Int4, Int4, Bool)>::write_tuple(
&(
self.start_time.clone(),
self.end_time.clone(),
self.headway_secs.clone(),
self.exact_times.clone(),
),
out,
)
}
}

impl FromSql<TripFrequency, Pg> for TripFrequencyModel {
fn from_sql(bytes: diesel::pg::PgValue) -> diesel::deserialize::Result<Self> {
let (start_time, end_time, headway_secs, exact_times) =
FromSql::<Record<(Int4, Int4, Int4, Bool)>, Pg>::from_sql(bytes)?;

Ok(TripFrequencyModel {
start_time,
end_time,
headway_secs,
exact_times,
})
}
}

#[derive(Queryable, Selectable, Insertable, Debug, Clone)]
#[diesel(table_name = crate::schema::gtfs::trip_frequencies)]
pub struct TripFrequencyTableRow {
pub onestop_feed_id: String,
pub trip_id: String,
pub attempt_id: String,
pub index: i16,
pub start_time: u32,
pub end_time: u32,
pub headway_secs: u32,
pub exact_times: bool,
}

#[derive(Queryable, Selectable, Insertable, Debug, Clone)]
#[diesel(table_name = crate::schema::gtfs::stoptimes)]
pub struct StopTime {
pub onestop_feed_id: String,
pub attempt_id: String,
pub trip_id: String,
pub stop_sequence: i32,
pub arrival_time: Option<u32>,
pub departure_time: Option<u32>,
pub stop_id: String,
pub stop_headsign: Option<String>,
pub stop_headsign_translations: Option<Value>,
pub pickup_type: i16,
pub drop_off_type: i16,
pub shape_dist_traveled: Option<f32>,
pub timepoint: bool,
pub continuous_pickup: i16,
pub continuous_drop_off: i16,
// pub point: Option<postgis_diesel::types::Point>,
pub route_id: String,
pub chateau: String,
}

#[derive(Queryable, Selectable, Insertable, Debug, Clone)]
#[diesel(table_name = crate::schema::gtfs::stops)]
pub struct Stop {
Expand Down
2 changes: 2 additions & 0 deletions src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ pub mod gtfs {
trip_headsign -> Nullable<Text>,
trip_headsign_translations -> Nullable<Jsonb>,
shape_id -> Nullable<Text>,
timezone -> Text,
}
}

Expand Down Expand Up @@ -420,6 +421,7 @@ pub mod gtfs {
frequencies -> Nullable<Array<Nullable<TripFrequency>>>,
has_frequencies -> Bool,
itinerary_pattern_id -> Text,
compressed_trip_frequencies -> Nullable<Text>,
}
}

Expand Down

0 comments on commit 70efc14

Please sign in to comment.