Skip to content

Commit

Permalink
use explicit data table instead of inner table in materialized view (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
phiSgr authored Feb 12, 2024
1 parent a5e7102 commit 2196d9b
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 117 deletions.
8 changes: 4 additions & 4 deletions apps/framework-cli/src/cli/routines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl RoutineController {
}

// Starts the file watcher and the webserver
pub async fn start_development_mode(project: &Project) -> Result<(), Error> {
pub async fn start_development_mode(project: &Project) -> anyhow::Result<()> {
show_message!(
MessageType::Success,
Message {
Expand All @@ -231,11 +231,11 @@ pub async fn start_development_mode(project: &Project) -> Result<(), Error> {
}
);

// TODO: Explore using a RWLock instead of a Mutex to ensure concurrent reads without locks
let mut route_table = HashMap::<PathBuf, RouteMeta>::new();

info!("Initializing project state");
initialize_project_state(project.schemas_dir(), project, &mut route_table).await?;

let route_table: &'static RwLock<HashMap<PathBuf, RouteMeta>> =
Box::leak(Box::new(RwLock::new(route_table)));

Expand All @@ -258,7 +258,7 @@ async fn initialize_project_state(
schema_dir: PathBuf,
project: &Project,
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
let configured_client = olap::clickhouse::create_client(project.clickhouse_config.clone());
let producer = redpanda::create_producer(project.redpanda_config.clone());

Expand Down Expand Up @@ -294,7 +294,7 @@ async fn process_schemas_in_dir(
project: &Project,
configured_client: &ConfiguredDBClient,
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
if schema_dir.is_dir() {
for entry in std::fs::read_dir(schema_dir)? {
let entry = entry?;
Expand Down
4 changes: 2 additions & 2 deletions apps/framework-cli/src/cli/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn process_event(
event: notify::Event,
route_table: &RwLock<HashMap<PathBuf, RouteMeta>>,
configured_client: &ConfiguredDBClient,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
let mut route_table = route_table.write().await;

debug!(
Expand Down Expand Up @@ -93,7 +93,7 @@ async fn create_framework_objects_from_schema_file_path(
schema_file_path: &Path,
route_table: &mut HashMap<PathBuf, RouteMeta>,
configured_client: &ConfiguredDBClient,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
//! Creates the route, topics and tables from a path to the schema file

if let Some(ext) = schema_file_path.extension() {
Expand Down
67 changes: 26 additions & 41 deletions apps/framework-cli/src/framework/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ use crate::infrastructure::olap;

use std::io::ErrorKind;

use crate::infrastructure::olap::clickhouse::ClickhouseView;
use crate::infrastructure::olap::clickhouse::ClickhouseKafkaTrigger;

use std::io::Error;

use crate::infrastructure::olap::clickhouse::ConfiguredDBClient;

use super::schema::parse_schema_file;
use super::schema::DataModel;
use super::schema::MatViewOps;
use super::schema::TableOps;
use super::typescript::TypescriptInterface;

#[derive(Debug, Clone)]
Expand All @@ -51,7 +49,7 @@ pub fn framework_object_mapper(s: DataModel) -> FrameworkObject {
let clickhouse_table = olap::clickhouse::mapper::std_table_to_clickhouse_table(s.to_table());
FrameworkObject {
data_model: s.clone(),
table: clickhouse_table.clone(),
table: clickhouse_table,
topic: s.name.clone(),
ts_interface: framework::typescript::mapper::std_table_to_typescript_interface(
s.to_table(),
Expand Down Expand Up @@ -98,12 +96,17 @@ pub fn get_framework_objects_from_schema_file(path: &Path) -> Result<Vec<Framewo
Ok(framework_objects)
}

pub(crate) async fn create_or_replace_view(
pub(crate) async fn create_or_replace_kafka_trigger(
fo: &FrameworkObject,
view_name: String,
configured_client: &ConfiguredDBClient,
) -> Result<(), Error> {
let view = ClickhouseView::new(fo.table.db_name.clone(), view_name, fo.table.clone());
let view = ClickhouseKafkaTrigger::new(
fo.table.db_name.clone(),
view_name,
fo.table.kafka_table_name(),
fo.table.name.clone(),
);
let create_view_query = view.create_materialized_view_query().map_err(|e| {
Error::new(
ErrorKind::Other,
Expand Down Expand Up @@ -137,23 +140,17 @@ pub(crate) async fn create_or_replace_view(
Ok(())
}

pub(crate) async fn create_or_replace_table(
pub(crate) async fn create_or_replace_tables(
fo: &FrameworkObject,
configured_client: &ConfiguredDBClient,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
info!("Creating table: {:?}", fo.table.name);
let create_table_query = fo.table.create_table_query().map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to get clickhouse query: {:?}", e),
)
})?;
let drop_table_query = fo.table.drop_table_query().map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to get clickhouse query: {:?}", e),
)
})?;

let drop_data_table_query = fo.table.drop_kafka_table_query()?;
let drop_kafka_table_query = fo.table.drop_data_table_query()?;

let create_data_table_query = fo.table.create_data_table_query()?;
let create_kafka_table_query = fo.table.create_kafka_table_query()?;

olap::clickhouse::check_ready(configured_client)
.await
Expand All @@ -165,22 +162,10 @@ pub(crate) async fn create_or_replace_table(
})?;

// Clickhouse doesn't support dropping a view if it doesn't exist so we need to drop it first in case the schema has changed
olap::clickhouse::run_query(&drop_table_query, configured_client)
.await
.map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to drop table in clickhouse: {}", e),
)
})?;
olap::clickhouse::run_query(&create_table_query, configured_client)
.await
.map_err(|e| {
Error::new(
ErrorKind::Other,
format!("Failed to create table in clickhouse: {}", e),
)
})?;
olap::clickhouse::run_query(&drop_data_table_query, configured_client).await?;
olap::clickhouse::run_query(&drop_kafka_table_query, configured_client).await?;
olap::clickhouse::run_query(&create_data_table_query, configured_client).await?;
olap::clickhouse::run_query(&create_kafka_table_query, configured_client).await?;
Ok(())
}

Expand Down Expand Up @@ -245,7 +230,7 @@ pub async fn remove_table_and_topics_from_schema_file_path(
schema_file_path: &Path,
route_table: &mut HashMap<PathBuf, RouteMeta>,
configured_client: &ConfiguredDBClient,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
//need to get the path of the file, scan the route table and remove all the files that need to be deleted.
// This doesn't have to be as fast as the scanning for routes in the web server so we're ok with the scan here.

Expand Down Expand Up @@ -299,7 +284,7 @@ pub async fn process_objects(
configured_client: &ConfiguredDBClient,
compilable_objects: &mut Vec<TypescriptObjects>, // Objects that require compilation after processing
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
for fo in framework_objects {
let ingest_route = schema_file_path_to_ingest_route(
project.app_dir().clone(),
Expand All @@ -310,10 +295,10 @@ pub async fn process_objects(

debug!("Creating table & view: {:?}", fo.table.name);

let view_name = format!("{}_view", fo.table.name);
let view_name = format!("{}_trigger", fo.table.name);

create_or_replace_table(&fo, configured_client).await?;
create_or_replace_view(&fo, view_name.clone(), configured_client).await?;
create_or_replace_tables(&fo, configured_client).await?;
create_or_replace_kafka_trigger(&fo, view_name.clone(), configured_client).await?;

debug!("Table created: {:?}", fo.table.name);

Expand Down
12 changes: 10 additions & 2 deletions apps/framework-cli/src/framework/schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::{Display, Formatter};
use std::{
collections::HashMap,
fmt,
io::Error,
path::{Path, PathBuf},
};

Expand Down Expand Up @@ -38,6 +38,14 @@ pub struct UnsupportedDataTypeError {
pub type_name: String,
}

impl Display for UnsupportedDataTypeError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "UnsupportedDataTypeError: {}", self.type_name)
}
}

impl std::error::Error for UnsupportedDataTypeError {}

/// A function that maps an input type to an output type
type MapperFunc<I, O> = fn(i: I) -> O;

Expand Down Expand Up @@ -307,7 +315,7 @@ pub async fn process_schema_file(
project: &Project,
configured_client: &ConfiguredDBClient,
route_table: &mut HashMap<PathBuf, RouteMeta>,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
let framework_objects = get_framework_objects_from_schema_file(schema_file_path)?;
let mut compilable_objects: Vec<TypescriptObjects> = Vec::new();
process_objects(
Expand Down
59 changes: 42 additions & 17 deletions apps/framework-cli/src/infrastructure/olap/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use log::debug;
use serde::{Deserialize, Serialize};

use crate::{
framework::schema::{FieldArity, MatViewOps, TableOps, UnsupportedDataTypeError},
framework::schema::{FieldArity, UnsupportedDataTypeError},
utilities::constants::REDPANDA_CONTAINER_NAME,
};

use self::{
config::ClickhouseConfig,
queries::{
CreateMaterializedViewQuery, CreateTableQuery, DropMaterializedViewQuery, DropTableQuery,
CreateKafkaTriggerViewQuery, CreateTableQuery, DropMaterializedViewQuery, DropTableQuery,
},
};

Expand Down Expand Up @@ -158,45 +158,70 @@ impl ClickhouseTable {
}
}

impl TableOps for ClickhouseTable {
fn create_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
CreateTableQuery::build(
self.clone(),
impl ClickhouseTable {
pub fn kafka_table_name(&self) -> String {
format!("{}_kafka", self.name)
}

fn kafka_table(&self) -> ClickhouseTable {
ClickhouseTable {
name: self.kafka_table_name(),
..self.clone()
}
}

pub fn create_kafka_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
CreateTableQuery::kafka(
self.kafka_table(),
REDPANDA_CONTAINER_NAME.to_string(),
9092,
self.name.clone(),
)
}
pub fn create_data_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
CreateTableQuery::build(self.clone(), "Memory".to_string())
}

pub fn drop_kafka_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
DropTableQuery::build(self.kafka_table())
}

fn drop_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
pub fn drop_data_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
DropTableQuery::build(self.clone())
}
}

#[derive(Debug, Clone)]
pub struct ClickhouseView {
pub struct ClickhouseKafkaTrigger {
pub db_name: String,
pub name: String,
pub source_table: ClickhouseTable,
pub source_table_name: String,
pub dest_table_name: String,
}

impl ClickhouseView {
pub fn new(db_name: String, name: String, source_table: ClickhouseTable) -> ClickhouseView {
ClickhouseView {
impl ClickhouseKafkaTrigger {
pub fn new(
db_name: String,
name: String,
source_table_name: String,
dest_table_name: String,
) -> ClickhouseKafkaTrigger {
ClickhouseKafkaTrigger {
db_name,
name,
source_table,
source_table_name,
dest_table_name,
}
}
}

pub type QueryString = String;

impl MatViewOps for ClickhouseView {
fn create_materialized_view_query(&self) -> Result<QueryString, UnsupportedDataTypeError> {
CreateMaterializedViewQuery::build(self.clone())
impl ClickhouseKafkaTrigger {
pub fn create_materialized_view_query(&self) -> Result<QueryString, UnsupportedDataTypeError> {
Ok(CreateKafkaTriggerViewQuery::build(self.clone()))
}
fn drop_materialized_view_query(&self) -> Result<QueryString, UnsupportedDataTypeError> {
pub fn drop_materialized_view_query(&self) -> Result<QueryString, UnsupportedDataTypeError> {
DropMaterializedViewQuery::build(self.clone())
}
}
Expand Down
Loading

0 comments on commit 2196d9b

Please sign in to comment.