diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index acfc972c7d3d7..592d17b1cb8f1 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import re from collections.abc import Callable, Iterator from copy import copy @@ -22,6 +23,9 @@ from posthog.models.utils import generate_random_short_suffix from posthog.settings import CLICKHOUSE_DATABASE, CLICKHOUSE_PER_TEAM_SETTINGS, TEST + +logger = logging.getLogger(__name__) + T = TypeVar("T") DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties" @@ -161,6 +165,10 @@ def map_data_nodes(self, cluster: ClickhouseCluster, fn: Callable[[Client], T]) } +def get_minmax_index_name(column: str) -> str: + return f"minmax_{column}" + + @dataclass class CreateColumnOnDataNodesTask: table: str @@ -182,7 +190,7 @@ def execute(self, client: Client) -> None: parameters["comment"] = self.column.details.as_column_comment() if self.create_minmax_index: - index_name = f"minmax_{self.column.name}" + index_name = get_minmax_index_name(self.column.name) actions.append(f"ADD INDEX IF NOT EXISTS {index_name} {self.column.name} TYPE minmax GRANULARITY 1") client.execute( @@ -289,6 +297,32 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: ).result() +def check_index_exists(client: Client, table: str, index: str) -> bool: + [(count,)] = client.execute( + """ + SELECT count() + FROM system.data_skipping_indices + WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s + """, + {"table": table, "name": index}, + ) + assert 1 >= count >= 0 + return bool(count) + + +def check_column_exists(client: Client, table: str, column: str) -> bool: + [(count,)] = client.execute( + """ + SELECT count() + FROM system.columns + WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s + """, + {"table": table, "name": column}, + ) + assert 1 >= count >= 0 + return bool(count) + + @dataclass class DropColumnTask: table: str @@ -296,19 +330,28 @@ class DropColumnTask: try_drop_index: bool def execute(self, client: Client) -> None: - # XXX: copy/pasted from create task + actions = [] + if self.try_drop_index: - index_name = f"minmax_{self.column_name}" + index_name = get_minmax_index_name(self.column_name) + drop_index_action = f"DROP INDEX IF EXISTS {index_name}" + if check_index_exists(client, self.table, index_name): + actions.append(drop_index_action) + else: + logger.info("Skipping %r, nothing to do...", drop_index_action) + + drop_column_action = f"DROP COLUMN IF EXISTS {self.column_name}" + if check_column_exists(client, self.table, self.column_name): + actions.append(drop_column_action) + else: + logger.info("Skipping %r, nothing to do...", drop_column_action) + + if actions: client.execute( - f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}", + f"ALTER TABLE {self.table} " + ", ".join(actions), settings={"alter_sync": 2 if TEST else 1}, ) - client.execute( - f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}", - settings={"alter_sync": 2 if TEST else 1}, - ) - def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: cluster = get_cluster() diff --git a/ee/clickhouse/materialized_columns/test/test_columns.py b/ee/clickhouse/materialized_columns/test/test_columns.py index 4cbbef0c4a416..3978d010f7246 100644 --- a/ee/clickhouse/materialized_columns/test/test_columns.py +++ b/ee/clickhouse/materialized_columns/test/test_columns.py @@ -1,5 +1,6 @@ from datetime import timedelta from time import sleep +from collections.abc import Iterable from unittest import TestCase from unittest.mock import patch @@ -336,3 +337,70 @@ def test_lifecycle(self): assert key not in get_materialized_columns(table, exclude_disabled_columns=True) with self.assertRaises(ValueError): MaterializedColumn.get(table, destination_column) + + def _get_latest_mutation_id(self, table: str) -> str: + [(mutation_id,)] = sync_execute( + """ + SELECT max(mutation_id) + FROM system.mutations + WHERE + database = currentDatabase() + AND table = %(table)s + """, + {"table": table}, + ) + return mutation_id + + def _get_mutations_since_id(self, table: str, id: str) -> Iterable[str]: + return [ + command + for (command,) in sync_execute( + """ + SELECT command + FROM system.mutations + WHERE + database = currentDatabase() + AND table = %(table)s + AND mutation_id > %(mutation_id)s + ORDER BY mutation_id + """, + {"table": table, "mutation_id": id}, + ) + ] + + def test_drop_optimized_no_index(self): + table: TablesWithMaterializedColumns = ( + "person" # little bit easier than events because no shard awareness needed + ) + property: PropertyName = "myprop" + source_column: TableColumn = "properties" + + destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False) + assert destination_column is not None + + latest_mutation_id_before_drop = self._get_latest_mutation_id(table) + + drop_column(table, destination_column) + + mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop) + assert not any("DROP INDEX" in mutation for mutation in mutations_ran) + + def test_drop_optimized_no_column(self): + table: TablesWithMaterializedColumns = ( + "person" # little bit easier than events because no shard awareness needed + ) + property: PropertyName = "myprop" + source_column: TableColumn = "properties" + + # create the materialized column + destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False) + assert destination_column is not None + + sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column}", settings={"alter_sync": 1}) + + latest_mutation_id_before_drop = self._get_latest_mutation_id(table) + + drop_column(table, destination_column) + + mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop) + assert not any("DROP COLUMN" in mutation for mutation in mutations_ran)