Skip to content

Commit

Permalink
perf: Only drop columns and indexes associated with materialized colu…
Browse files Browse the repository at this point in the history
…mns if they exist (#26664)
  • Loading branch information
tkaemming authored Dec 5, 2024
1 parent 4c95677 commit f04d14c
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 9 deletions.
61 changes: 52 additions & 9 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import re
from collections.abc import Callable, Iterator
from copy import copy
Expand All @@ -22,6 +23,9 @@
from posthog.models.utils import generate_random_short_suffix
from posthog.settings import CLICKHOUSE_DATABASE, CLICKHOUSE_PER_TEAM_SETTINGS, TEST


logger = logging.getLogger(__name__)

T = TypeVar("T")

DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties"
Expand Down Expand Up @@ -161,6 +165,10 @@ def map_data_nodes(self, cluster: ClickhouseCluster, fn: Callable[[Client], T])
}


def get_minmax_index_name(column: str) -> str:
return f"minmax_{column}"


@dataclass
class CreateColumnOnDataNodesTask:
table: str
Expand All @@ -182,7 +190,7 @@ def execute(self, client: Client) -> None:
parameters["comment"] = self.column.details.as_column_comment()

if self.create_minmax_index:
index_name = f"minmax_{self.column.name}"
index_name = get_minmax_index_name(self.column.name)
actions.append(f"ADD INDEX IF NOT EXISTS {index_name} {self.column.name} TYPE minmax GRANULARITY 1")

client.execute(
Expand Down Expand Up @@ -289,26 +297,61 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name:
).result()


def check_index_exists(client: Client, table: str, index: str) -> bool:
[(count,)] = client.execute(
"""
SELECT count()
FROM system.data_skipping_indices
WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s
""",
{"table": table, "name": index},
)
assert 1 >= count >= 0
return bool(count)


def check_column_exists(client: Client, table: str, column: str) -> bool:
[(count,)] = client.execute(
"""
SELECT count()
FROM system.columns
WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s
""",
{"table": table, "name": column},
)
assert 1 >= count >= 0
return bool(count)


@dataclass
class DropColumnTask:
table: str
column_name: str
try_drop_index: bool

def execute(self, client: Client) -> None:
# XXX: copy/pasted from create task
actions = []

if self.try_drop_index:
index_name = f"minmax_{self.column_name}"
index_name = get_minmax_index_name(self.column_name)
drop_index_action = f"DROP INDEX IF EXISTS {index_name}"
if check_index_exists(client, self.table, index_name):
actions.append(drop_index_action)
else:
logger.info("Skipping %r, nothing to do...", drop_index_action)

drop_column_action = f"DROP COLUMN IF EXISTS {self.column_name}"
if check_column_exists(client, self.table, self.column_name):
actions.append(drop_column_action)
else:
logger.info("Skipping %r, nothing to do...", drop_column_action)

if actions:
client.execute(
f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}",
f"ALTER TABLE {self.table} " + ", ".join(actions),
settings={"alter_sync": 2 if TEST else 1},
)

client.execute(
f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}",
settings={"alter_sync": 2 if TEST else 1},
)


def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None:
cluster = get_cluster()
Expand Down
68 changes: 68 additions & 0 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta
from time import sleep
from collections.abc import Iterable
from unittest import TestCase
from unittest.mock import patch

Expand Down Expand Up @@ -336,3 +337,70 @@ def test_lifecycle(self):
assert key not in get_materialized_columns(table, exclude_disabled_columns=True)
with self.assertRaises(ValueError):
MaterializedColumn.get(table, destination_column)

def _get_latest_mutation_id(self, table: str) -> str:
[(mutation_id,)] = sync_execute(
"""
SELECT max(mutation_id)
FROM system.mutations
WHERE
database = currentDatabase()
AND table = %(table)s
""",
{"table": table},
)
return mutation_id

def _get_mutations_since_id(self, table: str, id: str) -> Iterable[str]:
return [
command
for (command,) in sync_execute(
"""
SELECT command
FROM system.mutations
WHERE
database = currentDatabase()
AND table = %(table)s
AND mutation_id > %(mutation_id)s
ORDER BY mutation_id
""",
{"table": table, "mutation_id": id},
)
]

def test_drop_optimized_no_index(self):
table: TablesWithMaterializedColumns = (
"person" # little bit easier than events because no shard awareness needed
)
property: PropertyName = "myprop"
source_column: TableColumn = "properties"

destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP INDEX" in mutation for mutation in mutations_ran)

def test_drop_optimized_no_column(self):
table: TablesWithMaterializedColumns = (
"person" # little bit easier than events because no shard awareness needed
)
property: PropertyName = "myprop"
source_column: TableColumn = "properties"

# create the materialized column
destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column}", settings={"alter_sync": 1})

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP COLUMN" in mutation for mutation in mutations_ran)

0 comments on commit f04d14c

Please sign in to comment.