Skip to content

Commit

Permalink
feat: Support nullable materialized columns using native types (#26448)
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming authored Dec 5, 2024
1 parent f04d14c commit cc990fd
Show file tree
Hide file tree
Showing 20 changed files with 244 additions and 180 deletions.
2 changes: 1 addition & 1 deletion ee/benchmarks/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

django.setup()

from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns # noqa: E402
from ee.clickhouse.materialized_columns.columns import get_enabled_materialized_columns # noqa: E402
from posthog import client # noqa: E402
from posthog.clickhouse.query_tagging import reset_query_tags, tag_queries # noqa: E402
from posthog.models.utils import UUIDT # noqa: E402
Expand Down
3 changes: 2 additions & 1 deletion ee/clickhouse/materialized_columns/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def materialize_properties_task(
backfill_period_days: int = MATERIALIZE_COLUMNS_BACKFILL_PERIOD_DAYS,
dry_run: bool = False,
team_id_to_analyze: Optional[int] = None,
is_nullable: bool = False,
) -> None:
"""
Creates materialized columns for event and person properties based off of slow queries
Expand Down Expand Up @@ -203,7 +204,7 @@ def materialize_properties_task(
logger.info(f"Materializing column. table={table}, property_name={property_name}")

if not dry_run:
materialize(table, property_name, table_column=table_column)
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
properties[table].append((property_name, table_column))

if backfill_period_days > 0 and not dry_run:
Expand Down
81 changes: 49 additions & 32 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
from copy import copy
from dataclasses import dataclass, replace
from datetime import timedelta
from typing import Any, Literal, NamedTuple, TypeVar, cast
from typing import Any, Literal, TypeVar, cast

from clickhouse_driver import Client
from django.utils.timezone import now

from posthog.cache_utils import cache_for
from posthog.clickhouse.client.connection import default_client
from posthog.clickhouse.cluster import ClickhouseCluster, ConnectionInfo, FuturesMap, HostInfo
from posthog.clickhouse.kafka_engine import trim_quotes_expr
from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns
from posthog.client import sync_execute
from posthog.models.event.sql import EVENTS_DATA_TABLE
from posthog.models.instance_setting import get_instance_setting
from posthog.models.person.sql import PERSONS_TABLE
from posthog.models.property import PropertyName, TableColumn, TableWithProperties
from posthog.models.utils import generate_random_short_suffix
Expand All @@ -30,8 +30,6 @@

DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties"

TRIM_AND_EXTRACT_PROPERTY = trim_quotes_expr("JSONExtractRaw({table_column}, %(property)s)")

SHORT_TABLE_COLUMN_NAME = {
"properties": "p",
"group_properties": "gp",
Expand All @@ -44,15 +42,36 @@
}


class MaterializedColumn(NamedTuple):
@dataclass
class MaterializedColumn:
name: ColumnName
details: MaterializedColumnDetails
is_nullable: bool

@property
def type(self) -> str:
if self.is_nullable:
return "Nullable(String)"
else:
return "String"

def get_expression_and_parameters(self) -> tuple[str, dict[str, Any]]:
if self.is_nullable:
return (
f"JSONExtract({self.details.table_column}, %(property_name)s, %(property_type)s)",
{"property_name": self.details.property_name, "property_type": self.type},
)
else:
return (
trim_quotes_expr(f"JSONExtractRaw({self.details.table_column}, %(property)s)"),
{"property": self.details.property_name},
)

@staticmethod
def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn]:
rows = sync_execute(
"""
SELECT name, comment
SELECT name, comment, type like 'Nullable(%%)' as is_nullable
FROM system.columns
WHERE database = %(database)s
AND table = %(table)s
Expand All @@ -62,8 +81,8 @@ def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn
{"database": CLICKHOUSE_DATABASE, "table": table},
)

for name, comment in rows:
yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment))
for name, comment, is_nullable in rows:
yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment), is_nullable)

@staticmethod
def get(table: TablesWithMaterializedColumns, column_name: ColumnName) -> MaterializedColumn:
Expand Down Expand Up @@ -115,18 +134,20 @@ def from_column_comment(cls, comment: str) -> MaterializedColumnDetails:

def get_materialized_columns(
table: TablesWithMaterializedColumns,
exclude_disabled_columns: bool = False,
) -> dict[tuple[PropertyName, TableColumn], ColumnName]:
if not get_instance_setting("MATERIALIZED_COLUMNS_ENABLED"):
return {}

) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]:
return {
(column.details.property_name, column.details.table_column): column.name
(column.details.property_name, column.details.table_column): column
for column in MaterializedColumn.get_all(table)
if not (exclude_disabled_columns and column.details.is_disabled)
}


