diff --git a/apps/framework-cli/src/cli/routines.rs b/apps/framework-cli/src/cli/routines.rs index 0768292bb..a5fa05b75 100644 --- a/apps/framework-cli/src/cli/routines.rs +++ b/apps/framework-cli/src/cli/routines.rs @@ -222,7 +222,7 @@ impl RoutineController { } // Starts the file watcher and the webserver -pub async fn start_development_mode(project: &Project) -> Result<(), Error> { +pub async fn start_development_mode(project: &Project) -> anyhow::Result<()> { show_message!( MessageType::Success, Message { @@ -231,11 +231,11 @@ pub async fn start_development_mode(project: &Project) -> Result<(), Error> { } ); - // TODO: Explore using a RWLock instead of a Mutex to ensure concurrent reads without locks let mut route_table = HashMap::::new(); info!("Initializing project state"); initialize_project_state(project.schemas_dir(), project, &mut route_table).await?; + let route_table: &'static RwLock> = Box::leak(Box::new(RwLock::new(route_table))); @@ -258,7 +258,7 @@ async fn initialize_project_state( schema_dir: PathBuf, project: &Project, route_table: &mut HashMap, -) -> Result<(), Error> { +) -> anyhow::Result<()> { let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone()); let producer = redpanda::create_producer(project.redpanda_config.clone()); @@ -294,7 +294,7 @@ async fn process_schemas_in_dir( project: &Project, configured_client: &ConfiguredDBClient, route_table: &mut HashMap, -) -> Result<(), Error> { +) -> anyhow::Result<()> { if schema_dir.is_dir() { for entry in std::fs::read_dir(schema_dir)? { let entry = entry?; diff --git a/apps/framework-cli/src/cli/watcher.rs b/apps/framework-cli/src/cli/watcher.rs index f65e227f2..22df3cf07 100644 --- a/apps/framework-cli/src/cli/watcher.rs +++ b/apps/framework-cli/src/cli/watcher.rs @@ -25,7 +25,7 @@ async fn process_event( event: notify::Event, route_table: &RwLock>, configured_client: &ConfiguredDBClient, -) -> Result<(), Error> { +) -> anyhow::Result<()> { let mut route_table = route_table.write().await; debug!( @@ -93,7 +93,7 @@ async fn create_framework_objects_from_schema_file_path( schema_file_path: &Path, route_table: &mut HashMap, configured_client: &ConfiguredDBClient, -) -> Result<(), Error> { +) -> anyhow::Result<()> { //! Creates the route, topics and tables from a path to the schema file if let Some(ext) = schema_file_path.extension() { diff --git a/apps/framework-cli/src/framework/controller.rs b/apps/framework-cli/src/framework/controller.rs index e8a6fab8f..2e5ab70d5 100644 --- a/apps/framework-cli/src/framework/controller.rs +++ b/apps/framework-cli/src/framework/controller.rs @@ -27,7 +27,7 @@ use crate::infrastructure::olap; use std::io::ErrorKind; -use crate::infrastructure::olap::clickhouse::ClickhouseView; +use crate::infrastructure::olap::clickhouse::ClickhouseKafkaTrigger; use std::io::Error; @@ -35,8 +35,6 @@ use crate::infrastructure::olap::clickhouse::ConfiguredDBClient; use super::schema::parse_schema_file; use super::schema::DataModel; -use super::schema::MatViewOps; -use super::schema::TableOps; use super::typescript::TypescriptInterface; #[derive(Debug, Clone)] @@ -51,7 +49,7 @@ pub fn framework_object_mapper(s: DataModel) -> FrameworkObject { let clickhouse_table = olap::clickhouse::mapper::std_table_to_clickhouse_table(s.to_table()); FrameworkObject { data_model: s.clone(), - table: clickhouse_table.clone(), + table: clickhouse_table, topic: s.name.clone(), ts_interface: framework::typescript::mapper::std_table_to_typescript_interface( s.to_table(), @@ -98,12 +96,17 @@ pub fn get_framework_objects_from_schema_file(path: &Path) -> Result Result<(), Error> { - let view = ClickhouseView::new(fo.table.db_name.clone(), view_name, fo.table.clone()); + let view = ClickhouseKafkaTrigger::new( + fo.table.db_name.clone(), + view_name, + fo.table.kafka_table_name(), + fo.table.name.clone(), + ); let create_view_query = view.create_materialized_view_query().map_err(|e| { Error::new( ErrorKind::Other, @@ -137,23 +140,17 @@ pub(crate) async fn create_or_replace_view( Ok(()) } -pub(crate) async fn create_or_replace_table( +pub(crate) async fn create_or_replace_tables( fo: &FrameworkObject, configured_client: &ConfiguredDBClient, -) -> Result<(), Error> { +) -> anyhow::Result<()> { info!("Creating table: {:?}", fo.table.name); - let create_table_query = fo.table.create_table_query().map_err(|e| { - Error::new( - ErrorKind::Other, - format!("Failed to get clickhouse query: {:?}", e), - ) - })?; - let drop_table_query = fo.table.drop_table_query().map_err(|e| { - Error::new( - ErrorKind::Other, - format!("Failed to get clickhouse query: {:?}", e), - ) - })?; + + let drop_data_table_query = fo.table.drop_kafka_table_query()?; + let drop_kafka_table_query = fo.table.drop_data_table_query()?; + + let create_data_table_query = fo.table.create_data_table_query()?; + let create_kafka_table_query = fo.table.create_kafka_table_query()?; olap::clickhouse::check_ready(configured_client) .await @@ -165,22 +162,10 @@ pub(crate) async fn create_or_replace_table( })?; // Clickhouse doesn't support dropping a view if it doesn't exist so we need to drop it first in case the schema has changed - olap::clickhouse::run_query(&drop_table_query, configured_client) - .await - .map_err(|e| { - Error::new( - ErrorKind::Other, - format!("Failed to drop table in clickhouse: {}", e), - ) - })?; - olap::clickhouse::run_query(&create_table_query, configured_client) - .await - .map_err(|e| { - Error::new( - ErrorKind::Other, - format!("Failed to create table in clickhouse: {}", e), - ) - })?; + olap::clickhouse::run_query(&drop_data_table_query, configured_client).await?; + olap::clickhouse::run_query(&drop_kafka_table_query, configured_client).await?; + olap::clickhouse::run_query(&create_data_table_query, configured_client).await?; + olap::clickhouse::run_query(&create_kafka_table_query, configured_client).await?; Ok(()) } @@ -245,7 +230,7 @@ pub async fn remove_table_and_topics_from_schema_file_path( schema_file_path: &Path, route_table: &mut HashMap, configured_client: &ConfiguredDBClient, -) -> Result<(), Error> { +) -> anyhow::Result<()> { //need to get the path of the file, scan the route table and remove all the files that need to be deleted. // This doesn't have to be as fast as the scanning for routes in the web server so we're ok with the scan here. @@ -299,7 +284,7 @@ pub async fn process_objects( configured_client: &ConfiguredDBClient, compilable_objects: &mut Vec, // Objects that require compilation after processing route_table: &mut HashMap, -) -> Result<(), Error> { +) -> anyhow::Result<()> { for fo in framework_objects { let ingest_route = schema_file_path_to_ingest_route( project.app_dir().clone(), @@ -310,10 +295,10 @@ pub async fn process_objects( debug!("Creating table & view: {:?}", fo.table.name); - let view_name = format!("{}_view", fo.table.name); + let view_name = format!("{}_trigger", fo.table.name); - create_or_replace_table(&fo, configured_client).await?; - create_or_replace_view(&fo, view_name.clone(), configured_client).await?; + create_or_replace_tables(&fo, configured_client).await?; + create_or_replace_kafka_trigger(&fo, view_name.clone(), configured_client).await?; debug!("Table created: {:?}", fo.table.name); diff --git a/apps/framework-cli/src/framework/schema.rs b/apps/framework-cli/src/framework/schema.rs index b493e7236..a406b5801 100644 --- a/apps/framework-cli/src/framework/schema.rs +++ b/apps/framework-cli/src/framework/schema.rs @@ -1,7 +1,7 @@ +use std::fmt::{Display, Formatter}; use std::{ collections::HashMap, fmt, - io::Error, path::{Path, PathBuf}, }; @@ -38,6 +38,14 @@ pub struct UnsupportedDataTypeError { pub type_name: String, } +impl Display for UnsupportedDataTypeError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "UnsupportedDataTypeError: {}", self.type_name) + } +} + +impl std::error::Error for UnsupportedDataTypeError {} + /// A function that maps an input type to an output type type MapperFunc = fn(i: I) -> O; @@ -307,7 +315,7 @@ pub async fn process_schema_file( project: &Project, configured_client: &ConfiguredDBClient, route_table: &mut HashMap, -) -> Result<(), Error> { +) -> anyhow::Result<()> { let framework_objects = get_framework_objects_from_schema_file(schema_file_path)?; let mut compilable_objects: Vec = Vec::new(); process_objects( diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse.rs index 9c4582195..e32249663 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse.rs @@ -10,14 +10,14 @@ use log::debug; use serde::{Deserialize, Serialize}; use crate::{ - framework::schema::{FieldArity, MatViewOps, TableOps, UnsupportedDataTypeError}, + framework::schema::{FieldArity, UnsupportedDataTypeError}, utilities::constants::REDPANDA_CONTAINER_NAME, }; use self::{ config::ClickhouseConfig, queries::{ - CreateMaterializedViewQuery, CreateTableQuery, DropMaterializedViewQuery, DropTableQuery, + CreateKafkaTriggerViewQuery, CreateTableQuery, DropMaterializedViewQuery, DropTableQuery, }, }; @@ -158,45 +158,70 @@ impl ClickhouseTable { } } -impl TableOps for ClickhouseTable { - fn create_table_query(&self) -> Result { - CreateTableQuery::build( - self.clone(), +impl ClickhouseTable { + pub fn kafka_table_name(&self) -> String { + format!("{}_kafka", self.name) + } + + fn kafka_table(&self) -> ClickhouseTable { + ClickhouseTable { + name: self.kafka_table_name(), + ..self.clone() + } + } + + pub fn create_kafka_table_query(&self) -> Result { + CreateTableQuery::kafka( + self.kafka_table(), REDPANDA_CONTAINER_NAME.to_string(), 9092, self.name.clone(), ) } + pub fn create_data_table_query(&self) -> Result { + CreateTableQuery::build(self.clone(), "Memory".to_string()) + } + + pub fn drop_kafka_table_query(&self) -> Result { + DropTableQuery::build(self.kafka_table()) + } - fn drop_table_query(&self) -> Result { + pub fn drop_data_table_query(&self) -> Result { DropTableQuery::build(self.clone()) } } #[derive(Debug, Clone)] -pub struct ClickhouseView { +pub struct ClickhouseKafkaTrigger { pub db_name: String, pub name: String, - pub source_table: ClickhouseTable, + pub source_table_name: String, + pub dest_table_name: String, } -impl ClickhouseView { - pub fn new(db_name: String, name: String, source_table: ClickhouseTable) -> ClickhouseView { - ClickhouseView { +impl ClickhouseKafkaTrigger { + pub fn new( + db_name: String, + name: String, + source_table_name: String, + dest_table_name: String, + ) -> ClickhouseKafkaTrigger { + ClickhouseKafkaTrigger { db_name, name, - source_table, + source_table_name, + dest_table_name, } } } pub type QueryString = String; -impl MatViewOps for ClickhouseView { - fn create_materialized_view_query(&self) -> Result { - CreateMaterializedViewQuery::build(self.clone()) +impl ClickhouseKafkaTrigger { + pub fn create_materialized_view_query(&self) -> Result { + Ok(CreateKafkaTriggerViewQuery::build(self.clone())) } - fn drop_materialized_view_query(&self) -> Result { + pub fn drop_materialized_view_query(&self) -> Result { DropMaterializedViewQuery::build(self.clone()) } } diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs index 8e9925a85..4315d9391 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs @@ -1,5 +1,5 @@ use serde::Serialize; -use tinytemplate::TinyTemplate; +use tinytemplate::{format_unescaped, TinyTemplate}; use crate::{ framework::schema::{FieldArity, UnsupportedDataTypeError}, @@ -8,10 +8,10 @@ use crate::{ }, }; -use super::ClickhouseView; +use super::ClickhouseKafkaTrigger; -// TODO: Add column comment capability to the schemna and template -pub static CREATE_TABLE_TEMPLATE: &str = r#" +// TODO: Add column comment capability to the schema and template +static CREATE_TABLE_TEMPLATE: &str = r#" CREATE TABLE IF NOT EXISTS {db_name}.{table_name} ( {{for field in fields}}{field.field_name} {field.field_type} {field.field_arity}, @@ -20,22 +20,44 @@ CREATE TABLE IF NOT EXISTS {db_name}.{table_name} PRIMARY KEY ({primary_key_string}) {{endif}} ) -ENGINE = Kafka('{kafka_host}:{kafka_port}', '{topic}', 'clickhouse-group', 'JSONEachRow'); +ENGINE = {engine}; +"#; + +static CREATE_KAFKA_TRIGGER_TEMPLATE: &str = r#" +CREATE MATERIALIZED VIEW IF NOT EXISTS {db_name}.{view_name} TO {db_name}.{dest_table_name} +AS +SELECT * FROM {db_name}.{source_table_name} +SETTINGS +stream_like_engine_allow_direct_select = 1; "#; pub struct CreateTableQuery; impl CreateTableQuery { - pub fn build( + pub fn kafka( table: ClickhouseTable, kafka_host: String, kafka_port: u16, topic: String, + ) -> Result { + CreateTableQuery::build( + table, + format!( + "Kafka('{}:{}', '{}', 'clickhouse-group', 'JSONEachRow')", + kafka_host, kafka_port, topic, + ), + ) + } + + pub fn build( + table: ClickhouseTable, + engine: String, ) -> Result { let mut tt = TinyTemplate::new(); + tt.set_default_formatter(&format_unescaped); // by default it formats HTML-escaped and messes up single quotes tt.add_template("create_table", CREATE_TABLE_TEMPLATE) .unwrap(); - let context = CreateTableContext::new(table, kafka_host, kafka_port, topic)?; + let context = CreateTableContext::new(table, engine)?; let rendered = tt.render("create_table", &context).unwrap(); Ok(rendered) } @@ -47,17 +69,13 @@ struct CreateTableContext { table_name: String, fields: Vec, primary_key_string: Option, - kafka_host: String, - kafka_port: u16, - topic: String, + engine: String, } impl CreateTableContext { fn new( table: ClickhouseTable, - kafka_host: String, - kafka_port: u16, - topic: String, + engine: String, ) -> Result { let primary_key = table .columns @@ -79,9 +97,7 @@ impl CreateTableContext { } else { None }, - kafka_host, - kafka_port, - topic, + engine, }) } } @@ -130,41 +146,28 @@ impl DropTableContext { } } -pub static CREATE_MATERIALIZED_VIEW_TEMPLATE: &str = r#" -CREATE MATERIALIZED VIEW IF NOT EXISTS {db_name}.{view_name} -ENGINE = Memory -AS -SELECT * FROM {db_name}.{source_table_name} -SETTINGS -stream_like_engine_allow_direct_select = 1; -"#; - -pub struct CreateMaterializedViewQuery; +pub struct CreateKafkaTriggerViewQuery; -impl CreateMaterializedViewQuery { - pub fn build(view: ClickhouseView) -> Result { +impl CreateKafkaTriggerViewQuery { + pub fn build(view: ClickhouseKafkaTrigger) -> String { let mut tt = TinyTemplate::new(); - tt.add_template( - "create_materialized_view", - CREATE_MATERIALIZED_VIEW_TEMPLATE, - ) - .unwrap(); - let context = CreateMaterializedViewContext::new(view)?; - let rendered = tt.render("create_materialized_view", &context).unwrap(); - Ok(rendered) + tt.add_template("create_materialized_view", CREATE_KAFKA_TRIGGER_TEMPLATE) + .unwrap(); + let context = CreateKafkaTriggerContext::new(view); + tt.render("create_materialized_view", &context).unwrap() } } -pub static DROP_MATERIALIZED_VIEW_TEMPLATE: &str = r#" -DROP TABLE IF EXISTS {db_name}.{view_name}; +pub static DROP_VIEW_TEMPLATE: &str = r#" +DROP VIEW IF EXISTS {db_name}.{view_name}; "#; pub struct DropMaterializedViewQuery; impl DropMaterializedViewQuery { - pub fn build(table: ClickhouseView) -> Result { + pub fn build(table: ClickhouseKafkaTrigger) -> Result { let mut tt = TinyTemplate::new(); - tt.add_template("drop_materialized_view", DROP_MATERIALIZED_VIEW_TEMPLATE) + tt.add_template("drop_materialized_view", DROP_VIEW_TEMPLATE) .unwrap(); let context = DropMaterializedViewContext::new(table)?; let rendered = tt.render("drop_materialized_view", &context).unwrap(); @@ -179,7 +182,9 @@ struct DropMaterializedViewContext { } impl DropMaterializedViewContext { - fn new(view: ClickhouseView) -> Result { + fn new( + view: ClickhouseKafkaTrigger, + ) -> Result { Ok(DropMaterializedViewContext { db_name: view.db_name, view_name: format!("{}_view", view.name), @@ -188,21 +193,21 @@ impl DropMaterializedViewContext { } #[derive(Serialize)] -struct CreateMaterializedViewContext { +struct CreateKafkaTriggerContext { db_name: String, view_name: String, source_table_name: String, + dest_table_name: String, } -impl CreateMaterializedViewContext { - fn new( - view: ClickhouseView, - ) -> Result { - Ok(CreateMaterializedViewContext { +impl CreateKafkaTriggerContext { + fn new(view: ClickhouseKafkaTrigger) -> CreateKafkaTriggerContext { + CreateKafkaTriggerContext { db_name: view.db_name, view_name: view.name, - source_table_name: view.source_table.name, - }) + source_table_name: view.source_table_name, + dest_table_name: view.dest_table_name, + } } } diff --git a/apps/framework-cli/src/project.rs b/apps/framework-cli/src/project.rs index c06798172..0bf3b6a37 100644 --- a/apps/framework-cli/src/project.rs +++ b/apps/framework-cli/src/project.rs @@ -175,8 +175,8 @@ impl Project { let config_file = PackageJsonFile { name: self.name.clone(), version: "0.0".to_string(), - // For local development of the CLI - // change `moose-cli` to `/apps/moose-kit-cli/target/debug/moose-cli` + // For local development of the CLI, + // change `moose-cli` to `/apps/framework-cli/target/debug/moose-cli` scripts: HashMap::from([("dev".to_string(), "moose-cli dev".to_string())]), dependencies: HashMap::new(), dev_dependencies: HashMap::from([(