Skip to content

Commit

Permalink
Cull out feeds that are not valid using stops.txt
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 3, 2024
1 parent 286a639 commit 6d8d286
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 28 deletions.
65 changes: 40 additions & 25 deletions src/birch/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,26 +109,31 @@ pub async fn nanotime(req: HttpRequest) -> impl Responder {
pub async fn bus_stops_meta(req: HttpRequest) -> impl Responder {
let mut fields = std::collections::BTreeMap::new();


fields.insert(String::from("onestop_feed_id"), String::from("text"));
fields.insert(String::from("attempt_id"), String::from("text"));
fields.insert(String::from("gtfs_id"), String::from("text"));
fields.insert(String::from("name"), String::from("text"));
fields.insert(String::from("displayname"), String::from("text"));
fields.insert(String::from("code"), String::from("text"));
fields.insert(String::from("gtfs_desc"), String::from("text"));
fields.insert(String::from("location_type"), String::from("smallint"));
fields.insert(String::from("parent_station"), String::from("text"));
fields.insert(String::from("zone_id"), String::from("text"));
fields.insert(String::from("url"), String::from("text"));
fields.insert(String::from("timezone"), String::from("text"));
fields.insert(String::from("wheelchair_boarding"), String::from("smallint"));
fields.insert(String::from("level_id"), String::from("text"));
fields.insert(String::from("platform_code"), String::from("text"));
fields.insert(String::from("routes"), String::from("text[]"));
fields.insert(String::from("route_types"), String::from("smallint[]"));
fields.insert(String::from("children_ids"), String::from("text[]"));
fields.insert(String::from("children_route_types"), String::from("smallint[]"));
fields.insert(String::from("onestop_feed_id"), String::from("text"));
fields.insert(String::from("attempt_id"), String::from("text"));
fields.insert(String::from("gtfs_id"), String::from("text"));
fields.insert(String::from("name"), String::from("text"));
fields.insert(String::from("displayname"), String::from("text"));
fields.insert(String::from("code"), String::from("text"));
fields.insert(String::from("gtfs_desc"), String::from("text"));
fields.insert(String::from("location_type"), String::from("smallint"));
fields.insert(String::from("parent_station"), String::from("text"));
fields.insert(String::from("zone_id"), String::from("text"));
fields.insert(String::from("url"), String::from("text"));
fields.insert(String::from("timezone"), String::from("text"));
fields.insert(
String::from("wheelchair_boarding"),
String::from("smallint"),
);
fields.insert(String::from("level_id"), String::from("text"));
fields.insert(String::from("platform_code"), String::from("text"));
fields.insert(String::from("routes"), String::from("text[]"));
fields.insert(String::from("route_types"), String::from("smallint[]"));
fields.insert(String::from("children_ids"), String::from("text[]"));
fields.insert(
String::from("children_route_types"),
String::from("smallint[]"),
);

let fields = tilejson::VectorLayer::new(String::from("data"), fields);

Expand Down Expand Up @@ -171,7 +176,7 @@ pub async fn bus_stops(

//let grid = tile_grid::Grid::wgs84();

// let bbox = grid.tile_extent(x, y, z);
// let bbox = grid.tile_extent(x, y, z);

let sqlx_pool_ref = sqlx_pool.as_ref().as_ref();

Expand Down Expand Up @@ -223,7 +228,7 @@ FROM (
Err(err) => {
eprintln!("{:?}", err);
HttpResponse::InternalServerError().body("Failed to fetch from postgres!")
},
}
}
}

Expand All @@ -236,7 +241,7 @@ pub async fn shapes_not_bus(
) -> impl Responder {
let (z, x, y) = path.into_inner();

// let grid = tile_grid::Grid::wgs84();
// let grid = tile_grid::Grid::wgs84();

//let bbox = grid.tile_extent(x, y, z);

Expand Down Expand Up @@ -343,8 +348,8 @@ pub async fn shapes_bus(
) -> impl Responder {
let (z, x, y) = path.into_inner();

// let grid = tile_grid::Grid::wgs84();
// let bbox = grid.tile_extent(x, y, z);
// let grid = tile_grid::Grid::wgs84();
// let bbox = grid.tile_extent(x, y, z);

let sqlx_pool_ref = sqlx_pool.as_ref().as_ref();

Expand Down Expand Up @@ -500,6 +505,8 @@ pub async fn irvinevehproxy(req: HttpRequest) -> impl Responder {
reqwest::get("https://passio3.com/irvine/passioTransit/gtfs/realtime/vehiclePositions")
.await;

let qs = QString::from(req.query_string());

match raw_data {
Ok(raw_data) => {
//println!("Raw data successfully downloaded");
Expand All @@ -510,6 +517,14 @@ pub async fn irvinevehproxy(req: HttpRequest) -> impl Responder {
Ok(raw_bytes) => {
let hashofresult = fasthash::metro::hash64(raw_bytes.as_ref());

if let Some(hashofbodyclient) = qs.get("bodyhash") {
if let Ok(clienthash) = hashofbodyclient.parse::<u64>() {
if clienthash == hashofresult {
return HttpResponse::NoContent().body("");
}
}
}

HttpResponse::Ok()
.insert_header(("Content-Type", "application/x-protobuf"))
.insert_header(("hash", hashofresult))
Expand Down
33 changes: 30 additions & 3 deletions src/maple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,38 @@ async fn run_ingest() -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {
unzip_feeds.len()
);

let check_for_stops_ids = unzip_feeds.into_iter().filter(|x| x.1 == true)
.map(|(feed_id, _)| feed_id)
.map(|feed_id| {

let path_str = format!("gtfs_uncompressed/{}/stops.txt", &feed_id);

let has_stop_table =
std::path::Path::new(&path_str).exists();

(feed_id, has_stop_table)
}
).collect::<Vec<(String, bool)>>();

let feeds_with_stop_table_len = check_for_stops_ids
.iter()
.map(|x| x.1 == true)
.collect::<Vec<bool>>()
.len();

let feeds_without_stop_table_len = check_for_stops_ids
.iter()
.map(|x| x.1 == false)
.collect::<Vec<bool>>()
.len();

println!("{} deleted because does not contain stops.txt {} remaining",feeds_without_stop_table_len, feeds_with_stop_table_len);

// todo! perform additional checks to ensure feed is not a zip bomb

let attempt_ids: HashMap<String, String> = {
let mut attempt_ids = HashMap::new();
for (feed_id, _) in unzip_feeds.iter() {
for (feed_id, _) in check_for_stops_ids.iter() {
let attempt_id =
format!("{}-{}", feed_id, chrono::Utc::now().timestamp_millis());
attempt_ids.insert(feed_id.clone(), attempt_id);
Expand All @@ -312,15 +339,15 @@ async fn run_ingest() -> Result<(), Box<dyn Error + std::marker::Send + Sync>> {

let attempt_ids = Arc::new(attempt_ids);

let unzip_feeds_clone = unzip_feeds.clone();
let unzip_feeds_clone = check_for_stops_ids.clone();

// 5. Process GTFS feeds

//Stream the feeds into the processing function

let ingest_progress: Arc<std::sync::Mutex<u16>> = Arc::new(std::sync::Mutex::new(0));

let feeds_to_process: Vec<(String, String, String)> = unzip_feeds_clone
let feeds_to_process: Vec<(String, String, String)> = check_for_stops_ids
.into_iter()
.filter(|unzipped_feed| unzipped_feed.1 == true)
.map(|(feed_id, _)| (feed_id.clone(), attempt_ids.get(&feed_id).unwrap().clone()))
Expand Down

0 comments on commit 6d8d286

Please sign in to comment.