diff --git a/apps/igloo-kit-cli/src/cli/routines/validate.rs b/apps/igloo-kit-cli/src/cli/routines/validate.rs index d8e81d140..a0b5956b3 100644 --- a/apps/igloo-kit-cli/src/cli/routines/validate.rs +++ b/apps/igloo-kit-cli/src/cli/routines/validate.rs @@ -1,5 +1,5 @@ use std::io::{self, Write, Error, ErrorKind}; -use crate::{cli::{display::Message, DebugStatus}, utilities::docker}; +use crate::{cli::{display::Message, DebugStatus}, utilities::docker, infrastructure::PANDA_NETWORK}; use super::{Routine, RoutineSuccess, RoutineFailure}; pub struct ValidateClickhouseRun(DebugStatus); @@ -73,7 +73,7 @@ impl Routine for ValidatePandaHouseNetwork { let string_output = String::from_utf8(output.stdout).unwrap(); - if string_output.contains("panda_house") { + if string_output.contains(PANDA_NETWORK) { Ok(RoutineSuccess::success(Message::new("Successfully".to_string(), "validated panda house docker network".to_string()))) } else { Err(RoutineFailure::new(Message::new("Failed".to_string(), "to validate panda house docker network".to_string()), Error::new(ErrorKind::Other, "Failed to validate panda house network exists"))) diff --git a/apps/igloo-kit-cli/src/cli/watcher.rs b/apps/igloo-kit-cli/src/cli/watcher.rs index 4b8b89482..f619f4db8 100644 --- a/apps/igloo-kit-cli/src/cli/watcher.rs +++ b/apps/igloo-kit-cli/src/cli/watcher.rs @@ -3,16 +3,10 @@ use std::{sync::{Arc, RwLock}, collections::HashMap, path::PathBuf, io::{Error, use notify::{RecommendedWatcher, Config, RecursiveMode, Watcher, event::ModifyKind}; use tokio::sync::Mutex; -use crate::{framework::{directories::get_app_directory, schema::{parse_schema_file, OpsTable}}, cli::display::show_message, infrastructure::{stream, olap::{self, clickhouse::{ConfiguredClient, mapper, ClickhouseTable, config::ClickhouseConfig}}}}; +use crate::{framework::{directories::get_app_directory, schema::{parse_schema_file, TableOps, MatViewOps}}, cli::display::show_message, infrastructure::{stream, olap::{self, clickhouse::{ConfiguredClient, mapper, ClickhouseTable, config::ClickhouseConfig, ClickhouseView}}}}; use super::{CommandTerminal, display::{MessageType, Message}}; -// fn route_to_topic_name(route: PathBuf) -> String { -// let route = route.to_str().unwrap().to_string(); -// let route = route.replace("/", "."); -// route -// } - fn dataframe_path_to_ingest_route(project_dir: PathBuf, path: PathBuf, table_name: String) -> PathBuf { let dataframe_path = project_dir.join("dataframes"); let mut route = path.strip_prefix(dataframe_path).unwrap().to_path_buf(); @@ -54,6 +48,7 @@ async fn process_event(project_dir: PathBuf, event: notify::Event, route_table: pub struct RouteMeta { pub original_file_path: PathBuf, pub table_name: String, + pub view_name: Option, } async fn create_table_and_topics_from_dataframe_route(route: &PathBuf, project_dir: PathBuf, route_table: &mut tokio::sync::MutexGuard<'_, HashMap::>, configured_client: &ConfiguredClient) -> Result<(), Error> { @@ -64,12 +59,25 @@ async fn create_table_and_topics_from_dataframe_route(route: &PathBuf, project_d for table in tables { let ingest_route = dataframe_path_to_ingest_route(project_dir.clone(), route.clone(), table.name.clone()); - route_table.insert(ingest_route, RouteMeta { original_file_path: route.clone(), table_name: table.name.clone() }); + stream::redpanda::create_topic_from_name(table.name.clone()); - let query = table.create_table_query() + let table_query = table.create_table_query() + .map_err(|e| Error::new(ErrorKind::Other, format!("Failed to get clickhouse query: {:?}", e)))?; + + olap::clickhouse::run_query(table_query, configured_client).await + .map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?; + + let view = ClickhouseView::new( + table.db_name.clone(), + format!("{}_view", table.name), + table.clone()); + let view_query = view.create_materialized_view_query() .map_err(|e| Error::new(ErrorKind::Other, format!("Failed to get clickhouse query: {:?}", e)))?; - olap::clickhouse::run_query(query, configured_client).await + + olap::clickhouse::run_query(view_query, configured_client).await .map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?; + + route_table.insert(ingest_route, RouteMeta { original_file_path: route.clone(), table_name: table.name.clone(), view_name: Some(view.name.clone()) }); }; } } else { @@ -85,9 +93,14 @@ async fn remove_table_and_topics_from_dataframe_route(route: &PathBuf, route_tab if meta.original_file_path == route.clone() { stream::redpanda::delete_topic(meta.table_name.clone()); - olap::clickhouse::delete_table(meta.table_name, configured_client).await + olap::clickhouse::delete_table_or_view(meta.table_name, configured_client).await .map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?; + if let Some(view_name) = meta.view_name { + olap::clickhouse::delete_table_or_view(view_name, configured_client).await + .map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?; + } + route_table.remove(&k); } } diff --git a/apps/igloo-kit-cli/src/framework/schema.rs b/apps/igloo-kit-cli/src/framework/schema.rs index 322be8148..ff9385d42 100644 --- a/apps/igloo-kit-cli/src/framework/schema.rs +++ b/apps/igloo-kit-cli/src/framework/schema.rs @@ -50,11 +50,16 @@ pub struct Table { pub columns: Vec, } -pub trait OpsTable { +pub trait TableOps { fn create_table_query(&self) -> Result; fn drop_table_query(&self) -> Result; } +pub trait MatViewOps { + fn create_materialized_view_query(&self) -> Result; + fn drop_materialized_view_query(&self) -> Result; +} + #[derive(Debug, Clone)] pub struct Column { pub name: String, diff --git a/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse.rs b/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse.rs index a0acce7e5..45f982c20 100644 --- a/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse.rs +++ b/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse.rs @@ -2,15 +2,15 @@ pub mod config; pub mod mapper; mod queries; -use std::fmt; +use std::fmt::{self}; use clickhouse::Client; use reqwest::Url; use schema_ast::ast::FieldArity; -use crate::framework::schema::{OpsTable, UnsupportedDataTypeError}; +use crate::framework::schema::{TableOps, UnsupportedDataTypeError, MatViewOps}; -use self::{queries::{CreateTableQuery, DropTableQuery}, config::ClickhouseConfig}; +use self::{queries::{CreateTableQuery, DropTableQuery, CreateMaterializedViewQuery, DropMaterializedViewQuery}, config::ClickhouseConfig}; #[derive(Debug, Clone)] pub enum ClickhouseTableType { @@ -20,6 +20,12 @@ pub enum ClickhouseTableType { Unsupported } +impl fmt::Display for ClickhouseTableType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} + #[derive(Debug, Clone)] pub enum ClickhouseColumnType { String, @@ -107,7 +113,7 @@ impl ClickhouseTable { } } -impl OpsTable for ClickhouseTable { +impl TableOps for ClickhouseTable { fn create_table_query(&self) -> Result { CreateTableQuery::new(self.clone(), "redpanda-1".to_string(), 9092, self.name.clone()) } @@ -117,12 +123,39 @@ impl OpsTable for ClickhouseTable { } } +#[derive(Debug, Clone)] +pub struct ClickhouseView { + pub db_name: String, + pub name: String, + pub source_table: ClickhouseTable, +} + +impl ClickhouseView { + pub fn new(db_name: String, name: String, source_table: ClickhouseTable) -> ClickhouseView { + ClickhouseView { + db_name, + name, + source_table, + } + } +} + +pub type QueryString = String; + +impl MatViewOps for ClickhouseView { + fn create_materialized_view_query(&self) -> Result { + CreateMaterializedViewQuery::new(self.clone()) + } + fn drop_materialized_view_query(&self) -> Result { + DropMaterializedViewQuery::new(self.clone()) + } +} + pub struct ConfiguredClient { pub client: Client, pub config: ClickhouseConfig, } - pub fn create_client(clickhouse_config: ClickhouseConfig) -> ConfiguredClient { ConfiguredClient { client: Client::default() @@ -135,28 +168,14 @@ pub fn create_client(clickhouse_config: ClickhouseConfig) -> ConfiguredClient { } // Run an arbitrary clickhouse query -pub async fn run_query(query: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> { +pub async fn run_query(query: QueryString, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> { let client = &configured_client.client; client.query(query.as_str()).execute().await } -// Creates a table in clickhouse from a file name. this table should have a single field that accepts a json blob -pub async fn create_table(table_name: String, topic: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> { - let client = &configured_client.client; - let config = &configured_client.config; - let db_name = &config.db_name; - let cluster_network = &config.cluster_network; - let kafka_port = &config.kafka_port; - - // If you want to change the settings when doing a query you can do it as follows: SETTINGS allow_experimental_object_type = 1; - client.query(format!("CREATE TABLE IF NOT EXISTS {db_name}.{table_name} - (data String) ENGINE = Kafka('{cluster_network}:{kafka_port}', '{topic}', 'clickhouse-group', 'JSONEachRow') ;") - .as_str() ).execute().await -} - -pub async fn delete_table(table_name: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> { +pub async fn delete_table_or_view(table_or_view_name: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> { let client = &configured_client.client; let db_name = &configured_client.config.db_name; - client.query(format!("DROP TABLE {db_name}.{table_name}").as_str()).execute().await + client.query(format!("DROP TABLE {db_name}.{table_or_view_name}").as_str()).execute().await } diff --git a/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse/queries.rs b/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse/queries.rs index 106eb30c1..293574003 100644 --- a/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse/queries.rs +++ b/apps/igloo-kit-cli/src/infrastructure/olap/clickhouse/queries.rs @@ -4,6 +4,8 @@ use tinytemplate::TinyTemplate; use crate::{framework::schema::UnsupportedDataTypeError, infrastructure::olap::clickhouse::{ClickhouseColumn, ClickhouseColumnType, ClickhouseTable, ClickhouseInt, ClickhouseFloat}}; +use super::{ClickhouseTableType, ClickhouseView}; + // TODO: Add column comment capability to the schemna and template pub static CREATE_TABLE_TEMPLATE: &str = r#" CREATE TABLE IF NOT EXISTS {db_name}.{table_name} @@ -48,7 +50,9 @@ impl CreateTableContext { Ok(CreateTableContext { db_name: table.db_name, table_name: table.name, - fields: table.columns.into_iter().map(|column| CreateTableFieldContext::new(column)).collect::, UnsupportedDataTypeError>>()?, + fields: table.columns.into_iter() + .map(|column| CreateTableFieldContext::new(column)) + .collect::, UnsupportedDataTypeError>>()?, primary_key_string: if primary_key.len() > 0 { Some(primary_key.join(", ")) } else { None }, cluster_network, kafka_port, @@ -58,7 +62,6 @@ impl CreateTableContext { } } - #[derive(Serialize)] struct CreateTableFieldContext { field_name: String, @@ -72,6 +75,107 @@ impl CreateTableFieldContext { } } +pub static DROP_TABLE_TEMPLATE: &str = r#" +DROP TABLE IF EXISTS {db_name}.{table_name}; +"#; + +pub struct DropTableQuery; + +impl DropTableQuery { + pub fn new(table: ClickhouseTable) -> Result { + let mut tt = TinyTemplate::new(); + tt.add_template("drop_table", DROP_TABLE_TEMPLATE).unwrap(); + let context = DropTableContext::new(table)?; + let rendered = tt.render("drop_table", &context).unwrap(); + Ok(rendered) + } +} + +#[derive(Serialize)] +struct DropTableContext { + db_name: String, + table_name: String, +} + +impl DropTableContext { + fn new(table: ClickhouseTable) -> Result { + Ok(DropTableContext { + db_name: table.db_name, + table_name: table.name, + }) + } +} + + +pub static CREATE_MATERIALIZED_VIEW_TEMPLATE: &str = r#" +CREATE MATERIALIZED VIEW IF NOT EXISTS {db_name}.{view_name} +ENGINE = Memory +AS +SELECT * FROM {db_name}.{source_table_name} +SETTINGS +stream_like_engine_allow_direct_select = 1; +"#; + +pub struct CreateMaterializedViewQuery; + +impl CreateMaterializedViewQuery { + pub fn new(view: ClickhouseView) -> Result { + let mut tt = TinyTemplate::new(); + tt.add_template("create_materialized_view", CREATE_MATERIALIZED_VIEW_TEMPLATE).unwrap(); + let context = CreateMaterializedViewContext::new(view)?; + let rendered = tt.render("create_materialized_view", &context).unwrap(); + Ok(rendered) + } +} + +pub static DROP_MATERIALIZED_VIEW_TEMPLATE: &str = r#" +DROP TABLE IF EXISTS {db_name}.{view_name}; +"#; + +pub struct DropMaterializedViewQuery; + +impl DropMaterializedViewQuery { + pub fn new(table: ClickhouseView) -> Result { + let mut tt = TinyTemplate::new(); + tt.add_template("drop_materialized_view", DROP_MATERIALIZED_VIEW_TEMPLATE).unwrap(); + let context = DropMaterializedViewContext::new(table)?; + let rendered = tt.render("drop_materialized_view", &context).unwrap(); + Ok(rendered) + } +} + +#[derive(Serialize)] +struct DropMaterializedViewContext { + db_name: String, + view_name: String, +} + +impl DropMaterializedViewContext { + fn new(view: ClickhouseView) -> Result { + Ok(DropMaterializedViewContext { + db_name: view.db_name, + view_name: format!("{}_view", view.name), + }) + } +} + +#[derive(Serialize)] +struct CreateMaterializedViewContext { + db_name: String, + view_name: String, + source_table_name: String, +} + +impl CreateMaterializedViewContext { + fn new(view: ClickhouseView) -> Result { + Ok(CreateMaterializedViewContext { + db_name: view.db_name, + view_name: view.name, + source_table_name: view.source_table.name, + }) + } +} + fn field_type_to_string(field_type: ClickhouseColumnType) -> Result { // Blowing out match statements here in case we need to customize the output string for some types. match field_type { @@ -119,42 +223,4 @@ fn clickhouse_column_to_create_table_field_context(column: ClickhouseColumn) -> field_arity: if column.arity.is_required() { "NOT NULL".to_string() } else { "NULL".to_string() }, }) } -} - -pub static DROP_TABLE_TEMPLATE: &str = r#" -DROP TABLE IF EXISTS {db_name}.{table_name}; -"#; - -pub struct DropTableQuery; - -impl DropTableQuery { - pub fn new(table: ClickhouseTable) -> Result { - let mut tt = TinyTemplate::new(); - tt.add_template("drop_table", DROP_TABLE_TEMPLATE).unwrap(); - let context = DropTableContext::new(table)?; - let rendered = tt.render("drop_table", &context).unwrap(); - Ok(rendered) - } -} - -#[derive(Serialize)] -struct DropTableContext { - db_name: String, - table_name: String, -} - -impl DropTableContext { - fn new(table: ClickhouseTable) -> Result { - Ok(DropTableContext { - db_name: table.db_name, - table_name: table.name, - }) - } -} - - - -// TODO: Implement view creation for clickhouse -// static CREATE_VIEW_TEMPLATE: &str = r#" - -// "#; \ No newline at end of file +} \ No newline at end of file