diff --git a/posthog/clickhouse/kafka_engine.py b/posthog/clickhouse/kafka_engine.py index 68a3282d6cbc3..abfe456bf6a9a 100644 --- a/posthog/clickhouse/kafka_engine.py +++ b/posthog/clickhouse/kafka_engine.py @@ -36,10 +36,10 @@ """ -def kafka_engine(topic: str, kafka_host: str | None = None, group="group1") -> str: +def kafka_engine(topic: str, kafka_host: str | None = None, group="group1", serialization="JSONEachRow") -> str: if kafka_host is None: kafka_host = ",".join(settings.KAFKA_HOSTS_FOR_CLICKHOUSE) - return KAFKA_ENGINE.format(topic=topic, kafka_host=kafka_host, group=group, serialization="JSONEachRow") + return KAFKA_ENGINE.format(topic=topic, kafka_host=kafka_host, group=group, serialization=serialization) def ttl_period(field: str = "created_at", amount: int = 3, unit: Literal["DAY", "WEEK"] = "WEEK") -> str: diff --git a/posthog/clickhouse/migrations/0067_event_kafka_debug_table.py b/posthog/clickhouse/migrations/0067_event_kafka_debug_table copy.py similarity index 100% rename from posthog/clickhouse/migrations/0067_event_kafka_debug_table.py rename to posthog/clickhouse/migrations/0067_event_kafka_debug_table copy.py diff --git a/posthog/clickhouse/migrations/0068_event_kafka_debug_line_as_string.py b/posthog/clickhouse/migrations/0068_event_kafka_debug_line_as_string.py new file mode 100644 index 0000000000000..05987132159d9 --- /dev/null +++ b/posthog/clickhouse/migrations/0068_event_kafka_debug_line_as_string.py @@ -0,0 +1,23 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.kafka_client.topics import KAFKA_EVENTS_JSON +from posthog.models.kafka_debug.sql import ( + KafkaDebugTable, + KafkaDebugKafkaTable, + KafkaDebugMaterializedView, +) +from posthog.settings.data_stores import KAFKA_HOSTS + + +debug_table = KafkaDebugTable(topic=KAFKA_EVENTS_JSON) +kafka_table = KafkaDebugKafkaTable(brokers=KAFKA_HOSTS, topic=KAFKA_EVENTS_JSON) +materialized_view = KafkaDebugMaterializedView(to_table=debug_table, from_table=kafka_table) + + +operations = [ + # We just need to drop and recreate the kafka and MV tables here to + # correct the serialization type (LineAsString from JSONEachRow) + run_sql_with_exceptions(kafka_table.get_drop_table_sql()), + run_sql_with_exceptions(kafka_table.get_create_table_sql()), + run_sql_with_exceptions(materialized_view.get_drop_view_sql()), + run_sql_with_exceptions(materialized_view.get_create_view_sql()), +] diff --git a/posthog/models/kafka_debug/sql.py b/posthog/models/kafka_debug/sql.py index 7b5e8750e0413..b87a881063de3 100644 --- a/posthog/models/kafka_debug/sql.py +++ b/posthog/models/kafka_debug/sql.py @@ -9,6 +9,7 @@ class KafkaDebugKafkaTable: brokers: list[str] topic: str consumer_group: str = "debug" + serialization: str = "LineAsString" @property def table_name(self) -> str: @@ -20,7 +21,12 @@ def get_create_table_sql(self) -> str: ( payload String ) - ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)} + ENGINE={kafka_engine( + kafka_host=",".join(self.brokers), + topic=self.topic, + group=self.consumer_group, + serialization=self.serialization + )} SETTINGS input_format_values_interpret_expressions=0, kafka_handle_error_mode='stream' """ @@ -56,6 +62,11 @@ def get_create_table_sql(self) -> str: TTL _timestamp + INTERVAL 14 DAY """ + def get_drop_table_sql(self) -> str: + return f""" + DROP TABLE IF EXISTS `{CLICKHOUSE_DATABASE}`.{self.table_name} ON CLUSTER '{CLICKHOUSE_CLUSTER}' SYNC + """ + @dataclass class KafkaDebugMaterializedView: