diff --git a/posthog/clickhouse/migrations/0057_events_person_mode.py b/posthog/clickhouse/migrations/0057_events_person_mode.py new file mode 100644 index 00000000000000..75ca199536da47 --- /dev/null +++ b/posthog/clickhouse/migrations/0057_events_person_mode.py @@ -0,0 +1,30 @@ +from infi.clickhouse_orm import migrations + +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions +from posthog.client import sync_execute +from posthog.models.event.sql import ( + EVENTS_TABLE_JSON_MV_SQL, + KAFKA_EVENTS_TABLE_JSON_SQL, +) +from posthog.settings import CLICKHOUSE_CLUSTER + + +ADD_COLUMNS_BASE_SQL = """ +ALTER TABLE {table} ON CLUSTER {cluster} +ADD COLUMN IF NOT EXISTS person_mode Enum8('full' = 0, 'propertyless' = 1) DEFAULT 'full' +""" + + +def add_columns_to_required_tables(_): + sync_execute(ADD_COLUMNS_BASE_SQL.format(table="events", cluster=CLICKHOUSE_CLUSTER)) + sync_execute(ADD_COLUMNS_BASE_SQL.format(table="writable_events", cluster=CLICKHOUSE_CLUSTER)) + sync_execute(ADD_COLUMNS_BASE_SQL.format(table="sharded_events", cluster=CLICKHOUSE_CLUSTER)) + + +operations = [ + run_sql_with_exceptions(f"DROP TABLE IF EXISTS events_json_mv ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + run_sql_with_exceptions(f"DROP TABLE IF EXISTS kafka_events_json ON CLUSTER '{CLICKHOUSE_CLUSTER}'"), + migrations.RunPython(add_columns_to_required_tables), + run_sql_with_exceptions(KAFKA_EVENTS_TABLE_JSON_SQL()), + run_sql_with_exceptions(EVENTS_TABLE_JSON_MV_SQL()), +] diff --git a/posthog/models/event/sql.py b/posthog/models/event/sql.py index 8214ac90fdce0d..8c20bb6d9f7e01 100644 --- a/posthog/models/event/sql.py +++ b/posthog/models/event/sql.py @@ -48,7 +48,8 @@ group1_created_at DateTime64, group2_created_at DateTime64, group3_created_at DateTime64, - group4_created_at DateTime64 + group4_created_at DateTime64, + person_mode Enum8('full' = 0, 'propertyless' = 1) DEFAULT 'full' {materialized_columns} {extra_fields} {indexes} @@ -162,6 +163,7 @@ group2_created_at, group3_created_at, group4_created_at, +person_mode, NOW64() AS inserted_at, _timestamp, _offset @@ -219,6 +221,7 @@ group2_created_at, group3_created_at, group4_created_at, + person_mode, created_at, _timestamp, _offset @@ -245,6 +248,7 @@ %(group2_created_at)s, %(group3_created_at)s, %(group4_created_at)s, + %(person_mode)s, %(created_at)s, now(), 0 @@ -276,6 +280,7 @@ group2_created_at, group3_created_at, group4_created_at, + person_mode, created_at, _timestamp, _offset @@ -415,5 +420,5 @@ table_name=WRITABLE_EVENTS_DATA_TABLE(), columns_except_team_id="""uuid, event, properties, timestamp, distinct_id, elements_chain, created_at, person_id, person_created_at, person_properties, group0_properties, group1_properties, group2_properties, group3_properties, group4_properties, - group0_created_at, group1_created_at, group2_created_at, group3_created_at, group4_created_at""", + group0_created_at, group1_created_at, group2_created_at, group3_created_at, group4_created_at, person_mode""", ) diff --git a/posthog/models/event/util.py b/posthog/models/event/util.py index 9fe98305d693ac..a5cb4be15c9e56 100644 --- a/posthog/models/event/util.py +++ b/posthog/models/event/util.py @@ -79,6 +79,7 @@ def create_event( "group2_created_at": format_clickhouse_timestamp(group2_created_at, ZERO_DATE), "group3_created_at": format_clickhouse_timestamp(group3_created_at, ZERO_DATE), "group4_created_at": format_clickhouse_timestamp(group4_created_at, ZERO_DATE), + "person_mode": "full", } p = ClickhouseProducer() p.produce(topic=KAFKA_EVENTS_JSON, sql=INSERT_EVENT_SQL(), data=data) @@ -153,6 +154,7 @@ def bulk_create_events(events: List[Dict[str, Any]], person_mapping: Optional[Di %(group2_created_at_{i})s, %(group3_created_at_{i})s, %(group4_created_at_{i})s, + %(person_mode_{i})s, %(created_at_{i})s, now(), 0 @@ -249,6 +251,7 @@ def bulk_create_events(events: List[Dict[str, Any]], person_mapping: Optional[Di "group4_created_at": event["group4_created_at"] if event.get("group4_created_at") else datetime64_default_timestamp, + "person_mode": "full", } params = {