From 276dbc1e485d669d5930c48dde20727e86f6a334 Mon Sep 17 00:00:00 2001 From: George Leung Date: Tue, 27 Feb 2024 00:15:57 -0500 Subject: [PATCH] versioned tables, version sync, and rewritten watcher logic (#469) --- apps/framework-cli/Cargo.lock | 9 +- apps/framework-cli/Cargo.toml | 1 + apps/framework-cli/src/cli/local_webserver.rs | 16 +- apps/framework-cli/src/cli/routines.rs | 143 +++++--- apps/framework-cli/src/cli/watcher.rs | 329 ++++++++++++------ .../framework-cli/src/framework/controller.rs | 294 ++++++++++++---- apps/framework-cli/src/framework/schema.rs | 155 ++++----- apps/framework-cli/src/framework/sdks.rs | 10 +- .../src/framework/typescript/mapper.rs | 4 +- .../src/framework/typescript/templates.rs | 7 +- .../src/infrastructure/console.rs | 59 ++-- .../src/infrastructure/olap/clickhouse.rs | 80 ++++- .../infrastructure/olap/clickhouse/queries.rs | 65 +++- .../src/infrastructure/stream/redpanda.rs | 2 +- apps/framework-cli/src/project.rs | 16 +- apps/framework-cli/src/utilities/constants.rs | 3 +- .../tables/[tableId]/table-tabs.tsx | 4 +- apps/moose-console/src/lib/snippets.ts | 2 +- apps/moose-console/src/lib/utils.ts | 17 +- 19 files changed, 832 insertions(+), 384 deletions(-) diff --git a/apps/framework-cli/Cargo.lock b/apps/framework-cli/Cargo.lock index fe7d3ea66..53c35e9ad 100644 --- a/apps/framework-cli/Cargo.lock +++ b/apps/framework-cli/Cargo.lock @@ -1313,6 +1313,7 @@ dependencies = [ "openssl", "predicates", "rdkafka", + "regex", "reqwest", "schema-ast", "sentry", @@ -1775,9 +1776,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", @@ -1787,9 +1788,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", diff --git a/apps/framework-cli/Cargo.toml b/apps/framework-cli/Cargo.toml index f72877d52..146ca5929 100644 --- a/apps/framework-cli/Cargo.toml +++ b/apps/framework-cli/Cargo.toml @@ -38,6 +38,7 @@ lazy_static = "1.4.0" anyhow = "1.0" spinners = "4.1.1" git2 = { version = "0.18.1", features = ["vendored-libgit2"] } +regex = "1.10.3" [dev-dependencies] clickhouse = { version = "0.11.5", features = ["uuid", "test-util"] } diff --git a/apps/framework-cli/src/cli/local_webserver.rs b/apps/framework-cli/src/cli/local_webserver.rs index 2fc566964..7e40a2961 100644 --- a/apps/framework-cli/src/cli/local_webserver.rs +++ b/apps/framework-cli/src/cli/local_webserver.rs @@ -67,6 +67,7 @@ struct RouteService { route_table: &'static RwLock>, configured_producer: ConfiguredProducer, console_config: ConsoleConfig, + current_version: String, } impl Service> for RouteService { @@ -77,6 +78,7 @@ impl Service> for RouteService { fn call(&self, req: Request) -> Self::Future { Box::pin(router( req, + self.current_version.clone(), self.route_table, self.configured_producer.clone(), self.console_config.clone(), @@ -189,6 +191,7 @@ async fn ingest_route( async fn router( req: Request, + current_version: String, route_table: &RwLock>, configured_producer: ConfiguredProducer, console_config: ConsoleConfig, @@ -211,9 +214,19 @@ async fn router( ); let route_split = route.to_str().unwrap().split('/').collect::>(); - match (req.method(), &route_split[..]) { (&hyper::Method::POST, ["ingest", _]) => { + ingest_route( + req, + // without explicit version, go to current project version + route.join(current_version), + configured_producer, + route_table, + console_config, + ) + .await + } + (&hyper::Method::POST, ["ingest", _, _]) => { ingest_route(req, route, configured_producer, route_table, console_config).await } @@ -283,6 +296,7 @@ impl Webserver { let route_service = RouteService { route_table, + current_version: project.version().to_string(), configured_producer: producer, console_config: project.console_config.clone(), }; diff --git a/apps/framework-cli/src/cli/routines.rs b/apps/framework-cli/src/cli/routines.rs index 13e77c872..e3d22abff 100644 --- a/apps/framework-cli/src/cli/routines.rs +++ b/apps/framework-cli/src/cli/routines.rs @@ -80,27 +80,28 @@ //! use std::collections::HashMap; -use std::path::Path; use std::sync::Arc; use std::{io::Error, path::PathBuf}; use log::debug; +use log::info; use tokio::sync::RwLock; -use super::display::with_spinner_async; -use super::local_webserver::Webserver; -use super::watcher::FileWatcher; -use super::{Message, MessageType}; -use crate::framework::controller::RouteMeta; -use crate::framework::schema::process_schema_file; +use crate::framework::controller::{ + create_or_replace_version_sync, get_all_framework_objects, get_all_version_syncs, + process_objects, FrameworkObject, FrameworkObjectVersions, RouteMeta, SchemaVersion, +}; +use crate::framework::sdks::generate_ts_sdk; use crate::infrastructure::console::post_current_state_to_console; use crate::infrastructure::olap; -use crate::infrastructure::olap::clickhouse::ConfiguredDBClient; use crate::infrastructure::stream::redpanda; use crate::project::Project; -use log::info; +use crate::utilities::package_managers; -use async_recursion::async_recursion; +use super::display::with_spinner_async; +use super::local_webserver::Webserver; +use super::watcher::FileWatcher; +use super::{Message, MessageType}; pub mod clean; pub mod initialize; @@ -237,7 +238,8 @@ pub async fn start_development_mode(project: Arc) -> anyhow::Result<()> let mut route_table = HashMap::::new(); info!("Initializing project state"); - initialize_project_state(project.clone(), &mut route_table).await?; + let framework_object_versions = + initialize_project_state(project.clone(), &mut route_table).await?; let route_table: &'static RwLock> = Box::leak(Box::new(RwLock::new(route_table))); @@ -246,7 +248,7 @@ pub async fn start_development_mode(project: Arc) -> anyhow::Result<()> let web_server = Webserver::new(server_config.host.clone(), server_config.port); let file_watcher = FileWatcher::new(); - file_watcher.start(project.clone(), route_table)?; + file_watcher.start(project.clone(), framework_object_versions, route_table)?; info!("Starting web server..."); @@ -258,31 +260,101 @@ pub async fn start_development_mode(project: Arc) -> anyhow::Result<()> async fn initialize_project_state( project: Arc, route_table: &mut HashMap, -) -> anyhow::Result<()> { +) -> anyhow::Result { + let mut old_version_dir = project.internal_dir()?; + old_version_dir.push("versions"); + let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone()); let producer = redpanda::create_producer(project.redpanda_config.clone()); + info!("Checking for old version directories..."); + + let mut framework_object_versions = + FrameworkObjectVersions::new(project.version().to_string(), project.schemas_dir().clone()); + match std::fs::read_dir(&old_version_dir) { + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + debug!("No old version directories found"); + } + Ok(read_dir) => { + for entry in read_dir { + let entry = entry?; + let path = entry.path(); + if path.is_dir() { + let version = path.file_name().unwrap().to_str().unwrap().to_string(); + + debug!("Processing old version directory: {:?}", path); + + let mut framework_objects = HashMap::new(); + get_all_framework_objects(&mut framework_objects, &path, &version)?; + + let mut compilable_objects = HashMap::new(); + process_objects( + &framework_objects, + project.clone(), + &path, + &configured_client, + &mut compilable_objects, + route_table, + &version, + ) + .await?; + + framework_object_versions.previous_version_models.insert( + version, + SchemaVersion { + base_path: path, + models: framework_objects, + typescript_objects: compilable_objects, + }, + ); + } + } + } + Err(e) => Err(e)?, + }; + let schema_dir = project.schemas_dir(); info!("Starting schema directory crawl..."); - with_spinner_async("Processing schema file", async { - let crawl_result = process_schemas_in_dir( - schema_dir.as_path(), + let mut framework_objects: HashMap = HashMap::new(); + get_all_framework_objects(&mut framework_objects, &schema_dir, project.version())?; + + let mut compilable_objects = HashMap::new(); + + let result = process_objects( + &framework_objects, project.clone(), + &schema_dir, &configured_client, + &mut compilable_objects, route_table, + project.version(), ) .await; + // TODO: add old versions to SDK + let sdk_location = generate_ts_sdk(project.clone(), &compilable_objects)?; + let package_manager = package_managers::PackageManager::Npm; + package_managers::install_packages(&sdk_location, &package_manager)?; + package_managers::run_build(&sdk_location, &package_manager)?; + package_managers::link_sdk(&sdk_location, None, &package_manager)?; + + framework_object_versions.current_models = SchemaVersion { + base_path: schema_dir.clone(), + models: framework_objects.clone(), + typescript_objects: compilable_objects, + }; + + olap::clickhouse::check_ready(&configured_client).await?; let _ = post_current_state_to_console( - project, + project.clone(), &configured_client, &producer, - route_table.clone(), + &framework_object_versions, ) .await; - match crawl_result { + match result { Ok(_) => { info!("Schema directory crawl completed successfully"); Ok(()) @@ -295,29 +367,18 @@ async fn initialize_project_state( } }) .await?; - Ok(()) -} -#[async_recursion] -async fn process_schemas_in_dir( - schema_dir: &Path, - project: Arc, - configured_client: &ConfiguredDBClient, - route_table: &mut HashMap, -) -> anyhow::Result<()> { - if schema_dir.is_dir() { - for entry in std::fs::read_dir(schema_dir)? { - let entry = entry?; - let path = entry.path(); - if path.is_dir() { - debug!("Processing directory: {:?}", path); - process_schemas_in_dir(&path, project.clone(), configured_client, route_table) - .await?; - } else { - debug!("Processing file: {:?}", path); - process_schema_file(&path, project.clone(), configured_client, route_table).await? + info!("Crawling version syncs"); + with_spinner_async::<_, anyhow::Result<()>>("Setting up version syncs", { + async { + let version_syncs = get_all_version_syncs(&project, &framework_object_versions)?; + for vs in version_syncs { + create_or_replace_version_sync(vs, &configured_client).await?; } + Ok(()) } - } - Ok(()) + }) + .await?; + + Ok(framework_object_versions) } diff --git a/apps/framework-cli/src/cli/watcher.rs b/apps/framework-cli/src/cli/watcher.rs index 59f013145..aa15430fc 100644 --- a/apps/framework-cli/src/cli/watcher.rs +++ b/apps/framework-cli/src/cli/watcher.rs @@ -1,138 +1,216 @@ +use std::collections::HashSet; +use std::sync::mpsc::TryRecvError; use std::sync::Arc; use std::{ collections::HashMap, io::{Error, ErrorKind}, - path::{Path, PathBuf}, + path::PathBuf, }; -use notify::{event::ModifyKind, Config, RecommendedWatcher, RecursiveMode, Watcher}; +use log::{debug, warn}; +use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher}; use tokio::sync::RwLock; -use super::display::{with_spinner_async, Message, MessageType}; +use crate::framework::controller::{ + create_language_objects, create_or_replace_kafka_trigger, create_or_replace_tables, + drop_kafka_trigger, drop_tables, get_framework_objects_from_schema_file, + schema_file_path_to_ingest_route, FrameworkObjectVersions, +}; +use crate::framework::schema::{is_prisma_file, DuplicateModelError}; +use crate::framework::sdks::generate_ts_sdk; +use crate::infrastructure::console::post_current_state_to_console; +use crate::infrastructure::olap::clickhouse::ClickhouseKafkaTrigger; use crate::infrastructure::stream::redpanda; +use crate::utilities::package_managers; use crate::{ - framework::controller::{remove_table_and_topics_from_schema_file_path, RouteMeta}, + framework::controller::RouteMeta, infrastructure::olap::{self, clickhouse::ConfiguredDBClient}, project::Project, - utilities::constants::SCHEMAS_DIR, -}; -use crate::{ - framework::schema::process_schema_file, infrastructure::console::post_current_state_to_console, }; -use log::{debug, info}; -async fn process_event( +use super::display::{with_spinner_async, Message, MessageType}; + +async fn process_events( project: Arc, - event: notify::Event, + events: Vec, + framework_object_versions: &mut FrameworkObjectVersions, route_table: &RwLock>, configured_client: &ConfiguredDBClient, ) -> anyhow::Result<()> { - let mut route_table = route_table.write().await; - debug!( "File Watcher Event Received: {:?}, with Route Table {:?}", - event, route_table + events, route_table ); - let route = event.paths[0].clone(); - - match event.kind { - notify::EventKind::Create(_) => { - // Only create tables and topics from prisma files in the datamodels directory - create_framework_objects_from_schema_file_path( - project, - &route, - &mut route_table, - configured_client, - ) - .await - } - notify::EventKind::Modify(mk) => { - match mk { - ModifyKind::Name(_) => { - // remove the file from the routes if they don't exist in the file directory - if route.exists() { - create_framework_objects_from_schema_file_path( - project, - &route, - &mut route_table, - configured_client, - ) - .await - } else { - remove_table_and_topics_from_schema_file_path( - &project.name(), - &route, - &mut route_table, - configured_client, - ) - .await - } - } + let paths = events + .into_iter() + .flat_map(|e| e.paths) + .filter(|p| is_prisma_file(p)) + .collect::>(); + + let mut new_objects = HashMap::new(); + let mut deleted_objects = HashMap::new(); + let mut changed_objects = HashMap::new(); + let mut moved_objects = HashMap::new(); + + let mut extra_models = HashMap::new(); - ModifyKind::Data(_) => { - if route.exists() { - create_framework_objects_from_schema_file_path( - project, - &route, - &mut route_table, - configured_client, - ) - .await? + let old_objects = &framework_object_versions.current_models.models; + + for path in paths { + // This is O(mn) but m and n are both small, so it should be fine + let mut removed_old_objects_in_file = old_objects + .iter() + .filter(|(_, fo)| fo.original_file_path == path) + .collect::>(); + + if path.exists() { + let obj_in_new_file = get_framework_objects_from_schema_file(&path, project.version())?; + for obj in obj_in_new_file { + removed_old_objects_in_file.remove(&obj.data_model.name); + + match old_objects.get(&obj.data_model.name) { + Some(changed) if changed.data_model != obj.data_model => { + if changed.original_file_path != obj.original_file_path + && deleted_objects.remove(&obj.data_model.name).is_none() + { + DuplicateModelError::try_insert(&mut extra_models, obj.clone(), &path)?; + }; + DuplicateModelError::try_insert(&mut changed_objects, obj, &path)?; + } + Some(moved) if moved.original_file_path != obj.original_file_path => { + if deleted_objects.remove(&obj.data_model.name).is_none() { + DuplicateModelError::try_insert(&mut extra_models, obj.clone(), &path)?; + }; + DuplicateModelError::try_insert(&mut moved_objects, obj, &path)?; + } + Some(_unchanged) => { + debug!("No changes to object: {:?}", obj.data_model.name); + } + None => { + DuplicateModelError::try_insert(&mut new_objects, obj, &path)?; } - Ok(()) } - _ => Ok(()), } } - notify::EventKind::Remove(_) => Ok(()), - _ => Ok(()), - } -} -async fn create_framework_objects_from_schema_file_path( - project: Arc, - schema_file_path: &Path, - route_table: &mut HashMap, - configured_client: &ConfiguredDBClient, -) -> anyhow::Result<()> { - //! Creates the route, topics and tables from a path to the schema file - - if let Some(ext) = schema_file_path.extension() { - if ext == "prisma" && schema_file_path.to_str().unwrap().contains(SCHEMAS_DIR) { - with_spinner_async("Processing schema file", async { - let result = - process_schema_file(schema_file_path, project, configured_client, route_table) - .await; - match result { - Ok(_) => { - show_message!(MessageType::Info, { - Message { - action: "Schema".to_string(), - details: "file processed".to_string(), - } - }); - } - Err(e) => { - show_message!(MessageType::Error, { - Message { - action: "Schema".to_string(), - details: format!("file failed to process: {}", e), - } - }); - } + removed_old_objects_in_file + .into_iter() + .for_each(|(name, old)| { + debug!("Found {name:?} removed from file {path:?}"); + if let Some(moved) = new_objects.remove(&old.data_model.name) { + changed_objects.insert(old.data_model.name.clone(), moved); + } else if !changed_objects.contains_key(&old.data_model.name) + && !moved_objects.contains_key(&old.data_model.name) + { + deleted_objects.insert(old.data_model.name.clone(), old.clone()); + } else { + extra_models.remove(name.as_str()); } - }) - .await + }); + } + + if let Some((model_name, extra)) = extra_models.into_iter().next() { + let other_file_path = old_objects + .get(&model_name) + .unwrap() + .original_file_path + .clone(); + return Err(DuplicateModelError { + model_name, + file_path: extra.original_file_path, + other_file_path, } - } else { - info!("No primsa extension found. Likely created unsupported file type") + .into()); + } + + debug!("new_objects = {new_objects:?}\ndeleted_objects = {deleted_objects:?}\nchanged_objects = {changed_objects:?}\nmoved_objects = {moved_objects:?}"); + + // grab the lock to prevent HTTP requests from being processed while we update the route table + let mut route_table = route_table.write().await; + for (_, fo) in deleted_objects { + drop_tables(&fo, configured_client).await?; + drop_kafka_trigger( + &ClickhouseKafkaTrigger::from_clickhouse_table(&fo.table), + configured_client, + ) + .await?; + route_table.remove(&schema_file_path_to_ingest_route( + &framework_object_versions.current_models.base_path, + &fo.original_file_path, + fo.data_model.name.clone(), + &framework_object_versions.current_version, + )); + redpanda::delete_topic(&project.name(), &fo.data_model.name)?; + + framework_object_versions + .current_models + .models + .remove(&fo.data_model.name); + + framework_object_versions + .current_models + .typescript_objects + .remove(&fo.data_model.name); + } + for (_, fo) in changed_objects.iter().chain(new_objects.iter()) { + create_or_replace_tables(&project.name(), fo, configured_client).await?; + let view = ClickhouseKafkaTrigger::from_clickhouse_table(&fo.table); + create_or_replace_kafka_trigger(&view, configured_client).await?; + + let ingest_route = schema_file_path_to_ingest_route( + &framework_object_versions.current_models.base_path, + &fo.original_file_path, + fo.data_model.name.clone(), + &framework_object_versions.current_version, + ); + + framework_object_versions + .current_models + .typescript_objects + .insert( + fo.data_model.name.clone(), + create_language_objects(fo, &ingest_route, project.clone())?, + ); + + route_table.insert( + ingest_route, + RouteMeta { + original_file_path: fo.original_file_path.clone(), + table_name: fo.table.name.clone(), + view_name: Some(view.name), + }, + ); + redpanda::create_topic_from_name(&project.name(), fo.topic.clone())?; + + framework_object_versions + .current_models + .models + .insert(fo.data_model.name.clone(), fo.clone()); + } + + for (_, fo) in moved_objects { + framework_object_versions + .current_models + .models + .insert(fo.data_model.name.clone(), fo); } + + let sdk_location = generate_ts_sdk( + project.clone(), + &framework_object_versions.current_models.typescript_objects, + )?; + let package_manager = package_managers::PackageManager::Npm; + package_managers::install_packages(&sdk_location, &package_manager)?; + package_managers::run_build(&sdk_location, &package_manager)?; + package_managers::link_sdk(&sdk_location, None, &package_manager)?; + Ok(()) } async fn watch( project: Arc, + framework_object_versions: &mut FrameworkObjectVersions, route_table: &RwLock>, ) -> Result<(), Error> { let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone()); @@ -151,30 +229,49 @@ async fn watch( .watch(project.app_dir().as_ref(), RecursiveMode::Recursive) .map_err(|e| Error::new(ErrorKind::Other, format!("Failed to watch file: {}", e)))?; - for res in rx { + loop { + let res = rx.recv().unwrap(); match res { Ok(event) => { - process_event( - project.clone(), - event.clone(), - route_table, - &configured_client, + let mut events = vec![event]; + loop { + match rx.try_recv() { + Ok(Ok(event)) => events.push(event), + Ok(Err(e)) => { + warn!("File watcher event caused a failure: {}", e); + break; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + warn!("File watcher channel disconnected"); + break; + } + } + } + + with_spinner_async( + &format!("Processing {} events from file watcher", events.len()), + process_events( + project.clone(), + events, + framework_object_versions, + route_table, + &configured_client, + ), ) .await .map_err(|e| { - Error::new(ErrorKind::Other, format!("Processing error occured: {}", e)) + Error::new( + ErrorKind::Other, + format!("Processing error occurred: {}", e), + ) })?; - let route_table_snapshot = { - let read_lock = route_table.read().await; - (*read_lock).clone() - }; - let _ = post_current_state_to_console( project.clone(), &configured_client, &configured_producer, - route_table_snapshot, + framework_object_versions, ) .await; } @@ -186,7 +283,6 @@ async fn watch( } } } - Ok(()) } pub struct FileWatcher; @@ -199,6 +295,7 @@ impl FileWatcher { pub fn start( &self, project: Arc, + framework_object_versions: FrameworkObjectVersions, route_table: &'static RwLock>, ) -> Result<(), Error> { show_message!(MessageType::Info, { @@ -208,9 +305,11 @@ impl FileWatcher { } }); + let mut framework_object_versions = framework_object_versions; + tokio::spawn(async move { - if let Err(error) = watch(project, route_table).await { - debug!("Error: {error:?}"); + if let Err(error) = watch(project, &mut framework_object_versions, route_table).await { + panic!("Watcher error: {error:?}"); } }); diff --git a/apps/framework-cli/src/framework/controller.rs b/apps/framework-cli/src/framework/controller.rs index 7f1c8dc90..df61bf653 100644 --- a/apps/framework-cli/src/framework/controller.rs +++ b/apps/framework-cli/src/framework/controller.rs @@ -15,15 +15,16 @@ use crate::framework::sdks::TypescriptObjects; use crate::framework::typescript::get_typescript_models_dir; use crate::framework::typescript::SendFunction; use crate::infrastructure::olap; -use crate::infrastructure::olap::clickhouse::ClickhouseKafkaTrigger; -use crate::infrastructure::olap::clickhouse::ClickhouseTable; use crate::infrastructure::olap::clickhouse::ConfiguredDBClient; +use crate::infrastructure::olap::clickhouse::{ClickhouseKafkaTrigger, VERSION_SYNC_REGEX}; +use crate::infrastructure::olap::clickhouse::{ClickhouseTable, VersionSync}; use crate::infrastructure::stream; use crate::project::Project; +#[cfg(test)] use crate::utilities::constants::SCHEMAS_DIR; -use super::schema::parse_schema_file; -use super::schema::DataModel; +use super::schema::{is_prisma_file, DataModel}; +use super::schema::{parse_schema_file, DuplicateModelError}; use super::typescript::TypescriptInterface; #[derive(Debug, Clone)] @@ -32,17 +33,53 @@ pub struct FrameworkObject { pub table: ClickhouseTable, pub topic: String, pub ts_interface: TypescriptInterface, + pub original_file_path: PathBuf, } -pub fn framework_object_mapper(s: DataModel) -> FrameworkObject { - let clickhouse_table = olap::clickhouse::mapper::std_table_to_clickhouse_table(s.to_table()); +pub fn framework_object_mapper( + s: DataModel, + original_file_path: &Path, + version: &str, +) -> FrameworkObject { + let clickhouse_table = + olap::clickhouse::mapper::std_table_to_clickhouse_table(s.to_table(version)); FrameworkObject { data_model: s.clone(), table: clickhouse_table, topic: s.name.clone(), ts_interface: framework::typescript::mapper::std_table_to_typescript_interface( - s.to_table(), + s.to_table(version), + s.name.as_str(), ), + original_file_path: original_file_path.to_path_buf(), + } +} + +#[derive(Debug, Clone)] +pub struct SchemaVersion { + pub base_path: PathBuf, + pub models: HashMap, + pub typescript_objects: HashMap, +} + +#[derive(Debug, Clone)] +pub struct FrameworkObjectVersions { + pub current_version: String, + pub current_models: SchemaVersion, + pub previous_version_models: HashMap, +} + +impl FrameworkObjectVersions { + pub fn new(current_version: String, current_schema_directory: PathBuf) -> Self { + FrameworkObjectVersions { + current_version, + current_models: SchemaVersion { + base_path: current_schema_directory, + models: HashMap::new(), + typescript_objects: HashMap::new(), + }, + previous_version_models: HashMap::new(), + } } } @@ -53,49 +90,131 @@ pub struct RouteMeta { pub view_name: Option, } +pub fn get_all_version_syncs( + project: &Project, + framework_object_versions: &FrameworkObjectVersions, +) -> anyhow::Result> { + let flows_dir = project.flows_dir(); + let mut version_syncs = vec![]; + for entry in std::fs::read_dir(flows_dir)? { + let entry = entry?; + let path = entry.path(); + if !path.is_dir() { + if let Some(captures) = + VERSION_SYNC_REGEX.captures(entry.file_name().to_string_lossy().as_ref()) + { + let from_table_name = captures.get(1).unwrap().as_str(); + let to_table_name = captures.get(4).map_or(from_table_name, |m| m.as_str()); + + let from_version = captures.get(2).unwrap().as_str().replace('_', "."); + let to_version = captures.get(5).unwrap().as_str().replace('_', "."); + + let from_version_models = framework_object_versions + .previous_version_models + .get(&from_version); + let to_version_models = if to_version == framework_object_versions.current_version { + Some(&framework_object_versions.current_models) + } else { + framework_object_versions + .previous_version_models + .get(&to_version) + }; + + match (from_version_models, to_version_models) { + (Some(from_version_models), Some(to_version_models)) => { + let from_table = from_version_models.models.get(from_table_name); + let to_table = to_version_models.models.get(to_table_name); + match (from_table, to_table) { + (Some(from_table), Some(to_table)) => { + let version_sync = VersionSync { + db_name: from_table.table.db_name.clone(), + model_name: from_table.data_model.name.clone(), + source_version: from_version.clone(), + source_table: from_table.table.clone(), + dest_version: to_version.clone(), + dest_table: to_table.table.clone(), + migration_function: std::fs::read_to_string(&path)?, + }; + version_syncs.push(version_sync); + } + _ => { + return Err(anyhow::anyhow!( + "Failed to find tables in versions {:?}", + path.file_name() + )); + } + } + } + _ => { + debug!( + "Version unavailable for version sync {:?}. from: {:?} to: {:?}", + path.file_name(), + from_version_models, + to_version_models, + ); + } + } + }; + + debug!("Processing version sync: {:?}.", path); + } + } + Ok(version_syncs) +} + pub fn get_all_framework_objects( - framework_objects: &mut Vec, + framework_objects: &mut HashMap, schema_dir: &Path, -) -> Result, Error> { + version: &str, +) -> anyhow::Result<()> { if schema_dir.is_dir() { for entry in std::fs::read_dir(schema_dir)? { let entry = entry?; let path = entry.path(); if path.is_dir() { debug!("Processing directory: {:?}", path); - get_all_framework_objects(framework_objects, &path)?; - } else { + get_all_framework_objects(framework_objects, &path, version)?; + } else if is_prisma_file(&path) { debug!("Processing file: {:?}", path); - let mut objects = get_framework_objects_from_schema_file(&path)?; - framework_objects.append(&mut objects) + let objects = get_framework_objects_from_schema_file(&path, version)?; + for fo in objects { + DuplicateModelError::try_insert(framework_objects, fo, &path)?; + } } } } - Ok(framework_objects.to_vec()) + Ok(()) } -pub fn get_framework_objects_from_schema_file(path: &Path) -> Result, Error> { - let framework_objects = parse_schema_file::(path, framework_object_mapper) - .map_err(|e| { - Error::new( - ErrorKind::Other, - format!("Failed to parse schema file. Error {}", e), - ) - })?; +pub fn get_framework_objects_from_schema_file( + path: &Path, + version: &str, +) -> Result, Error> { + let framework_objects = + parse_schema_file::(path, version, framework_object_mapper).map_err( + |e| { + Error::new( + ErrorKind::Other, + format!("Failed to parse schema file. Error {}", e), + ) + }, + )?; Ok(framework_objects) } +pub async fn drop_kafka_trigger( + view: &ClickhouseKafkaTrigger, + configured_client: &ConfiguredDBClient, +) -> anyhow::Result<()> { + let drop_view_query = view.drop_materialized_view_query()?; + olap::clickhouse::run_query(&drop_view_query, configured_client).await?; + Ok(()) +} + pub(crate) async fn create_or_replace_kafka_trigger( - fo: &FrameworkObject, - view_name: String, + view: &ClickhouseKafkaTrigger, configured_client: &ConfiguredDBClient, -) -> Result<(), Error> { - let view = ClickhouseKafkaTrigger::new( - fo.table.db_name.clone(), - view_name, - fo.table.kafka_table_name(), - fo.table.name.clone(), - ); +) -> anyhow::Result<()> { let create_view_query = view.create_materialized_view_query().map_err(|e| { Error::new( ErrorKind::Other, @@ -103,21 +222,8 @@ pub(crate) async fn create_or_replace_kafka_trigger( ) })?; - // Clickhouse doesn't support dropping a view if it doesn't exist so we need to drop it first in case the schema has changed - let drop_view_query = view.drop_materialized_view_query().map_err(|e| { - Error::new( - ErrorKind::Other, - format!("Failed to get clickhouse query: {:?}", e), - ) - })?; - olap::clickhouse::run_query(&drop_view_query, configured_client) - .await - .map_err(|e| { - Error::new( - ErrorKind::Other, - format!("Failed to drop view in clickhouse: {}", e), - ) - })?; + // Clickhouse doesn't support dropping a view if it doesn't exist, so we need to drop it first in case the schema has changed + drop_kafka_trigger(view, configured_client).await?; olap::clickhouse::run_query(&create_view_query, configured_client) .await .map_err(|e| { @@ -129,16 +235,43 @@ pub(crate) async fn create_or_replace_kafka_trigger( Ok(()) } -pub(crate) async fn create_or_replace_tables( - project_name: &str, +pub async fn create_or_replace_version_sync( + version_sync: VersionSync, + configured_client: &ConfiguredDBClient, +) -> anyhow::Result<()> { + let drop_function_query = version_sync.drop_function_query(); + let drop_trigger_query = version_sync.drop_trigger_query(); + let create_function_query = version_sync.create_function_query(); + let create_trigger_query = version_sync.create_trigger_query(); + + olap::clickhouse::run_query(&drop_function_query, configured_client).await?; + olap::clickhouse::run_query(&drop_trigger_query, configured_client).await?; + olap::clickhouse::run_query(&create_function_query, configured_client).await?; + olap::clickhouse::run_query(&create_trigger_query, configured_client).await?; + Ok(()) +} + +pub(crate) async fn drop_tables( fo: &FrameworkObject, configured_client: &ConfiguredDBClient, ) -> anyhow::Result<()> { - info!("Creating table: {:?}", fo.table.name); + info!("Dropping tables for: {:?}", fo.table.name); let drop_data_table_query = fo.table.drop_kafka_table_query()?; let drop_kafka_table_query = fo.table.drop_data_table_query()?; + olap::clickhouse::run_query(&drop_data_table_query, configured_client).await?; + olap::clickhouse::run_query(&drop_kafka_table_query, configured_client).await?; + Ok(()) +} + +pub(crate) async fn create_or_replace_tables( + project_name: &str, + fo: &FrameworkObject, + configured_client: &ConfiguredDBClient, +) -> anyhow::Result<()> { + info!("Creating table: {:?}", fo.table.name); + let create_data_table_query = fo.table.create_data_table_query()?; let create_kafka_table_query = fo.table.create_kafka_table_query(project_name)?; @@ -151,9 +284,9 @@ pub(crate) async fn create_or_replace_tables( ) })?; - // Clickhouse doesn't support dropping a view if it doesn't exist so we need to drop it first in case the schema has changed - olap::clickhouse::run_query(&drop_data_table_query, configured_client).await?; - olap::clickhouse::run_query(&drop_kafka_table_query, configured_client).await?; + // Clickhouse doesn't support dropping a view if it doesn't exist, so we need to drop it first in case the schema has changed + drop_tables(fo, configured_client).await?; + olap::clickhouse::run_query(&create_data_table_query, configured_client).await?; olap::clickhouse::run_query(&create_kafka_table_query, configured_client).await?; Ok(()) @@ -228,7 +361,7 @@ pub async fn remove_table_and_topics_from_schema_file_path( for (k, meta) in route_table.clone().into_iter() { if meta.original_file_path == schema_file_path { - stream::redpanda::delete_topic(project_name, meta.table_name.clone())?; + stream::redpanda::delete_topic(project_name, &meta.table_name)?; olap::clickhouse::delete_table_or_view(meta.table_name, configured_client) .await @@ -256,54 +389,61 @@ pub async fn remove_table_and_topics_from_schema_file_path( Ok(()) } -fn schema_file_path_to_ingest_route(app_dir: PathBuf, path: &Path, table_name: String) -> PathBuf { - let data_model_path = app_dir.join(SCHEMAS_DIR); - debug!("got data model path: {:?}", data_model_path); +pub fn schema_file_path_to_ingest_route( + base_path: &Path, + path: &Path, + data_model_name: String, + version: &str, +) -> PathBuf { + debug!("got data model path: {:?}", base_path); debug!("processing schema file into route: {:?}", path); - let mut route = path.strip_prefix(data_model_path).unwrap().to_path_buf(); - route.set_file_name(table_name); + // E.g. `model Foo` in `app/datamodels/inner/bar.prisma will have route + // `ingest/inner/Foo/latest` + let mut route = path.strip_prefix(base_path).unwrap().to_path_buf(); + route.set_file_name(data_model_name); debug!("route: {:?}", route); - PathBuf::from("ingest").join(route) + PathBuf::from("ingest").join(route).join(version) } pub async fn process_objects( - framework_objects: Vec, + framework_objects: &HashMap, project: Arc, - schema_file_path: &Path, + schema_dir: &Path, configured_client: &ConfiguredDBClient, - compilable_objects: &mut Vec, // Objects that require compilation after processing + compilable_objects: &mut HashMap, // Objects that require compilation after processing route_table: &mut HashMap, + version: &str, ) -> anyhow::Result<()> { - let app_dir = project.clone().app_dir(); - for fo in framework_objects { + for (name, fo) in framework_objects.iter() { let ingest_route = schema_file_path_to_ingest_route( - app_dir.clone(), - schema_file_path, - fo.table.name.clone(), + schema_dir, + &fo.original_file_path, + fo.data_model.name.clone(), + version, ); stream::redpanda::create_topic_from_name(&project.name(), fo.topic.clone())?; debug!("Creating table & view: {:?}", fo.table.name); - let view_name = format!("{}_trigger", fo.table.name); + create_or_replace_tables(&project.name(), fo, configured_client).await?; - create_or_replace_tables(&project.name(), &fo, configured_client).await?; - create_or_replace_kafka_trigger(&fo, view_name.clone(), configured_client).await?; + let view = ClickhouseKafkaTrigger::from_clickhouse_table(&fo.table); + create_or_replace_kafka_trigger(&view, configured_client).await?; debug!("Table created: {:?}", fo.table.name); - let typescript_objects = create_language_objects(&fo, &ingest_route, project.clone())?; - compilable_objects.push(typescript_objects); + let typescript_objects = create_language_objects(fo, &ingest_route, project.clone())?; + compilable_objects.insert(name.clone(), typescript_objects); route_table.insert( ingest_route, RouteMeta { - original_file_path: schema_file_path.to_path_buf(), + original_file_path: fo.original_file_path.clone(), table_name: fo.table.name.clone(), - view_name: Some(view_name), + view_name: Some(fo.table.view_name()), }, ); } @@ -319,9 +459,9 @@ mod tests { let schema_dir = PathBuf::from(manifest_location) .join("tests/test_project") .join(SCHEMAS_DIR); - let mut framework_objects = vec![]; - let result = get_all_framework_objects(&mut framework_objects, &schema_dir); + let mut framework_objects = HashMap::new(); + let result = get_all_framework_objects(&mut framework_objects, &schema_dir, "0.0"); assert!(result.is_ok()); - assert!(framework_objects.len() == 2); + assert_eq!(framework_objects.len(), 2); } } diff --git a/apps/framework-cli/src/framework/schema.rs b/apps/framework-cli/src/framework/schema.rs index 007de2424..aa535352d 100644 --- a/apps/framework-cli/src/framework/schema.rs +++ b/apps/framework-cli/src/framework/schema.rs @@ -14,36 +14,24 @@ //! //! We only implemented part of the prisma schema parsing. We only support models and fields. We don't support enums, relations, or anything else for the moment -pub mod templates; - +use std::collections::HashMap; use std::fmt::{Display, Formatter}; -use std::sync::Arc; use std::{ - collections::HashMap, fmt, path::{Path, PathBuf}, }; +use crate::framework::controller::FrameworkObject; use diagnostics::Diagnostics; - -use log::debug; use schema_ast::{ - ast::{Attribute, Field, SchemaAst, Top, WithAttributes, WithName}, + ast::{Attribute, Field, SchemaAst, Top, WithName}, parse_schema, }; use serde::Serialize; -use crate::{ - framework::{ - controller::{get_framework_objects_from_schema_file, process_objects}, - sdks::{generate_ts_sdk, TypescriptObjects}, - }, - infrastructure::olap::clickhouse::{mapper::arity_mapper, ConfiguredDBClient}, - project::Project, - utilities::package_managers, -}; +use crate::infrastructure::olap::clickhouse::mapper::arity_mapper; -use super::controller::RouteMeta; +pub mod templates; #[derive(Debug, Clone)] pub enum ParsingError { @@ -65,13 +53,54 @@ impl Display for UnsupportedDataTypeError { impl std::error::Error for UnsupportedDataTypeError {} -/// A function that maps an input type to an output type -type MapperFunc = fn(i: I) -> O; +#[derive(Debug, Clone)] +pub struct DuplicateModelError { + pub model_name: String, + pub file_path: PathBuf, + pub other_file_path: PathBuf, +} + +impl DuplicateModelError { + pub fn try_insert( + map: &mut HashMap, + fo: FrameworkObject, + current_path: &Path, + ) -> Result<(), Self> { + let maybe_existing = map.insert(fo.data_model.name.clone(), fo); + match maybe_existing { + None => Ok(()), + Some(other_fo) => Err(DuplicateModelError { + model_name: other_fo.data_model.name, + file_path: current_path.to_path_buf(), + other_file_path: other_fo.original_file_path, + }), + } + } +} + +impl Display for DuplicateModelError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "Duplicate model {} in files: {}, {}", + self.model_name, + self.file_path.display(), + self.other_file_path.display() + ) + } +} + +impl std::error::Error for DuplicateModelError {} + +pub fn is_prisma_file(path: &Path) -> bool { + path.extension().map(|e| e == "prisma").unwrap_or(false) +} // TODO: Make the parse schema file a variable and pass it into the function pub fn parse_schema_file( path: &Path, - mapper: MapperFunc, + version: &str, + mapper: fn(DataModel, path: &Path, version: &str) -> O, ) -> Result, ParsingError> { let schema_file = std::fs::read_to_string(path).map_err(|_| ParsingError::FileNotFound { path: path.to_path_buf(), @@ -81,41 +110,25 @@ pub fn parse_schema_file( let ast = parse_schema(&schema_file, &mut diagnostics); - Ok(ast_mapper(ast)?.into_iter().map(mapper).collect()) + Ok(ast_mapper(ast)? + .into_iter() + .map(|data_model| mapper(data_model, path, version)) + .collect()) } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Eq, PartialEq)] pub struct DataModel { pub db_name: String, pub columns: Vec, pub name: String, - pub version: i8, } impl DataModel { - pub fn new(db_name: String, columns: Vec, name: String, version: i8) -> DataModel { - DataModel { - db_name, - columns, - name, - version, - } - } - - pub fn to_table(&self) -> Table { + pub fn to_table(&self, version: &str) -> Table { Table { db_name: self.db_name.clone(), table_type: TableType::Table, - name: self.name.clone(), - columns: self.columns.clone(), - } - } - - pub fn to_view(&self) -> Table { - Table { - db_name: self.db_name.clone(), - table_type: TableType::View, - name: format!("{}_view", self.name), + name: format!("{}_{}", self.name, version.replace('.', "_")), columns: self.columns.clone(), } } @@ -167,7 +180,7 @@ impl FieldArity { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Eq, PartialEq)] pub struct Column { pub name: String, pub data_type: ColumnType, @@ -177,7 +190,7 @@ pub struct Column { pub default: Option, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Eq, PartialEq)] pub enum ColumnDefaults { AutoIncrement, CUID, @@ -191,7 +204,7 @@ impl fmt::Display for ParsingError { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Eq, PartialEq)] pub enum ColumnType { String, Boolean, @@ -284,28 +297,6 @@ fn top_to_schema(t: &Top) -> Result { match t { Top::Model(m) => { let schema_name = m.name().to_string(); - let mut version = 1; - - let attributes = m.attributes(); - - // Get the value of the version attribute in the ugliest way possible - let version_attribute = attributes.iter().find(|a| a.name() == "version"); - - if let Some(attribute) = version_attribute { - version = attribute - .arguments - .arguments - .first() - .map(|arg| { - arg.value - .as_numeric_value() - .unwrap() - .0 - .parse::() - .unwrap() - }) - .unwrap_or(version); - } let columns: Result, ParsingError> = m.iter_fields().map(|(_id, f)| field_to_column(f)).collect(); @@ -314,7 +305,6 @@ fn top_to_schema(t: &Top) -> Result { db_name: "local".to_string(), columns: columns?, name: schema_name, - version, }) } _ => Err(ParsingError::UnsupportedDataTypeError { @@ -328,30 +318,3 @@ pub fn ast_mapper(ast: SchemaAst) -> Result, ParsingError> { .map(|(_id, t)| top_to_schema(t)) .collect::, ParsingError>>() } - -pub async fn process_schema_file( - schema_file_path: &Path, - project: Arc, - configured_client: &ConfiguredDBClient, - route_table: &mut HashMap, -) -> anyhow::Result<()> { - let framework_objects = get_framework_objects_from_schema_file(schema_file_path)?; - let mut compilable_objects: Vec = Vec::new(); - process_objects( - framework_objects, - project.clone(), - schema_file_path, - configured_client, - &mut compilable_objects, - route_table, - ) - .await?; - debug!("All objects created, generating sdk..."); - let sdk_location = generate_ts_sdk(project, compilable_objects)?; - - let package_manager = package_managers::PackageManager::Npm; - package_managers::install_packages(&sdk_location, &package_manager)?; - package_managers::run_build(&sdk_location, &package_manager)?; - package_managers::link_sdk(&sdk_location, None, &package_manager)?; - Ok(()) -} diff --git a/apps/framework-cli/src/framework/sdks.rs b/apps/framework-cli/src/framework/sdks.rs index 9fe0a7bdd..478210d26 100644 --- a/apps/framework-cli/src/framework/sdks.rs +++ b/apps/framework-cli/src/framework/sdks.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use std::{fs::File, io::Write, path::PathBuf}; @@ -57,7 +58,7 @@ fn write_config_to_file(path: PathBuf, code: String) -> Result<(), std::io::Erro pub fn generate_ts_sdk( project: Arc, - ts_objects: Vec, + ts_objects: &HashMap, ) -> Result { //! Generates a Typescript SDK for the given project and returns the path where the SDK was generated. //! @@ -74,10 +75,13 @@ pub fn generate_ts_sdk( let package = TypescriptPackage::from_project(project); let package_json_code = PackageJsonTemplate::build(&package); let ts_config_code = TsConfigTemplate::build(); - let index_code = IndexTemplate::build(&ts_objects); + let index_code = IndexTemplate::build(ts_objects); // This needs to write to the root of the NPM folder... creating in the current project location for now let sdk_dir = internal_dir.join(package.name); + + std::fs::remove_dir_all(sdk_dir.clone())?; + std::fs::create_dir_all(sdk_dir.clone())?; write_config_to_file(sdk_dir.join("package.json"), package_json_code)?; @@ -89,7 +93,7 @@ pub fn generate_ts_sdk( index_code, )?; - for obj in ts_objects { + for obj in ts_objects.values() { let interface_code = obj.interface.create_code().map_err(|err| { std::io::Error::new( std::io::ErrorKind::Other, diff --git a/apps/framework-cli/src/framework/typescript/mapper.rs b/apps/framework-cli/src/framework/typescript/mapper.rs index d0e13e1c5..a199d78bb 100644 --- a/apps/framework-cli/src/framework/typescript/mapper.rs +++ b/apps/framework-cli/src/framework/typescript/mapper.rs @@ -15,7 +15,7 @@ pub fn std_field_type_to_typescript_field_mapper(field_type: ColumnType) -> Inte } } -pub fn std_table_to_typescript_interface(table: Table) -> TypescriptInterface { +pub fn std_table_to_typescript_interface(table: Table, model_name: &str) -> TypescriptInterface { let fields = table .columns .into_iter() @@ -39,7 +39,7 @@ pub fn std_table_to_typescript_interface(table: Table) -> TypescriptInterface { .collect::>(); TypescriptInterface { - name: table.name, + name: model_name.to_string(), fields, } } diff --git a/apps/framework-cli/src/framework/typescript/templates.rs b/apps/framework-cli/src/framework/typescript/templates.rs index a7ef525aa..8caf0f9ac 100644 --- a/apps/framework-cli/src/framework/typescript/templates.rs +++ b/apps/framework-cli/src/framework/typescript/templates.rs @@ -1,4 +1,5 @@ use serde::Serialize; +use std::collections::HashMap; use tinytemplate::TinyTemplate; use crate::framework::sdks::TypescriptObjects; @@ -156,10 +157,10 @@ struct IndexContext { ts_objects: Vec, } impl IndexContext { - fn new(ts_objects: &[TypescriptObjects]) -> IndexContext { + fn new(ts_objects: &HashMap) -> IndexContext { IndexContext { ts_objects: ts_objects - .iter() + .values() .map(TypescriptObjectsContext::new) .collect::>(), } @@ -168,7 +169,7 @@ impl IndexContext { pub struct IndexTemplate; impl IndexTemplate { - pub fn build(ts_objects: &[TypescriptObjects]) -> String { + pub fn build(ts_objects: &HashMap) -> String { let mut tt = TinyTemplate::new(); tt.add_template("index", INDEX_TEMPLATE).unwrap(); let context = IndexContext::new(ts_objects); diff --git a/apps/framework-cli/src/infrastructure/console.rs b/apps/framework-cli/src/infrastructure/console.rs index 6604fe5a3..00af900ed 100644 --- a/apps/framework-cli/src/infrastructure/console.rs +++ b/apps/framework-cli/src/infrastructure/console.rs @@ -1,12 +1,5 @@ -use crate::framework::controller::get_all_framework_objects; -use crate::framework::controller::FrameworkObject; -use crate::framework::controller::RouteMeta; -use crate::framework::schema::DataModel; -use crate::infrastructure::olap; -use crate::infrastructure::olap::clickhouse::ConfiguredDBClient; -use crate::infrastructure::stream::redpanda; -use crate::infrastructure::stream::redpanda::ConfiguredProducer; -use crate::project::Project; +use std::str; + use http_body_util::BodyExt; use http_body_util::Full; use hyper::body::Bytes; @@ -16,13 +9,17 @@ use hyper_util::rt::TokioIo; use log::debug; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::collections::HashMap; -use std::path::PathBuf; use std::sync::Arc; use tokio::net::TcpStream; -use std::str; +use crate::framework::controller::{schema_file_path_to_ingest_route, FrameworkObjectVersions}; +use crate::framework::schema::DataModel; +use crate::infrastructure::olap; +use crate::infrastructure::olap::clickhouse::ConfiguredDBClient; +use crate::infrastructure::stream::redpanda; +use crate::infrastructure::stream::redpanda::ConfiguredProducer; +use crate::project::Project; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct ConsoleConfig { @@ -39,14 +36,12 @@ pub async fn post_current_state_to_console( project: Arc, configured_db_client: &ConfiguredDBClient, configured_producer: &ConfiguredProducer, - route_table: HashMap, + framework_object_versions: &FrameworkObjectVersions, ) -> Result<(), anyhow::Error> { - let schema_dir = project.schemas_dir(); - let mut framework_objects: Vec = Vec::new(); - get_all_framework_objects(&mut framework_objects, &schema_dir)?; - - let models: Vec = framework_objects - .iter() + let models: Vec = framework_object_versions + .current_models + .models + .values() .map(|fo| fo.data_model.clone()) .collect(); @@ -60,14 +55,26 @@ pub async fn post_current_state_to_console( .await .unwrap(); - let routes_table: Vec = route_table - .iter() - .map(|(k, v)| { + // TODO: old versions of the models are not being sent to the console + let routes_table: Vec = framework_object_versions + .current_models + .models + .values() + .map(|fo| { + let route_path = schema_file_path_to_ingest_route( + &framework_object_versions.current_models.base_path, + &fo.original_file_path, + fo.data_model.name.clone(), + &framework_object_versions.current_version, + ) + .to_string_lossy() + .to_string(); + RouteInfo::new( - k.to_str().unwrap().to_string(), - v.original_file_path.to_str().unwrap().to_string(), - v.table_name.clone(), - v.view_name.clone(), + route_path, + fo.original_file_path.to_str().unwrap().to_string(), + fo.table.name.clone(), + Some(fo.table.view_name()), ) }) .collect(); diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse.rs index d8d43d794..c267ac76c 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse.rs @@ -5,11 +5,14 @@ mod queries; use std::fmt::{self}; use clickhouse::Client; +use lazy_static::lazy_static; use log::debug; - +use regex::Regex; use serde::{Deserialize, Serialize}; use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine; +use crate::infrastructure::olap::clickhouse::queries::CreateVersionSyncTriggerQuery; + use crate::{ framework::schema::{FieldArity, UnsupportedDataTypeError}, utilities::constants::REDPANDA_CONTAINER_NAME, @@ -157,12 +160,13 @@ impl ClickhouseTable { table_type, } } -} -impl ClickhouseTable { pub fn kafka_table_name(&self) -> String { format!("{}_kafka", self.name) } + pub fn view_name(&self) -> String { + format!("{}_trigger", self.name) + } fn kafka_table(&self) -> ClickhouseTable { ClickhouseTable { @@ -217,6 +221,76 @@ impl ClickhouseKafkaTrigger { dest_table_name, } } + + pub fn from_clickhouse_table(table: &ClickhouseTable) -> ClickhouseKafkaTrigger { + ClickhouseKafkaTrigger { + db_name: table.db_name.clone(), + name: table.view_name(), + source_table_name: table.kafka_table_name(), + dest_table_name: table.name.clone(), + } + } +} + +#[derive(Debug, Clone)] +pub struct VersionSync { + pub db_name: String, + pub model_name: String, + pub source_version: String, + pub source_table: ClickhouseTable, + pub dest_version: String, + pub dest_table: ClickhouseTable, + pub migration_function: String, +} + +lazy_static! { + pub static ref VERSION_SYNC_REGEX: Regex = + // source_model_name source target_model_name dest_version + Regex::new(r"^([a-zA-Z0-9_]+)_migrate__([0-9_]+)__(([a-zA-Z0-9_]+)__)?([0-9_]+).sql$") + .unwrap(); +} + +impl VersionSync { + fn migration_function_name(&self) -> String { + format!( + "{}_migrate__{}__{}", + self.model_name, + self.source_version.replace('.', "_"), + self.dest_version.replace('.', "_"), + ) + } + + fn migration_trigger_name(&self) -> String { + format!( + "{}_trigger__{}__{}", + self.model_name, + self.source_version.replace('.', "_"), + self.dest_version.replace('.', "_"), + ) + } + + pub fn create_function_query(&self) -> String { + format!( + "CREATE FUNCTION {} AS {}", + self.migration_function_name(), + self.migration_function + ) + } + pub fn drop_function_query(&self) -> String { + format!("DROP FUNCTION IF EXISTS {}", self.migration_function_name()) + } + + pub fn create_trigger_query(self) -> String { + CreateVersionSyncTriggerQuery::build(self) + } + + pub fn drop_trigger_query(&self) -> String { + format!( + "DROP VIEW IF EXISTS {}.{}", + self.db_name, + self.migration_trigger_name() + ) + } } pub type QueryString = String; diff --git a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs index 0adb85eaa..41dc1e915 100644 --- a/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs +++ b/apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs @@ -8,7 +8,7 @@ use crate::{ }, }; -use super::ClickhouseKafkaTrigger; +use super::{ClickhouseKafkaTrigger, QueryString, VersionSync}; // TODO: Add column comment capability to the schema and template static CREATE_TABLE_TEMPLATE: &str = r#" @@ -31,6 +31,18 @@ SETTINGS stream_like_engine_allow_direct_select = 1; "#; +static CREATE_VERSION_SYNC_TRIGGER_TEMPLATE: &str = r#" +CREATE MATERIALIZED VIEW IF NOT EXISTS {db_name}.{view_name} TO {db_name}.{dest_table_name} +AS +SELECT +{{for field in to_fields}} moose_migrate_tuple.({@index} + 1) AS {field}{{- if @last }}{{ else }}, {{ endif }} +{{endfor}} +FROM (select {migration_function_name}( +{{for field in from_fields}}{field}{{- if @last }}{{ else }}, {{ endif }} +{{endfor}} +) as moose_migrate_tuple FROM {db_name}.{source_table_name}) +"#; + pub struct CreateTableQuery; static KAFKA_SETTINGS: &str = @@ -195,6 +207,57 @@ impl DropMaterializedViewQuery { } } +pub struct CreateVersionSyncTriggerQuery; +impl CreateVersionSyncTriggerQuery { + pub fn build(view: VersionSync) -> QueryString { + let mut tt = TinyTemplate::new(); + tt.add_template( + "create_version_sync_trigger", + CREATE_VERSION_SYNC_TRIGGER_TEMPLATE, + ) + .unwrap(); + let context = CreateVersionSyncTriggerContext::new(view); + tt.render("create_version_sync_trigger", &context).unwrap() + } +} + +#[derive(Serialize)] +struct CreateVersionSyncTriggerContext { + db_name: String, + view_name: String, + migration_function_name: String, + source_table_name: String, + dest_table_name: String, + from_fields: Vec, + to_fields: Vec, +} + +impl CreateVersionSyncTriggerContext { + pub fn new(version_sync: VersionSync) -> CreateVersionSyncTriggerContext { + let trigger_name = version_sync.migration_trigger_name(); + let migration_function_name = version_sync.migration_function_name(); + CreateVersionSyncTriggerContext { + db_name: version_sync.db_name, + view_name: trigger_name, + migration_function_name, + source_table_name: version_sync.source_table.name, + dest_table_name: version_sync.dest_table.name, + from_fields: version_sync + .source_table + .columns + .into_iter() + .map(|column| column.name) + .collect(), + to_fields: version_sync + .dest_table + .columns + .into_iter() + .map(|column| column.name) + .collect(), + } + } +} + #[derive(Serialize)] struct DropMaterializedViewContext { db_name: String, diff --git a/apps/framework-cli/src/infrastructure/stream/redpanda.rs b/apps/framework-cli/src/infrastructure/stream/redpanda.rs index 4a3db1a5a..d19364342 100644 --- a/apps/framework-cli/src/infrastructure/stream/redpanda.rs +++ b/apps/framework-cli/src/infrastructure/stream/redpanda.rs @@ -22,7 +22,7 @@ pub fn create_topic_from_name(project_name: &str, topic_name: String) -> std::io } // Deletes a topic from a file name -pub fn delete_topic(project_name: &str, topic_name: String) -> std::io::Result { +pub fn delete_topic(project_name: &str, topic_name: &str) -> std::io::Result { info!("Deleting topic: {}", topic_name); let valid_topic_name = topic_name.to_lowercase(); docker::run_rpk_command( diff --git a/apps/framework-cli/src/project.rs b/apps/framework-cli/src/project.rs index 85ca8e1db..6d6c58370 100644 --- a/apps/framework-cli/src/project.rs +++ b/apps/framework-cli/src/project.rs @@ -32,8 +32,8 @@ use crate::infrastructure::olap::clickhouse::config::ClickhouseConfig; use crate::infrastructure::stream::redpanda::RedpandaConfig; use crate::project::typescript_project::TypescriptProject; -use crate::utilities::constants::PROJECT_CONFIG_FILE; use crate::utilities::constants::{APP_DIR, APP_DIR_LAYOUT, CLI_PROJECT_INTERNAL_DIR, SCHEMAS_DIR}; +use crate::utilities::constants::{FLOWS_DIR, PROJECT_CONFIG_FILE}; // We have explored using a Generic associated Types as well as // Dynamic Dispatch to handle the different types of projects @@ -176,6 +176,14 @@ impl Project { schemas_dir } + pub fn flows_dir(&self) -> PathBuf { + let mut flows_dir = self.app_dir(); + flows_dir.push(FLOWS_DIR); + + debug!("Flows dir: {:?}", flows_dir); + flows_dir + } + // This is a Result of io::Error because the caller // can be returning a Result of io::Error or a Routine Failure pub fn internal_dir(&self) -> std::io::Result { @@ -202,4 +210,10 @@ impl Project { Ok(internal_dir) } + + pub fn version(&self) -> &str { + match &self.language_project_config { + LanguageProjectConfig::Typescript(package_json) => &package_json.version, + } + } } diff --git a/apps/framework-cli/src/utilities/constants.rs b/apps/framework-cli/src/utilities/constants.rs index e72253955..cc8effde5 100644 --- a/apps/framework-cli/src/utilities/constants.rs +++ b/apps/framework-cli/src/utilities/constants.rs @@ -8,6 +8,7 @@ pub const CLI_USER_DIRECTORY: &str = ".moose"; pub const CLI_PROJECT_INTERNAL_DIR: &str = ".moose"; pub const SCHEMAS_DIR: &str = "datamodels"; +pub const FLOWS_DIR: &str = "flows"; pub const CLICKHOUSE_CONTAINER_NAME: &str = "clickhousedb-1"; pub const CONSOLE_CONTAINER_NAME: &str = "console-1"; @@ -18,7 +19,7 @@ pub const REDPANDA_HOSTS: [&str; 2] = ["redpanda", "localhost"]; pub const APP_DIR: &str = "app"; pub const APP_DIR_LAYOUT: [&str; 5] = [ SCHEMAS_DIR, - "flows", + FLOWS_DIR, "insights", "insights/charts", "insights/metrics", diff --git a/apps/moose-console/src/app/infrastructure/databases/[databaseName]/tables/[tableId]/table-tabs.tsx b/apps/moose-console/src/app/infrastructure/databases/[databaseName]/tables/[tableId]/table-tabs.tsx index 5dcfdd7b5..6bf7aff49 100644 --- a/apps/moose-console/src/app/infrastructure/databases/[databaseName]/tables/[tableId]/table-tabs.tsx +++ b/apps/moose-console/src/app/infrastructure/databases/[databaseName]/tables/[tableId]/table-tabs.tsx @@ -70,7 +70,9 @@ export default function TableTabs({ const model = getModelFromTable(table, cliData); const infra = getRelatedInfra(model, cliData, table); const triggerTable = infra.tables.find( - (t) => t.name.includes(model.name) && t.engine === "MaterializedView", + (t) => + t.name === table.name.replace(/(_kafka)?$/, "_trigger") && + t.engine === "MaterializedView", ); const createTabQueryString = useCallback( diff --git a/apps/moose-console/src/lib/snippets.ts b/apps/moose-console/src/lib/snippets.ts index 5b91ca24d..039841950 100644 --- a/apps/moose-console/src/lib/snippets.ts +++ b/apps/moose-console/src/lib/snippets.ts @@ -28,7 +28,7 @@ export const jsSnippet = (data: CliData, model: DataModel) => { return `\ fetch('http://${data.project && data.project.http_server_config.host}:${data.project.http_server_config.port}/${ingestionPoint.route_path}', { -method: 'POST', + method: 'POST', headers: { 'Content-Type': 'application/json' }, diff --git a/apps/moose-console/src/lib/utils.ts b/apps/moose-console/src/lib/utils.ts index 225e88af4..d95f5c686 100644 --- a/apps/moose-console/src/lib/utils.ts +++ b/apps/moose-console/src/lib/utils.ts @@ -16,8 +16,10 @@ export function getIngestionPointFromModel( } export function getModelFromRoute(route: Route, cliData: CliData): DataModel { - const routeTail = route.route_path.split("/").at(-1); - return cliData.models.find((model) => model.name === routeTail); + const routeTail = route.route_path.split("/").at(-2); // -1 is now version number + const found = cliData.models.find((model) => model.name === routeTail); + if (found === undefined) throw new Error(`Model ${routeTail} not found`); + return found; } export function tableIsIngestionTable(table: Table): boolean { @@ -34,12 +36,13 @@ export function getQueueFromRoute(route: Route, cliData: CliData): string { } export function getModelFromTable(table: Table, cliData: CliData): DataModel { - if (table.engine === "MaterializedView") { - const parsedViewName = table.name.split("_").at(0); - return cliData.models.find((model) => model.name === parsedViewName); - } + // TODO: this breaks if the model name includes underscore(s) + // maybe include more information in `CliData`, so we don't have to lookup by name + const table_name = table.name.split("_").at(0); - return cliData.models.find((model) => model.name === table.name); + const result = cliData.models.find((model) => model.name === table_name); + if (result === undefined) throw new Error(`Model ${table_name} not found`); + return result; } export function getRelatedInfra(