Skip to content

Commit

Permalink
Save data before appending to job queue
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 24, 2024
1 parent 4ff7657 commit d3c77e5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 9 deletions.
6 changes: 3 additions & 3 deletions src/aspen/import_alpenrose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub async fn new_rt_data(

let vehicles_gtfs_rt = match vehicles_response_code {
Some(200) => match vehicles {
Some(v) => match parse_gtfs_rt_message(&v.as_slice()) {
Some(v) => match parse_gtfs_rt_message(v.as_slice()) {
Ok(v) => Some(v),
Err(e) => {
println!("Error decoding vehicles: {}", e);
Expand All @@ -75,7 +75,7 @@ pub async fn new_rt_data(

let trips_gtfs_rt = match trips_response_code {
Some(200) => match trips {
Some(t) => match parse_gtfs_rt_message(&t.as_slice()) {
Some(t) => match parse_gtfs_rt_message(t.as_slice()) {
Ok(t) => Some(t),
Err(e) => {
println!("Error decoding trips: {}", e);
Expand All @@ -89,7 +89,7 @@ pub async fn new_rt_data(

let alerts_gtfs_rt = match alerts_response_code {
Some(200) => match alerts {
Some(a) => match parse_gtfs_rt_message(&a.as_slice()) {
Some(a) => match parse_gtfs_rt_message(a.as_slice()) {
Ok(a) => Some(a),
Err(e) => {
println!("Error decoding alerts: {}", e);
Expand Down
3 changes: 0 additions & 3 deletions src/aspen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ pub struct ChateausLeaderHashMap {
pub struct ProcessAlpenroseData {
pub chateau_id: String,
pub realtime_feed_id: String,
pub vehicles: Option<Vec<u8>>,
pub trips: Option<Vec<u8>>,
pub alerts: Option<Vec<u8>>,
pub has_vehicles: bool,
pub has_trips: bool,
pub has_alerts: bool,
Expand Down
73 changes: 70 additions & 3 deletions src/aspen/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,79 @@ impl AspenRpc for AspenServer {
alerts_response_code: Option<u16>,
time_of_submission_ms: u64,
) -> bool {

let vehicles_gtfs_rt = match vehicles_response_code {
Some(200) => match vehicles {
Some(v) => match parse_gtfs_rt_message(v.as_slice()) {
Ok(v) => Some(v),
Err(e) => {
println!("Error decoding vehicles: {}", e);
None
}
},
None => None,
},
_ => None,
};

let trips_gtfs_rt = match trips_response_code {
Some(200) => match trips {
Some(t) => match parse_gtfs_rt_message(t.as_slice()) {
Ok(t) => Some(t),
Err(e) => {
println!("Error decoding trips: {}", e);
None
}
},
None => None,
},
_ => None,
};

let alerts_gtfs_rt = match alerts_response_code {
Some(200) => match alerts {
Some(a) => match parse_gtfs_rt_message(a.as_slice()) {
Ok(a) => Some(a),
Err(e) => {
println!("Error decoding alerts: {}", e);
None
}
},
None => None,
},
_ => None,
};

//get and update raw gtfs_rt data

println!("Parsed FeedMessages for {}", realtime_feed_id);

if let Some(vehicles_gtfs_rt) = &vehicles_gtfs_rt {
authoritative_gtfs_rt
.entry((realtime_feed_id.clone(), GtfsRtType::VehiclePositions))
.and_modify(|gtfs_data| *gtfs_data = vehicles_gtfs_rt.clone())
.or_insert(vehicles_gtfs_rt.clone());
}

if let Some(trip_gtfs_rt) = &trips_gtfs_rt {
authoritative_gtfs_rt
.entry((realtime_feed_id.clone(), GtfsRtType::TripUpdates))
.and_modify(|gtfs_data| *gtfs_data = trip_gtfs_rt.clone())
.or_insert(trip_gtfs_rt.clone());
}

if let Some(alerts_gtfs_rt) = &alerts_gtfs_rt {
authoritative_gtfs_rt
.entry((realtime_feed_id.clone(), GtfsRtType::Alerts))
.and_modify(|gtfs_data| *gtfs_data = alerts_gtfs_rt.clone())
.or_insert(alerts_gtfs_rt.clone());
}

println!("Saved FeedMessages for {}", realtime_feed_id);

self.alpenrose_to_process_queue.push(ProcessAlpenroseData {
chateau_id,
realtime_feed_id,
vehicles,
trips,
alerts,
has_vehicles,
has_trips,
has_alerts,
Expand Down

0 comments on commit d3c77e5

Please sign in to comment.