Skip to content

Commit

Permalink
feat: Add a debug table for clickhouse_events_json (#23377)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Juraj Majerik <[email protected]>
  • Loading branch information
3 people authored Jul 2, 2024
1 parent 26ea95f commit 3813df9
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 0 deletions.
20 changes: 20 additions & 0 deletions posthog/clickhouse/migrations/0067_event_kafka_debug_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON
from posthog.models.kafka_debug.sql import (
KafkaDebugKafkaTable,
KafkaDebugTable,
KafkaDebugMaterializedView,
)
from posthog.settings.data_stores import KAFKA_HOSTS


debug_table = KafkaDebugTable(topic=KAFKA_EVENTS_JSON)
kafka_table = KafkaDebugKafkaTable(brokers=KAFKA_HOSTS, topic=KAFKA_EVENTS_JSON)
materialized_view = KafkaDebugMaterializedView(to_table=debug_table, from_table=kafka_table)


operations = [
run_sql_with_exceptions(debug_table.get_create_table_sql()),
run_sql_with_exceptions(kafka_table.get_create_table_sql()),
run_sql_with_exceptions(materialized_view.get_create_view_sql()),
]
Empty file.
86 changes: 86 additions & 0 deletions posthog/models/kafka_debug/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from dataclasses import dataclass
from posthog.clickhouse.kafka_engine import kafka_engine
from posthog.clickhouse.table_engines import MergeTreeEngine
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE


@dataclass
class KafkaDebugKafkaTable:
brokers: list[str]
topic: str
consumer_group: str = "debug"

@property
def table_name(self) -> str:
return f"kafka_{self.topic}_debug"

def get_create_table_sql(self) -> str:
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
(
payload String
)
ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)}
SETTINGS input_format_values_interpret_expressions=0, kafka_handle_error_mode='stream'
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
"""


@dataclass
class KafkaDebugTable:
topic: str

@property
def table_name(self) -> str:
return f"{self.topic}_debug"

def get_create_table_sql(self) -> str:
engine = MergeTreeEngine(self.table_name)
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' (
payload String,
_timestamp DateTime,
_timestamp_ms Nullable(DateTime64(3)),
_partition UInt64,
_offset UInt64,
_error String,
_raw_message String
)
ENGINE = {engine}
PARTITION BY toStartOfHour(_timestamp)
ORDER BY (_partition, _offset)
TTL _timestamp + INTERVAL 14 DAY
"""


@dataclass
class KafkaDebugMaterializedView:
to_table: KafkaDebugTable
from_table: KafkaDebugKafkaTable

@property
def view_name(self) -> str:
return f"{self.to_table.table_name}_mv"

def get_create_view_sql(self) -> str:
return f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' TO {self.to_table.table_name}
AS SELECT
payload,
_timestamp,
_timestamp_ms,
_partition,
_offset,
_error,
_raw_message
FROM `{CLICKHOUSE_DATABASE}`.{self.from_table.table_name}
"""

def get_drop_view_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""

0 comments on commit 3813df9

Please sign in to comment.