Skip to content

Commit

Permalink
Move hash function to lib, half way done worker assignment
Browse files Browse the repository at this point in the history
  • Loading branch information
kylerchin committed Apr 12, 2024
1 parent 7478ef9 commit 23a29b9
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 64 deletions.
40 changes: 28 additions & 12 deletions src/alpenrose/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::thread;
use std::time::Duration;
use tokio_zookeeper::*;
use uuid::Uuid;
use catenary::fast_hash;

// gtfs unix timestamps
struct LastDataFetched {
Expand All @@ -35,6 +36,22 @@ struct LastDataFetched {
realtime_alerts: Option<u64>,
}

#[derive(Serialize, Clone, Deserialize, Debug, Hash, PartialEq, Eq)]
pub struct RealtimeFeedFetch {
pub feed_id: String,
pub realtime_vehicle_positions: Option<String>,
pub realtime_trip_updates: Option<String>,
pub realtime_alerts: Option<String>,
pub key_formats: Vec<KeyFormat>,
pub passwords: Option<Vec<PasswordInfo>>,
pub fetch_interval_ms: Option<i32>,
}

#[derive(Serialize, Deserialize, Clone, Hash, PartialEq, Eq)]
struct InstructionsPerWorker {
feeds: RealtimeFeedFetch
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
let this_worker_id = Arc::new(Uuid::new_v4().to_string());
Expand Down Expand Up @@ -134,11 +151,19 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {

let fast_hash_of_worker_nodes = fast_hash(&worker_nodes);

if last_set_of_active_nodes_hash != Some(fast_hash_of_worker_nodes) && last_updated_feeds_hash != Some(fast_hash_of_feeds) {
// either the list of workers
if last_set_of_active_nodes_hash != Some(fast_hash_of_worker_nodes) || last_updated_feeds_hash != Some(fast_hash_of_feeds) {
// divide feeds between worker nodes

let mut assignments: BTreeMap<String, RealtimeFeedFetch> = BTreesMap::new();
// feed id -> List of realtime fetch instructions
let mut assignments: BTreeMap<String, Vec<RealtimeFeedFetch>> = BTreeMap::new();

for (index, (feed_id, realtime_instructions)) in feeds_map.iter().enumerate() {
let node_to_assign = &worker_nodes[index % worker_nodes.len()];

let _ = assignments.entry(node_to_assign.to_string());
//append to list
}
// look at current assignments, delete old assignments

// assign feeds to worker nodes
Expand All @@ -158,16 +183,7 @@ async fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
}
}

#[derive(Serialize, Clone, Deserialize, Debug, Hash, PartialEq, Eq)]
pub struct RealtimeFeedFetch {
pub feed_id: String,
pub realtime_vehicle_positions: Option<String>,
pub realtime_trip_updates: Option<String>,
pub realtime_alerts: Option<String>,
pub key_formats: Vec<KeyFormat>,
pub passwords: Option<Vec<PasswordInfo>>,
pub fetch_interval_ms: Option<i32>,
}


async fn get_feed_metadata(
arc_conn_pool: Arc<CatenaryPostgresPool>,
Expand Down
8 changes: 5 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ pub mod postgis_to_diesel;
pub mod postgres_tools;
pub mod schema;

use std::hash::Hash;
use fasthash::MetroHasher;
use std::hash::Hasher;

pub const WGS_84_SRID: u32 = 4326;

pub mod gtfs_schedule_protobuf {
Expand Down Expand Up @@ -73,9 +77,7 @@ pub mod gtfs_schedule_protobuf {
}
}



fn fast_hash<T: Hash>(t: &T) -> u64 {
pub fn fast_hash<T: Hash>(t: &T) -> u64 {
let mut s: MetroHasher = Default::default();
t.hash(&mut s);
s.finish()
Expand Down
53 changes: 4 additions & 49 deletions src/postgres_tools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,18 @@ use diesel_async::pooled_connection::bb8::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use std::env;

/// This type alias is the pool, which can be quried for connections.
/// It is typically wrapped in Arc to allow thread safe cloning to the same pool
pub type CatenaryPostgresPool =
bb8::Pool<AsyncDieselConnectionManager<diesel_async::AsyncPgConnection>>;

/// Type alias to the pooled connection
/// This must be used in a single thread, since it is mutable
pub type CatenaryConn<'a> = &'a mut bb8::PooledConnection<
'a,
diesel_async::pooled_connection::AsyncDieselConnectionManager<diesel_async::AsyncPgConnection>,
>;

/// This type alias is the pool, which can be quried for connections.
/// It is typically wrapped in Arc to allow thread safe cloning to the same pool
/*pub type CatenaryPostgresPool<'a> = db_pool::r#async::Reusable<
'a,
db_pool::r#async::ConnectionPool<
db_pool::r#async::DieselAsyncPostgresBackend<db_pool::r#async::DieselBb8>,
>,
>;*/

/// Type alias to the pooled connection
/// This must be used in a single thread, since it is mutable
/*pub type CatenaryPostgresConnection<'b> = &'b mut bb8::PooledConnection<
'b,
diesel_async::pooled_connection::AsyncDieselConnectionManager<
diesel_async::pg::AsyncPgConnection,
>,
>;
*/
// This is very broken, i think it's db-pool being a problem
// This returns a pool with a specified lifetime
/*
pub async fn get_connection_pool<'pool_lifespan>() -> CatenaryPostgresPool<'pool_lifespan> {
static POOL: OnceCell<DatabasePool<DieselAsyncPostgresBackend<DieselBb8>>> =
OnceCell::const_new();
let db_pool: &DatabasePool<DieselAsyncPostgresBackend<DieselBb8>> = POOL
.get_or_init(|| async {
dotenv().ok();
let config = PrivilegedPostgresConfig::new()
.password(Some("potgres".to_owned()));
let backend = DieselAsyncPostgresBackend::new(
config,
|| Pool::builder().max_size(10),
|| Pool::builder().max_size(2),
move |mut conn| Box::pin(async { conn }),
)
.await
.unwrap();
backend.create_database_pool().await.unwrap()
})
.await;
db_pool.pull().await
}*/

pub async fn make_async_pool() -> Result<
bb8::Pool<AsyncDieselConnectionManager<diesel_async::AsyncPgConnection>>,
Box<dyn std::error::Error + Sync + Send>,
Expand Down

0 comments on commit 23a29b9

Please sign in to comment.