Skip to content

Commit

Permalink
Move status to in progress table, create calendar feedinfo models
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Mar 29, 2024
1 parent 3a05863 commit 2856621
Show file tree
Hide file tree
Showing 9 changed files with 585 additions and 81 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,4 @@ path = "src/spruce/main.rs"
#Test binaries
[[bin]]
name = "pg_tests"
path = "src/pg_tests?main.rs"
path = "src/pg_tests/main.rs"
60 changes: 47 additions & 13 deletions migrations/2024-03-26-004608_init/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,27 @@ CREATE TABLE gtfs.ingested_static (
file_hash text NOT NULL,
attempt_id text NOT NULL,
ingest_start_unix_time_ms bigint NOT NULL,
ingest_end_unix_time_ms bigint NOT NULL,
ingest_duration_ms int NOT NULL,
ingesting_in_progress boolean NOT NULL,
ingestion_successfully_finished boolean NOT NULL,
ingestion_errored boolean NOT NULL,
production boolean NOT NULL,
deleted boolean NOT NULL,
feed_expiration_date date,
feed_start_date date,
default_lang text,
languages_avaliable text[] NOT NULL,
ingestion_version integer NOT NULL,
PRIMARY KEY (onestop_feed_id, ingest_start_unix_time_ms)
PRIMARY KEY (onestop_feed_id, attempt_id)
);

CREATE TABLE gtfs.in_progress_static_ingests (
onestop_feed_id text NOT NULL,
file_hash text NOT NULL,
attempt_id text NOT NULL,
ingest_start_unix_time_ms bigint NOT NULL,
PRIMARY KEY (onestop_feed_id, attempt_id)
);

CREATE INDEX IF NOT EXISTS gtfs_static_download_attempts_file_hash ON gtfs.static_download_attempts (file_hash);
Expand Down Expand Up @@ -188,8 +199,8 @@ check (
);

