Skip to content

Commit

Permalink
refactor: Backfill materialized columns by column reference instead o…
Browse files Browse the repository at this point in the history
…f property (#26742)
  • Loading branch information
tkaemming authored Dec 9, 2024
1 parent d779154 commit 694d9c4
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 79 deletions.
22 changes: 12 additions & 10 deletions ee/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from datetime import timedelta
from ee.clickhouse.materialized_columns.analyze import (
backfill_materialized_columns,
get_materialized_columns,
materialize,
)
from ee.clickhouse.materialized_columns.columns import MaterializedColumn
from ee.clickhouse.queries.stickiness import ClickhouseStickiness
from ee.clickhouse.queries.funnels.funnel_correlation import FunnelCorrelation
from posthog.queries.funnels import ClickhouseFunnel
Expand Down Expand Up @@ -771,15 +771,17 @@ def track_person_property_values_materialized(self):

def setup(self):
for table, properties in MATERIALIZED_PROPERTIES.items():
existing_materialized_columns = get_materialized_columns(table)
for property in properties:
if (property, "properties") not in existing_materialized_columns:
materialize(table, property)
backfill_materialized_columns(
table,
[(property, "properties")],
backfill_period=timedelta(days=1_000),
)
columns = [
materialize(table, property)
for property in (
set(properties) - {column.details.property_name for column in MaterializedColumn.get_all(table)}
)
]
backfill_materialized_columns(
table,
columns,
backfill_period=timedelta(days=1_000),
)

# :TRICKY: Data in benchmark servers has ID=2
team = Team.objects.filter(id=2).first()
Expand Down
36 changes: 17 additions & 19 deletions ee/clickhouse/materialized_columns/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ee.clickhouse.materialized_columns.columns import (
DEFAULT_TABLE_COLUMN,
MaterializedColumn,
backfill_materialized_columns,
get_materialized_columns,
materialize,
Expand Down Expand Up @@ -164,7 +165,7 @@ def _analyze(since_hours_ago: int, min_query_time: int, team_id: Optional[int] =


def materialize_properties_task(
columns_to_materialize: Optional[list[Suggestion]] = None,
properties_to_materialize: Optional[list[Suggestion]] = None,
time_to_analyze_hours: int = MATERIALIZE_COLUMNS_ANALYSIS_PERIOD_HOURS,
maximum: int = MATERIALIZE_COLUMNS_MAX_AT_ONCE,
min_query_time: int = MATERIALIZE_COLUMNS_MINIMUM_QUERY_TIME,
Expand All @@ -177,37 +178,34 @@ def materialize_properties_task(
Creates materialized columns for event and person properties based off of slow queries
"""

if columns_to_materialize is None:
columns_to_materialize = _analyze(time_to_analyze_hours, min_query_time, team_id_to_analyze)
if properties_to_materialize is None:
properties_to_materialize = _analyze(time_to_analyze_hours, min_query_time, team_id_to_analyze)

columns_by_table: dict[TableWithProperties, list[tuple[TableColumn, PropertyName]]] = defaultdict(list)
for table, table_column, property_name in columns_to_materialize:
columns_by_table[table].append((table_column, property_name))
properties_by_table: dict[TableWithProperties, list[tuple[TableColumn, PropertyName]]] = defaultdict(list)
for table, table_column, property_name in properties_to_materialize:
properties_by_table[table].append((table_column, property_name))

result: list[Suggestion] = []
for table, columns in columns_by_table.items():
existing_materialized_columns = get_materialized_columns(table)
for table_column, property_name in columns:
if (property_name, table_column) not in existing_materialized_columns:
for table, properties in properties_by_table.items():
existing_materialized_properties = get_materialized_columns(table).keys()
for table_column, property_name in properties:
if (property_name, table_column) not in existing_materialized_properties:
result.append((table, table_column, property_name))

if len(result) > 0:
logger.info(f"Calculated columns that could be materialized. count={len(result)}")
else:
logger.info("Found no columns to materialize.")

properties: dict[TableWithProperties, list[tuple[PropertyName, TableColumn]]] = {
"events": [],
"person": [],
}
materialized_columns: dict[TableWithProperties, list[MaterializedColumn]] = defaultdict(list)
for table, table_column, property_name in result[:maximum]:
logger.info(f"Materializing column. table={table}, property_name={property_name}")

if not dry_run:
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
properties[table].append((property_name, table_column))
materialized_columns[table].append(
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
)

if backfill_period_days > 0 and not dry_run:
logger.info(f"Starting backfill for new materialized columns. period_days={backfill_period_days}")
backfill_materialized_columns("events", properties["events"], timedelta(days=backfill_period_days))
backfill_materialized_columns("person", properties["person"], timedelta(days=backfill_period_days))
for table, columns in materialized_columns.items():
backfill_materialized_columns(table, columns, timedelta(days=backfill_period_days))
23 changes: 6 additions & 17 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ def materialize(
table_column: TableColumn = DEFAULT_TABLE_COLUMN,
create_minmax_index=not TEST,
is_nullable: bool = False,
) -> ColumnName | None:
if (property, table_column) in get_materialized_columns(table):
) -> MaterializedColumn:
if existing_column := get_materialized_columns(table).get((property, table_column)):
if TEST:
return None
return existing_column

raise ValueError(f"Property already materialized. table={table}, property={property}, column={table_column}")

Expand Down Expand Up @@ -283,7 +283,7 @@ def materialize(
).execute
).result()

return column.name
return column


@dataclass
Expand Down Expand Up @@ -444,7 +444,7 @@ def execute(self, client: Client) -> None:

def backfill_materialized_columns(
table: TableWithProperties,
properties: list[tuple[PropertyName, TableColumn]],
columns: Iterable[MaterializedColumn],
backfill_period: timedelta,
test_settings=None,
) -> None:
Expand All @@ -453,25 +453,14 @@ def backfill_materialized_columns(
This will require reading and writing a lot of data on clickhouse disk.
"""

if len(properties) == 0:
return

cluster = get_cluster()
table_info = tables[table]

# TODO: this will eventually need to handle duplicates
materialized_columns = {
(column.details.property_name, column.details.table_column): column
for column in MaterializedColumn.get_all(table)
}
columns = [materialized_columns[property] for property in properties]

table_info.map_data_nodes(
cluster,
BackfillColumnTask(
table_info.data_table,
columns,
[*columns],
backfill_period if table == "events" else None, # XXX
test_settings,
).execute,
Expand Down
57 changes: 25 additions & 32 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,20 @@ def test_caching_and_materializing(self):

@patch("secrets.choice", return_value="X")
def test_materialized_column_naming(self, mock_choice):
materialize("events", "$foO();--sqlinject", create_minmax_index=True)
mock_choice.return_value = "Y"
materialize("events", "$foO();ääsqlinject", create_minmax_index=True)
mock_choice.return_value = "Z"
materialize("events", "$foO_____sqlinject", create_minmax_index=True)
materialize("person", "SoMePrOp", create_minmax_index=True)
assert materialize("events", "$foO();--sqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject"

self.assertDictContainsSubset(
{
("$foO();--sqlinject", "properties"): "mat_$foO_____sqlinject",
("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_YYYY",
("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_ZZZZ",
},
{k: column.name for k, column in get_materialized_columns("events").items()},
mock_choice.return_value = "Y"
assert (
materialize("events", "$foO();ääsqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject_YYYY"
)

self.assertEqual(
{k: column.name for k, column in get_materialized_columns("person").items()},
{("SoMePrOp", "properties"): "pmat_SoMePrOp"},
mock_choice.return_value = "Z"
assert (
materialize("events", "$foO_____sqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject_ZZZZ"
)

assert materialize("person", "SoMePrOp", create_minmax_index=True).name == "pmat_SoMePrOp"

def test_backfilling_data(self):
sync_execute("ALTER TABLE events DROP COLUMN IF EXISTS mat_prop")
sync_execute("ALTER TABLE events DROP COLUMN IF EXISTS mat_another")
Expand Down Expand Up @@ -199,16 +192,18 @@ def test_backfilling_data(self):
properties={"another": 6},
)

materialize("events", "prop", create_minmax_index=True)
materialize("events", "another", create_minmax_index=True)
columns = [
materialize("events", "prop", create_minmax_index=True),
materialize("events", "another", create_minmax_index=True),
]

self.assertEqual(self._count_materialized_rows("mat_prop"), 0)
self.assertEqual(self._count_materialized_rows("mat_another"), 0)

with freeze_time("2021-05-10T14:00:01Z"):
backfill_materialized_columns(
"events",
[("prop", "properties"), ("another", "properties")],
columns,
timedelta(days=50),
test_settings={"mutations_sync": "0"},
)
Expand Down Expand Up @@ -243,8 +238,10 @@ def test_backfilling_data(self):
)

def test_column_types(self):
materialize("events", "myprop", create_minmax_index=True)
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True)
columns = [
materialize("events", "myprop", create_minmax_index=True),
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True),
]

expr_nonnullable = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')"
expr_nullable = "JSONExtract(properties, 'myprop_nullable', 'Nullable(String)')"
Expand All @@ -253,9 +250,7 @@ def test_column_types(self):
("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable")
)

backfill_materialized_columns(
"events", [("myprop", "properties"), ("myprop_nullable", "properties")], timedelta(days=50)
)
backfill_materialized_columns("events", columns, timedelta(days=50))
self.assertEqual(("String", "DEFAULT", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(("Nullable(String)", "DEFAULT", expr_nullable), self._get_column_types("mat_myprop_nullable"))

Expand Down Expand Up @@ -312,9 +307,9 @@ def test_lifecycle(self):
# create materialized columns
materialized_columns = {}
for property_name in property_names:
destination_column = materialize(table, property_name, table_column=source_column, create_minmax_index=True)
if destination_column is not None:
materialized_columns[property_name] = destination_column
materialized_columns[property_name] = materialize(
table, property_name, table_column=source_column, create_minmax_index=True
).name

assert set(property_names) == materialized_columns.keys()

Expand Down Expand Up @@ -396,11 +391,10 @@ def test_drop_optimized_no_index(self):
source_column: TableColumn = "properties"

destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)
drop_column(table, destination_column.name)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP INDEX" in mutation for mutation in mutations_ran)
Expand All @@ -414,13 +408,12 @@ def test_drop_optimized_no_column(self):

# create the materialized column
destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column}", settings={"alter_sync": 1})
sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column.name}", settings={"alter_sync": 1})

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)
drop_column(table, destination_column.name)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP COLUMN" in mutation for mutation in mutations_ran)
2 changes: 1 addition & 1 deletion ee/management/commands/materialize_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def handle(self, *, is_nullable: bool, **options):
logger.info(f"Materializing column. table={options['property_table']}, property_name={options['property']}")

materialize_properties_task(
columns_to_materialize=[
properties_to_materialize=[
(
options["property_table"],
options["table_column"],
Expand Down

0 comments on commit 694d9c4

Please sign in to comment.