Skip to content

Commit

Permalink
feat(persons-on-events): Add ClickHouse table for tracking distinct I…
Browse files Browse the repository at this point in the history
…D overrides (without Kafka ingestion) (#20326)
  • Loading branch information
tkaemming authored Feb 14, 2024
1 parent ca20bac commit 9f838f9
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL

operations = [
run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL()),
]
1 change: 1 addition & 0 deletions posthog/clickhouse/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
PERSON_OVERRIDES_CREATE_TABLE_SQL,
PERSONS_DISTINCT_ID_TABLE_SQL,
PERSON_DISTINCT_ID2_TABLE_SQL,
PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL,
PLUGIN_LOG_ENTRIES_TABLE_SQL,
SESSION_RECORDING_EVENTS_TABLE_SQL,
INGESTION_WARNINGS_DATA_TABLE_SQL,
Expand Down
50 changes: 50 additions & 0 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,31 @@

'''
# ---
# name: test_create_table_query[person_distinct_id_overrides]
'''

CREATE TABLE IF NOT EXISTS person_distinct_id_overrides ON CLUSTER 'posthog'
(
team_id Int64,
distinct_id VARCHAR,
person_id UUID,
is_deleted Int8,
version Int64


, _timestamp DateTime
, _offset UInt64
, _partition UInt64

, INDEX kafka_timestamp_minmax_person_distinct_id_overrides _timestamp TYPE minmax GRANULARITY 3

) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.person_distinct_id_overrides', '{replica}-{shard}', version)

ORDER BY (team_id, distinct_id)
SETTINGS index_granularity = 512

'''
# ---
# name: test_create_table_query[person_mv]
'''

Expand Down Expand Up @@ -2106,6 +2131,31 @@

'''
# ---
# name: test_create_table_query_replicated_and_storage[person_distinct_id_overrides]
'''

CREATE TABLE IF NOT EXISTS person_distinct_id_overrides ON CLUSTER 'posthog'
(
team_id Int64,
distinct_id VARCHAR,
person_id UUID,
is_deleted Int8,
version Int64


, _timestamp DateTime
, _offset UInt64
, _partition UInt64

, INDEX kafka_timestamp_minmax_person_distinct_id_overrides _timestamp TYPE minmax GRANULARITY 3

) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.person_distinct_id_overrides', '{replica}-{shard}', version)

ORDER BY (team_id, distinct_id)
SETTINGS index_granularity = 512

'''
# ---
# name: test_create_table_query_replicated_and_storage[person_overrides]
'''

Expand Down
33 changes: 32 additions & 1 deletion posthog/models/person/sql.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL
from posthog.clickhouse.indexes import index_by_kafka_timestamp
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine
from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, KAFKA_COLUMNS_WITH_PARTITION, STORAGE_POLICY, kafka_engine
from posthog.clickhouse.table_engines import CollapsingMergeTree, ReplacingMergeTree
from posthog.kafka_client.topics import (
KAFKA_PERSON,
Expand Down Expand Up @@ -170,6 +170,7 @@

PERSON_DISTINCT_ID2_TABLE = "person_distinct_id2"

# NOTE: This table base SQL is also used for distinct ID overrides!
PERSON_DISTINCT_ID2_TABLE_BASE_SQL = """
CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}'
(
Expand Down Expand Up @@ -228,6 +229,36 @@
database=CLICKHOUSE_DATABASE,
)

#
# person_distinct_id_overrides: This table contains rows for all (team_id,
# distinct_id) pairs where the person_id has changed and those updates have not
# yet been integrated back into the events table via squashing.
#

PERSON_DISTINCT_ID_OVERRIDES_TABLE = "person_distinct_id_overrides"

PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL = PERSON_DISTINCT_ID2_TABLE_BASE_SQL

PERSON_DISTINCT_ID_OVERRIDES_TABLE_ENGINE = lambda: ReplacingMergeTree(
PERSON_DISTINCT_ID_OVERRIDES_TABLE, ver="version"
)

PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = lambda: (
PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL
+ """
ORDER BY (team_id, distinct_id)
SETTINGS index_granularity = 512
"""
).format(
table_name=PERSON_DISTINCT_ID_OVERRIDES_TABLE,
cluster=CLICKHOUSE_CLUSTER,
engine=PERSON_DISTINCT_ID_OVERRIDES_TABLE_ENGINE(),
extra_fields=f"""
{KAFKA_COLUMNS_WITH_PARTITION}
, {index_by_kafka_timestamp(PERSON_DISTINCT_ID_OVERRIDES_TABLE)}
""",
)

#
# Static Cohort
#
Expand Down

0 comments on commit 9f838f9

Please sign in to comment.