diff --git a/ee/benchmarks/helpers.py b/ee/benchmarks/helpers.py index 285a1dc97ee9f..8535e6adef47d 100644 --- a/ee/benchmarks/helpers.py +++ b/ee/benchmarks/helpers.py @@ -14,7 +14,7 @@ django.setup() -from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns # noqa: E402 +from ee.clickhouse.materialized_columns.columns import get_enabled_materialized_columns # noqa: E402 from posthog import client # noqa: E402 from posthog.clickhouse.query_tagging import reset_query_tags, tag_queries # noqa: E402 from posthog.models.utils import UUIDT # noqa: E402 diff --git a/ee/clickhouse/materialized_columns/analyze.py b/ee/clickhouse/materialized_columns/analyze.py index 43a1e83256912..bfae76ef2432c 100644 --- a/ee/clickhouse/materialized_columns/analyze.py +++ b/ee/clickhouse/materialized_columns/analyze.py @@ -171,6 +171,7 @@ def materialize_properties_task( backfill_period_days: int = MATERIALIZE_COLUMNS_BACKFILL_PERIOD_DAYS, dry_run: bool = False, team_id_to_analyze: Optional[int] = None, + is_nullable: bool = False, ) -> None: """ Creates materialized columns for event and person properties based off of slow queries @@ -203,7 +204,7 @@ def materialize_properties_task( logger.info(f"Materializing column. table={table}, property_name={property_name}") if not dry_run: - materialize(table, property_name, table_column=table_column) + materialize(table, property_name, table_column=table_column, is_nullable=is_nullable) properties[table].append((property_name, table_column)) if backfill_period_days > 0 and not dry_run: diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 592d17b1cb8f1..caa5cae1401c0 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -6,18 +6,18 @@ from copy import copy from dataclasses import dataclass, replace from datetime import timedelta -from typing import Any, Literal, NamedTuple, TypeVar, cast +from typing import Any, Literal, TypeVar, cast from clickhouse_driver import Client from django.utils.timezone import now +from posthog.cache_utils import cache_for from posthog.clickhouse.client.connection import default_client from posthog.clickhouse.cluster import ClickhouseCluster, ConnectionInfo, FuturesMap, HostInfo from posthog.clickhouse.kafka_engine import trim_quotes_expr from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns from posthog.client import sync_execute from posthog.models.event.sql import EVENTS_DATA_TABLE -from posthog.models.instance_setting import get_instance_setting from posthog.models.person.sql import PERSONS_TABLE from posthog.models.property import PropertyName, TableColumn, TableWithProperties from posthog.models.utils import generate_random_short_suffix @@ -30,8 +30,6 @@ DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties" -TRIM_AND_EXTRACT_PROPERTY = trim_quotes_expr("JSONExtractRaw({table_column}, %(property)s)") - SHORT_TABLE_COLUMN_NAME = { "properties": "p", "group_properties": "gp", @@ -44,15 +42,36 @@ } -class MaterializedColumn(NamedTuple): +@dataclass +class MaterializedColumn: name: ColumnName details: MaterializedColumnDetails + is_nullable: bool + + @property + def type(self) -> str: + if self.is_nullable: + return "Nullable(String)" + else: + return "String" + + def get_expression_and_parameters(self) -> tuple[str, dict[str, Any]]: + if self.is_nullable: + return ( + f"JSONExtract({self.details.table_column}, %(property_name)s, %(property_type)s)", + {"property_name": self.details.property_name, "property_type": self.type}, + ) + else: + return ( + trim_quotes_expr(f"JSONExtractRaw({self.details.table_column}, %(property)s)"), + {"property": self.details.property_name}, + ) @staticmethod def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn]: rows = sync_execute( """ - SELECT name, comment + SELECT name, comment, type like 'Nullable(%%)' as is_nullable FROM system.columns WHERE database = %(database)s AND table = %(table)s @@ -62,8 +81,8 @@ def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn {"database": CLICKHOUSE_DATABASE, "table": table}, ) - for name, comment in rows: - yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment)) + for name, comment, is_nullable in rows: + yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment), is_nullable) @staticmethod def get(table: TablesWithMaterializedColumns, column_name: ColumnName) -> MaterializedColumn: @@ -115,18 +134,20 @@ def from_column_comment(cls, comment: str) -> MaterializedColumnDetails: def get_materialized_columns( table: TablesWithMaterializedColumns, - exclude_disabled_columns: bool = False, -) -> dict[tuple[PropertyName, TableColumn], ColumnName]: - if not get_instance_setting("MATERIALIZED_COLUMNS_ENABLED"): - return {} - +) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]: return { - (column.details.property_name, column.details.table_column): column.name + (column.details.property_name, column.details.table_column): column for column in MaterializedColumn.get_all(table) - if not (exclude_disabled_columns and column.details.is_disabled) } +@cache_for(timedelta(minutes=15)) +def get_enabled_materialized_columns( + table: TablesWithMaterializedColumns, +) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]: + return {k: column for k, column in get_materialized_columns(table).items() if not column.details.is_disabled} + + def get_cluster() -> ClickhouseCluster: extra_hosts = [] for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()): @@ -177,13 +198,10 @@ class CreateColumnOnDataNodesTask: add_column_comment: bool def execute(self, client: Client) -> None: + expression, parameters = self.column.get_expression_and_parameters() actions = [ - f""" - ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR - MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=self.column.details.table_column)} - """, + f"ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type} MATERIALIZED {expression}", ] - parameters = {"property": self.column.details.property_name} if self.add_column_comment: actions.append(f"COMMENT COLUMN {self.column.name} %(comment)s") @@ -209,7 +227,7 @@ def execute(self, client: Client) -> None: client.execute( f""" ALTER TABLE {self.table} - ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR, + ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type}, COMMENT COLUMN {self.column.name} %(comment)s """, {"comment": self.column.details.as_column_comment()}, @@ -223,6 +241,7 @@ def materialize( column_name: ColumnName | None = None, table_column: TableColumn = DEFAULT_TABLE_COLUMN, create_minmax_index=not TEST, + is_nullable: bool = False, ) -> ColumnName | None: if (property, table_column) in get_materialized_columns(table): if TEST: @@ -243,6 +262,7 @@ def materialize( property_name=property, is_disabled=False, ), + is_nullable=is_nullable, ) table_info.map_data_nodes( @@ -283,16 +303,12 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: cluster = get_cluster() table_info = tables[table] + column = MaterializedColumn.get(table, column_name) + cluster.map_all_hosts( UpdateColumnCommentTask( table_info.read_table, - MaterializedColumn( - name=column_name, - details=replace( - MaterializedColumn.get(table, column_name).details, - is_disabled=is_disabled, - ), - ), + replace(column, details=replace(column.details, is_disabled=is_disabled)), ).execute ).result() @@ -388,12 +404,13 @@ def execute(self, client: Client) -> None: # Note that for this to work all inserts should list columns explicitly # Improve this if https://github.com/ClickHouse/ClickHouse/issues/27730 ever gets resolved for column in self.columns: + expression, parameters = column.get_expression_and_parameters() client.execute( f""" ALTER TABLE {self.table} - MODIFY COLUMN {column.name} VARCHAR DEFAULT {TRIM_AND_EXTRACT_PROPERTY.format(table_column=column.details.table_column)} + MODIFY COLUMN {column.name} {column.type} DEFAULT {expression} """, - {"property": column.details.property_name}, + parameters, settings=self.test_settings, ) @@ -463,10 +480,10 @@ def _materialized_column_name( prefix += f"{SHORT_TABLE_COLUMN_NAME[table_column]}_" property_str = re.sub("[^0-9a-zA-Z$]", "_", property) - existing_materialized_columns = set(get_materialized_columns(table).values()) + existing_materialized_column_names = {column.name for column in get_materialized_columns(table).values()} suffix = "" - while f"{prefix}{property_str}{suffix}" in existing_materialized_columns: + while f"{prefix}{property_str}{suffix}" in existing_materialized_column_names: suffix = "_" + generate_random_short_suffix() return f"{prefix}{property_str}{suffix}" diff --git a/ee/clickhouse/materialized_columns/test/test_analyze.py b/ee/clickhouse/materialized_columns/test/test_analyze.py index 6fdb0fb05cb0e..3b225ab670f92 100644 --- a/ee/clickhouse/materialized_columns/test/test_analyze.py +++ b/ee/clickhouse/materialized_columns/test/test_analyze.py @@ -49,9 +49,9 @@ def test_mat_columns(self, patch_backfill, patch_materialize): materialize_properties_task() patch_materialize.assert_has_calls( [ - call("events", "materialize_me", table_column="properties"), - call("events", "materialize_me2", table_column="properties"), - call("events", "materialize_person_prop", table_column="person_properties"), - call("events", "materialize_me3", table_column="properties"), + call("events", "materialize_me", table_column="properties", is_nullable=False), + call("events", "materialize_me2", table_column="properties", is_nullable=False), + call("events", "materialize_person_prop", table_column="person_properties", is_nullable=False), + call("events", "materialize_me3", table_column="properties", is_nullable=False), ] ) diff --git a/ee/clickhouse/materialized_columns/test/test_columns.py b/ee/clickhouse/materialized_columns/test/test_columns.py index 3978d010f7246..993c1a7aa2f65 100644 --- a/ee/clickhouse/materialized_columns/test/test_columns.py +++ b/ee/clickhouse/materialized_columns/test/test_columns.py @@ -11,11 +11,13 @@ MaterializedColumnDetails, backfill_materialized_columns, drop_column, + get_enabled_materialized_columns, get_materialized_columns, materialize, update_column_is_disabled, ) -from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns +from ee.tasks.materialized_columns import mark_all_materialized +from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns from posthog.client import sync_execute from posthog.conftest import create_clickhouse_tables from posthog.constants import GROUP_TYPES_LIMIT @@ -143,11 +145,11 @@ def test_materialized_column_naming(self, mock_choice): ("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_YYYY", ("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_ZZZZ", }, - get_materialized_columns("events"), + {k: column.name for k, column in get_materialized_columns("events").items()}, ) self.assertEqual( - get_materialized_columns("person"), + {k: column.name for k, column in get_materialized_columns("person").items()}, {("SoMePrOp", "properties"): "pmat_SoMePrOp"}, ) @@ -242,20 +244,26 @@ def test_backfilling_data(self): def test_column_types(self): materialize("events", "myprop", create_minmax_index=True) + materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True) - expr = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')" - self.assertEqual(("MATERIALIZED", expr), self._get_column_types("mat_myprop")) + expr_nonnullable = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')" + expr_nullable = "JSONExtract(properties, 'myprop_nullable', 'Nullable(String)')" + self.assertEqual(("String", "MATERIALIZED", expr_nonnullable), self._get_column_types("mat_myprop")) + self.assertEqual( + ("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable") + ) - backfill_materialized_columns("events", [("myprop", "properties")], timedelta(days=50)) - self.assertEqual(("DEFAULT", expr), self._get_column_types("mat_myprop")) + backfill_materialized_columns( + "events", [("myprop", "properties"), ("myprop_nullable", "properties")], timedelta(days=50) + ) + self.assertEqual(("String", "DEFAULT", expr_nonnullable), self._get_column_types("mat_myprop")) + self.assertEqual(("Nullable(String)", "DEFAULT", expr_nullable), self._get_column_types("mat_myprop_nullable")) - try: - from ee.tasks.materialized_columns import mark_all_materialized - except ImportError: - pass - else: - mark_all_materialized() - self.assertEqual(("MATERIALIZED", expr), self._get_column_types("mat_myprop")) + mark_all_materialized() + self.assertEqual(("String", "MATERIALIZED", expr_nonnullable), self._get_column_types("mat_myprop")) + self.assertEqual( + ("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable") + ) def _count_materialized_rows(self, column): return sync_execute( @@ -285,7 +293,7 @@ def _get_count_of_mutations_running(self) -> int: def _get_column_types(self, column: str): return sync_execute( """ - SELECT default_kind, default_expression + SELECT type, default_kind, default_expression FROM system.columns WHERE database = %(database)s AND table = %(table)s AND name = %(column)s """, @@ -307,34 +315,34 @@ def test_lifecycle(self): # ensure it exists everywhere key = (property, source_column) - assert get_materialized_columns(table)[key] == destination_column + assert get_materialized_columns(table)[key].name == destination_column assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( destination_column, MaterializedColumnDetails(source_column, property, is_disabled=False), + is_nullable=False, ) # disable it and ensure updates apply as needed update_column_is_disabled(table, destination_column, is_disabled=True) - assert get_materialized_columns(table)[key] == destination_column - assert key not in get_materialized_columns(table, exclude_disabled_columns=True) + assert get_materialized_columns(table)[key].name == destination_column assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( destination_column, MaterializedColumnDetails(source_column, property, is_disabled=True), + is_nullable=False, ) # re-enable it and ensure updates apply as needed update_column_is_disabled(table, destination_column, is_disabled=False) - assert get_materialized_columns(table, exclude_disabled_columns=False)[key] == destination_column - assert get_materialized_columns(table, exclude_disabled_columns=True)[key] == destination_column + assert get_materialized_columns(table)[key].name == destination_column assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( destination_column, MaterializedColumnDetails(source_column, property, is_disabled=False), + is_nullable=False, ) # drop it and ensure updates apply as needed drop_column(table, destination_column) - assert key not in get_materialized_columns(table, exclude_disabled_columns=False) - assert key not in get_materialized_columns(table, exclude_disabled_columns=True) + assert key not in get_materialized_columns(table) with self.assertRaises(ValueError): MaterializedColumn.get(table, destination_column) diff --git a/ee/clickhouse/queries/funnels/funnel_correlation.py b/ee/clickhouse/queries/funnels/funnel_correlation.py index 3e5b69005d689..0b909c84b398e 100644 --- a/ee/clickhouse/queries/funnels/funnel_correlation.py +++ b/ee/clickhouse/queries/funnels/funnel_correlation.py @@ -13,7 +13,7 @@ from ee.clickhouse.queries.column_optimizer import EnterpriseColumnOptimizer from ee.clickhouse.queries.groups_join_query import GroupsJoinQuery -from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns +from posthog.clickhouse.materialized_columns import get_materialized_column_for_property from posthog.constants import ( AUTOCAPTURE_EVENT, TREND_FILTER_TYPE_ACTIONS, @@ -156,8 +156,6 @@ def properties_to_include(self) -> list[str]: ): # When dealing with properties, make sure funnel response comes with properties # so we don't have to join on persons/groups to get these properties again - mat_event_cols = get_enabled_materialized_columns("events") - for property_name in cast(list, self._filter.correlation_property_names): if self._filter.aggregation_group_type_index is not None: continue # We don't support group properties on events at this time @@ -165,10 +163,11 @@ def properties_to_include(self) -> list[str]: if "$all" == property_name: return [f"person_properties"] - possible_mat_col = mat_event_cols.get((property_name, "person_properties")) - - if possible_mat_col is not None: - props_to_include.append(possible_mat_col) + possible_mat_col = get_materialized_column_for_property( + "events", "person_properties", property_name + ) + if possible_mat_col is not None and not possible_mat_col.is_nullable: + props_to_include.append(possible_mat_col.name) else: props_to_include.append(f"person_properties") diff --git a/ee/management/commands/materialize_columns.py b/ee/management/commands/materialize_columns.py index c1ca3b3fd2287..5ddbf55dea2b7 100644 --- a/ee/management/commands/materialize_columns.py +++ b/ee/management/commands/materialize_columns.py @@ -1,3 +1,4 @@ +import argparse import logging from django.core.management.base import BaseCommand @@ -69,8 +70,14 @@ def add_arguments(self, parser): default=MATERIALIZE_COLUMNS_MAX_AT_ONCE, help="Max number of columns to materialize via single invocation. Same as MATERIALIZE_COLUMNS_MAX_AT_ONCE env variable.", ) + parser.add_argument( + "--nullable", + action=argparse.BooleanOptionalAction, + default=True, + dest="is_nullable", + ) - def handle(self, *args, **options): + def handle(self, *, is_nullable: bool, **options): logger.setLevel(logging.INFO) if options["dry_run"]: @@ -90,6 +97,7 @@ def handle(self, *args, **options): ], backfill_period_days=options["backfill_period"], dry_run=options["dry_run"], + is_nullable=is_nullable, ) else: materialize_properties_task( @@ -99,4 +107,5 @@ def handle(self, *args, **options): backfill_period_days=options["backfill_period"], dry_run=options["dry_run"], team_id_to_analyze=options["analyze_team_id"], + is_nullable=is_nullable, ) diff --git a/ee/tasks/materialized_columns.py b/ee/tasks/materialized_columns.py index d05cdddc0b0ca..98091c3b1d00a 100644 --- a/ee/tasks/materialized_columns.py +++ b/ee/tasks/materialized_columns.py @@ -1,50 +1,49 @@ +from collections.abc import Iterator +from dataclasses import dataclass from celery.utils.log import get_task_logger +from clickhouse_driver import Client -from ee.clickhouse.materialized_columns.columns import ( - TRIM_AND_EXTRACT_PROPERTY, - get_materialized_columns, -) +from ee.clickhouse.materialized_columns.columns import MaterializedColumn, get_cluster, tables as table_infos from posthog.client import sync_execute -from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE +from posthog.settings import CLICKHOUSE_DATABASE from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns logger = get_task_logger(__name__) -def mark_all_materialized() -> None: - if any_ongoing_mutations(): - logger.info("There are running mutations, skipping marking as materialized") - return - - for ( - table, - property_name, - table_column, - column_name, - ) in get_materialized_columns_with_default_expression(): - updated_table = "sharded_events" if table == "events" else table - - # :TRICKY: On cloud, we ON CLUSTER updates to events/sharded_events but not to persons. Why? ¯\_(ツ)_/¯ - execute_on_cluster = f"ON CLUSTER '{CLICKHOUSE_CLUSTER}'" if table == "events" else "" - - sync_execute( - f""" - ALTER TABLE {updated_table} - {execute_on_cluster} - MODIFY COLUMN - {column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} - """, - {"property": property_name}, +@dataclass +class MarkMaterializedTask: + table: str + column: MaterializedColumn + + def execute(self, client: Client) -> None: + expression, parameters = self.column.get_expression_and_parameters() + client.execute( + f"ALTER TABLE {self.table} MODIFY COLUMN {self.column.name} {self.column.type} MATERIALIZED {expression}", + parameters, ) -def get_materialized_columns_with_default_expression(): - tables: list[TablesWithMaterializedColumns] = ["events", "person"] - for table in tables: - materialized_columns = get_materialized_columns(table) - for (property_name, table_column), column_name in materialized_columns.items(): - if is_default_expression(table, column_name): - yield table, property_name, table_column, column_name +def mark_all_materialized() -> None: + cluster = get_cluster() + + for table_name, column in get_materialized_columns_with_default_expression(): + table_info = table_infos[table_name] + table_info.map_data_nodes( + cluster, + MarkMaterializedTask( + table_info.data_table, + column, + ).execute, + ).result() + + +def get_materialized_columns_with_default_expression() -> Iterator[tuple[str, MaterializedColumn]]: + table_names: list[TablesWithMaterializedColumns] = ["events", "person"] + for table_name in table_names: + for column in MaterializedColumn.get_all(table_name): + if is_default_expression(table_name, column.name): + yield table_name, column def any_ongoing_mutations() -> bool: diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 462352d8fe0bc..16979be89288b 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -322,7 +322,7 @@ ee/billing/billing_manager.py:0: error: Incompatible types in assignment (expres posthog/models/property/util.py:0: error: Incompatible type for lookup 'pk': (got "str | int | list[str]", expected "str | int") [misc] posthog/models/property/util.py:0: error: Argument 3 to "format_filter_query" has incompatible type "HogQLContext | None"; expected "HogQLContext" [arg-type] posthog/models/property/util.py:0: error: Argument 3 to "format_cohort_subquery" has incompatible type "HogQLContext | None"; expected "HogQLContext" [arg-type] -posthog/models/property/util.py:0: error: Invalid index type "tuple[str, str]" for "dict[tuple[str, Literal['properties', 'group_properties', 'person_properties']], str]"; expected type "tuple[str, Literal['properties', 'group_properties', 'person_properties']]" [index] +posthog/models/property/util.py:0: error: Argument 2 to "get_materialized_column_for_property" has incompatible type "str"; expected "Literal['properties', 'group_properties', 'person_properties']" [arg-type] posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has incompatible type "str | int"; expected "str" [arg-type] posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has incompatible type "str | int"; expected "str" [arg-type] posthog/models/property/util.py:0: error: Argument 1 to "append" of "list" has incompatible type "str | int"; expected "str" [arg-type] @@ -332,7 +332,7 @@ posthog/api/documentation.py:0: note: def run_validation(self, data: Any = ...) posthog/api/documentation.py:0: note: Subclass: posthog/api/documentation.py:0: note: def run_validation(self, data: Any) -> Any posthog/queries/trends/util.py:0: error: Argument 1 to "translate_hogql" has incompatible type "str | None"; expected "str" [arg-type] -posthog/queries/column_optimizer/foss_column_optimizer.py:0: error: Argument 1 to "get" of "dict" has incompatible type "tuple[str, str]"; expected "tuple[str, Literal['properties', 'group_properties', 'person_properties']]" [arg-type] +posthog/queries/column_optimizer/foss_column_optimizer.py:0: error: Argument 2 to "get_materialized_column_for_property" has incompatible type "str"; expected "Literal['properties', 'group_properties', 'person_properties']" [arg-type] posthog/hogql/property.py:0: error: Incompatible type for lookup 'id': (got "str | int | list[str]", expected "str | int") [misc] posthog/hogql/property.py:0: error: Incompatible type for lookup 'pk': (got "str | float", expected "str | int") [misc] posthog/api/utils.py:0: error: Incompatible types in assignment (expression has type "type[EventDefinition]", variable has type "type[EnterpriseEventDefinition]") [assignment] diff --git a/posthog/api/test/test_person.py b/posthog/api/test/test_person.py index 2c9694f6eda6d..29eb3990407d5 100644 --- a/posthog/api/test/test_person.py +++ b/posthog/api/test/test_person.py @@ -873,7 +873,7 @@ def test_pagination_limit(self): create_person(team_id=self.team.pk, version=0) returned_ids = [] - with self.assertNumQueries(10): + with self.assertNumQueries(9): response = self.client.get("/api/person/?limit=10").json() self.assertEqual(len(response["results"]), 9) returned_ids += [x["distinct_ids"][0] for x in response["results"]] diff --git a/posthog/clickhouse/materialized_columns.py b/posthog/clickhouse/materialized_columns.py index 2ff858274ab4d..09b2d8b24c6dc 100644 --- a/posthog/clickhouse/materialized_columns.py +++ b/posthog/clickhouse/materialized_columns.py @@ -1,6 +1,6 @@ -from datetime import timedelta +from typing import Protocol -from posthog.cache_utils import cache_for +from posthog.models.instance_setting import get_instance_setting from posthog.models.property import PropertyName, TableColumn, TableWithProperties from posthog.settings import EE_AVAILABLE @@ -8,19 +8,25 @@ ColumnName = str TablesWithMaterializedColumns = TableWithProperties + +class MaterializedColumn(Protocol): + name: ColumnName + is_nullable: bool + + if EE_AVAILABLE: - from ee.clickhouse.materialized_columns.columns import get_materialized_columns -else: + from ee.clickhouse.materialized_columns.columns import get_enabled_materialized_columns - def get_materialized_columns( - table: TablesWithMaterializedColumns, - exclude_disabled_columns: bool = False, - ) -> dict[tuple[PropertyName, TableColumn], ColumnName]: - return {} + def get_materialized_column_for_property( + table: TablesWithMaterializedColumns, table_column: TableColumn, property_name: PropertyName + ) -> MaterializedColumn | None: + if not get_instance_setting("MATERIALIZED_COLUMNS_ENABLED"): + return None + return get_enabled_materialized_columns(table).get((property_name, table_column)) +else: -@cache_for(timedelta(minutes=15)) -def get_enabled_materialized_columns( - table: TablesWithMaterializedColumns, -) -> dict[tuple[PropertyName, TableColumn], ColumnName]: - return get_materialized_columns(table, exclude_disabled_columns=True) + def get_materialized_column_for_property( + table: TablesWithMaterializedColumns, table_column: TableColumn, property_name: PropertyName + ) -> MaterializedColumn | None: + return None diff --git a/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py b/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py index 7e7847c570bac..1be2a1c033c66 100644 --- a/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py +++ b/posthog/clickhouse/migrations/0026_fix_materialized_window_and_session_ids.py @@ -1,6 +1,6 @@ from infi.clickhouse_orm import migrations -from posthog.clickhouse.materialized_columns import get_materialized_columns +from posthog.clickhouse.materialized_columns import get_materialized_column_for_property from posthog.client import sync_execute from posthog.settings import CLICKHOUSE_CLUSTER @@ -45,9 +45,9 @@ def materialize_session_and_window_id(database): properties = ["$session_id", "$window_id"] for property_name in properties: - materialized_columns = get_materialized_columns("events") + current_materialized_column = get_materialized_column_for_property("events", "properties", property_name) # If the column is not materialized, materialize it - if (property_name, "properties") not in materialized_columns: + if current_materialized_column is None: materialize("events", property_name, property_name) # Now, we need to clean up any potentail inconsistencies with existing column names @@ -71,9 +71,8 @@ def materialize_session_and_window_id(database): # materialized the column or renamed the column, and then ran the 0004_... async migration # before this migration runs. possible_old_column_names = {"mat_" + property_name} - current_materialized_column_name = materialized_columns.get((property_name, "properties"), None) - if current_materialized_column_name is not None and current_materialized_column_name != property_name: - possible_old_column_names.add(current_materialized_column_name) + if current_materialized_column is not None and current_materialized_column.name != property_name: + possible_old_column_names.add(current_materialized_column.name) for possible_old_column_name in possible_old_column_names: ensure_only_new_column_exists(database, "sharded_events", possible_old_column_name, property_name) diff --git a/posthog/hogql/printer.py b/posthog/hogql/printer.py index 37fea932f2014..418e2f6354807 100644 --- a/posthog/hogql/printer.py +++ b/posthog/hogql/printer.py @@ -6,7 +6,11 @@ from typing import Literal, Optional, Union, cast from uuid import UUID -from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns +from posthog.clickhouse.materialized_columns import ( + MaterializedColumn, + TablesWithMaterializedColumns, + get_materialized_column_for_property, +) from posthog.clickhouse.property_groups import property_groups from posthog.hogql import ast from posthog.hogql.base import AST, _T_AST @@ -197,6 +201,7 @@ class JoinExprResponse: class PrintableMaterializedColumn: table: Optional[str] column: str + is_nullable: bool def __str__(self) -> str: if self.table is None: @@ -1321,10 +1326,11 @@ def __get_all_materialized_property_sources( field_name = cast(Union[Literal["properties"], Literal["person_properties"]], field.name) materialized_column = self._get_materialized_column(table_name, property_name, field_name) - if materialized_column: + if materialized_column is not None: yield PrintableMaterializedColumn( self.visit(field_type.table_type), - self._print_identifier(materialized_column), + self._print_identifier(materialized_column.name), + is_nullable=materialized_column.is_nullable, ) if self.context.modifiers.propertyGroupsMode in ( @@ -1352,8 +1358,12 @@ def __get_all_materialized_property_sources( materialized_column = self._get_materialized_column("events", property_name, "person_properties") else: materialized_column = self._get_materialized_column("person", property_name, "properties") - if materialized_column: - yield PrintableMaterializedColumn(None, self._print_identifier(materialized_column)) + if materialized_column is not None: + yield PrintableMaterializedColumn( + None, + self._print_identifier(materialized_column.name), + is_nullable=materialized_column.is_nullable, + ) def visit_property_type(self, type: ast.PropertyType): if type.joined_subquery is not None and type.joined_subquery_field_name is not None: @@ -1361,7 +1371,10 @@ def visit_property_type(self, type: ast.PropertyType): materialized_property_source = self.__get_materialized_property_source_for_property_type(type) if materialized_property_source is not None: - if isinstance(materialized_property_source, PrintableMaterializedColumn): + if ( + isinstance(materialized_property_source, PrintableMaterializedColumn) + and not materialized_property_source.is_nullable + ): # TODO: rematerialize all columns to properly support empty strings and "null" string values. if self.context.modifiers.materializationMode == MaterializationMode.LEGACY_NULL_AS_STRING: materialized_property_sql = f"nullIf({materialized_property_source}, '')" @@ -1511,9 +1524,10 @@ def _unsafe_json_extract_trim_quotes(self, unsafe_field: str, unsafe_args: list[ def _get_materialized_column( self, table_name: str, property_name: PropertyName, field_name: TableColumn - ) -> Optional[str]: - materialized_columns = get_enabled_materialized_columns(cast(TablesWithMaterializedColumns, table_name)) - return materialized_columns.get((property_name, field_name), None) + ) -> MaterializedColumn | None: + return get_materialized_column_for_property( + cast(TablesWithMaterializedColumns, table_name), field_name, property_name + ) def _get_timezone(self) -> str: return self.context.database.get_timezone() if self.context.database else "UTC" diff --git a/posthog/hogql/test/test_printer.py b/posthog/hogql/test/test_printer.py index 8d7dad46040ac..4f2422263d0c8 100644 --- a/posthog/hogql/test/test_printer.py +++ b/posthog/hogql/test/test_printer.py @@ -460,14 +460,22 @@ def test_hogql_properties_materialized_json_access(self): self.assertEqual(1 + 2, 3) return - materialize("events", "withmat") context = HogQLContext(team_id=self.team.pk) + materialize("events", "withmat") self.assertEqual( self._expr("properties.withmat.json.yet", context), "replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(nullIf(nullIf(events.mat_withmat, ''), 'null'), %(hogql_val_0)s, %(hogql_val_1)s), ''), 'null'), '^\"|\"$', '')", ) self.assertEqual(context.values, {"hogql_val_0": "json", "hogql_val_1": "yet"}) + context = HogQLContext(team_id=self.team.pk) + materialize("events", "withmat_nullable", is_nullable=True) + self.assertEqual( + self._expr("properties.withmat_nullable.json.yet", context), + "replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.mat_withmat_nullable, %(hogql_val_0)s, %(hogql_val_1)s), ''), 'null'), '^\"|\"$', '')", + ) + self.assertEqual(context.values, {"hogql_val_0": "json", "hogql_val_1": "yet"}) + def test_materialized_fields_and_properties(self): try: from ee.clickhouse.materialized_columns.analyze import materialize @@ -499,6 +507,12 @@ def test_materialized_fields_and_properties(self): "nullIf(nullIf(events.`mat_$browser_______`, ''), 'null')", ) + materialize("events", "nullable_property", is_nullable=True) + self.assertEqual( + self._expr("properties['nullable_property']"), + "events.mat_nullable_property", + ) + def test_property_groups(self): context = HogQLContext( team_id=self.team.pk, diff --git a/posthog/hogql/transforms/property_types.py b/posthog/hogql/transforms/property_types.py index 6dbac74590da6..e561607629f1f 100644 --- a/posthog/hogql/transforms/property_types.py +++ b/posthog/hogql/transforms/property_types.py @@ -1,6 +1,10 @@ -from typing import Literal, Optional, cast +from typing import Literal, cast -from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns +from posthog.clickhouse.materialized_columns import ( + MaterializedColumn, + TablesWithMaterializedColumns, + get_materialized_column_for_property, +) from posthog.hogql import ast from posthog.hogql.context import HogQLContext from posthog.hogql.database.models import ( @@ -258,7 +262,7 @@ def _add_property_notice( message = f"{property_type.capitalize()} property '{property_name}' is of type '{field_type}'." if self.context.debug: - if materialized_column: + if materialized_column is not None: message += " This property is materialized ⚡️." else: message += " This property is not materialized 🐢." @@ -277,6 +281,7 @@ def _add_notice(self, node: ast.Field, message: str): def _get_materialized_column( self, table_name: str, property_name: PropertyName, field_name: TableColumn - ) -> Optional[str]: - materialized_columns = get_enabled_materialized_columns(cast(TablesWithMaterializedColumns, table_name)) - return materialized_columns.get((property_name, field_name), None) + ) -> MaterializedColumn | None: + return get_materialized_column_for_property( + cast(TablesWithMaterializedColumns, table_name), field_name, property_name + ) diff --git a/posthog/models/property/util.py b/posthog/models/property/util.py index 90651b6cd1e5f..ef63b2f69c670 100644 --- a/posthog/models/property/util.py +++ b/posthog/models/property/util.py @@ -14,10 +14,7 @@ from rest_framework import exceptions from posthog.clickhouse.kafka_engine import trim_quotes_expr -from posthog.clickhouse.materialized_columns import ( - TableWithProperties, - get_enabled_materialized_columns, -) +from posthog.clickhouse.materialized_columns import TableWithProperties, get_materialized_column_for_property from posthog.constants import PropertyOperatorType from posthog.hogql import ast from posthog.hogql.hogql import HogQLContext @@ -711,17 +708,18 @@ def get_property_string_expr( (optional) alias of the table being queried :return: """ - materialized_columns = get_enabled_materialized_columns(table) if allow_denormalized_props else {} - table_string = f"{table_alias}." if table_alias is not None and table_alias != "" else "" if ( allow_denormalized_props - and (property_name, materialised_table_column) in materialized_columns + and ( + materialized_column := get_materialized_column_for_property(table, materialised_table_column, property_name) + ) + and not materialized_column.is_nullable and "group" not in materialised_table_column ): return ( - f'{table_string}"{materialized_columns[(property_name, materialised_table_column)]}"', + f'{table_string}"{materialized_column.name}"', True, ) diff --git a/posthog/queries/column_optimizer/foss_column_optimizer.py b/posthog/queries/column_optimizer/foss_column_optimizer.py index 4fffbd1faa350..c998d92480b5a 100644 --- a/posthog/queries/column_optimizer/foss_column_optimizer.py +++ b/posthog/queries/column_optimizer/foss_column_optimizer.py @@ -3,7 +3,7 @@ from typing import Union, cast from collections.abc import Generator -from posthog.clickhouse.materialized_columns import ColumnName, get_enabled_materialized_columns +from posthog.clickhouse.materialized_columns import ColumnName, get_materialized_column_for_property from posthog.constants import TREND_FILTER_TYPE_ACTIONS, FunnelCorrelationType from posthog.models.action.util import ( get_action_tables_and_properties, @@ -72,12 +72,14 @@ def columns_to_query( table_column: str = "properties", ) -> set[ColumnName]: "Transforms a list of property names to what columns are needed for that query" - - materialized_columns = get_enabled_materialized_columns(table) - return { - materialized_columns.get((property_name, table_column), table_column) - for property_name, _, _ in used_properties - } + column_names = set() + for property_name, _, _ in used_properties: + column = get_materialized_column_for_property(table, table_column, property_name) + if column is not None and not column.is_nullable: + column_names.add(column.name) + else: + column_names.add(table_column) + return column_names @cached_property def is_using_person_properties(self) -> bool: diff --git a/posthog/tasks/test/__snapshots__/test_usage_report.ambr b/posthog/tasks/test/__snapshots__/test_usage_report.ambr index 2230c532da5ca..36733da586c57 100644 --- a/posthog/tasks/test/__snapshots__/test_usage_report.ambr +++ b/posthog/tasks/test/__snapshots__/test_usage_report.ambr @@ -3,7 +3,7 @@ ''' SELECT team_id, - multiIf(event LIKE 'helicone%', 'helicone_events', event LIKE 'langfuse%', 'langfuse_events', event LIKE 'keywords_ai%', 'keywords_ai_events', event LIKE 'traceloop%', 'traceloop_events', JSONExtractString(properties, '$lib') = 'web', 'web_events', JSONExtractString(properties, '$lib') = 'js', 'web_lite_events', JSONExtractString(properties, '$lib') = 'posthog-node', 'node_events', JSONExtractString(properties, '$lib') = 'posthog-android', 'android_events', JSONExtractString(properties, '$lib') = 'posthog-flutter', 'flutter_events', JSONExtractString(properties, '$lib') = 'posthog-ios', 'ios_events', JSONExtractString(properties, '$lib') = 'posthog-go', 'go_events', JSONExtractString(properties, '$lib') = 'posthog-java', 'java_events', JSONExtractString(properties, '$lib') = 'posthog-react-native', 'react_native_events', JSONExtractString(properties, '$lib') = 'posthog-ruby', 'ruby_events', JSONExtractString(properties, '$lib') = 'posthog-python', 'python_events', JSONExtractString(properties, '$lib') = 'posthog-php', 'php_events', 'other') AS metric, + multiIf(event LIKE 'helicone%', 'helicone_events', event LIKE 'langfuse%', 'langfuse_events', event LIKE 'keywords_ai%', 'keywords_ai_events', event LIKE 'traceloop%', 'traceloop_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'web', 'web_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'js', 'web_lite_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-node', 'node_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-android', 'android_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-flutter', 'flutter_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-ios', 'ios_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-go', 'go_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-java', 'java_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-react-native', 'react_native_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-ruby', 'ruby_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-python', 'python_events', replaceRegexpAll(JSONExtractRaw(properties, '$lib'), '^"|"$', '') = 'posthog-php', 'php_events', 'other') AS metric, count(1) as count FROM events WHERE timestamp BETWEEN '2022-01-10 00:00:00' AND '2022-01-10 23:59:59' diff --git a/posthog/tasks/usage_report.py b/posthog/tasks/usage_report.py index a2671bd97ebd5..9bd8619317fa6 100644 --- a/posthog/tasks/usage_report.py +++ b/posthog/tasks/usage_report.py @@ -19,7 +19,6 @@ from posthog import version_requirement from posthog.clickhouse.client.connection import Workload -from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns from posthog.client import sync_execute from posthog.cloud_utils import get_cached_instance_license, is_cloud from posthog.constants import FlagRequestType @@ -29,6 +28,7 @@ from posthog.models.feature_flag import FeatureFlag from posthog.models.organization import Organization from posthog.models.plugin import PluginConfig +from posthog.models.property.util import get_property_string_expr from posthog.models.team.team import Team from posthog.models.utils import namedtuplefetchall from posthog.settings import CLICKHOUSE_CLUSTER, INSTANCE_TAG @@ -460,10 +460,8 @@ def get_teams_with_event_count_with_groups_in_period(begin: datetime, end: datet @timed_log() @retry(tries=QUERY_RETRIES, delay=QUERY_RETRY_DELAY, backoff=QUERY_RETRY_BACKOFF) def get_all_event_metrics_in_period(begin: datetime, end: datetime) -> dict[str, list[tuple[int, int]]]: - materialized_columns = get_enabled_materialized_columns("events") - # Check if $lib is materialized - lib_expression = materialized_columns.get(("$lib", "properties"), "JSONExtractString(properties, '$lib')") + lib_expression, _ = get_property_string_expr("events", "$lib", "'$lib'", "properties") results = sync_execute( f""" diff --git a/posthog/test/base.py b/posthog/test/base.py index 43dcc0e130964..59a3bd9c71cb1 100644 --- a/posthog/test/base.py +++ b/posthog/test/base.py @@ -30,7 +30,6 @@ from posthog import rate_limit, redis from posthog.clickhouse.client import sync_execute from posthog.clickhouse.client.connection import ch_pool -from posthog.clickhouse.materialized_columns import get_materialized_columns from posthog.clickhouse.plugin_log_entries import TRUNCATE_PLUGIN_LOG_ENTRIES_TABLE_SQL from posthog.cloud_utils import TEST_clear_instance_license_cache from posthog.models import Dashboard, DashboardTile, Insight, Organization, Team, User @@ -575,35 +574,31 @@ def stripResponse(response, remove=("action", "label", "persons_urls", "filter") return response -def default_materialised_columns(): +def cleanup_materialized_columns(): try: + from ee.clickhouse.materialized_columns.columns import get_materialized_columns from ee.clickhouse.materialized_columns.test.test_columns import EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS except: # EE not available? Skip - return [] - - default_columns = [] - for prop in EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS: - column_name = get_materialized_columns("events")[(prop, "properties")] - default_columns.append(column_name) - - return default_columns - + return -def cleanup_materialized_columns(): def optionally_drop(table, filter=None): drops = ",".join( [ - f"DROP COLUMN {column_name}" - for column_name in get_materialized_columns(table).values() - if filter is None or filter(column_name) + f"DROP COLUMN {column.name}" + for column in get_materialized_columns(table).values() + if filter is None or filter(column.name) ] ) if drops: sync_execute(f"ALTER TABLE {table} {drops} SETTINGS mutations_sync = 2") - default_columns = default_materialised_columns() - optionally_drop("events", lambda name: name not in default_columns) + default_column_names = { + get_materialized_columns("events")[(prop, "properties")].name + for prop in EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS + } + + optionally_drop("events", lambda name: name not in default_column_names) optionally_drop("person") optionally_drop("groups")