Skip to content

Commit

Permalink
refactor: Backfill materialized columns by column reference instead o…
Browse files Browse the repository at this point in the history
…f property
  • Loading branch information
tkaemming committed Dec 6, 2024
1 parent 3759a04 commit b3bcd26
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 68 deletions.
22 changes: 12 additions & 10 deletions ee/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from datetime import timedelta
from ee.clickhouse.materialized_columns.analyze import (
backfill_materialized_columns,
get_materialized_columns,
materialize,
)
from ee.clickhouse.materialized_columns.columns import MaterializedColumn
from ee.clickhouse.queries.stickiness import ClickhouseStickiness
from ee.clickhouse.queries.funnels.funnel_correlation import FunnelCorrelation
from posthog.queries.funnels import ClickhouseFunnel
Expand Down Expand Up @@ -771,15 +771,17 @@ def track_person_property_values_materialized(self):

def setup(self):
for table, properties in MATERIALIZED_PROPERTIES.items():
existing_materialized_columns = get_materialized_columns(table)
for property in properties:
if (property, "properties") not in existing_materialized_columns:
materialize(table, property)
backfill_materialized_columns(
table,
[(property, "properties")],
backfill_period=timedelta(days=1_000),
)
columns = [
materialize(table, property)
for property in (
set(properties) - {column.details.property_name for column in MaterializedColumn.get_all(table)}
)
]
backfill_materialized_columns(
table,
columns,
backfill_period=timedelta(days=1_000),
)

# :TRICKY: Data in benchmark servers has ID=2
team = Team.objects.filter(id=2).first()
Expand Down
16 changes: 7 additions & 9 deletions ee/clickhouse/materialized_columns/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ee.clickhouse.materialized_columns.columns import (
DEFAULT_TABLE_COLUMN,
MaterializedColumn,
backfill_materialized_columns,
get_materialized_columns,
materialize,
Expand Down Expand Up @@ -196,18 +197,15 @@ def materialize_properties_task(
else:
logger.info("Found no columns to materialize.")

properties: dict[TableWithProperties, list[tuple[PropertyName, TableColumn]]] = {
"events": [],
"person": [],
}
materialized_columns: dict[TableWithProperties, list[MaterializedColumn]] = defaultdict(list)
for table, table_column, property_name in result[:maximum]:
logger.info(f"Materializing column. table={table}, property_name={property_name}")

if not dry_run:
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
properties[table].append((property_name, table_column))
materialized_columns[table].append(
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
)

if backfill_period_days > 0 and not dry_run:
logger.info(f"Starting backfill for new materialized columns. period_days={backfill_period_days}")
backfill_materialized_columns("events", properties["events"], timedelta(days=backfill_period_days))
backfill_materialized_columns("person", properties["person"], timedelta(days=backfill_period_days))
for table, columns in materialized_columns.items():

Check failure on line 210 in ee/clickhouse/materialized_columns/analyze.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Incompatible types in assignment (expression has type "list[MaterializedColumn]", variable has type "list[tuple[Literal['properties', 'group_properties', 'person_properties'], str]]")
backfill_materialized_columns(table, columns, timedelta(days=backfill_period_days))

Check failure on line 211 in ee/clickhouse/materialized_columns/analyze.py

View workflow job for this annotation

GitHub Actions / Python code quality checks

Argument 2 to "backfill_materialized_columns" has incompatible type "list[tuple[Literal['properties', 'group_properties', 'person_properties'], str]]"; expected "Iterable[MaterializedColumn]"
23 changes: 6 additions & 17 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ def materialize(
table_column: TableColumn = DEFAULT_TABLE_COLUMN,
create_minmax_index=not TEST,
is_nullable: bool = False,
) -> ColumnName | None:
if (property, table_column) in get_materialized_columns(table):
) -> MaterializedColumn:
if existing_column := get_materialized_columns(table).get((property, table_column)):
if TEST:
return None
return existing_column

raise ValueError(f"Property already materialized. table={table}, property={property}, column={table_column}")

Expand Down Expand Up @@ -283,7 +283,7 @@ def materialize(
).execute
).result()

return column.name
return column


@dataclass
Expand Down Expand Up @@ -444,7 +444,7 @@ def execute(self, client: Client) -> None:

