Skip to content

Commit

Permalink
Use merge tree engine for data table (#451)
Browse files Browse the repository at this point in the history
We go from not supporting primary key to requiring primary key.
  • Loading branch information
phiSgr authored Feb 21, 2024
1 parent bfcb91b commit 54db112
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 21 deletions.
3 changes: 2 additions & 1 deletion apps/framework-cli/src/infrastructure/olap/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use log::debug;

use serde::{Deserialize, Serialize};

use crate::infrastructure::olap::clickhouse::queries::ClickhouseEngine;
use crate::{
framework::schema::{FieldArity, UnsupportedDataTypeError},
utilities::constants::REDPANDA_CONTAINER_NAME,
Expand Down Expand Up @@ -182,7 +183,7 @@ impl ClickhouseTable {
)
}
pub fn create_data_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
CreateTableQuery::build(self.clone(), "Memory".to_string())
CreateTableQuery::build(self.clone(), ClickhouseEngine::MergeTree)
}

pub fn drop_kafka_table_query(&self) -> Result<String, UnsupportedDataTypeError> {
Expand Down
41 changes: 29 additions & 12 deletions apps/framework-cli/src/infrastructure/olap/clickhouse/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ pub struct CreateTableQuery;
static KAFKA_SETTINGS: &str =
"kafka_skip_broken_messages = 1, date_time_input_format = 'best_effort'";

pub enum ClickhouseEngine {
MergeTree,
Kafka(String, u16, String),
}

impl CreateTableQuery {
pub fn kafka(
table: ClickhouseTable,
Expand All @@ -45,16 +50,13 @@ impl CreateTableQuery {
) -> Result<String, UnsupportedDataTypeError> {
CreateTableQuery::build(
table,
format!(
"Kafka('{}:{}', '{}', 'clickhouse-group', 'JSONEachRow') SETTINGS {}",
kafka_host, kafka_port, topic, KAFKA_SETTINGS,
),
ClickhouseEngine::Kafka(kafka_host, kafka_port, topic),
)
}

pub fn build(
table: ClickhouseTable,
engine: String,
engine: ClickhouseEngine,
) -> Result<String, UnsupportedDataTypeError> {
let mut tt = TinyTemplate::new();
tt.set_default_formatter(&format_unescaped); // by default it formats HTML-escaped and messes up single quotes
Expand All @@ -78,14 +80,29 @@ struct CreateTableContext {
impl CreateTableContext {
fn new(
table: ClickhouseTable,
engine: String,
engine: ClickhouseEngine,
) -> Result<CreateTableContext, UnsupportedDataTypeError> {
let primary_key = table
.columns
.iter()
.filter(|column| column.primary_key)
.map(|column| column.name.clone())
.collect::<Vec<String>>();
let (engine, ignore_primary_key) = match engine {
ClickhouseEngine::MergeTree => ("MergeTree".to_string(), false),
ClickhouseEngine::Kafka(kafka_host, kafka_port, topic) => (
format!(
"Kafka('{}:{}', '{}', 'clickhouse-group', 'JSONEachRow') SETTINGS {}",
kafka_host, kafka_port, topic, KAFKA_SETTINGS,
),
true,
),
};

let primary_key = if ignore_primary_key {
Vec::new()
} else {
table
.columns
.iter()
.filter(|column| column.primary_key)
.map(|column| column.name.clone())
.collect::<Vec<String>>()
};

Ok(CreateTableContext {
db_name: table.db_name,
Expand Down
8 changes: 4 additions & 4 deletions apps/framework-cli/tests/psl/event.prisma
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
model Awesome {
id Int
name String
description String
}
id Int @id
name String
description String
}
8 changes: 4 additions & 4 deletions apps/framework-cli/tests/psl/simple.prisma
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
model User {
id Int
email String
name String?
}
id Int @id
email String
name String?
}

0 comments on commit 54db112

Please sign in to comment.