Skip to content

Commit

Permalink
feat(persons-on-events): Add Kafka table and materialized view for di…
Browse files Browse the repository at this point in the history
…stinct ID overrides
  • Loading branch information
tkaemming committed Feb 14, 2024
1 parent 8d467d2 commit cbb3039
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.person.sql import (
KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
PERSON_DISTINCT_ID_OVERRIDES_MV_SQL,
)

operations = [
run_sql_with_exceptions(KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL()),
run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_MV_SQL),
]
2 changes: 2 additions & 0 deletions posthog/clickhouse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
KAFKA_PERSON_OVERRIDES_TABLE_SQL,
KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL,
KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL,
KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL,
KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL,
KAFKA_INGESTION_WARNINGS_TABLE_SQL,
Expand All @@ -101,6 +102,7 @@
PERSON_OVERRIDES_CREATE_MATERIALIZED_VIEW_SQL,
PERSONS_DISTINCT_ID_TABLE_MV_SQL,
PERSON_DISTINCT_ID2_MV_SQL,
PERSON_DISTINCT_ID_OVERRIDES_MV_SQL,
PLUGIN_LOG_ENTRIES_TABLE_MV_SQL,
SESSION_RECORDING_EVENTS_TABLE_MV_SQL,
INGESTION_WARNINGS_MV_TABLE_SQL,
Expand Down
27 changes: 27 additions & 0 deletions posthog/models/person/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,33 @@
""",
)

KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = lambda: PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL.format(
table_name="kafka_" + PERSON_DISTINCT_ID_OVERRIDES_TABLE,
cluster=CLICKHOUSE_CLUSTER,
engine=kafka_engine(KAFKA_PERSON_DISTINCT_ID, group="clickhouse-person-distinct-id-overrides"),
extra_fields="",
)

PERSON_DISTINCT_ID_OVERRIDES_MV_SQL = """
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}'
TO {database}.{table_name}
AS SELECT
team_id,
distinct_id,
person_id,
is_deleted,
version,
_timestamp,
_offset,
_partition
FROM {database}.kafka_{table_name}
WHERE version > 0 -- only store updated rows, not newly inserted ones
""".format(
table_name=PERSON_DISTINCT_ID_OVERRIDES_TABLE,
cluster=CLICKHOUSE_CLUSTER,
database=CLICKHOUSE_DATABASE,
)

#
# Static Cohort
#
Expand Down

0 comments on commit cbb3039

Please sign in to comment.