From 3813df922f0da9cfc7b2f48d2170e00e037f0e00 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Tue, 2 Jul 2024 14:49:46 -0700 Subject: [PATCH] feat: Add a debug table for clickhouse_events_json (#23377) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Juraj Majerik --- .../0067_event_kafka_debug_table.py | 20 +++++ posthog/models/kafka_debug/__init__.py | 0 posthog/models/kafka_debug/sql.py | 86 +++++++++++++++++++ 3 files changed, 106 insertions(+) create mode 100644 posthog/clickhouse/migrations/0067_event_kafka_debug_table.py create mode 100644 posthog/models/kafka_debug/__init__.py create mode 100644 posthog/models/kafka_debug/sql.py diff --git a/posthog/clickhouse/migrations/0067_event_kafka_debug_table.py b/posthog/clickhouse/migrations/0067_event_kafka_debug_table.py new file mode 100644 index 0000000000000..07204ebd82e00 --- /dev/null +++ b/posthog/clickhouse/migrations/0067_event_kafka_debug_table.py @@ -0,0 +1,20 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.kafka_client.topics import KAFKA_EVENTS_JSON +from posthog.models.kafka_debug.sql import ( + KafkaDebugKafkaTable, + KafkaDebugTable, + KafkaDebugMaterializedView, +) +from posthog.settings.data_stores import KAFKA_HOSTS + + +debug_table = KafkaDebugTable(topic=KAFKA_EVENTS_JSON) +kafka_table = KafkaDebugKafkaTable(brokers=KAFKA_HOSTS, topic=KAFKA_EVENTS_JSON) +materialized_view = KafkaDebugMaterializedView(to_table=debug_table, from_table=kafka_table) + + +operations = [ + run_sql_with_exceptions(debug_table.get_create_table_sql()), + run_sql_with_exceptions(kafka_table.get_create_table_sql()), + run_sql_with_exceptions(materialized_view.get_create_view_sql()), +] diff --git a/posthog/models/kafka_debug/__init__.py b/posthog/models/kafka_debug/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/posthog/models/kafka_debug/sql.py b/posthog/models/kafka_debug/sql.py new file mode 100644 index 0000000000000..7b5e8750e0413 --- /dev/null +++ b/posthog/models/kafka_debug/sql.py @@ -0,0 +1,86 @@ +from dataclasses import dataclass +from posthog.clickhouse.kafka_engine import kafka_engine +from posthog.clickhouse.table_engines import MergeTreeEngine +from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE + + +@dataclass +class KafkaDebugKafkaTable: + brokers: list[str] + topic: str + consumer_group: str = "debug" + + @property + def table_name(self) -> str: + return f"kafka_{self.topic}_debug" + + def get_create_table_sql(self) -> str: + return f""" + CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + ( + payload String + ) + ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)} + SETTINGS input_format_values_interpret_expressions=0, kafka_handle_error_mode='stream' + """ + + def get_drop_table_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' + """ + + +@dataclass +class KafkaDebugTable: + topic: str + + @property + def table_name(self) -> str: + return f"{self.topic}_debug" + + def get_create_table_sql(self) -> str: + engine = MergeTreeEngine(self.table_name) + return f""" + CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' ( + payload String, + _timestamp DateTime, + _timestamp_ms Nullable(DateTime64(3)), + _partition UInt64, + _offset UInt64, + _error String, + _raw_message String + ) + ENGINE = {engine} + PARTITION BY toStartOfHour(_timestamp) + ORDER BY (_partition, _offset) + TTL _timestamp + INTERVAL 14 DAY + """ + + +@dataclass +class KafkaDebugMaterializedView: + to_table: KafkaDebugTable + from_table: KafkaDebugKafkaTable + + @property + def view_name(self) -> str: + return f"{self.to_table.table_name}_mv" + + def get_create_view_sql(self) -> str: + return f""" + CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' TO {self.to_table.table_name} + AS SELECT + payload, + _timestamp, + _timestamp_ms, + _partition, + _offset, + _error, + _raw_message + FROM `{CLICKHOUSE_DATABASE}`.{self.from_table.table_name} + """ + + def get_drop_view_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC + """