diff --git a/posthog/clickhouse/migrations/0053_add_person_distinct_id_overrides_table.py b/posthog/clickhouse/migrations/0053_add_person_distinct_id_overrides_table.py new file mode 100644 index 0000000000000..1889e4305d6ae --- /dev/null +++ b/posthog/clickhouse/migrations/0053_add_person_distinct_id_overrides_table.py @@ -0,0 +1,6 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.models.person.sql import PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL + +operations = [ + run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL()), +] diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py index e6ed91af60c97..b40934a427a8a 100644 --- a/posthog/clickhouse/schema.py +++ b/posthog/clickhouse/schema.py @@ -56,6 +56,7 @@ PERSON_OVERRIDES_CREATE_TABLE_SQL, PERSONS_DISTINCT_ID_TABLE_SQL, PERSON_DISTINCT_ID2_TABLE_SQL, + PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, PLUGIN_LOG_ENTRIES_TABLE_SQL, SESSION_RECORDING_EVENTS_TABLE_SQL, INGESTION_WARNINGS_DATA_TABLE_SQL, diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index c445d3dbbe5e5..6188a647a214e 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -1237,6 +1237,31 @@ ''' # --- +# name: test_create_table_query[person_distinct_id_overrides] + ''' + + CREATE TABLE IF NOT EXISTS person_distinct_id_overrides ON CLUSTER 'posthog' + ( + team_id Int64, + distinct_id VARCHAR, + person_id UUID, + is_deleted Int8, + version Int64 + + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + , INDEX kafka_timestamp_minmax_person_distinct_id_overrides _timestamp TYPE minmax GRANULARITY 3 + + ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.person_distinct_id_overrides', '{replica}-{shard}', version) + + ORDER BY (team_id, distinct_id) + SETTINGS index_granularity = 512 + + ''' +# --- # name: test_create_table_query[person_mv] ''' @@ -2106,6 +2131,31 @@ ''' # --- +# name: test_create_table_query_replicated_and_storage[person_distinct_id_overrides] + ''' + + CREATE TABLE IF NOT EXISTS person_distinct_id_overrides ON CLUSTER 'posthog' + ( + team_id Int64, + distinct_id VARCHAR, + person_id UUID, + is_deleted Int8, + version Int64 + + + , _timestamp DateTime + , _offset UInt64 + , _partition UInt64 + + , INDEX kafka_timestamp_minmax_person_distinct_id_overrides _timestamp TYPE minmax GRANULARITY 3 + + ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/77f1df52-4b43-11e9-910f-b8ca3a9b9f3e_noshard/posthog.person_distinct_id_overrides', '{replica}-{shard}', version) + + ORDER BY (team_id, distinct_id) + SETTINGS index_granularity = 512 + + ''' +# --- # name: test_create_table_query_replicated_and_storage[person_overrides] ''' diff --git a/posthog/models/person/sql.py b/posthog/models/person/sql.py index 41cd2e36a54f6..fc70aa0d5a419 100644 --- a/posthog/models/person/sql.py +++ b/posthog/models/person/sql.py @@ -1,6 +1,6 @@ from posthog.clickhouse.base_sql import COPY_ROWS_BETWEEN_TEAMS_BASE_SQL from posthog.clickhouse.indexes import index_by_kafka_timestamp -from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, STORAGE_POLICY, kafka_engine +from posthog.clickhouse.kafka_engine import KAFKA_COLUMNS, KAFKA_COLUMNS_WITH_PARTITION, STORAGE_POLICY, kafka_engine from posthog.clickhouse.table_engines import CollapsingMergeTree, ReplacingMergeTree from posthog.kafka_client.topics import ( KAFKA_PERSON, @@ -170,6 +170,7 @@ PERSON_DISTINCT_ID2_TABLE = "person_distinct_id2" +# NOTE: This table base SQL is also used for distinct ID overrides! PERSON_DISTINCT_ID2_TABLE_BASE_SQL = """ CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER '{cluster}' ( @@ -228,6 +229,36 @@ database=CLICKHOUSE_DATABASE, ) +# +# person_distinct_id_overrides: This table contains rows for all (team_id, +# distinct_id) pairs where the person_id has changed and those updates have not +# yet been integrated back into the events table via squashing. +# + +PERSON_DISTINCT_ID_OVERRIDES_TABLE = "person_distinct_id_overrides" + +PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL = PERSON_DISTINCT_ID2_TABLE_BASE_SQL + +PERSON_DISTINCT_ID_OVERRIDES_TABLE_ENGINE = lambda: ReplacingMergeTree( + PERSON_DISTINCT_ID_OVERRIDES_TABLE, ver="version" +) + +PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = lambda: ( + PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL + + """ + ORDER BY (team_id, distinct_id) + SETTINGS index_granularity = 512 + """ +).format( + table_name=PERSON_DISTINCT_ID_OVERRIDES_TABLE, + cluster=CLICKHOUSE_CLUSTER, + engine=PERSON_DISTINCT_ID_OVERRIDES_TABLE_ENGINE(), + extra_fields=f""" + {KAFKA_COLUMNS_WITH_PARTITION} + , {index_by_kafka_timestamp(PERSON_DISTINCT_ID_OVERRIDES_TABLE)} + """, +) + # # Static Cohort #