Skip to content

Commit

Permalink
feat: working push to console (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
callicles authored Jan 23, 2024
1 parent 329bcd4 commit e444108
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 111 deletions.
108 changes: 2 additions & 106 deletions apps/igloo-kit-cli/src/cli/local_webserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::cli::routines::stop::StopLocalInfrastructure;
use crate::cli::routines::Routine;
use crate::cli::routines::RunMode;
use crate::framework::controller::RouteMeta;
use crate::infrastructure::olap;

use crate::infrastructure::olap::clickhouse::ConfiguredDBClient;
use crate::infrastructure::stream::redpanda;
use crate::infrastructure::stream::redpanda::ConfiguredProducer;

Expand All @@ -28,7 +26,6 @@ use rdkafka::producer::FutureRecord;
use rdkafka::util::Timeout;
use serde::Deserialize;
use serde::Serialize;
use serde_json::json;
use std::collections::HashMap;
use std::future::Future;
use std::net::SocketAddr;
Expand Down Expand Up @@ -64,34 +61,9 @@ impl Default for LocalWebserverConfig {
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RouteInfo {
pub route_path: String,
pub file_path: String,
pub table_name: String,
pub view_name: Option<String>,
}

impl RouteInfo {
pub fn new(
route_path: String,
file_path: String,
table_name: String,
view_name: Option<String>,
) -> Self {
Self {
route_path,
file_path,
table_name,
view_name,
}
}
}

struct RouteService {
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
configured_producer: Arc<Mutex<ConfiguredProducer>>,
configured_db_client: Arc<Mutex<ConfiguredDBClient>>,
}

impl Service<Request<Incoming>> for RouteService {
Expand All @@ -104,7 +76,6 @@ impl Service<Request<Incoming>> for RouteService {
req,
self.route_table.clone(),
self.configured_producer.clone(),
self.configured_db_client.clone(),
))
}
}
Expand Down Expand Up @@ -137,6 +108,7 @@ async fn ingest_route(
details: route.to_str().unwrap().to_string().to_string(),
}
);

if route_table.lock().await.contains_key(&route) {
let body = req.collect().await.unwrap().to_bytes().to_vec();

Expand Down Expand Up @@ -180,64 +152,10 @@ async fn ingest_route(
}
}

async fn console_route(
configured_db_client: Arc<Mutex<ConfiguredDBClient>>,
configured_producer: Arc<Mutex<ConfiguredProducer>>,
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
show_message!(
MessageType::Info,
Message {
action: "GET".to_string(),
details: "Console API".to_string(),
}
);

let db_guard = configured_db_client.lock().await;
let producer_guard = configured_producer.lock().await;
let route_table_guard = route_table.lock().await;

let tables = olap::clickhouse::fetch_all_tables(&db_guard).await.unwrap();
let topics = redpanda::fetch_topics(&producer_guard.config)
.await
.unwrap();
let routes_table: Vec<RouteInfo> = route_table_guard
.clone()
.iter()
.map(|(k, v)| {
RouteInfo::new(
k.to_str().unwrap().to_string(),
v.original_file_path.to_str().unwrap().to_string(),
v.table_name.clone(),
v.view_name.clone(),
)
})
.collect();

let response = Response::builder()
.status(StatusCode::OK)
.header("Access-Control-Allow-Origin", "*")
.header("Access-Control-Allow-Methods", "GET")
.header(
"Access-Control-Allow-Headers",
"Content-Type, Baggage, Sentry-Trace",
)
.body(Full::new(Bytes::from(
json!({
"tables": tables,
"topics": topics,
"routes": routes_table
})
.to_string(),
)))?;
Ok(response)
}

