From c565f106080af421bb9dbc14b2fd12229f9fc660 Mon Sep 17 00:00:00 2001 From: ted kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 5 Dec 2024 19:11:24 -0800 Subject: [PATCH] refactor: Support updating and dropping multiple materialized columns at once (#26707) --- ee/clickhouse/materialized_columns/columns.py | 57 +++++++------ .../materialized_columns/test/test_columns.py | 80 +++++++++++-------- .../commands/update_materialized_column.py | 18 +++-- 3 files changed, 90 insertions(+), 65 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index caa5cae1401c0..66be7d75f0e1d 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -2,7 +2,7 @@ import logging import re -from collections.abc import Callable, Iterator +from collections.abc import Callable, Iterable, Iterator from copy import copy from dataclasses import dataclass, replace from datetime import timedelta @@ -289,26 +289,35 @@ def materialize( @dataclass class UpdateColumnCommentTask: table: str - column: MaterializedColumn + columns: list[MaterializedColumn] def execute(self, client: Client) -> None: + actions = [] + parameters = {} + for i, column in enumerate(self.columns): + parameter_name = f"comment_{i}" + actions.append(f"COMMENT COLUMN {column.name} %({parameter_name})s") + parameters[parameter_name] = column.details.as_column_comment() + client.execute( - f"ALTER TABLE {self.table} COMMENT COLUMN {self.column.name} %(comment)s", - {"comment": self.column.details.as_column_comment()}, + f"ALTER TABLE {self.table} " + ", ".join(actions), + parameters, settings={"alter_sync": 2 if TEST else 1}, ) -def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: str, is_disabled: bool) -> None: +def update_column_is_disabled( + table: TablesWithMaterializedColumns, column_names: Iterable[str], is_disabled: bool +) -> None: cluster = get_cluster() table_info = tables[table] - column = MaterializedColumn.get(table, column_name) + columns = [MaterializedColumn.get(table, column_name) for column_name in column_names] cluster.map_all_hosts( UpdateColumnCommentTask( table_info.read_table, - replace(column, details=replace(column.details, is_disabled=is_disabled)), + [replace(column, details=replace(column.details, is_disabled=is_disabled)) for column in columns], ).execute ).result() @@ -342,25 +351,26 @@ def check_column_exists(client: Client, table: str, column: str) -> bool: @dataclass class DropColumnTask: table: str - column_name: str + column_names: list[str] try_drop_index: bool def execute(self, client: Client) -> None: actions = [] - if self.try_drop_index: - index_name = get_minmax_index_name(self.column_name) - drop_index_action = f"DROP INDEX IF EXISTS {index_name}" - if check_index_exists(client, self.table, index_name): - actions.append(drop_index_action) + for column_name in self.column_names: + if self.try_drop_index: + index_name = get_minmax_index_name(column_name) + drop_index_action = f"DROP INDEX IF EXISTS {index_name}" + if check_index_exists(client, self.table, index_name): + actions.append(drop_index_action) + else: + logger.info("Skipping %r, nothing to do...", drop_index_action) + + drop_column_action = f"DROP COLUMN IF EXISTS {column_name}" + if check_column_exists(client, self.table, column_name): + actions.append(drop_column_action) else: - logger.info("Skipping %r, nothing to do...", drop_index_action) - - drop_column_action = f"DROP COLUMN IF EXISTS {self.column_name}" - if check_column_exists(client, self.table, self.column_name): - actions.append(drop_column_action) - else: - logger.info("Skipping %r, nothing to do...", drop_column_action) + logger.info("Skipping %r, nothing to do...", drop_column_action) if actions: client.execute( @@ -369,15 +379,16 @@ def execute(self, client: Client) -> None: ) -def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: +def drop_column(table: TablesWithMaterializedColumns, column_names: Iterable[str]) -> None: cluster = get_cluster() table_info = tables[table] + column_names = [*column_names] if isinstance(table_info, ShardedTableInfo): cluster.map_all_hosts( DropColumnTask( table_info.dist_table, - column_name, + column_names, try_drop_index=False, # no indexes on distributed tables ).execute ).result() @@ -386,7 +397,7 @@ def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: cluster, DropColumnTask( table_info.data_table, - column_name, + column_names, try_drop_index=True, ).execute, ).result() diff --git a/ee/clickhouse/materialized_columns/test/test_columns.py b/ee/clickhouse/materialized_columns/test/test_columns.py index 993c1a7aa2f65..ba2739bf34760 100644 --- a/ee/clickhouse/materialized_columns/test/test_columns.py +++ b/ee/clickhouse/materialized_columns/test/test_columns.py @@ -306,45 +306,57 @@ def _get_column_types(self, column: str): def test_lifecycle(self): table: TablesWithMaterializedColumns = "events" - property: PropertyName = "myprop" + property_names = ["foo", "bar"] source_column: TableColumn = "properties" - # create the materialized column - destination_column = materialize(table, property, table_column=source_column, create_minmax_index=True) - assert destination_column is not None - - # ensure it exists everywhere - key = (property, source_column) - assert get_materialized_columns(table)[key].name == destination_column - assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( - destination_column, - MaterializedColumnDetails(source_column, property, is_disabled=False), - is_nullable=False, - ) + # create materialized columns + materialized_columns = {} + for property_name in property_names: + destination_column = materialize(table, property_name, table_column=source_column, create_minmax_index=True) + if destination_column is not None: + materialized_columns[property_name] = destination_column + + assert set(property_names) == materialized_columns.keys() + + # ensure they exist everywhere + for property_name, destination_column in materialized_columns.items(): + key = (property_name, source_column) + assert get_materialized_columns(table)[key].name == destination_column + assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( + destination_column, + MaterializedColumnDetails(source_column, property_name, is_disabled=False), + is_nullable=False, + ) - # disable it and ensure updates apply as needed - update_column_is_disabled(table, destination_column, is_disabled=True) - assert get_materialized_columns(table)[key].name == destination_column - assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( - destination_column, - MaterializedColumnDetails(source_column, property, is_disabled=True), - is_nullable=False, - ) + # disable them and ensure updates apply as needed + update_column_is_disabled(table, materialized_columns.values(), is_disabled=True) + for property_name, destination_column in materialized_columns.items(): + key = (property_name, source_column) + assert get_materialized_columns(table)[key].name == destination_column + assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( + destination_column, + MaterializedColumnDetails(source_column, property_name, is_disabled=True), + is_nullable=False, + ) - # re-enable it and ensure updates apply as needed - update_column_is_disabled(table, destination_column, is_disabled=False) - assert get_materialized_columns(table)[key].name == destination_column - assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( - destination_column, - MaterializedColumnDetails(source_column, property, is_disabled=False), - is_nullable=False, - ) + # re-enable them and ensure updates apply as needed + update_column_is_disabled(table, materialized_columns.values(), is_disabled=False) + for property_name, destination_column in materialized_columns.items(): + key = (property_name, source_column) + assert get_materialized_columns(table)[key].name == destination_column + assert MaterializedColumn.get(table, destination_column) == MaterializedColumn( + destination_column, + MaterializedColumnDetails(source_column, property_name, is_disabled=False), + is_nullable=False, + ) - # drop it and ensure updates apply as needed - drop_column(table, destination_column) - assert key not in get_materialized_columns(table) - with self.assertRaises(ValueError): - MaterializedColumn.get(table, destination_column) + # drop them and ensure updates apply as needed + drop_column(table, materialized_columns.values()) + for property_name, destination_column in materialized_columns.items(): + key = (property_name, source_column) + assert key not in get_materialized_columns(table) + with self.assertRaises(ValueError): + MaterializedColumn.get(table, destination_column) def _get_latest_mutation_id(self, table: str) -> str: [(mutation_id,)] = sync_execute( diff --git a/ee/management/commands/update_materialized_column.py b/ee/management/commands/update_materialized_column.py index b45444eb4831b..bb55a61545dd6 100644 --- a/ee/management/commands/update_materialized_column.py +++ b/ee/management/commands/update_materialized_column.py @@ -1,7 +1,7 @@ import logging from typing import Any -from collections.abc import Callable +from collections.abc import Callable, Iterable from django.core.management.base import BaseCommand, CommandParser from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns @@ -9,9 +9,9 @@ logger = logging.getLogger(__name__) -COLUMN_OPERATIONS: dict[str, Callable[[TablesWithMaterializedColumns, ColumnName], Any]] = { - "enable": lambda table, column_name: update_column_is_disabled(table, column_name, is_disabled=False), - "disable": lambda table, column_name: update_column_is_disabled(table, column_name, is_disabled=True), +COLUMN_OPERATIONS: dict[str, Callable[[TablesWithMaterializedColumns, Iterable[ColumnName]], Any]] = { + "enable": lambda table, column_names: update_column_is_disabled(table, column_names, is_disabled=False), + "disable": lambda table, column_names: update_column_is_disabled(table, column_names, is_disabled=True), "drop": drop_column, } @@ -20,10 +20,12 @@ class Command(BaseCommand): def add_arguments(self, parser: CommandParser) -> None: parser.add_argument("operation", choices=COLUMN_OPERATIONS.keys()) parser.add_argument("table") - parser.add_argument("column_name") + parser.add_argument("column_names", nargs="+", metavar="column") - def handle(self, operation: str, table: TablesWithMaterializedColumns, column_name: ColumnName, **options): - logger.info("Running %r for %r.%r...", operation, table, column_name) + def handle( + self, operation: str, table: TablesWithMaterializedColumns, column_names: Iterable[ColumnName], **options + ): + logger.info("Running %r on %r for %r...", operation, table, column_names) fn = COLUMN_OPERATIONS[operation] - fn(table, column_name) + fn(table, column_names) logger.info("Success!")