Skip to content

Commit

Permalink
skip broken messages and best effort DateTime parsing for Kafka (#394)
Browse files Browse the repository at this point in the history
  • Loading branch information
phiSgr authored Feb 15, 2024
1 parent 5499848 commit 7875d51
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ stream_like_engine_allow_direct_select = 1;

pub struct CreateTableQuery;

static KAFKA_SETTINGS: &str =
"kafka_skip_broken_messages = 1, date_time_input_format = 'best_effort'";

impl CreateTableQuery {
pub fn kafka(
table: ClickhouseTable,
Expand All @@ -43,8 +46,8 @@ impl CreateTableQuery {
CreateTableQuery::build(
table,
format!(
"Kafka('{}:{}', '{}', 'clickhouse-group', 'JSONEachRow')",
kafka_host, kafka_port, topic,
"Kafka('{}:{}', '{}', 'clickhouse-group', 'JSONEachRow') SETTINGS {}",
kafka_host, kafka_port, topic, KAFKA_SETTINGS,
),
)
}
Expand Down

0 comments on commit 7875d51

Please sign in to comment.