@cache_for(timedelta(minutes=15))
def get_enabled_materialized_columns(
table: TablesWithMaterializedColumns,
) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]:
return {k: column for k, column in get_materialized_columns(table).items() if not column.details.is_disabled}


def get_cluster() -> ClickhouseCluster:
extra_hosts = []
for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()):
Expand Down Expand Up @@ -177,13 +198,10 @@ class CreateColumnOnDataNodesTask:
add_column_comment: bool

def execute(self, client: Client) -> None:
expression, parameters = self.column.get_expression_and_parameters()
actions = [
f"""
ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR
MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=self.column.details.table_column)}
""",
f"ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type} MATERIALIZED {expression}",
]
parameters = {"property": self.column.details.property_name}

if self.add_column_comment:
actions.append(f"COMMENT COLUMN {self.column.name} %(comment)s")
Expand All @@ -209,7 +227,7 @@ def execute(self, client: Client) -> None:
client.execute(
f"""
ALTER TABLE {self.table}
ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR,
ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type},
COMMENT COLUMN {self.column.name} %(comment)s
""",
{"comment": self.column.details.as_column_comment()},
Expand All @@ -223,6 +241,7 @@ def materialize(
column_name: ColumnName | None = None,
table_column: TableColumn = DEFAULT_TABLE_COLUMN,
create_minmax_index=not TEST,
is_nullable: bool = False,
) -> ColumnName | None:
if (property, table_column) in get_materialized_columns(table):
if TEST:
Expand All @@ -243,6 +262,7 @@ def materialize(
property_name=property,
is_disabled=False,
),
is_nullable=is_nullable,
)

table_info.map_data_nodes(
Expand Down Expand Up @@ -283,16 +303,12 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name:
cluster = get_cluster()
table_info = tables[table]

column = MaterializedColumn.get(table, column_name)

cluster.map_all_hosts(
UpdateColumnCommentTask(
table_info.read_table,
MaterializedColumn(
name=column_name,
details=replace(
MaterializedColumn.get(table, column_name).details,
is_disabled=is_disabled,
),
),
replace(column, details=replace(column.details, is_disabled=is_disabled)),
).execute
).result()

Expand Down Expand Up @@ -388,12 +404,13 @@ def execute(self, client: Client) -> None:
# Note that for this to work all inserts should list columns explicitly
# Improve this if https://github.com/ClickHouse/ClickHouse/issues/27730 ever gets resolved
for column in self.columns:
expression, parameters = column.get_expression_and_parameters()
client.execute(
f"""
ALTER TABLE {self.table}
MODIFY COLUMN {column.name} VARCHAR DEFAULT {TRIM_AND_EXTRACT_PROPERTY.format(table_column=column.details.table_column)}
MODIFY COLUMN {column.name} {column.type} DEFAULT {expression}
""",
{"property": column.details.property_name},
parameters,
settings=self.test_settings,
)

Expand Down Expand Up @@ -463,10 +480,10 @@ def _materialized_column_name(
prefix += f"{SHORT_TABLE_COLUMN_NAME[table_column]}_"
property_str = re.sub("[^0-9a-zA-Z$]", "_", property)

existing_materialized_columns = set(get_materialized_columns(table).values())
existing_materialized_column_names = {column.name for column in get_materialized_columns(table).values()}
suffix = ""

while f"{prefix}{property_str}{suffix}" in existing_materialized_columns:
while f"{prefix}{property_str}{suffix}" in existing_materialized_column_names:
suffix = "_" + generate_random_short_suffix()

return f"{prefix}{property_str}{suffix}"
8 changes: 4 additions & 4 deletions ee/clickhouse/materialized_columns/test/test_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def test_mat_columns(self, patch_backfill, patch_materialize):
materialize_properties_task()
patch_materialize.assert_has_calls(
[
call("events", "materialize_me", table_column="properties"),
call("events", "materialize_me2", table_column="properties"),
call("events", "materialize_person_prop", table_column="person_properties"),
call("events", "materialize_me3", table_column="properties"),
call("events", "materialize_me", table_column="properties", is_nullable=False),
call("events", "materialize_me2", table_column="properties", is_nullable=False),
call("events", "materialize_person_prop", table_column="person_properties", is_nullable=False),
call("events", "materialize_me3", table_column="properties", is_nullable=False),
]
)
52 changes: 30 additions & 22 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
MaterializedColumnDetails,
backfill_materialized_columns,
drop_column,
get_enabled_materialized_columns,
get_materialized_columns,
materialize,
update_column_is_disabled,
)
from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns
from ee.tasks.materialized_columns import mark_all_materialized
from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns
from posthog.client import sync_execute
from posthog.conftest import create_clickhouse_tables
from posthog.constants import GROUP_TYPES_LIMIT
Expand Down Expand Up @@ -143,11 +145,11 @@ def test_materialized_column_naming(self, mock_choice):
("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_YYYY",
("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_ZZZZ",
},
get_materialized_columns("events"),
{k: column.name for k, column in get_materialized_columns("events").items()},
)

self.assertEqual(
get_materialized_columns("person"),
{k: column.name for k, column in get_materialized_columns("person").items()},
{("SoMePrOp", "properties"): "pmat_SoMePrOp"},
)

Expand Down Expand Up @@ -242,20 +244,26 @@ def test_backfilling_data(self):

def test_column_types(self):
materialize("events", "myprop", create_minmax_index=True)
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True)

