diff --git a/apps/igloo-kit-cli/src/cli/local_webserver.rs b/apps/igloo-kit-cli/src/cli/local_webserver.rs index c50dbf1a0..e40b0884d 100644 --- a/apps/igloo-kit-cli/src/cli/local_webserver.rs +++ b/apps/igloo-kit-cli/src/cli/local_webserver.rs @@ -32,10 +32,9 @@ use std::future::Future; use std::net::SocketAddr; use std::path::PathBuf; use std::pin::Pin; -use std::sync::Arc; use std::time::Duration; use tokio::net::TcpListener; -use tokio::sync::Mutex; +use tokio::sync::RwLock; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LocalWebserverConfig { @@ -62,9 +61,10 @@ impl Default for LocalWebserverConfig { } } +#[derive(Clone)] struct RouteService { - route_table: Arc>>, - configured_producer: Arc>, + route_table: &'static RwLock>, + configured_producer: ConfiguredProducer, console_config: ConsoleConfig, } @@ -76,7 +76,7 @@ impl Service> for RouteService { fn call(&self, req: Request) -> Self::Future { Box::pin(router( req, - self.route_table.clone(), + self.route_table, self.configured_producer.clone(), self.console_config.clone(), )) @@ -101,8 +101,8 @@ fn options_route() -> Result>, hyper::http::Error> { async fn ingest_route( req: Request, route: PathBuf, - configured_producer: Arc>, - route_table: Arc>>, + configured_producer: ConfiguredProducer, + route_table: &RwLock>, console_config: ConsoleConfig, ) -> Result>, hyper::http::Error> { show_message!( @@ -113,67 +113,65 @@ async fn ingest_route( } ); - if route_table.lock().await.contains_key(&route) { - let is_curl = req.headers().get("User-Agent").map_or_else( - || false, - |user_agent| { - user_agent - .to_str() - .map_or_else(|_| false, |s| s.starts_with("curl")) - }, - ); - - let body = req.collect().await.unwrap().to_bytes().to_vec(); - - let guard = route_table.lock().await; - let topic_name = &guard.get(&route).unwrap().table_name; - - let res = configured_producer - .lock() - .await - .producer - .send( - FutureRecord::to(topic_name) - .key(topic_name) // This should probably be generated by the client that pushes data to the API - .payload(&body), - Timeout::After(Duration::from_secs(1)), - ) - .await; - - match res { - Ok(_) => { - show_message!( - MessageType::Success, - Message { - action: "SUCCESS".to_string(), - details: route.to_str().unwrap().to_string(), - } - ); - let response_bytes = if is_curl { - Bytes::from(format!("Success! Go to http://localhost:{}/infrastructure/views to view your data!", console_config.host_port)) - } else { - Bytes::from("SUCCESS") - }; - Ok(Response::new(Full::new(response_bytes))) - } - Err(e) => { - println!("Error: {:?}", e); - Ok(Response::new(Full::new(Bytes::from("Error")))) + match route_table.read().await.get(&route) { + Some(route_meta) => { + let is_curl = req.headers().get("User-Agent").map_or_else( + || false, + |user_agent| { + user_agent + .to_str() + .map_or_else(|_| false, |s| s.starts_with("curl")) + }, + ); + + let body = req.collect().await.unwrap().to_bytes().to_vec(); + + let topic_name = &route_meta.table_name; + + let res = configured_producer + .producer + .send( + FutureRecord::to(topic_name) + .key(topic_name) // This should probably be generated by the client that pushes data to the API + .payload(&body), + Timeout::After(Duration::from_secs(1)), + ) + .await; + + match res { + Ok(_) => { + show_message!( + MessageType::Success, + Message { + action: "SUCCESS".to_string(), + details: route.to_str().unwrap().to_string(), + } + ); + let response_bytes = if is_curl { + Bytes::from(format!("Success! Go to http://localhost:{}/infrastructure/views to view your data!", console_config.host_port)) + } else { + Bytes::from("SUCCESS") + }; + Ok(Response::new(Full::new(response_bytes))) + } + Err(e) => { + println!("Error: {:?}", e); + Ok(Response::new(Full::new(Bytes::from("Error")))) + } } } - } else { - Response::builder() + None => Response::builder() .status(StatusCode::NOT_FOUND) .body(Full::new(Bytes::from( "Please visit /console to view your routes", - ))) + ))), } } async fn router( req: Request, - route_table: Arc>>, - configured_producer: Arc>, + route_table: &RwLock>, + configured_producer: ConfiguredProducer, console_config: ConsoleConfig, ) -> Result>, hyper::http::Error> { debug!( @@ -228,7 +226,7 @@ impl Webserver { pub async fn start( &self, - route_table: Arc>>, + route_table: &'static RwLock>, project: &Project, ) { //! Starts the local webserver @@ -237,9 +235,7 @@ impl Webserver { // We create a TcpListener and bind it to 127.0.0.1:3000 let listener = TcpListener::bind(socket).await.unwrap(); - let producer = Arc::new(Mutex::new(redpanda::create_producer( - project.redpanda_config.clone(), - ))); + let producer = redpanda::create_producer(project.redpanda_config.clone()); show_message!( MessageType::Info, @@ -254,6 +250,12 @@ impl Webserver { let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap(); + let route_service = RouteService { + route_table, + configured_producer: producer, + console_config: project.console_config.clone(), + }; + loop { tokio::select! { _ = sigint.recv() => { @@ -272,20 +274,14 @@ impl Webserver { // `hyper::rt` IO traits. let io = TokioIo::new(stream); - let route_table = route_table.clone(); - let producer = producer.clone(); - let console_config = project.console_config.clone(); + let route_service = route_service.clone(); // Spawn a tokio task to serve multiple connections concurrently tokio::task::spawn(async move { // Run this server for... forever! if let Err(e) = auto::Builder::new(TokioExecutor::new()).serve_connection( io, - RouteService { - route_table, - configured_producer: producer, - console_config, - }, + route_service, ).await { error!("server error: {}", e); } diff --git a/apps/igloo-kit-cli/src/cli/routines.rs b/apps/igloo-kit-cli/src/cli/routines.rs index cffc57ca3..7272207e8 100644 --- a/apps/igloo-kit-cli/src/cli/routines.rs +++ b/apps/igloo-kit-cli/src/cli/routines.rs @@ -81,11 +81,10 @@ use std::collections::HashMap; use std::path::Path; -use std::sync::Arc; use std::{io::Error, path::PathBuf}; use log::debug; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use super::local_webserver::Webserver; use super::watcher::FileWatcher; @@ -233,10 +232,12 @@ pub async fn start_development_mode(project: &Project) -> Result<(), Error> { ); // TODO: Explore using a RWLock instead of a Mutex to ensure concurrent reads without locks - let route_table = Arc::new(Mutex::new(HashMap::::new())); + let mut route_table = HashMap::::new(); info!("Initializing project state"); - initialize_project_state(project.schemas_dir(), project, Arc::clone(&route_table)).await?; + initialize_project_state(project.schemas_dir(), project, &mut route_table).await?; + let route_table: &'static RwLock> = + Box::leak(Box::new(RwLock::new(route_table))); let web_server = Webserver::new( project.local_webserver_config.host.clone(), @@ -244,11 +245,11 @@ pub async fn start_development_mode(project: &Project) -> Result<(), Error> { ); let file_watcher = FileWatcher::new(); - file_watcher.start(project, Arc::clone(&route_table))?; + file_watcher.start(project, route_table)?; info!("Starting web server..."); - web_server.start(Arc::clone(&route_table), project).await; + web_server.start(route_table, project).await; Ok(()) } @@ -256,25 +257,19 @@ pub async fn start_development_mode(project: &Project) -> Result<(), Error> { async fn initialize_project_state( schema_dir: PathBuf, project: &Project, - route_table: Arc>>, + route_table: &mut HashMap, ) -> Result<(), Error> { let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone()); let producer = redpanda::create_producer(project.redpanda_config.clone()); info!("Starting schema directory crawl..."); - let crawl_result = crawl_schema_project_dir( - &schema_dir, - project, - &configured_client, - route_table.clone(), - ) - .await; + let crawl_result = + crawl_schema_project_dir(&schema_dir, project, &configured_client, route_table).await; - let route_table_clone = route_table.clone(); let _ = post_current_state_to_console( &configured_client, &producer, - route_table_clone, + route_table.clone(), project.console_config.clone(), ) .await; @@ -297,7 +292,7 @@ async fn crawl_schema_project_dir( schema_dir: &Path, project: &Project, configured_client: &ConfiguredDBClient, - route_table: Arc>>, + route_table: &mut HashMap, ) -> Result<(), Error> { if schema_dir.is_dir() { for entry in std::fs::read_dir(schema_dir)? { @@ -305,11 +300,10 @@ async fn crawl_schema_project_dir( let path = entry.path(); if path.is_dir() { debug!("Processing directory: {:?}", path); - crawl_schema_project_dir(&path, project, configured_client, route_table.clone()) - .await?; + crawl_schema_project_dir(&path, project, configured_client, route_table).await?; } else { debug!("Processing file: {:?}", path); - process_schema_file(&path, project, configured_client, route_table.clone()).await? + process_schema_file(&path, project, configured_client, route_table).await? } } } diff --git a/apps/igloo-kit-cli/src/cli/watcher.rs b/apps/igloo-kit-cli/src/cli/watcher.rs index ded3ec354..e03b5a074 100644 --- a/apps/igloo-kit-cli/src/cli/watcher.rs +++ b/apps/igloo-kit-cli/src/cli/watcher.rs @@ -2,11 +2,10 @@ use std::{ collections::HashMap, io::{Error, ErrorKind}, path::{Path, PathBuf}, - sync::Arc, }; use notify::{event::ModifyKind, Config, RecommendedWatcher, RecursiveMode, Watcher}; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use super::display::{Message, MessageType}; use crate::infrastructure::console::post_current_state_to_console; @@ -46,9 +45,11 @@ fn schema_file_path_to_ingest_route(app_dir: PathBuf, path: &Path, table_name: S async fn process_event( project: Project, event: notify::Event, - route_table: Arc>>, + route_table: &RwLock>, configured_client: &ConfiguredDBClient, ) -> Result<(), Error> { + let mut route_table = route_table.write().await; + debug!( "File Watcher Event Received: {:?}, with Route Table {:?}", event, route_table @@ -62,7 +63,7 @@ async fn process_event( create_framework_objects_from_schema_file_path( &project, &route, - route_table, + &mut route_table, configured_client, ) .await @@ -75,14 +76,14 @@ async fn process_event( create_framework_objects_from_schema_file_path( &project, &route, - route_table, + &mut route_table, configured_client, ) .await } else { remove_table_and_topics_from_schema_file_path( &route, - route_table, + &mut route_table, configured_client, ) .await @@ -94,7 +95,7 @@ async fn process_event( create_framework_objects_from_schema_file_path( &project, &route, - route_table, + &mut route_table, configured_client, ) .await? @@ -112,7 +113,7 @@ async fn process_event( async fn create_framework_objects_from_schema_file_path( project: &Project, schema_file_path: &Path, - route_table: Arc>>, + route_table: &mut HashMap, configured_client: &ConfiguredDBClient, ) -> Result<(), Error> { //! Creates the route, topics and tables from a path to the schema file @@ -131,7 +132,7 @@ pub async fn process_schema_file( schema_file_path: &Path, project: &Project, configured_client: &ConfiguredDBClient, - route_table: Arc>>, + route_table: &mut HashMap, ) -> Result<(), Error> { let framework_objects = get_framework_objects(schema_file_path)?; let mut compilable_objects: Vec = Vec::new(); @@ -159,10 +160,8 @@ async fn process_objects( schema_file_path: &Path, configured_client: &ConfiguredDBClient, compilable_objects: &mut Vec, // Objects that require compilation after processing - route_table: Arc>>, + route_table: &mut HashMap, ) -> Result<(), Error> { - let mut route_table = route_table.lock().await; - for fo in framework_objects { let ingest_route = schema_file_path_to_ingest_route( project.app_dir().clone(), @@ -197,7 +196,7 @@ async fn process_objects( async fn watch( project: &Project, - route_table: Arc>>, + route_table: &RwLock>, ) -> Result<(), Error> { let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone()); let configured_producer = redpanda::create_producer(project.redpanda_config.clone()); @@ -221,7 +220,7 @@ async fn watch( process_event( project.clone(), event.clone(), - Arc::clone(&route_table), + route_table, &configured_client, ) .await @@ -229,10 +228,15 @@ async fn watch( Error::new(ErrorKind::Other, format!("Processing error occured: {}", e)) })?; + let route_table_snapshot = { + let read_lock = route_table.read().await; + (*read_lock).clone() + }; + let _ = post_current_state_to_console( &configured_client, &configured_producer, - route_table.clone(), + route_table_snapshot, project.console_config.clone(), ) .await; @@ -259,7 +263,7 @@ impl FileWatcher { pub fn start( &self, project: &Project, - route_table: Arc>>, + route_table: &'static RwLock>, ) -> Result<(), Error> { show_message!(MessageType::Info, { Message { @@ -270,7 +274,7 @@ impl FileWatcher { let project = project.clone(); tokio::spawn(async move { - if let Err(error) = watch(&project, Arc::clone(&route_table)).await { + if let Err(error) = watch(&project, route_table).await { println!("Error: {error:?}"); } }); diff --git a/apps/igloo-kit-cli/src/framework/controller.rs b/apps/igloo-kit-cli/src/framework/controller.rs index 4e1d1a793..5745930be 100644 --- a/apps/igloo-kit-cli/src/framework/controller.rs +++ b/apps/igloo-kit-cli/src/framework/controller.rs @@ -4,7 +4,6 @@ use crate::infrastructure::stream; use std::collections::HashMap; use std::path::Path; -use std::sync::Arc; use crate::framework::languages::SupportedLanguages; @@ -12,7 +11,6 @@ use crate::framework; use log::debug; use log::info; -use tokio::sync::Mutex; use crate::framework::typescript::get_typescript_models_dir; @@ -217,16 +215,15 @@ pub(crate) fn create_language_objects( } pub async fn remove_table_and_topics_from_schema_file_path( - shcema_file_path: &Path, - route_table: Arc>>, + schema_file_path: &Path, + route_table: &mut HashMap, configured_client: &ConfiguredDBClient, ) -> Result<(), Error> { //need to get the path of the file, scan the route table and remove all the files that need to be deleted. // This doesn't have to be as fast as the scanning for routes in the web server so we're ok with the scan here. - let mut route_table = route_table.lock().await; for (k, meta) in route_table.clone().into_iter() { - if meta.original_file_path == shcema_file_path { + if meta.original_file_path == schema_file_path { stream::redpanda::delete_topic(meta.table_name.clone())?; olap::clickhouse::delete_table_or_view(meta.table_name, configured_client) @@ -249,7 +246,7 @@ pub async fn remove_table_and_topics_from_schema_file_path( })?; } - route_table.remove(&k); + (*route_table).remove(&k); } } Ok(()) diff --git a/apps/igloo-kit-cli/src/infrastructure/console.rs b/apps/igloo-kit-cli/src/infrastructure/console.rs index bbb1e9543..a60bd9c0d 100644 --- a/apps/igloo-kit-cli/src/infrastructure/console.rs +++ b/apps/igloo-kit-cli/src/infrastructure/console.rs @@ -14,9 +14,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; use std::path::PathBuf; -use std::sync::Arc; use tokio::net::TcpStream; -use tokio::sync::Mutex; use std::str; @@ -34,7 +32,7 @@ impl Default for ConsoleConfig { pub async fn post_current_state_to_console( configured_db_client: &ConfiguredDBClient, configured_producer: &ConfiguredProducer, - route_table: Arc>>, + route_table: HashMap, console_config: ConsoleConfig, ) -> Result<(), anyhow::Error> { let tables = olap::clickhouse::fetch_all_tables(configured_db_client) @@ -45,9 +43,6 @@ pub async fn post_current_state_to_console( .unwrap(); let routes_table: Vec = route_table - .lock() - .await - .clone() .iter() .map(|(k, v)| { RouteInfo::new(