Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support nullable materialized columns using native types #26448

Merged
merged 17 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ee/benchmarks/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

django.setup()

from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns # noqa: E402
from ee.clickhouse.materialized_columns.columns import get_enabled_materialized_columns # noqa: E402
from posthog import client # noqa: E402
from posthog.clickhouse.query_tagging import reset_query_tags, tag_queries # noqa: E402
from posthog.models.utils import UUIDT # noqa: E402
Expand Down
3 changes: 2 additions & 1 deletion ee/clickhouse/materialized_columns/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def materialize_properties_task(
backfill_period_days: int = MATERIALIZE_COLUMNS_BACKFILL_PERIOD_DAYS,
dry_run: bool = False,
team_id_to_analyze: Optional[int] = None,
is_nullable: bool = False,
) -> None:
"""
Creates materialized columns for event and person properties based off of slow queries
Expand Down Expand Up @@ -203,7 +204,7 @@ def materialize_properties_task(
logger.info(f"Materializing column. table={table}, property_name={property_name}")

if not dry_run:
materialize(table, property_name, table_column=table_column)
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
properties[table].append((property_name, table_column))

if backfill_period_days > 0 and not dry_run:
Expand Down
81 changes: 49 additions & 32 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@
from copy import copy
from dataclasses import dataclass, replace
from datetime import timedelta
from typing import Any, Literal, NamedTuple, TypeVar, cast
from typing import Any, Literal, TypeVar, cast

from clickhouse_driver import Client
from django.utils.timezone import now

from posthog.cache_utils import cache_for
from posthog.clickhouse.client.connection import default_client
from posthog.clickhouse.cluster import ClickhouseCluster, ConnectionInfo, FuturesMap, HostInfo
from posthog.clickhouse.kafka_engine import trim_quotes_expr
from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns
from posthog.client import sync_execute
from posthog.models.event.sql import EVENTS_DATA_TABLE
from posthog.models.instance_setting import get_instance_setting
from posthog.models.person.sql import PERSONS_TABLE
from posthog.models.property import PropertyName, TableColumn, TableWithProperties
from posthog.models.utils import generate_random_short_suffix
Expand All @@ -26,8 +26,6 @@

DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties"

TRIM_AND_EXTRACT_PROPERTY = trim_quotes_expr("JSONExtractRaw({table_column}, %(property)s)")

SHORT_TABLE_COLUMN_NAME = {
"properties": "p",
"group_properties": "gp",
Expand All @@ -40,15 +38,36 @@
}


class MaterializedColumn(NamedTuple):
@dataclass
class MaterializedColumn:
name: ColumnName
details: MaterializedColumnDetails
Copy link
Contributor Author

@tkaemming tkaemming Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll probably consolidate these two classes (MaterializedColumn & MaterializedColumnDetails) at some point (there's not much benefit to them being separate at this point) but not in a big hurry to do that.

is_nullable: bool

@property
def type(self) -> str:
if self.is_nullable:
return "Nullable(String)"
else:
return "String"

def get_expression_and_parameters(self) -> tuple[str, dict[str, Any]]:
if self.is_nullable:
return (
f"JSONExtract({self.details.table_column}, %(property_name)s, %(property_type)s)",
{"property_name": self.details.property_name, "property_type": self.type},
)
else:
return (
trim_quotes_expr(f"JSONExtractRaw({self.details.table_column}, %(property)s)"),
{"property": self.details.property_name},
)

@staticmethod
def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn]:
rows = sync_execute(
"""
SELECT name, comment
SELECT name, comment, type like 'Nullable(%%)' as is_nullable
FROM system.columns
WHERE database = %(database)s
AND table = %(table)s
Expand All @@ -58,8 +77,8 @@ def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn
{"database": CLICKHOUSE_DATABASE, "table": table},
)

for name, comment in rows:
yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment))
for name, comment, is_nullable in rows:
yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment), is_nullable)

@staticmethod
def get(table: TablesWithMaterializedColumns, column_name: ColumnName) -> MaterializedColumn:
Expand Down Expand Up @@ -111,18 +130,20 @@ def from_column_comment(cls, comment: str) -> MaterializedColumnDetails:

def get_materialized_columns(
table: TablesWithMaterializedColumns,
exclude_disabled_columns: bool = False,
) -> dict[tuple[PropertyName, TableColumn], ColumnName]:
if not get_instance_setting("MATERIALIZED_COLUMNS_ENABLED"):
return {}

) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]:
return {
(column.details.property_name, column.details.table_column): column.name
(column.details.property_name, column.details.table_column): column
for column in MaterializedColumn.get_all(table)
if not (exclude_disabled_columns and column.details.is_disabled)
}


@cache_for(timedelta(minutes=15))
def get_enabled_materialized_columns(
table: TablesWithMaterializedColumns,
) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]:
return {k: column for k, column in get_materialized_columns(table).items() if not column.details.is_disabled}


