diff --git a/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py b/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py new file mode 100644 index 0000000000000..d9e2552d9b572 --- /dev/null +++ b/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py @@ -0,0 +1,10 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.models.person.sql import ( + KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, + PERSON_DISTINCT_ID_OVERRIDES_MV_SQL, +) + +operations = [ + run_sql_with_exceptions(KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL()), + run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_MV_SQL), +] diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py index b40934a427a8a..a2f0db7eaede3 100644 --- a/posthog/clickhouse/schema.py +++ b/posthog/clickhouse/schema.py @@ -85,6 +85,7 @@ KAFKA_PERSON_OVERRIDES_TABLE_SQL, KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL, KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL, + KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL, KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL, KAFKA_INGESTION_WARNINGS_TABLE_SQL, @@ -101,6 +102,7 @@ PERSON_OVERRIDES_CREATE_MATERIALIZED_VIEW_SQL, PERSONS_DISTINCT_ID_TABLE_MV_SQL, PERSON_DISTINCT_ID2_MV_SQL, + PERSON_DISTINCT_ID_OVERRIDES_MV_SQL, PLUGIN_LOG_ENTRIES_TABLE_MV_SQL, SESSION_RECORDING_EVENTS_TABLE_MV_SQL, INGESTION_WARNINGS_MV_TABLE_SQL, diff --git a/posthog/clickhouse/test/__snapshots__/test_schema.ambr b/posthog/clickhouse/test/__snapshots__/test_schema.ambr index 6188a647a214e..fa2c314d1b266 100644 --- a/posthog/clickhouse/test/__snapshots__/test_schema.ambr +++ b/posthog/clickhouse/test/__snapshots__/test_schema.ambr @@ -278,6 +278,21 @@ ''' # --- +# name: test_create_kafka_table_with_different_kafka_host[kafka_person_distinct_id_overrides] + ''' + + CREATE TABLE IF NOT EXISTS kafka_person_distinct_id_overrides ON CLUSTER 'posthog' + ( + team_id Int64, + distinct_id VARCHAR, + person_id UUID, + is_deleted Int8, + version Int64 + + ) ENGINE = Kafka('test.kafka.broker:9092', 'clickhouse_person_distinct_id_test', 'clickhouse-person-distinct-id-overrides', 'JSONEachRow') + + ''' +# --- # name: test_create_kafka_table_with_different_kafka_host[kafka_person_overrides] ''' @@ -910,6 +925,21 @@ ''' # --- +# name: test_create_table_query[kafka_person_distinct_id_overrides] + ''' + + CREATE TABLE IF NOT EXISTS kafka_person_distinct_id_overrides ON CLUSTER 'posthog' + ( + team_id Int64, + distinct_id VARCHAR, + person_id UUID, + is_deleted Int8, + version Int64 + + ) ENGINE = Kafka('kafka:9092', 'clickhouse_person_distinct_id_test', 'clickhouse-person-distinct-id-overrides', 'JSONEachRow') + + ''' +# --- # name: test_create_table_query[kafka_person_overrides] ''' @@ -1262,6 +1292,25 @@ ''' # --- +# name: test_create_table_query[person_distinct_id_overrides_mv] + ''' + + CREATE MATERIALIZED VIEW IF NOT EXISTS person_distinct_id_overrides_mv ON CLUSTER 'posthog' + TO posthog_test.person_distinct_id_overrides + AS SELECT + team_id, + distinct_id, + person_id, + is_deleted, + version, + _timestamp, + _offset, + _partition + FROM posthog_test.kafka_person_distinct_id_overrides + WHERE version > 0 -- only store updated rows, not newly inserted ones + + ''' +# --- # name: test_create_table_query[person_mv] ''' diff --git a/posthog/models/person/sql.py b/posthog/models/person/sql.py index fc70aa0d5a419..bc3565ab3d722 100644 --- a/posthog/models/person/sql.py +++ b/posthog/models/person/sql.py @@ -259,6 +259,33 @@ """, ) +KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = lambda: PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL.format( + table_name="kafka_" + PERSON_DISTINCT_ID_OVERRIDES_TABLE, + cluster=CLICKHOUSE_CLUSTER, + engine=kafka_engine(KAFKA_PERSON_DISTINCT_ID, group="clickhouse-person-distinct-id-overrides"), + extra_fields="", +) + +PERSON_DISTINCT_ID_OVERRIDES_MV_SQL = """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}' +TO {database}.{table_name} +AS SELECT +team_id, +distinct_id, +person_id, +is_deleted, +version, +_timestamp, +_offset, +_partition +FROM {database}.kafka_{table_name} +WHERE version > 0 -- only store updated rows, not newly inserted ones +""".format( + table_name=PERSON_DISTINCT_ID_OVERRIDES_TABLE, + cluster=CLICKHOUSE_CLUSTER, + database=CLICKHOUSE_DATABASE, +) + # # Static Cohort #