Skip to content

Commit

Permalink
Create GTFS processor worker
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Mar 21, 2024
1 parent 8817513 commit d4a8dbe
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ r2d2 = "0.8.10"
r2d2_postgres = "0.18.1"
bb8-postgres = "0.8.1"
bb8 = "0.8.1"
threadpool = {version = "1.8.1"}
threadpool = "1.8.1"
tokio-threadpool = "0.1.18"
qstring = "0.7.2"
rand = "0.8.5"
Expand Down
16 changes: 16 additions & 0 deletions src/maple/gtfs_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::sync::Arc;
use std::error::Error;

// Initial version 3 of ingest written by Kyler Chin
// Removal of the attribution is not allowed, as covered under the AGPL license

// take a feed id and throw it into postgres
pub async fn gtfs_process_feed(feed_id: &str, pool: Arc<sqlx::Pool<sqlx::Postgres>>) -> Result<(), Box<dyn Error>> {
let path = format!("gtfs_uncompressed/{}", feed_id);

let gtfs = gtfs_structures::Gtfs::new(path.as_str())?;



Ok(())
}
26 changes: 26 additions & 0 deletions src/maple/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use service::quicli::prelude::info;
use sqlx::migrate::MigrateDatabase;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use sqlx::postgres::PgPoolOptions;
use sqlx::query;
use sqlx::{Connection, PgConnection, PgPool, Postgres};
Expand All @@ -12,12 +14,17 @@ use std::time::Duration;
mod database;
use std::collections::HashSet;
use std::sync::Arc;
use threadpool::ThreadPool;
use tokio::runtime;

mod gtfs_handlers;

mod chateau_postprocess;
mod refresh_metadata_tables;
mod transitland_download;
mod gtfs_process;

use gtfs_process::gtfs_process_feed;

use chateau::chateau;
use dmfr_folder_reader::read_folders;
Expand Down Expand Up @@ -220,6 +227,25 @@ async fn run_ingest() -> Result<(), Box<dyn Error>> {

// create thread pool
// process GTFS and insert into system

// This will spawn a work-stealing runtime with 4 worker threads.
let rt = runtime::Builder::new_multi_thread()
.worker_threads(4)
.thread_name_fn(|| {
static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0);
let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
format!("catenary-maple-ingest-{}", id)
})
.build()
.unwrap();

let pool = Arc::new(pool);

rt.spawn(async move {
for (feed_id, _) in unzip_feeds.iter().filter(|unzipped_feed| unzipped_feed.1 == true) {
gtfs_process_feed(&feed_id, pool);
}
});
}

//determine if the old one should be deleted, if so, delete it
Expand Down

0 comments on commit d4a8dbe

Please sign in to comment.