def get_cluster() -> ClickhouseCluster:
extra_hosts = []
for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()):
Expand Down Expand Up @@ -169,13 +190,10 @@ class CreateColumnOnDataNodesTask:
add_column_comment: bool

def execute(self, client: Client) -> None:
expression, parameters = self.column.get_expression_and_parameters()
actions = [
f"""
ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR
MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=self.column.details.table_column)}
""",
f"ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type} MATERIALIZED {expression}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:chef-kiss:

]
parameters = {"property": self.column.details.property_name}

if self.add_column_comment:
actions.append(f"COMMENT COLUMN {self.column.name} %(comment)s")
Expand All @@ -201,7 +219,7 @@ def execute(self, client: Client) -> None:
client.execute(
f"""
ALTER TABLE {self.table}
ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR,
ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type},
COMMENT COLUMN {self.column.name} %(comment)s
""",
{"comment": self.column.details.as_column_comment()},
Expand All @@ -215,6 +233,7 @@ def materialize(
column_name: ColumnName | None = None,
table_column: TableColumn = DEFAULT_TABLE_COLUMN,
create_minmax_index=not TEST,
is_nullable: bool = False,
) -> ColumnName | None:
if (property, table_column) in get_materialized_columns(table):
if TEST:
Expand All @@ -235,6 +254,7 @@ def materialize(
property_name=property,
is_disabled=False,
),
is_nullable=is_nullable,
)

table_info.map_data_nodes(
Expand Down Expand Up @@ -275,16 +295,12 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name:
cluster = get_cluster()
table_info = tables[table]

column = MaterializedColumn.get(table, column_name)

cluster.map_all_hosts(
UpdateColumnCommentTask(
table_info.read_table,
MaterializedColumn(
name=column_name,
details=replace(
MaterializedColumn.get(table, column_name).details,
is_disabled=is_disabled,
),
),
replace(column, details=replace(column.details, is_disabled=is_disabled)),
).execute
).result()

Expand Down Expand Up @@ -345,12 +361,13 @@ def execute(self, client: Client) -> None:
# Note that for this to work all inserts should list columns explicitly
# Improve this if https://github.com/ClickHouse/ClickHouse/issues/27730 ever gets resolved
for column in self.columns:
expression, parameters = column.get_expression_and_parameters()
client.execute(
f"""
ALTER TABLE {self.table}
MODIFY COLUMN {column.name} VARCHAR DEFAULT {TRIM_AND_EXTRACT_PROPERTY.format(table_column=column.details.table_column)}
MODIFY COLUMN {column.name} {column.type} DEFAULT {expression}
""",
{"property": column.details.property_name},
parameters,
settings=self.test_settings,
)

Expand Down Expand Up @@ -420,10 +437,10 @@ def _materialized_column_name(
prefix += f"{SHORT_TABLE_COLUMN_NAME[table_column]}_"
property_str = re.sub("[^0-9a-zA-Z$]", "_", property)

existing_materialized_columns = set(get_materialized_columns(table).values())
existing_materialized_column_names = {column.name for column in get_materialized_columns(table).values()}
suffix = ""

while f"{prefix}{property_str}{suffix}" in existing_materialized_columns:
while f"{prefix}{property_str}{suffix}" in existing_materialized_column_names:
suffix = "_" + generate_random_short_suffix()

return f"{prefix}{property_str}{suffix}"
8 changes: 4 additions & 4 deletions ee/clickhouse/materialized_columns/test/test_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def test_mat_columns(self, patch_backfill, patch_materialize):
materialize_properties_task()
patch_materialize.assert_has_calls(
[
call("events", "materialize_me", table_column="properties"),
call("events", "materialize_me2", table_column="properties"),
call("events", "materialize_person_prop", table_column="person_properties"),
call("events", "materialize_me3", table_column="properties"),
call("events", "materialize_me", table_column="properties", is_nullable=False),
call("events", "materialize_me2", table_column="properties", is_nullable=False),
call("events", "materialize_person_prop", table_column="person_properties", is_nullable=False),
call("events", "materialize_me3", table_column="properties", is_nullable=False),
]
)
52 changes: 30 additions & 22 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
MaterializedColumnDetails,
backfill_materialized_columns,
drop_column,
get_enabled_materialized_columns,
get_materialized_columns,
materialize,
update_column_is_disabled,
)
from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns, get_enabled_materialized_columns
from ee.tasks.materialized_columns import mark_all_materialized
from posthog.clickhouse.materialized_columns import TablesWithMaterializedColumns
from posthog.client import sync_execute
from posthog.conftest import create_clickhouse_tables
from posthog.constants import GROUP_TYPES_LIMIT
Expand Down Expand Up @@ -142,11 +144,11 @@ def test_materialized_column_naming(self, mock_choice):
("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_YYYY",
("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_ZZZZ",
},
get_materialized_columns("events"),
{k: column.name for k, column in get_materialized_columns("events").items()},
)

self.assertEqual(
get_materialized_columns("person"),
{k: column.name for k, column in get_materialized_columns("person").items()},
{("SoMePrOp", "properties"): "pmat_SoMePrOp"},
)

Expand Down Expand Up @@ -241,20 +243,26 @@ def test_backfilling_data(self):

def test_column_types(self):
materialize("events", "myprop", create_minmax_index=True)
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True)