def backfill_materialized_columns(
table: TableWithProperties,
properties: list[tuple[PropertyName, TableColumn]],
columns: Iterable[MaterializedColumn],
backfill_period: timedelta,
test_settings=None,
) -> None:
Expand All @@ -453,25 +453,14 @@ def backfill_materialized_columns(
This will require reading and writing a lot of data on clickhouse disk.
"""

if len(properties) == 0:
return

cluster = get_cluster()
table_info = tables[table]

# TODO: this will eventually need to handle duplicates
materialized_columns = {
(column.details.property_name, column.details.table_column): column
for column in MaterializedColumn.get_all(table)
}
columns = [materialized_columns[property] for property in properties]

table_info.map_data_nodes(
cluster,
BackfillColumnTask(
table_info.data_table,
columns,
[*columns],
backfill_period if table == "events" else None, # XXX
test_settings,
).execute,
Expand Down
57 changes: 25 additions & 32 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,20 @@ def test_caching_and_materializing(self):

@patch("secrets.choice", return_value="X")
def test_materialized_column_naming(self, mock_choice):
materialize("events", "$foO();--sqlinject", create_minmax_index=True)
mock_choice.return_value = "Y"
materialize("events", "$foO();ääsqlinject", create_minmax_index=True)
mock_choice.return_value = "Z"
materialize("events", "$foO_____sqlinject", create_minmax_index=True)
materialize("person", "SoMePrOp", create_minmax_index=True)
assert materialize("events", "$foO();--sqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject"

self.assertDictContainsSubset(
{
("$foO();--sqlinject", "properties"): "mat_$foO_____sqlinject",
("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_YYYY",
("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_ZZZZ",
},
{k: column.name for k, column in get_materialized_columns("events").items()},
mock_choice.return_value = "Y"
assert (
materialize("events", "$foO();ääsqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject_YYYY"
)

self.assertEqual(
{k: column.name for k, column in get_materialized_columns("person").items()},
{("SoMePrOp", "properties"): "pmat_SoMePrOp"},
mock_choice.return_value = "Z"
assert (
materialize("events", "$foO_____sqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject_ZZZZ"
)

assert materialize("person", "SoMePrOp", create_minmax_index=True).name == "pmat_SoMePrOp"

def test_backfilling_data(self):
sync_execute("ALTER TABLE events DROP COLUMN IF EXISTS mat_prop")
sync_execute("ALTER TABLE events DROP COLUMN IF EXISTS mat_another")
Expand Down Expand Up @@ -199,16 +192,18 @@ def test_backfilling_data(self):
properties={"another": 6},
)

materialize("events", "prop", create_minmax_index=True)
materialize("events", "another", create_minmax_index=True)
columns = [
materialize("events", "prop", create_minmax_index=True),
materialize("events", "another", create_minmax_index=True),
]

self.assertEqual(self._count_materialized_rows("mat_prop"), 0)
self.assertEqual(self._count_materialized_rows("mat_another"), 0)

with freeze_time("2021-05-10T14:00:01Z"):
backfill_materialized_columns(
"events",
[("prop", "properties"), ("another", "properties")],
columns,
timedelta(days=50),
test_settings={"mutations_sync": "0"},
)
Expand Down Expand Up @@ -243,8 +238,10 @@ def test_backfilling_data(self):
)

def test_column_types(self):
materialize("events", "myprop", create_minmax_index=True)
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True)
columns = [
materialize("events", "myprop", create_minmax_index=True),
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True),
]

expr_nonnullable = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')"
expr_nullable = "JSONExtract(properties, 'myprop_nullable', 'Nullable(String)')"
Expand All @@ -253,9 +250,7 @@ def test_column_types(self):
("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable")
)

backfill_materialized_columns(
"events", [("myprop", "properties"), ("myprop_nullable", "properties")], timedelta(days=50)
)
backfill_materialized_columns("events", columns, timedelta(days=50))
self.assertEqual(("String", "DEFAULT", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(("Nullable(String)", "DEFAULT", expr_nullable), self._get_column_types("mat_myprop_nullable"))

Expand Down Expand Up @@ -312,9 +307,9 @@ def test_lifecycle(self):
# create materialized columns
materialized_columns = {}
for property_name in property_names:
destination_column = materialize(table, property_name, table_column=source_column, create_minmax_index=True)
if destination_column is not None:
materialized_columns[property_name] = destination_column
materialized_columns[property_name] = materialize(
table, property_name, table_column=source_column, create_minmax_index=True
).name

assert set(property_names) == materialized_columns.keys()

Expand Down Expand Up @@ -396,11 +391,10 @@ def test_drop_optimized_no_index(self):
source_column: TableColumn = "properties"

destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)
drop_column(table, destination_column.name)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP INDEX" in mutation for mutation in mutations_ran)
Expand All @@ -414,13 +408,12 @@ def test_drop_optimized_no_column(self):

# create the materialized column
destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column}", settings={"alter_sync": 1})
sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column.name}", settings={"alter_sync": 1})

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)
drop_column(table, destination_column.name)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP COLUMN" in mutation for mutation in mutations_ran)

0 comments on commit b3bcd26

Please sign in to comment.