diff --git a/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py b/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py new file mode 100644 index 0000000000000..d9e2552d9b572 --- /dev/null +++ b/posthog/clickhouse/migrations/0054_add_person_distinct_id_overrides_consumer.py @@ -0,0 +1,10 @@ +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.models.person.sql import ( + KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, + PERSON_DISTINCT_ID_OVERRIDES_MV_SQL, +) + +operations = [ + run_sql_with_exceptions(KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL()), + run_sql_with_exceptions(PERSON_DISTINCT_ID_OVERRIDES_MV_SQL), +] diff --git a/posthog/clickhouse/schema.py b/posthog/clickhouse/schema.py index b40934a427a8a..a2f0db7eaede3 100644 --- a/posthog/clickhouse/schema.py +++ b/posthog/clickhouse/schema.py @@ -85,6 +85,7 @@ KAFKA_PERSON_OVERRIDES_TABLE_SQL, KAFKA_PERSONS_DISTINCT_ID_TABLE_SQL, KAFKA_PERSON_DISTINCT_ID2_TABLE_SQL, + KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL, KAFKA_PLUGIN_LOG_ENTRIES_TABLE_SQL, KAFKA_SESSION_RECORDING_EVENTS_TABLE_SQL, KAFKA_INGESTION_WARNINGS_TABLE_SQL, @@ -101,6 +102,7 @@ PERSON_OVERRIDES_CREATE_MATERIALIZED_VIEW_SQL, PERSONS_DISTINCT_ID_TABLE_MV_SQL, PERSON_DISTINCT_ID2_MV_SQL, + PERSON_DISTINCT_ID_OVERRIDES_MV_SQL, PLUGIN_LOG_ENTRIES_TABLE_MV_SQL, SESSION_RECORDING_EVENTS_TABLE_MV_SQL, INGESTION_WARNINGS_MV_TABLE_SQL, diff --git a/posthog/models/person/sql.py b/posthog/models/person/sql.py index fc70aa0d5a419..bc3565ab3d722 100644 --- a/posthog/models/person/sql.py +++ b/posthog/models/person/sql.py @@ -259,6 +259,33 @@ """, ) +KAFKA_PERSON_DISTINCT_ID_OVERRIDES_TABLE_SQL = lambda: PERSON_DISTINCT_ID_OVERRIDES_TABLE_BASE_SQL.format( + table_name="kafka_" + PERSON_DISTINCT_ID_OVERRIDES_TABLE, + cluster=CLICKHOUSE_CLUSTER, + engine=kafka_engine(KAFKA_PERSON_DISTINCT_ID, group="clickhouse-person-distinct-id-overrides"), + extra_fields="", +) + +PERSON_DISTINCT_ID_OVERRIDES_MV_SQL = """ +CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}_mv ON CLUSTER '{cluster}' +TO {database}.{table_name} +AS SELECT +team_id, +distinct_id, +person_id, +is_deleted, +version, +_timestamp, +_offset, +_partition +FROM {database}.kafka_{table_name} +WHERE version > 0 -- only store updated rows, not newly inserted ones +""".format( + table_name=PERSON_DISTINCT_ID_OVERRIDES_TABLE, + cluster=CLICKHOUSE_CLUSTER, + database=CLICKHOUSE_DATABASE, +) + # # Static Cohort #