From 0a48625bf96a1efcb09df148e84c8521741d8c57 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:11:09 -0800 Subject: [PATCH 1/9] conditionally drop index --- ee/clickhouse/materialized_columns/columns.py | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index c9624bf96bacd..0bdab9e786ef5 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import re from collections.abc import Callable, Iterator from copy import copy @@ -22,6 +23,9 @@ from posthog.models.utils import generate_random_short_suffix from posthog.settings import CLICKHOUSE_DATABASE, CLICKHOUSE_PER_TEAM_SETTINGS, TEST + +logger = logging.getLogger(__name__) + T = TypeVar("T") DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties" @@ -296,13 +300,22 @@ class DropColumnTask: try_drop_index: bool def execute(self, client: Client) -> None: - # XXX: copy/pasted from create task if self.try_drop_index: + # XXX: copy/pasted from create task index_name = f"minmax_{self.column_name}" - client.execute( - f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}", - settings={"alter_sync": 2 if TEST else 1}, - ) + match client.execute( + "SELECT count() FROM system.data_skipping_indices WHERE table = %(table)s AND name = %(name)s", + {"table": self.table, "name": index_name}, + ): + case [(1,)]: + client.execute( + f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}", + settings={"alter_sync": 2 if TEST else 1}, + ) + case [(0,)]: + logger.info("Skipping DROP INDEX for %r, nothing to do...", index_name) + case _: + raise Exception("received unexpected response") client.execute( f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}", From 44d8008deccc92c7955d85a5eff420f3a0ca13d4 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:13:42 -0800 Subject: [PATCH 2/9] clean up --- ee/clickhouse/materialized_columns/columns.py | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 0bdab9e786ef5..c4cde6df0b847 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -293,6 +293,19 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: ).result() +def check_index_exists(client: Client, table: str, index: str) -> bool: + match client.execute( + "SELECT count() FROM system.data_skipping_indices WHERE table = %(table)s AND name = %(name)s", + {"table": table, "name": index}, + ): + case [(1,)]: + return True + case [(0,)]: + return False + case _: + raise Exception("received unexpected response") + + @dataclass class DropColumnTask: table: str @@ -303,19 +316,13 @@ def execute(self, client: Client) -> None: if self.try_drop_index: # XXX: copy/pasted from create task index_name = f"minmax_{self.column_name}" - match client.execute( - "SELECT count() FROM system.data_skipping_indices WHERE table = %(table)s AND name = %(name)s", - {"table": self.table, "name": index_name}, - ): - case [(1,)]: - client.execute( - f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}", - settings={"alter_sync": 2 if TEST else 1}, - ) - case [(0,)]: - logger.info("Skipping DROP INDEX for %r, nothing to do...", index_name) - case _: - raise Exception("received unexpected response") + if check_index_exists(client, self.table, index_name): + client.execute( + f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}", + settings={"alter_sync": 2 if TEST else 1}, + ) + else: + logger.info("Skipping DROP INDEX for %r, nothing to do...", index_name) client.execute( f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}", From 77a292a274ce22761342bb135e58b885d3d4bfe5 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:15:39 -0800 Subject: [PATCH 3/9] also make drop column conditional --- ee/clickhouse/materialized_columns/columns.py | 26 +++++++++++++++---- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index c4cde6df0b847..406ecadae7bbb 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -306,6 +306,19 @@ def check_index_exists(client: Client, table: str, index: str) -> bool: raise Exception("received unexpected response") +def check_column_exists(client: Client, table: str, column: str) -> bool: + match client.execute( + "SELECT count() FROM system.columns WHERE table = %(table)s AND name = %(name)s", + {"table": table, "name": column}, + ): + case [(1,)]: + return True + case [(0,)]: + return False + case _: + raise Exception("received unexpected response") + + @dataclass class DropColumnTask: table: str @@ -322,12 +335,15 @@ def execute(self, client: Client) -> None: settings={"alter_sync": 2 if TEST else 1}, ) else: - logger.info("Skipping DROP INDEX for %r, nothing to do...", index_name) + logger.info("Skipping DROP INDEX for %r, nothing to do...", self) - client.execute( - f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}", - settings={"alter_sync": 2 if TEST else 1}, - ) + if check_column_exists(client, self.table, self.column_name): + client.execute( + f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}", + settings={"alter_sync": 2 if TEST else 1}, + ) + else: + logger.info("Skipping DROP COLUMN for %r, nothing to do...", self) def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: From fdbae9f40bda0e9aab41a8a0730521b989a866ca Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:20:37 -0800 Subject: [PATCH 4/9] single statement --- ee/clickhouse/materialized_columns/columns.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 406ecadae7bbb..50bfa454d22d1 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -326,24 +326,26 @@ class DropColumnTask: try_drop_index: bool def execute(self, client: Client) -> None: + actions = [] + if self.try_drop_index: # XXX: copy/pasted from create task index_name = f"minmax_{self.column_name}" if check_index_exists(client, self.table, index_name): - client.execute( - f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}", - settings={"alter_sync": 2 if TEST else 1}, - ) + actions.append(f"DROP INDEX IF EXISTS {index_name}") else: logger.info("Skipping DROP INDEX for %r, nothing to do...", self) if check_column_exists(client, self.table, self.column_name): + actions.append(f"DROP COLUMN IF EXISTS {self.column_name}") + else: + logger.info("Skipping DROP COLUMN for %r, nothing to do...", self) + + if actions: client.execute( - f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}", + f"ALTER TABLE {self.table} " + ", ".join(actions), settings={"alter_sync": 2 if TEST else 1}, ) - else: - logger.info("Skipping DROP COLUMN for %r, nothing to do...", self) def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: From 8a03a0f332eaa2e87665562e909c5e52b59ac3d1 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:23:00 -0800 Subject: [PATCH 5/9] qualify current database --- ee/clickhouse/materialized_columns/columns.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 50bfa454d22d1..d19f8d5cdecdd 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -295,7 +295,11 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: def check_index_exists(client: Client, table: str, index: str) -> bool: match client.execute( - "SELECT count() FROM system.data_skipping_indices WHERE table = %(table)s AND name = %(name)s", + """ + SELECT count() + FROM system.data_skipping_indices + WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s + """, {"table": table, "name": index}, ): case [(1,)]: @@ -308,7 +312,11 @@ def check_index_exists(client: Client, table: str, index: str) -> bool: def check_column_exists(client: Client, table: str, column: str) -> bool: match client.execute( - "SELECT count() FROM system.columns WHERE table = %(table)s AND name = %(name)s", + """ + SELECT count() + FROM system.columns + WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s + """, {"table": table, "name": column}, ): case [(1,)]: From 9ce036756d73c4337455aba0b861d7597eb188c7 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Wed, 4 Dec 2024 17:24:39 -0800 Subject: [PATCH 6/9] tidy up logging --- ee/clickhouse/materialized_columns/columns.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index d19f8d5cdecdd..63f1bc8fdf9a4 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -339,15 +339,17 @@ def execute(self, client: Client) -> None: if self.try_drop_index: # XXX: copy/pasted from create task index_name = f"minmax_{self.column_name}" + drop_index_action = f"DROP INDEX IF EXISTS {index_name}" if check_index_exists(client, self.table, index_name): - actions.append(f"DROP INDEX IF EXISTS {index_name}") + actions.append(drop_index_action) else: - logger.info("Skipping DROP INDEX for %r, nothing to do...", self) + logger.info("Skipping %r, nothing to do...", drop_index_action) + drop_column_action = f"DROP COLUMN IF EXISTS {self.column_name}" if check_column_exists(client, self.table, self.column_name): - actions.append(f"DROP COLUMN IF EXISTS {self.column_name}") + actions.append(drop_column_action) else: - logger.info("Skipping DROP COLUMN for %r, nothing to do...", self) + logger.info("Skipping %r, nothing to do...", drop_column_action) if actions: client.execute( From 50b80068da704fa1996ef65fddadddd4622ab224 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:26:11 -0800 Subject: [PATCH 7/9] test and tidy --- ee/clickhouse/materialized_columns/columns.py | 9 ++- .../materialized_columns/test/test_columns.py | 69 +++++++++++++++++++ 2 files changed, 75 insertions(+), 3 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 63f1bc8fdf9a4..6df964f4a3841 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -165,6 +165,10 @@ def map_data_nodes(self, cluster: ClickhouseCluster, fn: Callable[[Client], T]) } +def get_minmax_index_name(column: str) -> str: + return f"minmax_{column}" + + @dataclass class CreateColumnOnDataNodesTask: table: str @@ -186,7 +190,7 @@ def execute(self, client: Client) -> None: parameters["comment"] = self.column.details.as_column_comment() if self.create_minmax_index: - index_name = f"minmax_{self.column.name}" + index_name = get_minmax_index_name(self.column.name) actions.append(f"ADD INDEX IF NOT EXISTS {index_name} {self.column.name} TYPE minmax GRANULARITY 1") client.execute( @@ -337,8 +341,7 @@ def execute(self, client: Client) -> None: actions = [] if self.try_drop_index: - # XXX: copy/pasted from create task - index_name = f"minmax_{self.column_name}" + index_name = get_minmax_index_name(self.column_name) drop_index_action = f"DROP INDEX IF EXISTS {index_name}" if check_index_exists(client, self.table, index_name): actions.append(drop_index_action) diff --git a/ee/clickhouse/materialized_columns/test/test_columns.py b/ee/clickhouse/materialized_columns/test/test_columns.py index 4cbbef0c4a416..b9874b161b3d4 100644 --- a/ee/clickhouse/materialized_columns/test/test_columns.py +++ b/ee/clickhouse/materialized_columns/test/test_columns.py @@ -1,5 +1,6 @@ from datetime import timedelta from time import sleep +from collections.abc import Iterable from unittest import TestCase from unittest.mock import patch @@ -336,3 +337,71 @@ def test_lifecycle(self): assert key not in get_materialized_columns(table, exclude_disabled_columns=True) with self.assertRaises(ValueError): MaterializedColumn.get(table, destination_column) + + def _get_latest_mutation_id(self, table: str) -> str: + match sync_execute( + """ + SELECT max(mutation_id) + FROM system.mutations + WHERE + database = currentDatabase() + AND table = %(table)s + """, + {"table": table}, + ): + case [(mutation_id,)]: + return mutation_id + + def _get_mutations_since_id(self, table: str, id: str) -> Iterable[str]: + return [ + command + for (command,) in sync_execute( + """ + SELECT command + FROM system.mutations + WHERE + database = currentDatabase() + AND table = %(table)s + AND mutation_id > %(mutation_id)s + ORDER BY mutation_id + """, + {"table": table, "mutation_id": id}, + ) + ] + + def test_drop_optimized_no_index(self): + table: TablesWithMaterializedColumns = ( + "person" # little bit easier than events because no shard awareness needed + ) + property: PropertyName = "myprop" + source_column: TableColumn = "properties" + + destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False) + assert destination_column is not None + + latest_mutation_id_before_drop = self._get_latest_mutation_id(table) + + drop_column(table, destination_column) + + mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop) + assert not any("DROP INDEX" in mutation for mutation in mutations_ran) + + def test_drop_optimized_no_column(self): + table: TablesWithMaterializedColumns = ( + "person" # little bit easier than events because no shard awareness needed + ) + property: PropertyName = "myprop" + source_column: TableColumn = "properties" + + # create the materialized column + destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False) + assert destination_column is not None + + sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column}", settings={"alter_sync": 1}) + + latest_mutation_id_before_drop = self._get_latest_mutation_id(table) + + drop_column(table, destination_column) + + mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop) + assert not any("DROP COLUMN" in mutation for mutation in mutations_ran) From cd5ed5b40e8a41ff7ae005b82fa255eca4f532bf Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:35:18 -0800 Subject: [PATCH 8/9] mypy is NOT my friend --- ee/clickhouse/materialized_columns/columns.py | 24 +++++++------------ 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 6df964f4a3841..3815c84751608 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -298,37 +298,29 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: def check_index_exists(client: Client, table: str, index: str) -> bool: - match client.execute( + [(count,)] = client.execute( """ SELECT count() FROM system.data_skipping_indices WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s """, {"table": table, "name": index}, - ): - case [(1,)]: - return True - case [(0,)]: - return False - case _: - raise Exception("received unexpected response") + ) + assert 1 >= count >= 0 + return bool(count) def check_column_exists(client: Client, table: str, column: str) -> bool: - match client.execute( + [(count,)] = client.execute( """ SELECT count() FROM system.columns WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s """, {"table": table, "name": column}, - ): - case [(1,)]: - return True - case [(0,)]: - return False - case _: - raise Exception("received unexpected response") + ) + assert 1 >= count >= 0 + return bool(count) @dataclass From a900fdcffeaf748f908811588f8684f4030c83f8 Mon Sep 17 00:00:00 2001 From: Ted Kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 5 Dec 2024 11:36:48 -0800 Subject: [PATCH 9/9] let me match --- ee/clickhouse/materialized_columns/test/test_columns.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ee/clickhouse/materialized_columns/test/test_columns.py b/ee/clickhouse/materialized_columns/test/test_columns.py index b9874b161b3d4..3978d010f7246 100644 --- a/ee/clickhouse/materialized_columns/test/test_columns.py +++ b/ee/clickhouse/materialized_columns/test/test_columns.py @@ -339,7 +339,7 @@ def test_lifecycle(self): MaterializedColumn.get(table, destination_column) def _get_latest_mutation_id(self, table: str) -> str: - match sync_execute( + [(mutation_id,)] = sync_execute( """ SELECT max(mutation_id) FROM system.mutations @@ -348,9 +348,8 @@ def _get_latest_mutation_id(self, table: str) -> str: AND table = %(table)s """, {"table": table}, - ): - case [(mutation_id,)]: - return mutation_id + ) + return mutation_id def _get_mutations_since_id(self, table: str, id: str) -> Iterable[str]: return [