Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Backfill materialized columns by column reference instead of property #26742

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions ee/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from datetime import timedelta
from ee.clickhouse.materialized_columns.analyze import (
backfill_materialized_columns,
get_materialized_columns,
materialize,
)
from ee.clickhouse.materialized_columns.columns import MaterializedColumn
from ee.clickhouse.queries.stickiness import ClickhouseStickiness
from ee.clickhouse.queries.funnels.funnel_correlation import FunnelCorrelation
from posthog.queries.funnels import ClickhouseFunnel
Expand Down Expand Up @@ -771,15 +771,17 @@ def track_person_property_values_materialized(self):

def setup(self):
for table, properties in MATERIALIZED_PROPERTIES.items():
existing_materialized_columns = get_materialized_columns(table)
for property in properties:
if (property, "properties") not in existing_materialized_columns:
materialize(table, property)
backfill_materialized_columns(
table,
[(property, "properties")],
backfill_period=timedelta(days=1_000),
)
columns = [
materialize(table, property)
for property in (
set(properties) - {column.details.property_name for column in MaterializedColumn.get_all(table)}
)
]
backfill_materialized_columns(
table,
columns,
backfill_period=timedelta(days=1_000),
)

# :TRICKY: Data in benchmark servers has ID=2
team = Team.objects.filter(id=2).first()
Expand Down
36 changes: 17 additions & 19 deletions ee/clickhouse/materialized_columns/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from ee.clickhouse.materialized_columns.columns import (
DEFAULT_TABLE_COLUMN,
MaterializedColumn,
backfill_materialized_columns,
get_materialized_columns,
materialize,
Expand Down Expand Up @@ -164,7 +165,7 @@ def _analyze(since_hours_ago: int, min_query_time: int, team_id: Optional[int] =


def materialize_properties_task(
columns_to_materialize: Optional[list[Suggestion]] = None,
properties_to_materialize: Optional[list[Suggestion]] = None,
time_to_analyze_hours: int = MATERIALIZE_COLUMNS_ANALYSIS_PERIOD_HOURS,
maximum: int = MATERIALIZE_COLUMNS_MAX_AT_ONCE,
min_query_time: int = MATERIALIZE_COLUMNS_MINIMUM_QUERY_TIME,
Expand All @@ -177,37 +178,34 @@ def materialize_properties_task(
Creates materialized columns for event and person properties based off of slow queries
"""

if columns_to_materialize is None:
columns_to_materialize = _analyze(time_to_analyze_hours, min_query_time, team_id_to_analyze)
if properties_to_materialize is None:
properties_to_materialize = _analyze(time_to_analyze_hours, min_query_time, team_id_to_analyze)

columns_by_table: dict[TableWithProperties, list[tuple[TableColumn, PropertyName]]] = defaultdict(list)
for table, table_column, property_name in columns_to_materialize:
columns_by_table[table].append((table_column, property_name))
properties_by_table: dict[TableWithProperties, list[tuple[TableColumn, PropertyName]]] = defaultdict(list)
for table, table_column, property_name in properties_to_materialize:
properties_by_table[table].append((table_column, property_name))

result: list[Suggestion] = []
for table, columns in columns_by_table.items():
existing_materialized_columns = get_materialized_columns(table)
for table_column, property_name in columns:
if (property_name, table_column) not in existing_materialized_columns:
for table, properties in properties_by_table.items():
existing_materialized_properties = get_materialized_columns(table).keys()
for table_column, property_name in properties:
if (property_name, table_column) not in existing_materialized_properties:
result.append((table, table_column, property_name))

if len(result) > 0:
logger.info(f"Calculated columns that could be materialized. count={len(result)}")
else:
logger.info("Found no columns to materialize.")

properties: dict[TableWithProperties, list[tuple[PropertyName, TableColumn]]] = {
"events": [],
"person": [],
}
materialized_columns: dict[TableWithProperties, list[MaterializedColumn]] = defaultdict(list)
for table, table_column, property_name in result[:maximum]:
logger.info(f"Materializing column. table={table}, property_name={property_name}")

if not dry_run:
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
properties[table].append((property_name, table_column))
materialized_columns[table].append(
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
)

if backfill_period_days > 0 and not dry_run:
logger.info(f"Starting backfill for new materialized columns. period_days={backfill_period_days}")
backfill_materialized_columns("events", properties["events"], timedelta(days=backfill_period_days))
backfill_materialized_columns("person", properties["person"], timedelta(days=backfill_period_days))
for table, columns in materialized_columns.items():
backfill_materialized_columns(table, columns, timedelta(days=backfill_period_days))
23 changes: 6 additions & 17 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,10 @@ def materialize(
table_column: TableColumn = DEFAULT_TABLE_COLUMN,
create_minmax_index=not TEST,
is_nullable: bool = False,
) -> ColumnName | None:
if (property, table_column) in get_materialized_columns(table):
) -> MaterializedColumn:
if existing_column := get_materialized_columns(table).get((property, table_column)):
if TEST:
return None
return existing_column

raise ValueError(f"Property already materialized. table={table}, property={property}, column={table_column}")

Expand Down Expand Up @@ -283,7 +283,7 @@ def materialize(
).execute
).result()

return column.name
return column


@dataclass
Expand Down Expand Up @@ -444,7 +444,7 @@ def execute(self, client: Client) -> None:

def backfill_materialized_columns(
table: TableWithProperties,
properties: list[tuple[PropertyName, TableColumn]],
columns: Iterable[MaterializedColumn],
backfill_period: timedelta,
test_settings=None,
) -> None:
Expand All @@ -453,25 +453,14 @@ def backfill_materialized_columns(

This will require reading and writing a lot of data on clickhouse disk.
"""

if len(properties) == 0:
return

cluster = get_cluster()
table_info = tables[table]

# TODO: this will eventually need to handle duplicates
materialized_columns = {
(column.details.property_name, column.details.table_column): column
for column in MaterializedColumn.get_all(table)
}
columns = [materialized_columns[property] for property in properties]

table_info.map_data_nodes(
cluster,
BackfillColumnTask(
table_info.data_table,
columns,
[*columns],
backfill_period if table == "events" else None, # XXX
test_settings,
).execute,
Expand Down
57 changes: 25 additions & 32 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,20 @@ def test_caching_and_materializing(self):

@patch("secrets.choice", return_value="X")
def test_materialized_column_naming(self, mock_choice):
materialize("events", "$foO();--sqlinject", create_minmax_index=True)
mock_choice.return_value = "Y"
materialize("events", "$foO();ääsqlinject", create_minmax_index=True)
mock_choice.return_value = "Z"
materialize("events", "$foO_____sqlinject", create_minmax_index=True)
materialize("person", "SoMePrOp", create_minmax_index=True)
assert materialize("events", "$foO();--sqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject"

self.assertDictContainsSubset(
{
("$foO();--sqlinject", "properties"): "mat_$foO_____sqlinject",
("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_YYYY",
("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_ZZZZ",
},
{k: column.name for k, column in get_materialized_columns("events").items()},
mock_choice.return_value = "Y"
assert (
materialize("events", "$foO();ääsqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject_YYYY"
)

self.assertEqual(
{k: column.name for k, column in get_materialized_columns("person").items()},
{("SoMePrOp", "properties"): "pmat_SoMePrOp"},
mock_choice.return_value = "Z"
assert (
materialize("events", "$foO_____sqlinject", create_minmax_index=True).name == "mat_$foO_____sqlinject_ZZZZ"
)

assert materialize("person", "SoMePrOp", create_minmax_index=True).name == "pmat_SoMePrOp"

def test_backfilling_data(self):
sync_execute("ALTER TABLE events DROP COLUMN IF EXISTS mat_prop")
sync_execute("ALTER TABLE events DROP COLUMN IF EXISTS mat_another")
Expand Down Expand Up @@ -199,16 +192,18 @@ def test_backfilling_data(self):
properties={"another": 6},
)

materialize("events", "prop", create_minmax_index=True)
materialize("events", "another", create_minmax_index=True)
columns = [
materialize("events", "prop", create_minmax_index=True),
materialize("events", "another", create_minmax_index=True),
]

self.assertEqual(self._count_materialized_rows("mat_prop"), 0)
self.assertEqual(self._count_materialized_rows("mat_another"), 0)

with freeze_time("2021-05-10T14:00:01Z"):
backfill_materialized_columns(
"events",
[("prop", "properties"), ("another", "properties")],
columns,
timedelta(days=50),
test_settings={"mutations_sync": "0"},
)
Expand Down Expand Up @@ -243,8 +238,10 @@ def test_backfilling_data(self):
)

def test_column_types(self):
materialize("events", "myprop", create_minmax_index=True)
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True)
columns = [
materialize("events", "myprop", create_minmax_index=True),
materialize("events", "myprop_nullable", create_minmax_index=True, is_nullable=True),
]

expr_nonnullable = "replaceRegexpAll(JSONExtractRaw(properties, 'myprop'), '^\"|\"$', '')"
expr_nullable = "JSONExtract(properties, 'myprop_nullable', 'Nullable(String)')"
Expand All @@ -253,9 +250,7 @@ def test_column_types(self):
("Nullable(String)", "MATERIALIZED", expr_nullable), self._get_column_types("mat_myprop_nullable")
)

backfill_materialized_columns(
"events", [("myprop", "properties"), ("myprop_nullable", "properties")], timedelta(days=50)
)
backfill_materialized_columns("events", columns, timedelta(days=50))
self.assertEqual(("String", "DEFAULT", expr_nonnullable), self._get_column_types("mat_myprop"))
self.assertEqual(("Nullable(String)", "DEFAULT", expr_nullable), self._get_column_types("mat_myprop_nullable"))

Expand Down Expand Up @@ -312,9 +307,9 @@ def test_lifecycle(self):
# create materialized columns
materialized_columns = {}
for property_name in property_names:
destination_column = materialize(table, property_name, table_column=source_column, create_minmax_index=True)
if destination_column is not None:
materialized_columns[property_name] = destination_column
materialized_columns[property_name] = materialize(
table, property_name, table_column=source_column, create_minmax_index=True
).name

assert set(property_names) == materialized_columns.keys()

Expand Down Expand Up @@ -396,11 +391,10 @@ def test_drop_optimized_no_index(self):
source_column: TableColumn = "properties"

destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)
drop_column(table, destination_column.name)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP INDEX" in mutation for mutation in mutations_ran)
Expand All @@ -414,13 +408,12 @@ def test_drop_optimized_no_column(self):

# create the materialized column
destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column}", settings={"alter_sync": 1})
sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column.name}", settings={"alter_sync": 1})

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)
drop_column(table, destination_column.name)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP COLUMN" in mutation for mutation in mutations_ran)
2 changes: 1 addition & 1 deletion ee/management/commands/materialize_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def handle(self, *, is_nullable: bool, **options):
logger.info(f"Materializing column. table={options['property_table']}, property_name={options['property']}")

materialize_properties_task(
columns_to_materialize=[
properties_to_materialize=[
(
options["property_table"],
options["table_column"],
Expand Down
Loading