Skip to content

Commit

Permalink
simplify RouteService (#198)
Browse files Browse the repository at this point in the history
  • Loading branch information
phiSgr authored Jan 24, 2024
1 parent a37c2c4 commit c7cd0e6
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 120 deletions.
136 changes: 66 additions & 70 deletions apps/igloo-kit-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ use std::future::Future;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LocalWebserverConfig {
Expand All @@ -62,9 +61,10 @@ impl Default for LocalWebserverConfig {
}
}

#[derive(Clone)]
struct RouteService {
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
configured_producer: Arc<Mutex<ConfiguredProducer>>,
route_table: &'static RwLock<HashMap<PathBuf, RouteMeta>>,
configured_producer: ConfiguredProducer,
console_config: ConsoleConfig,
}

Expand All @@ -76,7 +76,7 @@ impl Service<Request<Incoming>> for RouteService {
fn call(&self, req: Request<Incoming>) -> Self::Future {
Box::pin(router(
req,
self.route_table.clone(),
self.route_table,
self.configured_producer.clone(),
self.console_config.clone(),
))
Expand All @@ -101,8 +101,8 @@ fn options_route() -> Result<Response<Full<Bytes>>, hyper::http::Error> {
async fn ingest_route(
req: Request<hyper::body::Incoming>,
route: PathBuf,
configured_producer: Arc<Mutex<ConfiguredProducer>>,
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
configured_producer: ConfiguredProducer,
route_table: &RwLock<HashMap<PathBuf, RouteMeta>>,
console_config: ConsoleConfig,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
show_message!(
Expand All @@ -113,67 +113,65 @@ async fn ingest_route(
}
);

if route_table.lock().await.contains_key(&route) {
let is_curl = req.headers().get("User-Agent").map_or_else(
|| false,
|user_agent| {
user_agent
.to_str()
.map_or_else(|_| false, |s| s.starts_with("curl"))
},
);

let body = req.collect().await.unwrap().to_bytes().to_vec();

let guard = route_table.lock().await;
let topic_name = &guard.get(&route).unwrap().table_name;

let res = configured_producer
.lock()
.await
.producer
.send(
FutureRecord::to(topic_name)
.key(topic_name) // This should probably be generated by the client that pushes data to the API
.payload(&body),
Timeout::After(Duration::from_secs(1)),
)
.await;

match res {
Ok(_) => {
show_message!(
MessageType::Success,
Message {
action: "SUCCESS".to_string(),
details: route.to_str().unwrap().to_string(),
}
);
let response_bytes = if is_curl {
Bytes::from(format!("Success! Go to http://localhost:{}/infrastructure/views to view your data!", console_config.host_port))
} else {
Bytes::from("SUCCESS")
};
Ok(Response::new(Full::new(response_bytes)))
}
Err(e) => {
println!("Error: {:?}", e);
Ok(Response::new(Full::new(Bytes::from("Error"))))
match route_table.read().await.get(&route) {
Some(route_meta) => {
let is_curl = req.headers().get("User-Agent").map_or_else(
|| false,
|user_agent| {
user_agent
.to_str()
.map_or_else(|_| false, |s| s.starts_with("curl"))
},
);

let body = req.collect().await.unwrap().to_bytes().to_vec();

let topic_name = &route_meta.table_name;

let res = configured_producer
.producer
.send(
FutureRecord::to(topic_name)
.key(topic_name) // This should probably be generated by the client that pushes data to the API
.payload(&body),
Timeout::After(Duration::from_secs(1)),
)
.await;

match res {
Ok(_) => {
show_message!(
MessageType::Success,
Message {
action: "SUCCESS".to_string(),
details: route.to_str().unwrap().to_string(),
}
);
let response_bytes = if is_curl {
Bytes::from(format!("Success! Go to http://localhost:{}/infrastructure/views to view your data!", console_config.host_port))
} else {
Bytes::from("SUCCESS")
};
Ok(Response::new(Full::new(response_bytes)))
}
Err(e) => {
println!("Error: {:?}", e);
Ok(Response::new(Full::new(Bytes::from("Error"))))
}
}
}
} else {
Response::builder()
None => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::new(Bytes::from(
"Please visit /console to view your routes",
)))
))),
}
}

async fn router(
req: Request<hyper::body::Incoming>,
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
configured_producer: Arc<Mutex<ConfiguredProducer>>,
route_table: &RwLock<HashMap<PathBuf, RouteMeta>>,
configured_producer: ConfiguredProducer,
console_config: ConsoleConfig,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
debug!(
Expand Down Expand Up @@ -228,7 +226,7 @@ impl Webserver {

pub async fn start(
&self,
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
route_table: &'static RwLock<HashMap<PathBuf, RouteMeta>>,
project: &Project,
) {
//! Starts the local webserver
Expand All @@ -237,9 +235,7 @@ impl Webserver {
// We create a TcpListener and bind it to 127.0.0.1:3000
let listener = TcpListener::bind(socket).await.unwrap();

let producer = Arc::new(Mutex::new(redpanda::create_producer(
project.redpanda_config.clone(),
)));
let producer = redpanda::create_producer(project.redpanda_config.clone());

show_message!(
MessageType::Info,
Expand All @@ -254,6 +250,12 @@ impl Webserver {
let mut sigint =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt()).unwrap();

let route_service = RouteService {
route_table,
configured_producer: producer,
console_config: project.console_config.clone(),
};

loop {
tokio::select! {
_ = sigint.recv() => {
Expand All @@ -272,20 +274,14 @@ impl Webserver {
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);

let route_table = route_table.clone();
let producer = producer.clone();
let console_config = project.console_config.clone();
let route_service = route_service.clone();

// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
// Run this server for... forever!
if let Err(e) = auto::Builder::new(TokioExecutor::new()).serve_connection(
io,
RouteService {
route_table,
configured_producer: producer,
console_config,
},
route_service,
).await {
error!("server error: {}", e);
}
Expand Down
34 changes: 14 additions & 20 deletions apps/igloo-kit-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,10 @@

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::{io::Error, path::PathBuf};

use log::debug;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

use super::local_webserver::Webserver;
use super::watcher::FileWatcher;
Expand Down Expand Up @@ -233,48 +232,44 @@ pub async fn start_development_mode(project: &Project) -> Result<(), Error> {
);

// TODO: Explore using a RWLock instead of a Mutex to ensure concurrent reads without locks
let route_table = Arc::new(Mutex::new(HashMap::<PathBuf, RouteMeta>::new()));
let mut route_table = HashMap::<PathBuf, RouteMeta>::new();

info!("Initializing project state");
initialize_project_state(project.schemas_dir(), project, Arc::clone(&route_table)).await?;
initialize_project_state(project.schemas_dir(), project, &mut route_table).await?;
let route_table: &'static RwLock<HashMap<PathBuf, RouteMeta>> =
Box::leak(Box::new(RwLock::new(route_table)));

let web_server = Webserver::new(
project.local_webserver_config.host.clone(),
project.local_webserver_config.port,
);
let file_watcher = FileWatcher::new();

file_watcher.start(project, Arc::clone(&route_table))?;
file_watcher.start(project, route_table)?;

info!("Starting web server...");

web_server.start(Arc::clone(&route_table), project).await;
web_server.start(route_table, project).await;

Ok(())
}

async fn initialize_project_state(
schema_dir: PathBuf,
project: &Project,
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> Result<(), Error> {
let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone());
let producer = redpanda::create_producer(project.redpanda_config.clone());

info!("Starting schema directory crawl...");
let crawl_result = crawl_schema_project_dir(
&schema_dir,
project,
&configured_client,
route_table.clone(),
)
.await;
let crawl_result =
crawl_schema_project_dir(&schema_dir, project, &configured_client, route_table).await;

let route_table_clone = route_table.clone();
let _ = post_current_state_to_console(
&configured_client,
&producer,
route_table_clone,
route_table.clone(),
project.console_config.clone(),
)
.await;
Expand All @@ -297,19 +292,18 @@ async fn crawl_schema_project_dir(
schema_dir: &Path,
project: &Project,
configured_client: &ConfiguredDBClient,
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> Result<(), Error> {
if schema_dir.is_dir() {
for entry in std::fs::read_dir(schema_dir)? {
let entry = entry?;
let path = entry.path();
if path.is_dir() {
debug!("Processing directory: {:?}", path);
crawl_schema_project_dir(&path, project, configured_client, route_table.clone())
.await?;
crawl_schema_project_dir(&path, project, configured_client, route_table).await?;
} else {
debug!("Processing file: {:?}", path);
process_schema_file(&path, project, configured_client, route_table.clone()).await?
process_schema_file(&path, project, configured_client, route_table).await?
}
}
}
Expand Down
Loading

0 comments on commit c7cd0e6

Please sign in to comment.