Skip to content

Commit

Permalink
chore: change the serialization format from JSONEachRow to LineAsStri…
Browse files Browse the repository at this point in the history
…ng for debug table (#23421)
  • Loading branch information
fuziontech authored Jul 2, 2024
1 parent 39a4d27 commit 49c79c0
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 3 deletions.
4 changes: 2 additions & 2 deletions posthog/clickhouse/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
"""


def kafka_engine(topic: str, kafka_host: str | None = None, group="group1") -> str:
def kafka_engine(topic: str, kafka_host: str | None = None, group="group1", serialization="JSONEachRow") -> str:
if kafka_host is None:
kafka_host = ",".join(settings.KAFKA_HOSTS_FOR_CLICKHOUSE)
return KAFKA_ENGINE.format(topic=topic, kafka_host=kafka_host, group=group, serialization="JSONEachRow")
return KAFKA_ENGINE.format(topic=topic, kafka_host=kafka_host, group=group, serialization=serialization)


def ttl_period(field: str = "created_at", amount: int = 3, unit: Literal["DAY", "WEEK"] = "WEEK") -> str:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.kafka_client.topics import KAFKA_EVENTS_JSON
from posthog.models.kafka_debug.sql import (
KafkaDebugTable,
KafkaDebugKafkaTable,
KafkaDebugMaterializedView,
)
from posthog.settings.data_stores import KAFKA_HOSTS


debug_table = KafkaDebugTable(topic=KAFKA_EVENTS_JSON)
kafka_table = KafkaDebugKafkaTable(brokers=KAFKA_HOSTS, topic=KAFKA_EVENTS_JSON)
materialized_view = KafkaDebugMaterializedView(to_table=debug_table, from_table=kafka_table)


operations = [
# We just need to drop and recreate the kafka and MV tables here to
# correct the serialization type (LineAsString from JSONEachRow)
run_sql_with_exceptions(kafka_table.get_drop_table_sql()),
run_sql_with_exceptions(kafka_table.get_create_table_sql()),
run_sql_with_exceptions(materialized_view.get_drop_view_sql()),
run_sql_with_exceptions(materialized_view.get_create_view_sql()),
]
13 changes: 12 additions & 1 deletion posthog/models/kafka_debug/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class KafkaDebugKafkaTable:
brokers: list[str]
topic: str
consumer_group: str = "debug"
serialization: str = "LineAsString"

@property
def table_name(self) -> str:
Expand All @@ -20,7 +21,12 @@ def get_create_table_sql(self) -> str:
(
payload String
)
ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)}
ENGINE={kafka_engine(
kafka_host=",".join(self.brokers),
topic=self.topic,
group=self.consumer_group,
serialization=self.serialization
)}
SETTINGS input_format_values_interpret_expressions=0, kafka_handle_error_mode='stream'
"""

Expand Down Expand Up @@ -56,6 +62,11 @@ def get_create_table_sql(self) -> str:
TTL _timestamp + INTERVAL 14 DAY
"""

def get_drop_table_sql(self) -> str:
return f"""
DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC
"""


@dataclass
class KafkaDebugMaterializedView:
Expand Down

0 comments on commit 49c79c0

Please sign in to comment.