expr = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')"
self.assertEqual(("MATERIALIZED", expr), self._get_column_types("mat_myprop"))
expr_nonnullable = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')"
expr_nullable = "JSONExtract(properties, 'myprop_nullable', 'Nullable(String)')"
self.assertEqual(("String", "MATERIALIZED", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(
("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable")
)

backfill_materialized_columns("events", [("myprop", "properties")], timedelta(days=50))
self.assertEqual(("DEFAULT", expr), self._get_column_types("mat_myprop"))
backfill_materialized_columns(
"events", [("myprop", "properties"), ("myprop_nullable", "properties")], timedelta(days=50)
)
self.assertEqual(("String", "DEFAULT", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(("Nullable(String)", "DEFAULT", expr_nullable), self._get_column_types("mat_myprop_nullable"))

try:
from ee.tasks.materialized_columns import mark_all_materialized
except ImportError:
pass
else:
mark_all_materialized()
self.assertEqual(("MATERIALIZED", expr), self._get_column_types("mat_myprop"))
mark_all_materialized()
self.assertEqual(("String", "MATERIALIZED", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(
("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable")
)

def _count_materialized_rows(self, column):
return sync_execute(
Expand Down Expand Up @@ -285,7 +293,7 @@ def _get_count_of_mutations_running(self) -> int:
def _get_column_types(self, column: str):
return sync_execute(
"""
SELECT default_kind, default_expression
SELECT type, default_kind, default_expression
FROM system.columns
WHERE database = %(database)s AND table = %(table)s AND name = %(column)s
""",
Expand All @@ -307,34 +315,34 @@ def test_lifecycle(self):

# ensure it exists everywhere
key = (property, source_column)
assert get_materialized_columns(table)[key] == destination_column
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=False),
is_nullable=False,
)

# disable it and ensure updates apply as needed
update_column_is_disabled(table, destination_column, is_disabled=True)
assert get_materialized_columns(table)[key] == destination_column
assert key not in get_materialized_columns(table, exclude_disabled_columns=True)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=True),
is_nullable=False,
)

# re-enable it and ensure updates apply as needed
update_column_is_disabled(table, destination_column, is_disabled=False)
assert get_materialized_columns(table, exclude_disabled_columns=False)[key] == destination_column
assert get_materialized_columns(table, exclude_disabled_columns=True)[key] == destination_column
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=False),
is_nullable=False,
)

# drop it and ensure updates apply as needed
drop_column(table, destination_column)
assert key not in get_materialized_columns(table, exclude_disabled_columns=False)
assert key not in get_materialized_columns(table, exclude_disabled_columns=True)
assert key not in get_materialized_columns(table)
with self.assertRaises(ValueError):
MaterializedColumn.get(table, destination_column)

Expand Down
13 changes: 6 additions & 7 deletions ee/clickhouse/queries/funnels/funnel_correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from ee.clickhouse.queries.column_optimizer import EnterpriseColumnOptimizer
from ee.clickhouse.queries.groups_join_query import GroupsJoinQuery
from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns
from posthog.clickhouse.materialized_columns import get_materialized_column_for_property
from posthog.constants import (
AUTOCAPTURE_EVENT,
TREND_FILTER_TYPE_ACTIONS,
Expand Down Expand Up @@ -156,19 +156,18 @@ def properties_to_include(self) -> list[str]:
):
# When dealing with properties, make sure funnel response comes with properties
# so we don't have to join on persons/groups to get these properties again
mat_event_cols = get_enabled_materialized_columns("events")

for property_name in cast(list, self._filter.correlation_property_names):
if self._filter.aggregation_group_type_index is not None:
continue # We don't support group properties on events at this time
else:
if "$all" == property_name:
return [f"person_properties"]

possible_mat_col = mat_event_cols.get((property_name, "person_properties"))

if possible_mat_col is not None:
props_to_include.append(possible_mat_col)
possible_mat_col = get_materialized_column_for_property(
"events", "person_properties", property_name
)
if possible_mat_col is not None and not possible_mat_col.is_nullable:
props_to_include.append(possible_mat_col.name)
else:
props_to_include.append(f"person_properties")

Expand Down
Loading

0 comments on commit cc990fd

Please sign in to comment.