From 2ccdfd502dfdc0311aec28f19febf462ad1eabad Mon Sep 17 00:00:00 2001 From: Kyler Chin <7539174+kylerchin@users.noreply.github.com> Date: Tue, 26 Mar 2024 04:59:31 -0700 Subject: [PATCH] Rust fmt --- src/birch/server.rs | 2 +- .../shapes_into_postgres.rs | 132 +++++++++--------- src/maple/main.rs | 72 +++++----- src/maple/transitland_download.rs | 6 +- src/postgres_tools/mod.rs | 33 +++-- 5 files changed, 130 insertions(+), 115 deletions(-) diff --git a/src/birch/server.rs b/src/birch/server.rs index fbedd920..e7a11a96 100644 --- a/src/birch/server.rs +++ b/src/birch/server.rs @@ -1,3 +1,3 @@ fn main() { println!("Hello, world!"); -} \ No newline at end of file +} diff --git a/src/maple/gtfs_ingestion_sequence/shapes_into_postgres.rs b/src/maple/gtfs_ingestion_sequence/shapes_into_postgres.rs index f5bb7f26..0aa7af16 100644 --- a/src/maple/gtfs_ingestion_sequence/shapes_into_postgres.rs +++ b/src/maple/gtfs_ingestion_sequence/shapes_into_postgres.rs @@ -1,10 +1,10 @@ +use diesel_async::RunQueryDsl; use postgis::ewkb; use rgb::RGB; use std::collections::HashMap; use std::collections::HashSet; use std::error::Error; use std::sync::Arc; -use diesel_async::RunQueryDsl; use crate::gtfs_handlers::colour_correction; use crate::gtfs_handlers::enum_to_int::route_type_to_int; @@ -24,8 +24,8 @@ pub async fn shapes_into_postgres( attempt_id: &str, ) -> Result<(), Box> { let conn_pool = arc_conn_pool.as_ref(); - let conn_pre = conn_pool.get().await; - let conn = &mut conn_pre?; + let conn_pre = conn_pool.get().await; + let conn = &mut conn_pre?; for (shape_id, shape) in gtfs.shapes.iter() { let mut route_ids: HashSet = gtfs @@ -89,73 +89,79 @@ pub async fn shapes_into_postgres( //Lines are only valid in postgres if they contain 2 or more points if preshape.len() >= 2 { - let linestring:postgis_diesel::types::LineString = postgis_diesel::types::LineString { - srid: Some(4326), - points: preshape - .iter() - .map(|point| postgis_diesel::types::Point { - x: point.longitude, - y: point.latitude, - srid: Some(4326), - }) - .collect(), + let linestring: postgis_diesel::types::LineString = + postgis_diesel::types::LineString { + srid: Some(4326), + points: preshape + .iter() + .map(|point| postgis_diesel::types::Point { + x: point.longitude, + y: point.latitude, + srid: Some(4326), + }) + .collect(), + }; + + let text_color = match shape_to_text_color_lookup.get(shape_id) { + Some(color) => format!("{:02x}{:02x}{:02x}", color.r, color.g, color.b), + None => String::from("000000"), }; - - - let text_color = match shape_to_text_color_lookup.get(shape_id) { - Some(color) => format!("{:02x}{:02x}{:02x}", color.r, color.g, color.b), - None => String::from("000000"), - }; - //creates a text label for the shape to be displayed with on the map - //todo! change this with i18n - let route_label: String = route_ids - .iter() - .map(|route_id| { - let route = gtfs.routes.get(route_id); - match route { - Some(route) => match route.short_name.is_some() { - true => route.short_name.to_owned(), - false => match route.long_name.is_some() { - true => route.long_name.to_owned(), - false => None, + //creates a text label for the shape to be displayed with on the map + //todo! change this with i18n + let route_label: String = route_ids + .iter() + .map(|route_id| { + let route = gtfs.routes.get(route_id); + match route { + Some(route) => match route.short_name.is_some() { + true => route.short_name.to_owned(), + false => match route.long_name.is_some() { + true => route.long_name.to_owned(), + false => None, + }, }, - }, - _ => None, - } - }) - .filter(|route_label| route_label.is_some()) - .map(|route_label| rename_route_string(route_label.as_ref().unwrap().to_owned())) - .collect::>() - .join(",") - .as_str() - .replace("Orange County", "OC") - .replace("Inland Empire", "IE") - .to_string(); - - let shape_value: catenary::models::Shape = catenary::models::Shape { - onestop_feed_id: feed_id.to_string(), - attempt_id: attempt_id.to_string(), - shape_id: shape_id.clone(), - chateau: chateau_id.to_string(), - linestring:linestring, - color: Some(color_to_upload), - routes: Some(route_ids.iter().map(|route_id| Some(route_id.to_string())).collect()), - route_type: route_type_number, - route_label: Some(route_label), - route_label_translations: None, - text_color: Some(text_color), - }; + _ => None, + } + }) + .filter(|route_label| route_label.is_some()) + .map(|route_label| rename_route_string(route_label.as_ref().unwrap().to_owned())) + .collect::>() + .join(",") + .as_str() + .replace("Orange County", "OC") + .replace("Inland Empire", "IE") + .to_string(); + + let shape_value: catenary::models::Shape = catenary::models::Shape { + onestop_feed_id: feed_id.to_string(), + attempt_id: attempt_id.to_string(), + shape_id: shape_id.clone(), + chateau: chateau_id.to_string(), + linestring: linestring, + color: Some(color_to_upload), + routes: Some( + route_ids + .iter() + .map(|route_id| Some(route_id.to_string())) + .collect(), + ), + route_type: route_type_number, + route_label: Some(route_label), + route_label_translations: None, + text_color: Some(text_color), + }; - { - use catenary::schema::gtfs::shapes::dsl::*; + { + use catenary::schema::gtfs::shapes::dsl::*; - diesel::insert_into(shapes) - .values(shape_value) - .execute(conn).await?; + diesel::insert_into(shapes) + .values(shape_value) + .execute(conn) + .await?; + } } } - } Ok(()) } diff --git a/src/maple/main.rs b/src/maple/main.rs index 9a73f5e5..dafa121a 100644 --- a/src/maple/main.rs +++ b/src/maple/main.rs @@ -3,12 +3,12 @@ use catenary::postgres_tools::CatenaryPostgresPool; // Initial version 3 of ingest written by Kyler Chin // This was heavily inspired and copied from Emma Alexia, thank you Emma! // Removal of the attribution is not allowed, as covered under the AGPL license +use catenary::postgres_tools::get_connection_pool; use diesel::prelude::*; use dotenvy::dotenv; use service::quicli::prelude::info; use std::collections::HashSet; use std::env; -use catenary::postgres_tools::get_connection_pool; use std::error::Error; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; @@ -17,9 +17,9 @@ use std::time::Duration; use threadpool::ThreadPool; use tokio::runtime; +use diesel::insert_into; use futures::StreamExt; use git2::Repository; -use diesel::insert_into; mod gtfs_handlers; mod gtfs_ingestion_sequence; @@ -106,13 +106,13 @@ async fn run_ingest() -> Result<(), Box> { info!("Initializing database connection"); - // get connection pool from database pool - let conn_pool: CatenaryPostgresPool<'_> = get_connection_pool().await; - let arc_conn_pool: Arc> = Arc::new(conn_pool); - - let conn_pool = arc_conn_pool.as_ref(); - let conn_pre = conn_pool.get().await; - let conn = &mut conn_pre?; + // get connection pool from database pool + let conn_pool: CatenaryPostgresPool<'_> = get_connection_pool().await; + let arc_conn_pool: Arc> = Arc::new(conn_pool); + + let conn_pool = arc_conn_pool.as_ref(); + let conn_pre = conn_pool.get().await; + let conn = &mut conn_pre?; // reads a transitland directory and returns a hashmap of all the data feeds (urls) associated with their correct operator and vise versa // See https://github.com/catenarytransit/dmfr-folder-reader @@ -182,32 +182,32 @@ async fn run_ingest() -> Result<(), Box> { // (todo) hand off to routing algorithm preprocessing engine Prarie (needs further research and development) // 2. update metadata - futures::stream::iter(eligible_feeds.iter() - .map(|eligible_feed| - { - async move { - /* let sql_query = sqlx::query!("INSERT INTO gtfs.static_download_attempts - (onestop_feed_id, url, file_hash, downloaded_unix_time_ms, ingested, failed, http_response_code, mark_for_redo, ingestion_version) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ", - eligible_feed.feed_id, - eligible_feed.url, - match eligible_feed.hash { - Some(hash) => Some(format!("{}", hash)), - None => None - }, - eligible_feed.download_timestamp_ms as i64, - false, - !eligible_feed.operation_success, - eligible_feed.http_response_code, - false, - MAPLE_INGESTION_VERSION - ); - - let _ = insert_into()*/ - } + futures::stream::iter(eligible_feeds.iter().map(|eligible_feed| { + async move { + /* let sql_query = sqlx::query!("INSERT INTO gtfs.static_download_attempts + (onestop_feed_id, url, file_hash, downloaded_unix_time_ms, ingested, failed, http_response_code, mark_for_redo, ingestion_version) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ", + eligible_feed.feed_id, + eligible_feed.url, + match eligible_feed.hash { + Some(hash) => Some(format!("{}", hash)), + None => None + }, + eligible_feed.download_timestamp_ms as i64, + false, + !eligible_feed.operation_success, + eligible_feed.http_response_code, + false, + MAPLE_INGESTION_VERSION + ); + + let _ = insert_into()*/ } - )).buffer_unordered(100).collect::>().await; + })) + .buffer_unordered(100) + .collect::>() + .await; // 3. Assign Attempt IDs to each feed_id that is ready to ingest @@ -216,7 +216,6 @@ async fn run_ingest() -> Result<(), Box> { &chateau_result, Arc::clone(&arc_conn_pool), ); - // 4. Unzip folders let unzip_feeds: Vec<(String, bool)> = @@ -270,7 +269,8 @@ async fn run_ingest() -> Result<(), Box> { let conn_pool = arc_conn_pool.as_ref(); let conn_pre = conn_pool.get().await; let conn = &mut conn_pre.unwrap(); - let gtfs_process_result = gtfs_process_feed(&feed_id, Arc::clone(&arc_conn_pool), "", "").await; + let gtfs_process_result = + gtfs_process_feed(&feed_id, Arc::clone(&arc_conn_pool), "", "").await; if gtfs_process_result.is_ok() { // at the end, UPDATE gtfs.static_download_attempts where onstop_feed_id and download_unix_time_ms match as ingested diff --git a/src/maple/transitland_download.rs b/src/maple/transitland_download.rs index 01037063..e56f6f98 100644 --- a/src/maple/transitland_download.rs +++ b/src/maple/transitland_download.rs @@ -1,12 +1,13 @@ use catenary::models::StaticDownloadAttempt; use catenary::schema::gtfs::static_download_attempts; +use diesel::prelude::*; +use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl}; use dmfr_folder_reader::ReturnDmfrAnalysis; use futures; -use diesel::prelude::*; -use diesel_async::{RunQueryDsl, AsyncConnection, AsyncPgConnection}; use reqwest::Client as ReqwestClient; use reqwest::Request; +use crate::CatenaryPostgresPool; use reqwest::RequestBuilder; use std::collections::HashSet; use std::fs; @@ -17,7 +18,6 @@ use std::sync::Arc; use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use crate::CatenaryPostgresPool; use crate::gtfs_handlers::MAPLE_INGESTION_VERSION; diff --git a/src/postgres_tools/mod.rs b/src/postgres_tools/mod.rs index 2563ec5a..476d86e5 100644 --- a/src/postgres_tools/mod.rs +++ b/src/postgres_tools/mod.rs @@ -1,25 +1,34 @@ +use bb8::Pool; +use db_pool::r#async::ConnectionPool; +use db_pool::r#async::DatabasePool; +use db_pool::r#async::DatabasePoolBuilderTrait; +use db_pool::r#async::DieselAsyncPostgresBackend; +use db_pool::r#async::DieselBb8; +use db_pool::r#async::Reusable; +use db_pool::PrivilegedPostgresConfig; use diesel::prelude::*; use diesel::prelude::*; use dotenvy::dotenv; -use bb8::Pool; use std::env; use std::error::Error; use std::sync::Arc; use std::thread; use tokio::sync::OnceCell; -use db_pool::r#async::ConnectionPool; -use db_pool::r#async::DieselAsyncPostgresBackend; -use db_pool::r#async::Reusable; -use db_pool::r#async::DatabasePool; -use db_pool::r#async::DieselBb8; -use db_pool::PrivilegedPostgresConfig; -use db_pool::r#async::DatabasePoolBuilderTrait; -pub type CatenaryPostgresPool<'a> = db_pool::r#async::Reusable<'a, db_pool::r#async::ConnectionPool>>; -pub type CatenaryPostgresConnection<'b> = &'b mut bb8::PooledConnection<'b, diesel_async::pooled_connection::AsyncDieselConnectionManager>; +pub type CatenaryPostgresPool<'a> = db_pool::r#async::Reusable< + 'a, + db_pool::r#async::ConnectionPool< + db_pool::r#async::DieselAsyncPostgresBackend, + >, +>; +pub type CatenaryPostgresConnection<'b> = &'b mut bb8::PooledConnection< + 'b, + diesel_async::pooled_connection::AsyncDieselConnectionManager< + diesel_async::pg::AsyncPgConnection, + >, +>; -pub async fn get_connection_pool( -) -> CatenaryPostgresPool<'static> { +pub async fn get_connection_pool() -> CatenaryPostgresPool<'static> { static POOL: OnceCell>> = OnceCell::const_new();