Skip to content

Commit

Permalink
Tim/pol 89 create view automatically from table creation (#65)
Browse files Browse the repository at this point in the history
* views are working

* removing temp fiels

* removing the client code in favor of new approacj
  • Loading branch information
tg339 authored Oct 31, 2023
1 parent 292a135 commit 834fad8
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 77 deletions.
4 changes: 2 additions & 2 deletions apps/igloo-kit-cli/src/cli/routines/validate.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::io::{self, Write, Error, ErrorKind};
use crate::{cli::{display::Message, DebugStatus}, utilities::docker};
use crate::{cli::{display::Message, DebugStatus}, utilities::docker, infrastructure::PANDA_NETWORK};
use super::{Routine, RoutineSuccess, RoutineFailure};

pub struct ValidateClickhouseRun(DebugStatus);
Expand Down Expand Up @@ -73,7 +73,7 @@ impl Routine for ValidatePandaHouseNetwork {

let string_output = String::from_utf8(output.stdout).unwrap();

if string_output.contains("panda_house") {
if string_output.contains(PANDA_NETWORK) {
Ok(RoutineSuccess::success(Message::new("Successfully".to_string(), "validated panda house docker network".to_string())))
} else {
Err(RoutineFailure::new(Message::new("Failed".to_string(), "to validate panda house docker network".to_string()), Error::new(ErrorKind::Other, "Failed to validate panda house network exists")))
Expand Down
35 changes: 24 additions & 11 deletions apps/igloo-kit-cli/src/cli/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@ use std::{sync::{Arc, RwLock}, collections::HashMap, path::PathBuf, io::{Error,
use notify::{RecommendedWatcher, Config, RecursiveMode, Watcher, event::ModifyKind};
use tokio::sync::Mutex;

use crate::{framework::{directories::get_app_directory, schema::{parse_schema_file, OpsTable}}, cli::display::show_message, infrastructure::{stream, olap::{self, clickhouse::{ConfiguredClient, mapper, ClickhouseTable, config::ClickhouseConfig}}}};
use crate::{framework::{directories::get_app_directory, schema::{parse_schema_file, TableOps, MatViewOps}}, cli::display::show_message, infrastructure::{stream, olap::{self, clickhouse::{ConfiguredClient, mapper, ClickhouseTable, config::ClickhouseConfig, ClickhouseView}}}};

use super::{CommandTerminal, display::{MessageType, Message}};

// fn route_to_topic_name(route: PathBuf) -> String {
// let route = route.to_str().unwrap().to_string();
// let route = route.replace("/", ".");
// route
// }

fn dataframe_path_to_ingest_route(project_dir: PathBuf, path: PathBuf, table_name: String) -> PathBuf {
let dataframe_path = project_dir.join("dataframes");
let mut route = path.strip_prefix(dataframe_path).unwrap().to_path_buf();
Expand Down Expand Up @@ -54,6 +48,7 @@ async fn process_event(project_dir: PathBuf, event: notify::Event, route_table:
pub struct RouteMeta {
pub original_file_path: PathBuf,
pub table_name: String,
pub view_name: Option<String>,
}

async fn create_table_and_topics_from_dataframe_route(route: &PathBuf, project_dir: PathBuf, route_table: &mut tokio::sync::MutexGuard<'_, HashMap::<PathBuf, RouteMeta>>, configured_client: &ConfiguredClient) -> Result<(), Error> {
Expand All @@ -64,12 +59,25 @@ async fn create_table_and_topics_from_dataframe_route(route: &PathBuf, project_d

for table in tables {
let ingest_route = dataframe_path_to_ingest_route(project_dir.clone(), route.clone(), table.name.clone());
route_table.insert(ingest_route, RouteMeta { original_file_path: route.clone(), table_name: table.name.clone() });

stream::redpanda::create_topic_from_name(table.name.clone());
let query = table.create_table_query()
let table_query = table.create_table_query()
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to get clickhouse query: {:?}", e)))?;

olap::clickhouse::run_query(table_query, configured_client).await
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?;

let view = ClickhouseView::new(
table.db_name.clone(),
format!("{}_view", table.name),
table.clone());
let view_query = view.create_materialized_view_query()
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to get clickhouse query: {:?}", e)))?;
olap::clickhouse::run_query(query, configured_client).await

olap::clickhouse::run_query(view_query, configured_client).await
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?;

route_table.insert(ingest_route, RouteMeta { original_file_path: route.clone(), table_name: table.name.clone(), view_name: Some(view.name.clone()) });
};
}
} else {
Expand All @@ -85,9 +93,14 @@ async fn remove_table_and_topics_from_dataframe_route(route: &PathBuf, route_tab
if meta.original_file_path == route.clone() {
stream::redpanda::delete_topic(meta.table_name.clone());

olap::clickhouse::delete_table(meta.table_name, configured_client).await
olap::clickhouse::delete_table_or_view(meta.table_name, configured_client).await
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?;

if let Some(view_name) = meta.view_name {
olap::clickhouse::delete_table_or_view(view_name, configured_client).await
.map_err(|e| Error::new(ErrorKind::Other, format!("Failed to create table in clickhouse: {}", e)))?;
}

route_table.remove(&k);
}
}
Expand Down
7 changes: 6 additions & 1 deletion apps/igloo-kit-cli/src/framework/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,16 @@ pub struct Table {
pub columns: Vec<Column>,
}

pub trait OpsTable {
pub trait TableOps {
fn create_table_query(&self) -> Result<String, UnsupportedDataTypeError>;
fn drop_table_query(&self) -> Result<String, UnsupportedDataTypeError>;
}

pub trait MatViewOps {
fn create_materialized_view_query(&self) -> Result<String, UnsupportedDataTypeError>;
fn drop_materialized_view_query(&self) -> Result<String, UnsupportedDataTypeError>;
}

#[derive(Debug, Clone)]
pub struct Column {
pub name: String,
Expand Down
63 changes: 41 additions & 22 deletions apps/igloo-kit-cli/src/infrastructure/olap/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ pub mod config;
pub mod mapper;
mod queries;

use std::fmt;
use std::fmt::{self};

use clickhouse::Client;
use reqwest::Url;
use schema_ast::ast::FieldArity;

use crate::framework::schema::{OpsTable, UnsupportedDataTypeError};
use crate::framework::schema::{TableOps, UnsupportedDataTypeError, MatViewOps};

use self::{queries::{CreateTableQuery, DropTableQuery}, config::ClickhouseConfig};
use self::{queries::{CreateTableQuery, DropTableQuery, CreateMaterializedViewQuery, DropMaterializedViewQuery}, config::ClickhouseConfig};

#[derive(Debug, Clone)]
pub enum ClickhouseTableType {
Expand All @@ -20,6 +20,12 @@ pub enum ClickhouseTableType {
Unsupported
}

impl fmt::Display for ClickhouseTableType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self)
}
}

#[derive(Debug, Clone)]
pub enum ClickhouseColumnType {
String,
Expand Down Expand Up @@ -107,7 +113,7 @@ impl ClickhouseTable {
}
}

impl OpsTable for ClickhouseTable {
impl TableOps for ClickhouseTable {
fn create_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
CreateTableQuery::new(self.clone(), "redpanda-1".to_string(), 9092, self.name.clone())
}
Expand All @@ -117,12 +123,39 @@ impl OpsTable for ClickhouseTable {
}
}

#[derive(Debug, Clone)]
pub struct ClickhouseView {
pub db_name: String,
pub name: String,
pub source_table: ClickhouseTable,
}

impl ClickhouseView {
pub fn new(db_name: String, name: String, source_table: ClickhouseTable) -> ClickhouseView {
ClickhouseView {
db_name,
name,
source_table,
}
}
}

pub type QueryString = String;

impl MatViewOps for ClickhouseView {
fn create_materialized_view_query(&self) -> Result<QueryString, UnsupportedDataTypeError> {
CreateMaterializedViewQuery::new(self.clone())
}
fn drop_materialized_view_query(&self) -> Result<QueryString, UnsupportedDataTypeError> {
DropMaterializedViewQuery::new(self.clone())
}
}

pub struct ConfiguredClient {
pub client: Client,
pub config: ClickhouseConfig,
}


pub fn create_client(clickhouse_config: ClickhouseConfig) -> ConfiguredClient {
ConfiguredClient {
client: Client::default()
Expand All @@ -135,28 +168,14 @@ pub fn create_client(clickhouse_config: ClickhouseConfig) -> ConfiguredClient {
}

// Run an arbitrary clickhouse query
pub async fn run_query(query: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> {
pub async fn run_query(query: QueryString, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> {
let client = &configured_client.client;
client.query(query.as_str()).execute().await
}

// Creates a table in clickhouse from a file name. this table should have a single field that accepts a json blob
pub async fn create_table(table_name: String, topic: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> {
let client = &configured_client.client;
let config = &configured_client.config;
let db_name = &config.db_name;
let cluster_network = &config.cluster_network;
let kafka_port = &config.kafka_port;

// If you want to change the settings when doing a query you can do it as follows: SETTINGS allow_experimental_object_type = 1;
client.query(format!("CREATE TABLE IF NOT EXISTS {db_name}.{table_name}
(data String) ENGINE = Kafka('{cluster_network}:{kafka_port}', '{topic}', 'clickhouse-group', 'JSONEachRow') ;")
.as_str() ).execute().await
}

pub async fn delete_table(table_name: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> {
pub async fn delete_table_or_view(table_or_view_name: String, configured_client: &ConfiguredClient) -> Result<(), clickhouse::error::Error> {
let client = &configured_client.client;
let db_name = &configured_client.config.db_name;

client.query(format!("DROP TABLE {db_name}.{table_name}").as_str()).execute().await
client.query(format!("DROP TABLE {db_name}.{table_or_view_name}").as_str()).execute().await
}
Loading

0 comments on commit 834fad8

Please sign in to comment.