Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Only drop columns and indexes associated with materialized columns if they exist #26664

Merged
merged 9 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import logging
import re
from collections.abc import Callable, Iterator
from copy import copy
Expand All @@ -22,6 +23,9 @@
from posthog.models.utils import generate_random_short_suffix
from posthog.settings import CLICKHOUSE_DATABASE, CLICKHOUSE_PER_TEAM_SETTINGS, TEST


logger = logging.getLogger(__name__)

T = TypeVar("T")

DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties"
Expand Down Expand Up @@ -161,6 +165,10 @@ def map_data_nodes(self, cluster: ClickhouseCluster, fn: Callable[[Client], T])
}


def get_minmax_index_name(column: str) -> str:
return f"minmax_{column}"


@dataclass
class CreateColumnOnDataNodesTask:
table: str
Expand All @@ -182,7 +190,7 @@ def execute(self, client: Client) -> None:
parameters["comment"] = self.column.details.as_column_comment()

if self.create_minmax_index:
index_name = f"minmax_{self.column.name}"
index_name = get_minmax_index_name(self.column.name)
actions.append(f"ADD INDEX IF NOT EXISTS {index_name} {self.column.name} TYPE minmax GRANULARITY 1")

client.execute(
Expand Down Expand Up @@ -289,26 +297,61 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name:
).result()


def check_index_exists(client: Client, table: str, index: str) -> bool:
[(count,)] = client.execute(
"""
SELECT count()
FROM system.data_skipping_indices
WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s
""",
{"table": table, "name": index},
)
assert 1 >= count >= 0
return bool(count)


def check_column_exists(client: Client, table: str, column: str) -> bool:
[(count,)] = client.execute(
"""
SELECT count()
FROM system.columns
WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s
""",
{"table": table, "name": column},
)
assert 1 >= count >= 0
return bool(count)


@dataclass
class DropColumnTask:
table: str
column_name: str
try_drop_index: bool

def execute(self, client: Client) -> None:
# XXX: copy/pasted from create task
actions = []

if self.try_drop_index:
index_name = f"minmax_{self.column_name}"
index_name = get_minmax_index_name(self.column_name)
drop_index_action = f"DROP INDEX IF EXISTS {index_name}"
if check_index_exists(client, self.table, index_name):
actions.append(drop_index_action)
else:
logger.info("Skipping %r, nothing to do...", drop_index_action)

drop_column_action = f"DROP COLUMN IF EXISTS {self.column_name}"
if check_column_exists(client, self.table, self.column_name):
actions.append(drop_column_action)
else:
logger.info("Skipping %r, nothing to do...", drop_column_action)

if actions:
client.execute(
f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}",
f"ALTER TABLE {self.table} " + ", ".join(actions),
settings={"alter_sync": 2 if TEST else 1},
)

client.execute(
f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}",
settings={"alter_sync": 2 if TEST else 1},
)


def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None:
cluster = get_cluster()
Expand Down
68 changes: 68 additions & 0 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import timedelta
from time import sleep
from collections.abc import Iterable
from unittest import TestCase
from unittest.mock import patch

Expand Down Expand Up @@ -336,3 +337,70 @@ def test_lifecycle(self):
assert key not in get_materialized_columns(table, exclude_disabled_columns=True)
with self.assertRaises(ValueError):
MaterializedColumn.get(table, destination_column)

def _get_latest_mutation_id(self, table: str) -> str:
[(mutation_id,)] = sync_execute(
"""
SELECT max(mutation_id)
FROM system.mutations
WHERE
database = currentDatabase()
AND table = %(table)s
""",
{"table": table},
)
return mutation_id

def _get_mutations_since_id(self, table: str, id: str) -> Iterable[str]:
return [
command
for (command,) in sync_execute(
"""
SELECT command
FROM system.mutations
WHERE
database = currentDatabase()
AND table = %(table)s
AND mutation_id > %(mutation_id)s
ORDER BY mutation_id
""",
{"table": table, "mutation_id": id},
)
]

def test_drop_optimized_no_index(self):
table: TablesWithMaterializedColumns = (
"person" # little bit easier than events because no shard awareness needed
)
property: PropertyName = "myprop"
source_column: TableColumn = "properties"

destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP INDEX" in mutation for mutation in mutations_ran)

def test_drop_optimized_no_column(self):
table: TablesWithMaterializedColumns = (
"person" # little bit easier than events because no shard awareness needed
)
property: PropertyName = "myprop"
source_column: TableColumn = "properties"

# create the materialized column
destination_column = materialize(table, property, table_column=source_column, create_minmax_index=False)
assert destination_column is not None

sync_execute(f"ALTER TABLE {table} DROP COLUMN {destination_column}", settings={"alter_sync": 1})

latest_mutation_id_before_drop = self._get_latest_mutation_id(table)

drop_column(table, destination_column)

mutations_ran = self._get_mutations_since_id(table, latest_mutation_id_before_drop)
assert not any("DROP COLUMN" in mutation for mutation in mutations_ran)
Loading