Skip to content

Commit

Permalink
feat: create events table to store last 7 days of data (#26239)
Browse files Browse the repository at this point in the history
  • Loading branch information
Daesgar authored Nov 20, 2024
1 parent 39a721d commit 4ff32c5
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 1 deletion.
2 changes: 2 additions & 0 deletions posthog/clickhouse/kafka_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
, _partition UInt64
"""

KAFKA_TIMESTAMP_MS_COLUMN = "_timestamp_ms DateTime64"


def kafka_engine(topic: str, kafka_host: str | None = None, group="group1", serialization="JSONEachRow") -> str:
if kafka_host is None:
Expand Down
14 changes: 14 additions & 0 deletions posthog/clickhouse/migrations/0086_events_recent_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.event.sql import (
EVENTS_RECENT_TABLE_JSON_MV_SQL,
EVENTS_RECENT_TABLE_SQL,
KAFKA_EVENTS_RECENT_TABLE_JSON_SQL,
DISTRIBUTED_EVENTS_RECENT_TABLE_SQL,
)

operations = [
run_sql_with_exceptions(EVENTS_RECENT_TABLE_SQL()),
run_sql_with_exceptions(KAFKA_EVENTS_RECENT_TABLE_JSON_SQL()),
run_sql_with_exceptions(EVENTS_RECENT_TABLE_JSON_MV_SQL()),
run_sql_with_exceptions(DISTRIBUTED_EVENTS_RECENT_TABLE_SQL()),
]
84 changes: 83 additions & 1 deletion posthog/models/event/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import (
KAFKA_COLUMNS,
KAFKA_COLUMNS_WITH_PARTITION,
KAFKA_TIMESTAMP_MS_COLUMN,
STORAGE_POLICY,
kafka_engine,
trim_quotes_expr,
Expand All @@ -18,7 +20,7 @@

EVENTS_DATA_TABLE = lambda: "sharded_events"
WRITABLE_EVENTS_DATA_TABLE = lambda: "writable_events"

EVENTS_RECENT_DATA_TABLE = lambda: "events_recent"
TRUNCATE_EVENTS_TABLE_SQL = (
lambda: f"TRUNCATE TABLE IF EXISTS {EVENTS_DATA_TABLE()} ON CLUSTER '{settings.CLICKHOUSE_CLUSTER}'"
)
Expand Down Expand Up @@ -185,6 +187,86 @@
)
)


KAFKA_EVENTS_RECENT_TABLE_JSON_SQL = lambda: (
EVENTS_TABLE_BASE_SQL
+ """
SETTINGS kafka_skip_broken_messages = 100
"""
).format(
table_name="kafka_events_recent_json",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=kafka_engine(topic=KAFKA_EVENTS_JSON, group="group1_recent"),
extra_fields="",
materialized_columns="",
indexes="",
)

EVENTS_RECENT_TABLE_JSON_MV_SQL = (
lambda: """
CREATE MATERIALIZED VIEW IF NOT EXISTS events_recent_json_mv ON CLUSTER '{cluster}'
TO {database}.{target_table}
AS SELECT
uuid,
event,
properties,
timestamp,
team_id,
distinct_id,
elements_chain,
created_at,
person_id,
person_created_at,
person_properties,
group0_properties,
group1_properties,
group2_properties,
group3_properties,
group4_properties,
group0_created_at,
group1_created_at,
group2_created_at,
group3_created_at,
group4_created_at,
person_mode,
_timestamp,
_timestamp_ms,
_offset,
_partition
FROM {database}.kafka_events_recent_json
""".format(
target_table=EVENTS_RECENT_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
database=settings.CLICKHOUSE_DATABASE,
)
)

EVENTS_RECENT_TABLE_SQL = lambda: (
EVENTS_TABLE_BASE_SQL
+ """PARTITION BY toStartOfHour(_timestamp)
ORDER BY (team_id, toStartOfHour(_timestamp), event, cityHash64(distinct_id), cityHash64(uuid))
TTL _timestamp + INTERVAL 7 DAY
{storage_policy}
"""
).format(
table_name=EVENTS_RECENT_DATA_TABLE(),
cluster=settings.CLICKHOUSE_CLUSTER,
engine=ReplacingMergeTree(EVENTS_RECENT_DATA_TABLE(), ver="_timestamp"),
extra_fields=KAFKA_COLUMNS_WITH_PARTITION + INSERTED_AT_COLUMN + f", {KAFKA_TIMESTAMP_MS_COLUMN}",
materialized_columns="",
indexes="",
storage_policy=STORAGE_POLICY(),
)

DISTRIBUTED_EVENTS_RECENT_TABLE_SQL = lambda: EVENTS_TABLE_BASE_SQL.format(
table_name="distributed_events_recent",
cluster=settings.CLICKHOUSE_CLUSTER,
engine=Distributed(data_table=EVENTS_RECENT_DATA_TABLE(), sharding_key="sipHash64(distinct_id)"),
extra_fields=KAFKA_COLUMNS_WITH_PARTITION + INSERTED_AT_COLUMN + f", {KAFKA_TIMESTAMP_MS_COLUMN}",
materialized_columns="",
indexes="",
)

# Distributed engine tables are only created if CLICKHOUSE_REPLICATED

# This table is responsible for writing to sharded_events based on a sharding key.
Expand Down

0 comments on commit 4ff32c5

Please sign in to comment.