async fn router(
req: Request<hyper::body::Incoming>,
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
configured_producer: Arc<Mutex<ConfiguredProducer>>,
configured_db_client: Arc<Mutex<ConfiguredDBClient>>,
) -> Result<Response<Full<Bytes>>, hyper::http::Error> {
debug!(
"HTTP Request Received: {:?}, with Route Table {:?}",
Expand All @@ -263,23 +181,6 @@ async fn router(
ingest_route(req, route, configured_producer, route_table).await
}

(&hyper::Method::GET, ["console"]) => {
console_route(configured_db_client, configured_producer, route_table).await
}
(&hyper::Method::GET, ["console", "routes"]) => {
todo!("get all routes");
}
(&hyper::Method::GET, ["console", "routes", _route_id]) => {
todo!("get specific route");
}

(&hyper::Method::GET, ["console", "tables"]) => {
todo!("get all tables");
}
(&hyper::Method::GET, ["console", "tables", _table_name]) => {
todo!("get specific table");
}

(&hyper::Method::OPTIONS, _) => options_route(),
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
Expand Down Expand Up @@ -320,9 +221,6 @@ impl Webserver {
let producer = Arc::new(Mutex::new(redpanda::create_producer(
project.redpanda_config.clone(),
)));
let db_client = Arc::new(Mutex::new(olap::clickhouse::create_client(
project.clickhouse_config.clone(),
)));

show_message!(
MessageType::Info,
Expand Down Expand Up @@ -357,7 +255,6 @@ impl Webserver {

let route_table = route_table.clone();
let producer = producer.clone();
let db_client = db_client.clone();

// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
Expand All @@ -366,8 +263,7 @@ impl Webserver {
io,
RouteService {
route_table,
configured_producer: producer,
configured_db_client: db_client,
configured_producer: producer
},
).await {
error!("server error: {}", e);
Expand Down
3 changes: 2 additions & 1 deletion apps/igloo-kit-cli/src/cli/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ pub fn setup_logging(settings: LoggerSettings) -> Result<(), fern::InitError> {
let file_config = fern::Dispatch::new()
.format(move |out, message, record| {
out.finish(format_args!(
"[{} {} {}] {}",
"[{} {} {} - {}] {}",
humantime::format_rfc3339_seconds(SystemTime::now()),
record.level(),
&session_id,
record.target(),
message
))
})
Expand Down
15 changes: 13 additions & 2 deletions apps/igloo-kit-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ use super::{Message, MessageType};
use crate::cli::watcher::process_schema_file;

use crate::framework::controller::RouteMeta;
use crate::infrastructure::console::post_current_state_to_console;
use crate::infrastructure::olap;
use crate::infrastructure::olap::clickhouse::ConfiguredDBClient;
use crate::infrastructure::stream::redpanda;
use crate::project::Project;
use log::info;

Expand Down Expand Up @@ -257,10 +259,19 @@ async fn initialize_project_state(
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
) -> Result<(), Error> {
let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone());
let producer = redpanda::create_producer(project.redpanda_config.clone());

info!("Starting schema directory crawl...");
let crawl_result =
crawl_schema_project_dir(&schema_dir, project, &configured_client, route_table).await;
let crawl_result = crawl_schema_project_dir(
&schema_dir,
project,
&configured_client,
route_table.clone(),
)
.await;

let route_table_clone = route_table.clone();
let _ = post_current_state_to_console(&configured_client, &producer, route_table_clone).await;

match crawl_result {
Ok(_) => {
Expand Down
13 changes: 11 additions & 2 deletions apps/igloo-kit-cli/src/cli/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use std::{
use notify::{event::ModifyKind, Config, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::Mutex;

use super::display::{Message, MessageType};
use crate::infrastructure::console::post_current_state_to_console;
use crate::infrastructure::stream::redpanda;
use crate::{
framework::{
controller::{
Expand All @@ -25,8 +28,6 @@ use crate::{
utilities::constants::SCHEMAS_DIR,
utilities::package_managers,
};

use super::display::{Message, MessageType};
use log::{debug, info};

fn schema_file_path_to_ingest_route(app_dir: PathBuf, path: &Path, table_name: String) -> PathBuf {
Expand Down Expand Up @@ -199,6 +200,7 @@ async fn watch(
route_table: Arc<Mutex<HashMap<PathBuf, RouteMeta>>>,
) -> Result<(), Error> {
let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone());
let configured_producer = redpanda::create_producer(project.redpanda_config.clone());

let (tx, rx) = std::sync::mpsc::channel();

Expand Down Expand Up @@ -226,6 +228,13 @@ async fn watch(
.map_err(|e| {
Error::new(ErrorKind::Other, format!("Processing error occured: {}", e))
})?;

let _ = post_current_state_to_console(
&configured_client,
&configured_producer,
route_table.clone(),
)
.await;
}
Err(error) => {
return Err(Error::new(
Expand Down
Loading

0 comments on commit e444108

Please sign in to comment.