Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a debug table for clickhouse_events_json #23377

Merged
merged 23 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b101422
feat: Add a debug table for clickhouse_events_json
fuziontech Jul 1, 2024
24215ed
get_create_table => get_create_view
fuziontech Jul 1, 2024
11a752c
use DateTime for _timestamp for TTL
fuziontech Jul 2, 2024
ff61bee
fix type issue
fuziontech Jul 2, 2024
d257ad2
commas
fuziontech Jul 2, 2024
03d036c
not nullable
fuziontech Jul 2, 2024
e7c14d9
reorder tables
fuziontech Jul 2, 2024
ba0740f
update sequencing for migrations
fuziontech Jul 2, 2024
32cc9b0
Merge branch 'master' into debug_events
fuziontech Jul 2, 2024
dc5195b
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
afbbaac
Update query snapshots
github-actions[bot] Jul 2, 2024
f3a062c
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
dfc4269
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
e18bbb9
Update UI snapshots for `chromium` (2)
github-actions[bot] Jul 2, 2024
652fb4c
fix(multi project flags): remove flag id from URL when switching proj…
jurajmajerik Jul 2, 2024
d47c704
Update UI snapshots for `chromium` (2)
github-actions[bot] Jul 2, 2024
9337ecd
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
78a6afc
Update UI snapshots for `chromium` (2)
github-actions[bot] Jul 2, 2024
57052ad
remove version and don't skip bad messages
fuziontech Jul 2, 2024
a9a7c98
Add _error and _raw_message and set kafka_handle_error_mode=stream
fuziontech Jul 2, 2024
2d1b3c3
Merge branch 'master' into debug_events
fuziontech Jul 2, 2024
675388e
Update UI snapshots for `chromium` (1)
github-actions[bot] Jul 2, 2024
46cedae
typos
fuziontech Jul 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions posthog/clickhouse/migrations/0067_event_kafka_debug_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON
from posthog.models.kafka_debug.sql import (
KafkaDebugKafkaTable,
KafkaDebugTable,
KafkaDebugMaterializedView,
)
from posthog.settings.data_stores import KAFKA_HOSTS


debug_table = KafkaDebugTable(topic=KAFKA_EVENTS_JSON)
kafka_table = KafkaDebugKafkaTable(brokers=KAFKA_HOSTS, topic=KAFKA_EVENTS_JSON)
materialized_view = KafkaDebugMaterializedView(to_table=debug_table, from_table=kafka_table)


operations = [
run_sql_with_exceptions(debug_table.get_create_table_sql()),
run_sql_with_exceptions(kafka_table.get_create_table_sql()),
run_sql_with_exceptions(materialized_view.get_create_view_sql()),
]
Empty file.
86 changes: 86 additions & 0 deletions posthog/models/kafka_debug/sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from dataclasses import dataclass
from posthog.clickhouse.kafka_engine import kafka_engine
from posthog.clickhouse.table_engines import MergeTreeEngine
from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE


@dataclass
class KafkaDebugKafkaTable:
brokers: list[str]
topic: str
consumer_group: str = "debug"

@property
def table_name(self) -> str:
return f"kafka_{self.topic}_debug"

def get_create_table_sql(self) -> str:
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
(
payload String
)
ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)}
SETTINGS input_format_values_interpret_expressions=0, kafka_handle_error_mode='stream'
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}'
"""


@dataclass
class KafkaDebugTable:
topic: str

@property
def table_name(self) -> str:
return f"{self.topic}_debug"

def get_create_table_sql(self) -> str:
engine = MergeTreeEngine(self.table_name)
return f"""
CREATE TABLE IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' (
payload String,
_timestamp DateTime,
_timestamp_ms Nullable(DateTime64(3)),
_partition UInt64,
_offset UInt64,
_error String,
_raw_message String
)
ENGINE = {engine}
PARTITION BY toStartOfHour(_timestamp)
ORDER BY (_partition, _offset)
TTL _timestamp + INTERVAL 14 DAY
"""


@dataclass
class KafkaDebugMaterializedView:
to_table: KafkaDebugTable
from_table: KafkaDebugKafkaTable

@property
def view_name(self) -> str:
return f"{self.to_table.table_name}_mv"

def get_create_view_sql(self) -> str:
return f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' TO {self.to_table.table_name}
AS SELECT
payload,
_timestamp,
_timestamp_ms,
_partition,
_offset,
_error,
_raw_message
FROM `{CLICKHOUSE_DATABASE}`.{self.from_table.table_name}
"""

def get_drop_view_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.view_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""
Loading