expr = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')"
self.assertEqual(("MATERIALIZED", expr), self._get_column_types("mat_myprop"))
expr_nonnullable = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')"
expr_nullable = "JSONExtract(properties, 'myprop_nullable', 'Nullable(String)')"
self.assertEqual(("String", "MATERIALIZED", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(
("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable")
)

backfill_materialized_columns("events", [("myprop", "properties")], timedelta(days=50))
self.assertEqual(("DEFAULT", expr), self._get_column_types("mat_myprop"))
backfill_materialized_columns(
"events", [("myprop", "properties"), ("myprop_nullable", "properties")], timedelta(days=50)
)
self.assertEqual(("String", "DEFAULT", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(("Nullable(String)", "DEFAULT", expr_nullable), self._get_column_types("mat_myprop_nullable"))

try:
from ee.tasks.materialized_columns import mark_all_materialized
except ImportError:
pass
else:
mark_all_materialized()
self.assertEqual(("MATERIALIZED", expr), self._get_column_types("mat_myprop"))
mark_all_materialized()
self.assertEqual(("String", "MATERIALIZED", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(
("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable")
)

def _count_materialized_rows(self, column):
return sync_execute(
Expand Down Expand Up @@ -284,7 +292,7 @@ def _get_count_of_mutations_running(self) -> int:
def _get_column_types(self, column: str):
return sync_execute(
"""
SELECT default_kind, default_expression
SELECT type, default_kind, default_expression
FROM system.columns
WHERE database = %(database)s AND table = %(table)s AND name = %(column)s
""",
Expand All @@ -306,33 +314,33 @@ def test_lifecycle(self):

# ensure it exists everywhere
key = (property, source_column)
assert get_materialized_columns(table)[key] == destination_column
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=False),
is_nullable=False,
)

# disable it and ensure updates apply as needed
update_column_is_disabled(table, destination_column, is_disabled=True)
assert get_materialized_columns(table)[key] == destination_column
assert key not in get_materialized_columns(table, exclude_disabled_columns=True)
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=True),
is_nullable=False,
)

# re-enable it and ensure updates apply as needed
update_column_is_disabled(table, destination_column, is_disabled=False)
assert get_materialized_columns(table, exclude_disabled_columns=False)[key] == destination_column
assert get_materialized_columns(table, exclude_disabled_columns=True)[key] == destination_column
assert get_materialized_columns(table)[key].name == destination_column
assert MaterializedColumn.get(table, destination_column) == MaterializedColumn(
destination_column,
MaterializedColumnDetails(source_column, property, is_disabled=False),
is_nullable=False,
)

# drop it and ensure updates apply as needed
drop_column(table, destination_column)
assert key not in get_materialized_columns(table, exclude_disabled_columns=False)
assert key not in get_materialized_columns(table, exclude_disabled_columns=True)
assert key not in get_materialized_columns(table)
with self.assertRaises(ValueError):
MaterializedColumn.get(table, destination_column)
13 changes: 6 additions & 7 deletions ee/clickhouse/queries/funnels/funnel_correlation.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from ee.clickhouse.queries.column_optimizer import EnterpriseColumnOptimizer
from ee.clickhouse.queries.groups_join_query import GroupsJoinQuery
from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns
from posthog.clickhouse.materialized_columns import get_materialized_column_for_property
from posthog.constants import (
AUTOCAPTURE_EVENT,
TREND_FILTER_TYPE_ACTIONS,
Expand Down Expand Up @@ -156,19 +156,18 @@ def properties_to_include(self) -> list[str]:
):
# When dealing with properties, make sure funnel response comes with properties
# so we don't have to join on persons/groups to get these properties again
mat_event_cols = get_enabled_materialized_columns("events")

for property_name in cast(list, self._filter.correlation_property_names):
if self._filter.aggregation_group_type_index is not None:
continue # We don't support group properties on events at this time
else:
if "$all" == property_name:
return [f"person_properties"]

possible_mat_col = mat_event_cols.get((property_name, "person_properties"))

if possible_mat_col is not None:
props_to_include.append(possible_mat_col)
possible_mat_col = get_materialized_column_for_property(
"events", "person_properties", property_name
)
if possible_mat_col is not None and not possible_mat_col.is_nullable:
props_to_include.append(possible_mat_col.name)
else:
props_to_include.append(f"person_properties")

Expand Down
Loading
Loading