CREATE TABLE gtfs.trips (
trip_id text NOT NULL,
onestop_feed_id text NOT NULL,
trip_id text NOT NULL,
attempt_id text NOT NULL,
route_id text NOT NULL,
service_id text NOT NULL,
Expand All @@ -205,9 +216,23 @@ CREATE TABLE gtfs.trips (
bikes_allowed smallint NOT NULL,
chateau text NOT NULL,
frequencies trip_frequency[],
has_frequencies boolean NOT NULL,
PRIMARY KEY (onestop_feed_id, attempt_id, trip_id)
);

CREATE TABLE gtfs.trip_frequencies (
onestop_feed_id text NOT NULL,
trip_id text NOT NULL,
attempt_id text NOT NULL,
index smallint NOT NULL,
start_time OID NOT NULL,
end_time OID NOT NULL,
headway_secs OID NOT NULL,
-- a false means 0 or FrequencyBased, true means ScheduleBased or 1
exact_times boolean NOT NULL,
PRIMARY KEY (onestop_feed_id, attempt_id, trip_id, index)
);

CREATE TABLE gtfs.f_test (
trip_id text NOT NULL PRIMARY KEY,
f trip_frequency[]
Expand All @@ -229,7 +254,7 @@ CREATE TABLE gtfs.stops (
parent_station text,
zone_id text,
url text,
point GEOMETRY(POINT, 4326) NOT NULL,
point GEOMETRY(POINT, 4326),
timezone text,
wheelchair_boarding int,
primary_route_type text,
Expand All @@ -256,23 +281,26 @@ CREATE TABLE gtfs.stoptimes (
attempt_id text NOT NULL,
trip_id text NOT NULL,
stop_sequence int NOT NULL,
arrival_time bigint,
departure_time bigint,
arrival_time OID,
departure_time OID,
stop_id text NOT NULL,
stop_headsign text,
stop_headsign_translations jsonb,
pickup_type int,
drop_off_type int,
shape_dist_traveled double precision,
timepoint int,
continuous_pickup smallint,
continuous_drop_off smallint,
point GEOMETRY(POINT, 4326) NOT NULL,
route_id text,
pickup_type smallint NOT NULL,
drop_off_type smallint NOT NULL,
shape_dist_traveled float4,
-- true is 1, false is 0
timepoint bool NOT NULL,
continuous_pickup smallint NOT NULL,
continuous_drop_off smallint NOT NULL,
point GEOMETRY(POINT, 4326),
route_id text NOT NULL,
chateau text NOT NULL,
PRIMARY KEY (onestop_feed_id, attempt_id, trip_id, stop_sequence)
);

CREATE INDEX stoptimes_chateau_idx ON gtfs.stops (chateau);

CREATE TABLE gtfs.gtfs_errors (
onestop_feed_id text NOT NULL,
error text NOT NULL,
Expand Down Expand Up @@ -304,9 +332,12 @@ CREATE TABLE gtfs.calendar_dates (
service_id text NOT NULL,
gtfs_date date NOT NULL,
exception_type smallint NOT NULL,
chateau text NOT NULL,
PRIMARY KEY (onestop_feed_id, service_id, gtfs_date)
);

CREATE INDEX calendar_dates_chateau ON gtfs.calendar_dates (chateau);

CREATE TABLE gtfs.calendar (
onestop_feed_id text NOT NULL,
attempt_id text NOT NULL,
Expand All @@ -320,7 +351,10 @@ CREATE TABLE gtfs.calendar (
sunday boolean NOT NULL,
gtfs_start_date date NOT NULL,
gtfs_end_date date NOT NULL,
chateau text NOT NULL,
PRIMARY KEY (onestop_feed_id, attempt_id, service_id)
);

CREATE INDEX calendar_chateau ON gtfs.calendar (chateau);

-- translations does not need a table, values should be directly inserted into the data structure
2 changes: 1 addition & 1 deletion src/custom_pg_types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use diesel::sql_types::*;

#[derive(Clone, Debug, SqlType)]
#[diesel(postgres_type(name = "trip_frequency"))]
#[diesel(postgres_type(name = "trip_frequency", schema = "public"))]
pub struct TripFrequency {
pub start_time: Int4,
pub end_time: Int4,
Expand Down
79 changes: 79 additions & 0 deletions src/maple/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use catenary::postgres_tools::CatenaryPostgresPool;
use std::error::Error;
use diesel::query_dsl::methods::FilterDsl;
use diesel::ExpressionMethods;
use std::sync::Arc;
use diesel::BoolExpressionMethods;
use diesel_async::RunQueryDsl;

pub async fn delete_attempt_objects(feed_id: &str, attempt_id: &str, pool: Arc<CatenaryPostgresPool>) -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {
use catenary::schema::gtfs::trips::dsl::trips as trips_table;
use catenary::schema::gtfs::trips;

let conn_pool = pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

let _ = diesel::delete(trips_table.filter(trips::dsl::onestop_feed_id.eq(&feed_id).and(trips::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::stoptimes::dsl::stoptimes as stop_times_table;
use catenary::schema::gtfs::stoptimes;

let _ = diesel::delete(stop_times_table.filter(stoptimes::dsl::onestop_feed_id.eq(&feed_id).and(stoptimes::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

//cleanup trip freq

use catenary::schema::gtfs::trip_frequencies::dsl::trip_frequencies as frequencies_table;
use catenary::schema::gtfs::trip_frequencies;
let _ = diesel::delete(frequencies_table.filter(trip_frequencies::dsl::onestop_feed_id.eq(&feed_id).and(trip_frequencies::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::agencies::dsl::agencies as agencies_table;
use catenary::schema::gtfs::agencies;

let _ = diesel::delete(agencies_table.filter(agencies::dsl::static_onestop_id.eq(&feed_id).and(agencies::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::calendar_dates::dsl::calendar_dates as calendar_dates_table;
use catenary::schema::gtfs::calendar_dates;

let _ = diesel::delete(calendar_dates_table.filter(calendar_dates::dsl::onestop_feed_id.eq(&feed_id).and(calendar_dates::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::calendar::dsl::calendar as calendar_table;
use catenary::schema::gtfs::calendar;

let _ = diesel::delete(calendar_table.filter(calendar::dsl::onestop_feed_id.eq(&feed_id).and(calendar::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::routes::dsl::routes as routes_table;
use catenary::schema::gtfs::routes;

let _ = diesel::delete(routes_table.filter(routes::dsl::onestop_feed_id.eq(&feed_id).and(routes::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::shapes::dsl::shapes as shapes_table;
use catenary::schema::gtfs::shapes;

let _ = diesel::delete(shapes_table.filter(shapes::dsl::onestop_feed_id.eq(&feed_id).and(shapes::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

use catenary::schema::gtfs::stops::dsl::stops as stops_table;
use catenary::schema::gtfs::stops;

let _ = diesel::delete(stops_table.filter(stops::dsl::onestop_feed_id.eq(&feed_id).and(stops::dsl::attempt_id.eq(&attempt_id))))
.execute(conn)
.await;

Ok(())
}
Loading

0 comments on commit 2856621

Please sign in to comment.