Skip to content

Commit

Permalink
Rust fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Mar 26, 2024
1 parent a8be968 commit 2ccdfd5
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 115 deletions.
2 changes: 1 addition & 1 deletion src/birch/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
fn main() {
println!("Hello, world!");
}
}
132 changes: 69 additions & 63 deletions src/maple/gtfs_ingestion_sequence/shapes_into_postgres.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use diesel_async::RunQueryDsl;
use postgis::ewkb;
use rgb::RGB;
use std::collections::HashMap;
use std::collections::HashSet;
use std::error::Error;
use std::sync::Arc;
use diesel_async::RunQueryDsl;

use crate::gtfs_handlers::colour_correction;
use crate::gtfs_handlers::enum_to_int::route_type_to_int;
Expand All @@ -24,8 +24,8 @@ pub async fn shapes_into_postgres(
attempt_id: &str,
) -> Result<(), Box<dyn Error>> {
let conn_pool = arc_conn_pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

for (shape_id, shape) in gtfs.shapes.iter() {
let mut route_ids: HashSet<String> = gtfs
Expand Down Expand Up @@ -89,73 +89,79 @@ pub async fn shapes_into_postgres(

//Lines are only valid in postgres if they contain 2 or more points
if preshape.len() >= 2 {
let linestring:postgis_diesel::types::LineString<postgis_diesel::types::Point> = postgis_diesel::types::LineString {
srid: Some(4326),
points: preshape
.iter()
.map(|point| postgis_diesel::types::Point {
x: point.longitude,
y: point.latitude,
srid: Some(4326),
})
.collect(),
let linestring: postgis_diesel::types::LineString<postgis_diesel::types::Point> =
postgis_diesel::types::LineString {
srid: Some(4326),
points: preshape
.iter()
.map(|point| postgis_diesel::types::Point {
x: point.longitude,
y: point.latitude,
srid: Some(4326),
})
.collect(),
};

let text_color = match shape_to_text_color_lookup.get(shape_id) {
Some(color) => format!("{:02x}{:02x}{:02x}", color.r, color.g, color.b),
None => String::from("000000"),
};


let text_color = match shape_to_text_color_lookup.get(shape_id) {
Some(color) => format!("{:02x}{:02x}{:02x}", color.r, color.g, color.b),
None => String::from("000000"),
};

//creates a text label for the shape to be displayed with on the map
//todo! change this with i18n
let route_label: String = route_ids
.iter()
.map(|route_id| {
let route = gtfs.routes.get(route_id);
match route {
Some(route) => match route.short_name.is_some() {
true => route.short_name.to_owned(),
false => match route.long_name.is_some() {
true => route.long_name.to_owned(),
false => None,
//creates a text label for the shape to be displayed with on the map
//todo! change this with i18n
let route_label: String = route_ids
.iter()
.map(|route_id| {
let route = gtfs.routes.get(route_id);
match route {
Some(route) => match route.short_name.is_some() {
true => route.short_name.to_owned(),
false => match route.long_name.is_some() {
true => route.long_name.to_owned(),
false => None,
},
},
},
_ => None,
}
})
.filter(|route_label| route_label.is_some())
.map(|route_label| rename_route_string(route_label.as_ref().unwrap().to_owned()))
.collect::<Vec<String>>()
.join(",")
.as_str()
.replace("Orange County", "OC")
.replace("Inland Empire", "IE")
.to_string();

let shape_value: catenary::models::Shape = catenary::models::Shape {
onestop_feed_id: feed_id.to_string(),
attempt_id: attempt_id.to_string(),
shape_id: shape_id.clone(),
chateau: chateau_id.to_string(),
linestring:linestring,
color: Some(color_to_upload),
routes: Some(route_ids.iter().map(|route_id| Some(route_id.to_string())).collect()),
route_type: route_type_number,
route_label: Some(route_label),
route_label_translations: None,
text_color: Some(text_color),
};
_ => None,
}
})
.filter(|route_label| route_label.is_some())
.map(|route_label| rename_route_string(route_label.as_ref().unwrap().to_owned()))
.collect::<Vec<String>>()
.join(",")
.as_str()
.replace("Orange County", "OC")
.replace("Inland Empire", "IE")
.to_string();

let shape_value: catenary::models::Shape = catenary::models::Shape {
onestop_feed_id: feed_id.to_string(),
attempt_id: attempt_id.to_string(),
shape_id: shape_id.clone(),
chateau: chateau_id.to_string(),
linestring: linestring,
color: Some(color_to_upload),
routes: Some(
route_ids
.iter()
.map(|route_id| Some(route_id.to_string()))
.collect(),
),
route_type: route_type_number,
route_label: Some(route_label),
route_label_translations: None,
text_color: Some(text_color),
};

{
use catenary::schema::gtfs::shapes::dsl::*;
{
use catenary::schema::gtfs::shapes::dsl::*;

diesel::insert_into(shapes)
.values(shape_value)
.execute(conn).await?;
diesel::insert_into(shapes)
.values(shape_value)
.execute(conn)
.await?;
}
}
}
}

Ok(())
}
72 changes: 36 additions & 36 deletions src/maple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use catenary::postgres_tools::CatenaryPostgresPool;
// Initial version 3 of ingest written by Kyler Chin
// This was heavily inspired and copied from Emma Alexia, thank you Emma!
// Removal of the attribution is not allowed, as covered under the AGPL license
use catenary::postgres_tools::get_connection_pool;
use diesel::prelude::*;
use dotenvy::dotenv;
use service::quicli::prelude::info;
use std::collections::HashSet;
use std::env;
use catenary::postgres_tools::get_connection_pool;
use std::error::Error;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
Expand All @@ -17,9 +17,9 @@ use std::time::Duration;
use threadpool::ThreadPool;
use tokio::runtime;

use diesel::insert_into;
use futures::StreamExt;
use git2::Repository;
use diesel::insert_into;

mod gtfs_handlers;
mod gtfs_ingestion_sequence;
Expand Down Expand Up @@ -106,13 +106,13 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {

info!("Initializing database connection");

// get connection pool from database pool
let conn_pool: CatenaryPostgresPool<'_> = get_connection_pool().await;
let arc_conn_pool: Arc<CatenaryPostgresPool<'_>> = Arc::new(conn_pool);
let conn_pool = arc_conn_pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;
// get connection pool from database pool
let conn_pool: CatenaryPostgresPool<'_> = get_connection_pool().await;
let arc_conn_pool: Arc<CatenaryPostgresPool<'_>> = Arc::new(conn_pool);

let conn_pool = arc_conn_pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre?;

// reads a transitland directory and returns a hashmap of all the data feeds (urls) associated with their correct operator and vise versa
// See https://github.com/catenarytransit/dmfr-folder-reader
Expand Down Expand Up @@ -182,32 +182,32 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {
// (todo) hand off to routing algorithm preprocessing engine Prarie (needs further research and development)

// 2. update metadata
futures::stream::iter(eligible_feeds.iter()
.map(|eligible_feed|
{
async move {
/* let sql_query = sqlx::query!("INSERT INTO gtfs.static_download_attempts
(onestop_feed_id, url, file_hash, downloaded_unix_time_ms, ingested, failed, http_response_code, mark_for_redo, ingestion_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
",
eligible_feed.feed_id,
eligible_feed.url,
match eligible_feed.hash {
Some(hash) => Some(format!("{}", hash)),
None => None
},
eligible_feed.download_timestamp_ms as i64,
false,
!eligible_feed.operation_success,
eligible_feed.http_response_code,
false,
MAPLE_INGESTION_VERSION
);
let _ = insert_into()*/
}
futures::stream::iter(eligible_feeds.iter().map(|eligible_feed| {
async move {
/* let sql_query = sqlx::query!("INSERT INTO gtfs.static_download_attempts
(onestop_feed_id, url, file_hash, downloaded_unix_time_ms, ingested, failed, http_response_code, mark_for_redo, ingestion_version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
",
eligible_feed.feed_id,
eligible_feed.url,
match eligible_feed.hash {
Some(hash) => Some(format!("{}", hash)),
None => None
},
eligible_feed.download_timestamp_ms as i64,
false,
!eligible_feed.operation_success,
eligible_feed.http_response_code,
false,
MAPLE_INGESTION_VERSION
);
let _ = insert_into()*/
}
)).buffer_unordered(100).collect::<Vec<_>>().await;
}))
.buffer_unordered(100)
.collect::<Vec<_>>()
.await;

// 3. Assign Attempt IDs to each feed_id that is ready to ingest

Expand All @@ -216,7 +216,6 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {
&chateau_result,
Arc::clone(&arc_conn_pool),
);


// 4. Unzip folders
let unzip_feeds: Vec<(String, bool)> =
Expand Down Expand Up @@ -270,7 +269,8 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {
let conn_pool = arc_conn_pool.as_ref();
let conn_pre = conn_pool.get().await;
let conn = &mut conn_pre.unwrap();
let gtfs_process_result = gtfs_process_feed(&feed_id, Arc::clone(&arc_conn_pool), "", "").await;
let gtfs_process_result =
gtfs_process_feed(&feed_id, Arc::clone(&arc_conn_pool), "", "").await;

if gtfs_process_result.is_ok() {
// at the end, UPDATE gtfs.static_download_attempts where onstop_feed_id and download_unix_time_ms match as ingested
Expand Down
6 changes: 3 additions & 3 deletions src/maple/transitland_download.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use catenary::models::StaticDownloadAttempt;
use catenary::schema::gtfs::static_download_attempts;
use diesel::prelude::*;
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
use dmfr_folder_reader::ReturnDmfrAnalysis;
use futures;
use diesel::prelude::*;
use diesel_async::{RunQueryDsl, AsyncConnection, AsyncPgConnection};
use reqwest::Client as ReqwestClient;
use reqwest::Request;

use crate::CatenaryPostgresPool;
use reqwest::RequestBuilder;
use std::collections::HashSet;
use std::fs;
Expand All @@ -17,7 +18,6 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use crate::CatenaryPostgresPool;

use crate::gtfs_handlers::MAPLE_INGESTION_VERSION;

Expand Down
33 changes: 21 additions & 12 deletions src/postgres_tools/mod.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
use bb8::Pool;
use db_pool::r#async::ConnectionPool;
use db_pool::r#async::DatabasePool;
use db_pool::r#async::DatabasePoolBuilderTrait;
use db_pool::r#async::DieselAsyncPostgresBackend;
use db_pool::r#async::DieselBb8;
use db_pool::r#async::Reusable;
use db_pool::PrivilegedPostgresConfig;
use diesel::prelude::*;
use diesel::prelude::*;
use dotenvy::dotenv;
use bb8::Pool;
use std::env;
use std::error::Error;
use std::sync::Arc;
use std::thread;
use tokio::sync::OnceCell;
use db_pool::r#async::ConnectionPool;
use db_pool::r#async::DieselAsyncPostgresBackend;
use db_pool::r#async::Reusable;
use db_pool::r#async::DatabasePool;
use db_pool::r#async::DieselBb8;
use db_pool::PrivilegedPostgresConfig;
use db_pool::r#async::DatabasePoolBuilderTrait;

pub type CatenaryPostgresPool<'a> = db_pool::r#async::Reusable<'a, db_pool::r#async::ConnectionPool<db_pool::r#async::DieselAsyncPostgresBackend<db_pool::r#async::DieselBb8>>>;
pub type CatenaryPostgresConnection<'b> = &'b mut bb8::PooledConnection<'b, diesel_async::pooled_connection::AsyncDieselConnectionManager<diesel_async::pg::AsyncPgConnection>>;
pub type CatenaryPostgresPool<'a> = db_pool::r#async::Reusable<
'a,
db_pool::r#async::ConnectionPool<
db_pool::r#async::DieselAsyncPostgresBackend<db_pool::r#async::DieselBb8>,
>,
>;
pub type CatenaryPostgresConnection<'b> = &'b mut bb8::PooledConnection<
'b,
diesel_async::pooled_connection::AsyncDieselConnectionManager<
diesel_async::pg::AsyncPgConnection,
>,
>;

pub async fn get_connection_pool(
) -> CatenaryPostgresPool<'static> {
pub async fn get_connection_pool() -> CatenaryPostgresPool<'static> {
static POOL: OnceCell<DatabasePool<DieselAsyncPostgresBackend<DieselBb8>>> =
OnceCell::const_new();

Expand Down

0 comments on commit 2ccdfd5

Please sign in to comment.