Skip to content

Commit

Permalink
Ingest realtime feeds
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Jan 29, 2024
1 parent 6463d94 commit c1e8d64
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
4 changes: 0 additions & 4 deletions migrations/20240110110356_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ CREATE TABLE gtfs.static_feeds (
operators_to_gtfs_ids JSONB NOT NULL,
realtime_onestop_ids text[] NOT NULL,
realtime_onestop_ids_to_gtfs_ids JSONB NOT NULL,
max_lat double precision NOT NULL,
max_lon double precision NOT NULL,
min_lat double precision NOT NULL,
min_lon double precision NOT NULL,
hull GEOMETRY(POLYGON,4326)
);

Expand Down
4 changes: 2 additions & 2 deletions src/next_db/get_feeds_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::sync::Arc;

#[derive(Debug, Clone)]
pub struct OperatorPairInfo {
operator_id: String,
gtfs_agency_id: Option<String>,
pub operator_id: String,
pub gtfs_agency_id: Option<String>,
}

pub type FeedId = String;
Expand Down
27 changes: 26 additions & 1 deletion src/next_db/refresh_metadata_tables.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::dmfr;
use crate::get_feeds_meta;
use crate::get_feeds_meta::OperatorPairInfo;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

// Written by Kyler Chin at Catenary Transit Initiatives
// https://github.com/CatenaryTransit/catenary-backend
//You are required under the APGL license to retain this annotation as is
Expand Down Expand Up @@ -53,6 +53,18 @@ pub async fn refresh_feed_meta(
.feedhashmap
.get((&x.feed_onestop_id).as_ref().unwrap())
.unwrap();
let bruhitfailed: Vec<OperatorPairInfo> = vec![];
let listofoperatorpairs = transitland_meta
.feed_to_operator_pairs_hashmap
.get(&feed.id)
.unwrap_or_else(|| &bruhitfailed)
.to_owned();
let mut operator_pairs_hashmap: HashMap<String, Option<String>> =
HashMap::new();
for operator_pair in listofoperatorpairs {
operator_pairs_hashmap
.insert(operator_pair.operator_id, operator_pair.gtfs_agency_id);
}
match feed.spec {
dmfr::FeedSpec::Gtfs => {
if !feeds_to_discard
Expand All @@ -69,6 +81,19 @@ pub async fn refresh_feed_meta(
gtfs_realtime_feeds
.insert(x.feed_onestop_id.to_owned().unwrap(), x.gtfs_agency_id);
simplified_array_realtime.push(x.feed_onestop_id.to_owned().unwrap());

let operators_owned = operator_pairs_hashmap.iter().map(|(a,b)| a.clone()).collect::<Vec<String>>();

let _ = sqlx::query!("INSERT INTO gtfs.realtime_feeds (onestop_feed_id, name, operators, operators_to_gtfs_ids)
VALUES ($1, $2, $3, $4) ON CONFLICT (onestop_feed_id) DO UPDATE SET name = $2, operators = $3, operators_to_gtfs_ids = $4;",
&feed.id,
&"",
&operators_owned.as_slice(),
&serde_json::json!(serde_json::map::Map::from_iter(operator_pairs_hashmap.iter().map(|(key,value)| {
(key.clone(), serde_json::json!(value.clone()))
})
))
).execute(pool).await;
}
_ => {
//do nothing
Expand Down

0 comments on commit c1e8d64

Please sign in to comment.