diff --git a/Cargo.toml b/Cargo.toml index b2fea7e5..915f5021 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,7 @@ r2d2 = "0.8.10" r2d2_postgres = "0.18.1" bb8-postgres = "0.8.1" bb8 = "0.8.1" -threadpool = {version = "1.8.1"} +threadpool = "1.8.1" tokio-threadpool = "0.1.18" qstring = "0.7.2" rand = "0.8.5" diff --git a/src/maple/gtfs_process.rs b/src/maple/gtfs_process.rs new file mode 100644 index 00000000..9b81205e --- /dev/null +++ b/src/maple/gtfs_process.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; +use std::error::Error; + +// Initial version 3 of ingest written by Kyler Chin +// Removal of the attribution is not allowed, as covered under the AGPL license + +// take a feed id and throw it into postgres +pub async fn gtfs_process_feed(feed_id: &str, pool: Arc>) -> Result<(), Box> { + let path = format!("gtfs_uncompressed/{}", feed_id); + + let gtfs = gtfs_structures::Gtfs::new(path.as_str())?; + + + + Ok(()) +} \ No newline at end of file diff --git a/src/maple/main.rs b/src/maple/main.rs index 966cf102..1b731194 100644 --- a/src/maple/main.rs +++ b/src/maple/main.rs @@ -4,6 +4,8 @@ use service::quicli::prelude::info; use sqlx::migrate::MigrateDatabase; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use sqlx::postgres::PgPoolOptions; use sqlx::query; use sqlx::{Connection, PgConnection, PgPool, Postgres}; @@ -12,12 +14,17 @@ use std::time::Duration; mod database; use std::collections::HashSet; use std::sync::Arc; +use threadpool::ThreadPool; +use tokio::runtime; mod gtfs_handlers; mod chateau_postprocess; mod refresh_metadata_tables; mod transitland_download; +mod gtfs_process; + +use gtfs_process::gtfs_process_feed; use chateau::chateau; use dmfr_folder_reader::read_folders; @@ -220,6 +227,25 @@ async fn run_ingest() -> Result<(), Box> { // create thread pool // process GTFS and insert into system + + // This will spawn a work-stealing runtime with 4 worker threads. + let rt = runtime::Builder::new_multi_thread() + .worker_threads(4) + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("catenary-maple-ingest-{}", id) + }) + .build() + .unwrap(); + + let pool = Arc::new(pool); + + rt.spawn(async move { + for (feed_id, _) in unzip_feeds.iter().filter(|unzipped_feed| unzipped_feed.1 == true) { + gtfs_process_feed(&feed_id, pool); + } + }); } //determine if the old one should be deleted, if so, delete it