From fe8526e8f54f70b96cd8fe6daa5d15caa8ae92ad Mon Sep 17 00:00:00 2001 From: Kyler Chin <7539174+kylerchin@users.noreply.github.com> Date: Thu, 18 Apr 2024 01:36:13 -0700 Subject: [PATCH] Create leader election system for aspen --- src/alpenrose/main.rs | 95 ++++++++++- src/alpenrose/single_fetch_time.rs | 20 ++- src/aspen/leader_thread.rs | 250 +++++++++++++++++++++++++++++ src/aspen/lib.rs | 24 +++ src/aspen/main.rs | 98 +++++++---- src/birch/server.rs | 8 + src/lib.rs | 94 ++++++++--- 7 files changed, 529 insertions(+), 60 deletions(-) create mode 100644 src/aspen/leader_thread.rs diff --git a/src/alpenrose/main.rs b/src/alpenrose/main.rs index e6f3325a..694db776 100644 --- a/src/alpenrose/main.rs +++ b/src/alpenrose/main.rs @@ -30,7 +30,6 @@ // https://en.wikipedia.org/wiki/Rhododendron_ferrugineum use catenary::agency_secret::*; use catenary::fast_hash; -use std::time::Instant; use catenary::postgres_tools::CatenaryConn; use catenary::postgres_tools::{make_async_pool, CatenaryPostgresPool}; use catenary::schema::gtfs::admin_credentials::last_updated_ms; @@ -53,6 +52,7 @@ use std::error::Error; use std::sync::Arc; use std::thread; use std::time::Duration; +use std::time::Instant; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio_zookeeper::*; @@ -123,7 +123,7 @@ async fn main() -> Result<(), Box> { loop { //create parent node for workers - let workers = zk + let _ = zk .create( "/alpenrose_workers", vec![], @@ -166,14 +166,14 @@ async fn main() -> Result<(), Box> { .await .unwrap(); - let leader_exists = zk.watch().exists("/alpenrose_leader").await.unwrap(); + let leader_exists = zk.exists("/alpenrose_leader").await.unwrap(); if leader_exists.is_none() { //attempt to become leader let leader = zk .create( "/alpenrose_leader", - this_worker_id.as_bytes().to_vec(), + bincode::serialize(&this_worker_id).unwrap(), Acl::open_unsafe(), CreateMode::Ephemeral, ) @@ -188,7 +188,7 @@ async fn main() -> Result<(), Box> { let leader = zk.watch().get_data("/alpenrose_leader").await.unwrap(); if let Some((leader_str_bytes, leader_stats)) = leader { - let leader_id = String::from_utf8(leader_str_bytes).unwrap(); + let leader_id: String = bincode::deserialize(&leader_str_bytes).unwrap(); if &leader_id == this_worker_id.as_ref() { //I am the leader! @@ -282,9 +282,50 @@ async fn main() -> Result<(), Box> { .as_str(), bincode::serialize(&realtime_instruction).unwrap(), Acl::open_unsafe(), - CreateMode::Ephemeral, + CreateMode::Persistent, ) .await?; + + match assignment { + Ok(_) => { + println!( + "Assigned feed {} to worker {}", + feed_id_str, worker_id + ); + } + Err(error::Create::NodeExists) => { + let set_assignment = zk + .set_data( + format!( + "/alpenrose_assignments/{}/{}", + worker_id, feed_id_str + ) + .as_str(), + None, + bincode::serialize(&realtime_instruction) + .unwrap(), + ) + .await?; + + match set_assignment { + Ok(_) => { + println!( + "Reassigned feed {} to worker {}", + feed_id_str, worker_id + ); + } + Err(err) => { + eprintln!("Error reassigning feed {} to worker {}: {:?}", feed_id_str, worker_id, err); + } + } + } + Err(err) => { + eprintln!( + "Error assigning feed {} to worker {}: {:?}", + feed_id_str, worker_id, err + ); + } + } } //update the worker's last updated time @@ -297,9 +338,49 @@ async fn main() -> Result<(), Box> { )) .unwrap(), Acl::open_unsafe(), - CreateMode::Ephemeral, + CreateMode::Persistent, ) .await?; + + match worker_assignment_metadata { + Ok(_) => { + println!("Updated worker assignment metadata"); + } + Err(error::Create::NodeExists) => { + let set_worker_assignment_metadata = zk + .set_data( + format!( + "/alpenrose_assignments/{}", + this_worker_id + ) + .as_str(), + None, + bincode::serialize(&Some( + catenary::duration_since_unix_epoch() + .as_millis(), + )) + .unwrap(), + ) + .await?; + + match set_worker_assignment_metadata { + Ok(_) => { + println!( + "Reassigned worker assignment metadata" + ); + } + Err(err) => { + eprintln!("Error reassigning worker assignment metadata: {:?}", err); + } + } + } + Err(err) => { + eprintln!( + "Error updating worker assignment metadata: {:?}", + err + ); + } + } } } } diff --git a/src/alpenrose/single_fetch_time.rs b/src/alpenrose/single_fetch_time.rs index c5c306bf..09ef68f5 100644 --- a/src/alpenrose/single_fetch_time.rs +++ b/src/alpenrose/single_fetch_time.rs @@ -1,6 +1,8 @@ use crate::KeyFormat; use crate::RealtimeFeedFetch; +use catenary::aspen::lib::ChateausLeaderHashMap; use catenary::postgres_tools::CatenaryPostgresPool; +use dashmap::DashMap; use futures::StreamExt; use rand::seq::SliceRandom; use reqwest::Response; @@ -8,7 +10,6 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::RwLock; -use dashmap::DashMap; pub async fn single_fetch_time( client: reqwest::Client, @@ -58,6 +59,21 @@ pub async fn single_fetch_time( //send the data to aspen via tarpc + let vehicle_positions_http_status = match &vehicle_positions_data { + Some(Ok(response)) => Some(response.status().as_u16()), + _ => None, + }; + + let trip_updates_http_status = match &trip_updates_data { + Some(Ok(response)) => Some(response.status().as_u16()), + _ => None, + }; + + let alerts_http_status = match &alerts_data { + Some(Ok(response)) => Some(response.status().as_u16()), + _ => None, + }; + let duration = start.elapsed(); let duration = duration.as_secs_f64(); println!("{}: {:.2?}", feed_id, duration); @@ -80,7 +96,7 @@ async fn run_optional_req( } } -enum UrlType { +pub enum UrlType { VehiclePositions, TripUpdates, Alerts, diff --git a/src/aspen/leader_thread.rs b/src/aspen/leader_thread.rs new file mode 100644 index 00000000..22c0985b --- /dev/null +++ b/src/aspen/leader_thread.rs @@ -0,0 +1,250 @@ +use catenary::aspen::lib::ChateauMetadataZookeeper; +use catenary::aspen::lib::ChateausLeaderHashMap; +use catenary::aspen::lib::RealtimeFeedMetadataZookeeper; +use catenary::postgres_tools::CatenaryPostgresPool; +use catenary::ChateauDataNoGeometry; +use diesel::query_dsl::methods::FilterDsl; +use diesel::query_dsl::select_dsl::SelectDsl; +use diesel::sql_types::{Float, Integer}; +use diesel::ExpressionMethods; +use diesel::Selectable; +use diesel::SelectableHelper; +use diesel_async::pooled_connection::bb8::PooledConnection; +use diesel_async::RunQueryDsl; +use std::collections::BTreeMap; +use std::net::IpAddr; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio_threadpool::Worker; +use tokio_zookeeper::ZooKeeper; +use tokio_zookeeper::{Acl, CreateMode}; + +pub async fn aspen_leader_thread( + workers_nodes: Arc>>, + feeds_list: Arc>>, + this_worker_id: Arc, + tailscale_ip: Arc, + arc_conn_pool: Arc, +) { + let conn_pool = arc_conn_pool.as_ref(); + let conn_pre = conn_pool.get().await; + let conn = &mut conn_pre.unwrap(); + loop { + let (zk, default_watcher) = ZooKeeper::connect(&"127.0.0.1:2181".parse().unwrap()) + .await + .unwrap(); + + let _ = zk + .create( + "/aspen_workers", + vec![], + Acl::open_unsafe(), + CreateMode::Persistent, + ) + .await + .unwrap(); + + //register that the worker exists + let _ = zk + .create( + format!("aspen_workers/{}", this_worker_id).as_str(), + bincode::serialize(&tailscale_ip).unwrap(), + Acl::open_unsafe(), + CreateMode::Ephemeral, + ) + .await + .unwrap(); + + let _ = zk + .create( + "/aspen_assigned_chateaus", + vec![], + Acl::open_unsafe(), + CreateMode::Persistent, + ) + .await + .unwrap(); + + let _ = zk + .create( + "/aspen_assigned_realtime_feed_ids", + vec![], + Acl::open_unsafe(), + CreateMode::Persistent, + ) + .await + .unwrap(); + + //attempt to check if the system is leaderless, if so, become the leader + if zk.exists("/aspen_leader").await.unwrap().is_none() { + let _ = zk + .create( + "/aspen_leader", + bincode::serialize(&this_worker_id).unwrap(), + Acl::open_unsafe(), + CreateMode::Ephemeral, + ) + .await + .unwrap(); + } + + //if the current is the current worker id, do leader tasks + // Read the DMFR dataset, divide it into chunks, and assign it to workers + + let current_leader = zk.watch().get_data("/aspen_leader").await.unwrap(); + + if let Some((leader_str_bytes, leader_stats)) = current_leader { + let leader_id: String = bincode::deserialize(&leader_str_bytes).unwrap(); + + if leader_id == *this_worker_id { + //leader tasks + let mut workers_nodes_lock = workers_nodes.lock().await; + let mut chateau_list_lock = feeds_list.lock().await; + + //make a hashmap of workers and their tailscale ips + let mut workers_ips = BTreeMap::new(); + let workers_nodes = zk.get_children("/aspen_workers").await.unwrap(); + if let Some(workers_nodes) = workers_nodes { + for worker_node in workers_nodes { + let worker_ip_data = zk + .get_data(format!("/aspen_workers/{}", worker_node).as_str()) + .await + .unwrap(); + + if let Some((worker_ip_bytes, _)) = worker_ip_data { + let worker_ip: IpAddr = bincode::deserialize(&worker_ip_bytes).unwrap(); + workers_ips.insert(worker_node, worker_ip); + } + } + } + + // read out from postgres + let chateaus_pg_query = catenary::schema::gtfs::chateaus::table + .select(catenary::models::Chateau::as_select()) + .load::(conn) + .await; + + if let Ok(chateaus) = chateaus_pg_query { + //read into a btree + + let chateau_cache_for_aspen_leader = ChateausLeaderHashMap { + chateaus: { + let mut chateaus_btree: BTreeMap = + BTreeMap::new(); + for chateau in chateaus { + chateaus_btree.insert( + chateau.chateau.clone(), + ChateauDataNoGeometry { + chateau_id: chateau.chateau.clone(), + static_feeds: chateau + .static_feeds + .clone() + .into_iter() + .flatten() + .collect(), + realtime_feeds: chateau + .realtime_feeds + .clone() + .into_iter() + .flatten() + .collect(), + }, + ); + } + chateaus_btree + }, + }; + + let mut chateau_data_changed = false; + + if chateau_list_lock.is_none() { + *chateau_list_lock = Some(chateau_cache_for_aspen_leader); + chateau_data_changed = true; + } else { + let chateau_list = chateau_list_lock.as_mut().unwrap(); + if chateau_list != &chateau_cache_for_aspen_leader { + *chateau_list_lock = Some(chateau_cache_for_aspen_leader); + chateau_data_changed = true; + } + } + + let avaliable_aspen_workers = zk.get_children("/aspen_workers").await.unwrap(); + + let mut aspen_workers_list_changed = false; + + if let Some(avaliable_aspen_workers) = avaliable_aspen_workers { + let mut avaliable_aspen_workers = avaliable_aspen_workers; + avaliable_aspen_workers.sort(); + + if avaliable_aspen_workers != *workers_nodes_lock { + *workers_nodes_lock = avaliable_aspen_workers; + aspen_workers_list_changed = true; + } + } + + if chateau_data_changed || aspen_workers_list_changed { + // divide it into chunks + if let Some(chateau_list_lock) = chateau_list_lock.as_ref() { + for (index, (chateau_id, chateau)) in + chateau_list_lock.chateaus.iter().enumerate() + { + let selected_aspen_worker_to_assign = + workers_nodes_lock[index % workers_nodes_lock.len()].clone(); + + let assigned_chateau_data = ChateauMetadataZookeeper { + worker_id: selected_aspen_worker_to_assign.clone(), + tailscale_ip: workers_ips + .get(&selected_aspen_worker_to_assign) + .unwrap() + .clone(), + }; + + let _ = zk + .create( + format!("/aspen_assigned_chateaus/{}", chateau_id).as_str(), + bincode::serialize(&assigned_chateau_data).unwrap(), + Acl::open_unsafe(), + CreateMode::Persistent, + ) + .await + .unwrap(); + + for realtime_feed_id in chateau.realtime_feeds.iter() { + let assigned_realtime_feed_data = + RealtimeFeedMetadataZookeeper { + worker_id: selected_aspen_worker_to_assign.clone(), + tailscale_ip: workers_ips + .get(&selected_aspen_worker_to_assign) + .unwrap() + .clone(), + chateau_id: chateau_id.clone(), + }; + + let _ = zk + .create( + format!( + "/aspen_assigned_realtime_feed_ids/{}", + realtime_feed_id + ) + .as_str(), + bincode::serialize(&assigned_realtime_feed_data) + .unwrap(), + Acl::open_unsafe(), + CreateMode::Persistent, + ) + .await + .unwrap(); + } + } + } + } + + std::mem::drop(workers_nodes_lock); + std::mem::drop(chateau_list_lock); + } + } + + std::thread::sleep(std::time::Duration::from_secs(10)); + } + } +} diff --git a/src/aspen/lib.rs b/src/aspen/lib.rs index 905aa2bf..7d314654 100644 --- a/src/aspen/lib.rs +++ b/src/aspen/lib.rs @@ -4,6 +4,7 @@ /// This is the service definition. It looks a lot like a trait definition. /// It defines one RPC, hello, which takes one arg, name, and returns a String. + #[tarpc::service] pub trait AspenRpc { /// Returns a greeting for name. @@ -23,3 +24,26 @@ pub trait AspenRpc { alerts_response_code: Option, ) -> bool; } + +use crate::ChateauDataNoGeometry; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; +use std::net::IpAddr; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ChateauMetadataZookeeper { + pub worker_id: String, + pub tailscale_ip: IpAddr, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct RealtimeFeedMetadataZookeeper { + pub worker_id: String, + pub tailscale_ip: IpAddr, + pub chateau_id: String, +} + +#[derive(Debug, Hash, Clone, Eq, PartialEq)] +pub struct ChateausLeaderHashMap { + pub chateaus: BTreeMap, +} diff --git a/src/aspen/main.rs b/src/aspen/main.rs index 556c3084..02339dbb 100644 --- a/src/aspen/main.rs +++ b/src/aspen/main.rs @@ -25,14 +25,16 @@ clippy::iter_cloned_collect )] -mod lib; +use catenary::aspen::lib::*; +use catenary::postgres_tools::make_async_pool; use clap::Parser; +use dashmap::DashMap; use futures::{future, prelude::*}; -use lib::AspenRpc; use rand::{ distributions::{Distribution, Uniform}, thread_rng, }; +use std::sync::Arc; use std::{ net::{IpAddr, Ipv6Addr, SocketAddr}, time::Duration, @@ -42,9 +44,13 @@ use tarpc::{ server::{self, incoming::Incoming, Channel}, tokio_serde::formats::Json, }; +use tokio::sync::Mutex; use tokio::time; - +use uuid::Uuid; +mod leader_thread; +use leader_thread::aspen_leader_thread; mod import_kactus; +use catenary::postgres_tools::CatenaryPostgresPool; #[derive(Parser)] struct Flags { @@ -56,9 +62,12 @@ struct Flags { // This is the type that implements the generated World trait. It is the business logic // and is used to start the server. #[derive(Clone)] -struct AspenServer { - addr: SocketAddr, - this_tailscale_ip: IpAddr, +pub struct AspenServer { + pub addr: SocketAddr, + pub this_tailscale_ip: IpAddr, + pub worker_id: Arc, // Worker Id for this instance of Aspen + pub authoritative_data_store: Arc>, + pub conn_pool: Arc, } impl AspenRpc for AspenServer { @@ -66,7 +75,7 @@ impl AspenRpc for AspenServer { let sleep_time = Duration::from_millis(Uniform::new_inclusive(1, 10).sample(&mut thread_rng())); time::sleep(sleep_time).await; - format!("Hello, {name}! You are connected from {}", self.0) + format!("Hello, {name}! You are connected from {}", self.addr) } async fn new_rt_kactus( @@ -94,35 +103,70 @@ async fn spawn(fut: impl Future + Send + 'static) { #[tokio::main] async fn main() -> anyhow::Result<()> { + // Worker Id for this instance of Aspen + let this_worker_id = Arc::new(Uuid::new_v4().to_string()); + + //connect to postgres + let conn_pool: CatenaryPostgresPool = make_async_pool().await.unwrap(); + let arc_conn_pool: Arc = Arc::new(conn_pool); + let flags = Flags::parse(); //init_tracing("Tarpc Example Server")?; - let server_addr = (IpAddr::V6(Ipv6Addr::LOCALHOST), flags.port); + let tailscale_ip = catenary::tailscale::interface().expect("no tailscale interface found"); + + let server_addr = (tailscale_ip, flags.port); // JSON transport is provided by the json_transport tarpc module. It makes it easy // to start up a serde-powered json serialization strategy over TCP. let mut listener = tarpc::serde_transport::tcp::listen(&server_addr, Json::default).await?; //tracing::info!("Listening on port {}", listener.local_addr().port()); listener.config_mut().max_frame_length(usize::MAX); - listener - // Ignore accept errors. - .filter_map(|r| future::ready(r.ok())) - .map(server::BaseChannel::with_defaults) - // Limit channels to 1 per IP. - .max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip()) - // serve is generated by the service attribute. It takes as input any type implementing - // the generated World trait. - .map(|channel| { - let server = AspenServer { - addr: channel.transport().peer_addr().unwrap(), - this_tailscale_ip: catenary::tailscale::interface().unwrap() - }; - channel.execute(server.serve()).for_each(spawn) - }) - // Max 10 channels. - .buffer_unordered(10) - .for_each(|_| async {}) - .await; + + let workers_nodes: Arc>> = Arc::new(Mutex::new(Vec::new())); + let chateau_list: Arc>> = Arc::new(Mutex::new(None)); + + //run both the leader and the listener simultaniously + futures::join!( + { + let workers_nodes = Arc::clone(&workers_nodes); + let chateau_list = Arc::clone(&chateau_list); + let this_worker_id = Arc::clone(&this_worker_id); + let tailscale_ip = Arc::new(tailscale_ip.clone()); + let arc_conn_pool = Arc::clone(&arc_conn_pool); + async { + aspen_leader_thread( + workers_nodes, + chateau_list, + this_worker_id, + tailscale_ip, + arc_conn_pool, + ) + .await; + } + }, + listener + // Ignore accept errors. + .filter_map(|r| future::ready(r.ok())) + .map(server::BaseChannel::with_defaults) + // Limit channels to 1 per IP. + .max_channels_per_key(1, |t| t.transport().peer_addr().unwrap().ip()) + // serve is generated by the service attribute. It takes as input any type implementing + // the generated World trait. + .map(|channel| { + let server = AspenServer { + addr: channel.transport().peer_addr().unwrap(), + this_tailscale_ip: tailscale_ip, + worker_id: Arc::clone(&this_worker_id), + authoritative_data_store: Arc::new(DashMap::new()), + conn_pool: Arc::clone(&arc_conn_pool), + }; + channel.execute(server.serve()).for_each(spawn) + }) + // Max n channels. + .buffer_unordered(32) + .for_each(|_| async {}) + ); Ok(()) } diff --git a/src/birch/server.rs b/src/birch/server.rs index 30eeadb4..5f8c3e4f 100644 --- a/src/birch/server.rs +++ b/src/birch/server.rs @@ -406,6 +406,10 @@ pub async fn shapes_not_bus( ) -> impl Responder { let (z, x, y) = path.into_inner(); + if (z < 2) { + return HttpResponse::BadRequest().body("Zoom level too low"); + } + // let grid = tile_grid::Grid::wgs84(); //let bbox = grid.tile_extent(x, y, z); @@ -488,6 +492,10 @@ pub async fn shapes_bus( ) -> impl Responder { let (z, x, y) = path.into_inner(); + if (z < 4) { + return HttpResponse::BadRequest().body("Zoom level too low"); + } + // let grid = tile_grid::Grid::wgs84(); // let bbox = grid.tile_extent(x, y, z); diff --git a/src/lib.rs b/src/lib.rs index e59bf350..0555d1e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,11 +42,19 @@ pub mod postgres_tools; pub mod schema; use fasthash::MetroHasher; +use gtfs_rt::VehicleDescriptor; use std::hash::Hash; use std::hash::Hasher; use std::time::Duration; use std::time::{SystemTime, UNIX_EPOCH}; +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct ChateauDataNoGeometry { + pub chateau_id: String, + pub static_feeds: Vec, + pub realtime_feeds: Vec, +} + pub const WGS_84_SRID: u32 = 4326; pub mod gtfs_schedule_protobuf { @@ -114,33 +122,71 @@ pub fn duration_since_unix_epoch() -> Duration { } pub mod tailscale { - //stolen from tailscale-rs - //significantly adapted by Kyler Chin to use ipv6 addressing -extern crate ipnetwork; -extern crate pnet; + //stolen from tailscale-rs + //significantly adapted by Kyler Chin to use ipv6 addressing + extern crate ipnetwork; + extern crate pnet; -use ipnetwork::IpNetwork; -use pnet::datalink; -use std::net::IpAddr; + use ipnetwork::IpNetwork; + use pnet::datalink; + use std::net::IpAddr; -fn maybe_tailscale(s: &str) -> bool { + fn maybe_tailscale(s: &str) -> bool { s.starts_with("tailscale") + } + + /// Retrieve the IP address of the current machine's Tailscale interface, if any. + /// ``` + /// let iface = tailscale::interface().expect( "no tailscale interface found"); + /// ``` + pub fn interface() -> Option { + let ifaces = datalink::interfaces(); + let netmask: IpNetwork = "100.64.0.0/10".parse().unwrap(); + ifaces + .iter() + .filter(|iface| maybe_tailscale(&iface.name)) + .flat_map(|iface| iface.ips.clone()) + .filter(|ipnet| ipnet.is_ipv6() && netmask.contains(ipnet.network())) + .map(|ipnet| ipnet.ip()) + .next() + } } -/// Retrieve the IP address of the current machine's Tailscale interface, if any. -/// ``` -/// let iface = tailscale::interface().expect("no tailscale interface found"); -/// ``` -pub fn interface() -> Option { - let ifaces = datalink::interfaces(); - let netmask: IpNetwork = "100.64.0.0/10".parse().unwrap(); - ifaces - .iter() - .filter(|iface| maybe_tailscale(&iface.name)) - .map(|iface| iface.ips.clone()) - .flatten() - .filter(|ipnet| ipnet.is_ipv6() && netmask.contains(ipnet.network())) - .map(|ipnet| ipnet.ip()) - .next() +pub mod AspenDataset { + use gtfs_rt::TripUpdate; + use gtfs_rt::VehicleDescriptor; + use std::{collections::HashMap, hash::Hash}; + + pub struct AspenisedData { + pub vehicle_positions: Vec, + pub vehicle_routes_cache: HashMap, + //id to trip update + pub trip_updates: HashMap, + pub trip_updates_lookup_by_trip_id_to_trip_update_ids: HashMap>, + pub raw_alerts: Option>, + pub impacted_routes_alerts: Option>>, + pub impacted_stops_alerts: Option>>, + pub impacted_routes_stops_alerts: Option>>, + } + + pub struct AspenisedVehiclePosition { + pub trip: Option, + pub vehicle: Option, + } + + pub struct AspenisedVehicleTripInfo { + pub trip_id: Option, + pub trip_headsign: Option, + pub route_id: Option, + pub trip_short_name: Option, + } + + pub struct AspenisedVehicleRouteCache { + route_short_name: Option, + route_long_name: Option, + //route_short_name_langs: Option>, + //route_long_name_langs: Option>, + route_colour: Option, + route_text_colour: Option, + } } -} \ No newline at end of file