diff --git a/ee/clickhouse/materialized_columns/columns.py b/ee/clickhouse/materialized_columns/columns.py index 308148597124c..c9624bf96bacd 100644 --- a/ee/clickhouse/materialized_columns/columns.py +++ b/ee/clickhouse/materialized_columns/columns.py @@ -1,21 +1,28 @@ from __future__ import annotations import re -from collections.abc import Iterator +from collections.abc import Callable, Iterator +from copy import copy from dataclasses import dataclass, replace from datetime import timedelta -from typing import Literal, NamedTuple, cast +from typing import Any, Literal, NamedTuple, TypeVar, cast -from clickhouse_driver.errors import ServerException +from clickhouse_driver import Client from django.utils.timezone import now +from posthog.clickhouse.client.connection import default_client +from posthog.clickhouse.cluster import ClickhouseCluster, ConnectionInfo, FuturesMap, HostInfo from posthog.clickhouse.kafka_engine import trim_quotes_expr from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns from posthog.client import sync_execute +from posthog.models.event.sql import EVENTS_DATA_TABLE from posthog.models.instance_setting import get_instance_setting +from posthog.models.person.sql import PERSONS_TABLE from posthog.models.property import PropertyName, TableColumn, TableWithProperties from posthog.models.utils import generate_random_short_suffix -from posthog.settings import CLICKHOUSE_CLUSTER, CLICKHOUSE_DATABASE, TEST +from posthog.settings import CLICKHOUSE_DATABASE, CLICKHOUSE_PER_TEAM_SETTINGS, TEST + +T = TypeVar("T") DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties" @@ -116,135 +123,253 @@ def get_materialized_columns( } -def get_on_cluster_clause_for_table(table: TableWithProperties) -> str: - return f"ON CLUSTER '{CLICKHOUSE_CLUSTER}'" if table == "events" else "" +def get_cluster() -> ClickhouseCluster: + extra_hosts = [] + for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()): + extra_hosts.append(ConnectionInfo(host_config.pop("host"), host_config.pop("port", None))) + assert len(host_config) == 0, f"unexpected values: {host_config!r}" + return ClickhouseCluster(default_client(), extra_hosts=extra_hosts) -def materialize( - table: TableWithProperties, - property: PropertyName, - column_name: ColumnName | None = None, - table_column: TableColumn = DEFAULT_TABLE_COLUMN, - create_minmax_index=not TEST, -) -> ColumnName | None: - if (property, table_column) in get_materialized_columns(table): - if TEST: - return None +@dataclass +class TableInfo: + data_table: str - raise ValueError(f"Property already materialized. table={table}, property={property}, column={table_column}") + @property + def read_table(self) -> str: + return self.data_table - if table_column not in SHORT_TABLE_COLUMN_NAME: - raise ValueError(f"Invalid table_column={table_column} for materialisation") + def map_data_nodes(self, cluster: ClickhouseCluster, fn: Callable[[Client], T]) -> FuturesMap[HostInfo, T]: + return cluster.map_all_hosts(fn) - column_name = column_name or _materialized_column_name(table, property, table_column) - on_cluster = get_on_cluster_clause_for_table(table) - if table == "events": - sync_execute( - f""" - ALTER TABLE sharded_{table} {on_cluster} - ADD COLUMN IF NOT EXISTS - {column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} - """, - {"property": property}, - settings={"alter_sync": 2 if TEST else 1}, - ) - sync_execute( - f""" - ALTER TABLE {table} {on_cluster} - ADD COLUMN IF NOT EXISTS - {column_name} VARCHAR - """, - settings={"alter_sync": 2 if TEST else 1}, - ) - else: - sync_execute( - f""" - ALTER TABLE {table} {on_cluster} - ADD COLUMN IF NOT EXISTS - {column_name} VARCHAR MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} - """, - {"property": property}, - settings={"alter_sync": 2 if TEST else 1}, - ) +@dataclass +class ShardedTableInfo(TableInfo): + dist_table: str - sync_execute( - f"ALTER TABLE {table} {on_cluster} COMMENT COLUMN {column_name} %(comment)s", - {"comment": MaterializedColumnDetails(table_column, property, is_disabled=False).as_column_comment()}, - settings={"alter_sync": 2 if TEST else 1}, - ) + @property + def read_table(self) -> str: + return self.dist_table - if create_minmax_index: - add_minmax_index(table, column_name) + def map_data_nodes(self, cluster: ClickhouseCluster, fn: Callable[[Client], T]) -> FuturesMap[HostInfo, T]: + return cluster.map_one_host_per_shard(fn) - return column_name +tables: dict[str, TableInfo | ShardedTableInfo] = { + PERSONS_TABLE: TableInfo(PERSONS_TABLE), + "events": ShardedTableInfo(EVENTS_DATA_TABLE(), "events"), +} -def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: str, is_disabled: bool) -> None: - details = replace( - MaterializedColumn.get(table, column_name).details, - is_disabled=is_disabled, - ) - on_cluster = get_on_cluster_clause_for_table(table) - sync_execute( - f"ALTER TABLE {table} {on_cluster} COMMENT COLUMN {column_name} %(comment)s", - {"comment": details.as_column_comment()}, - settings={"alter_sync": 2 if TEST else 1}, - ) +@dataclass +class CreateColumnOnDataNodesTask: + table: str + column: MaterializedColumn + create_minmax_index: bool + add_column_comment: bool + def execute(self, client: Client) -> None: + actions = [ + f""" + ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR + MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=self.column.details.table_column)} + """, + ] + parameters = {"property": self.column.details.property_name} -def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: - drop_minmax_index(table, column_name) + if self.add_column_comment: + actions.append(f"COMMENT COLUMN {self.column.name} %(comment)s") + parameters["comment"] = self.column.details.as_column_comment() - on_cluster = get_on_cluster_clause_for_table(table) - sync_execute( - f"ALTER TABLE {table} {on_cluster} DROP COLUMN IF EXISTS {column_name}", - settings={"alter_sync": 2 if TEST else 1}, - ) + if self.create_minmax_index: + index_name = f"minmax_{self.column.name}" + actions.append(f"ADD INDEX IF NOT EXISTS {index_name} {self.column.name} TYPE minmax GRANULARITY 1") - if table == "events": - sync_execute( - f"ALTER TABLE sharded_{table} {on_cluster} DROP COLUMN IF EXISTS {column_name}", - {"property": property}, + client.execute( + f"ALTER TABLE {self.table} " + ", ".join(actions), + parameters, settings={"alter_sync": 2 if TEST else 1}, ) -def add_minmax_index(table: TablesWithMaterializedColumns, column_name: ColumnName): - # Note: This will be populated on backfill - on_cluster = get_on_cluster_clause_for_table(table) - updated_table = "sharded_events" if table == "events" else table - index_name = f"minmax_{column_name}" +@dataclass +class CreateColumnOnQueryNodesTask: + table: str + column: MaterializedColumn - try: - sync_execute( + def execute(self, client: Client) -> None: + client.execute( f""" - ALTER TABLE {updated_table} {on_cluster} - ADD INDEX {index_name} {column_name} - TYPE minmax GRANULARITY 1 + ALTER TABLE {self.table} + ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR, + COMMENT COLUMN {self.column.name} %(comment)s """, + {"comment": self.column.details.as_column_comment()}, settings={"alter_sync": 2 if TEST else 1}, ) - except ServerException as err: - if "index with this name already exists" not in str(err): - raise - return index_name +def materialize( + table: TableWithProperties, + property: PropertyName, + column_name: ColumnName | None = None, + table_column: TableColumn = DEFAULT_TABLE_COLUMN, + create_minmax_index=not TEST, +) -> ColumnName | None: + if (property, table_column) in get_materialized_columns(table): + if TEST: + return None + + raise ValueError(f"Property already materialized. table={table}, property={property}, column={table_column}") -def drop_minmax_index(table: TablesWithMaterializedColumns, column_name: ColumnName) -> None: - on_cluster = get_on_cluster_clause_for_table(table) + if table_column not in SHORT_TABLE_COLUMN_NAME: + raise ValueError(f"Invalid table_column={table_column} for materialisation") - # XXX: copy/pasted from `add_minmax_index` - updated_table = "sharded_events" if table == "events" else table - index_name = f"minmax_{column_name}" + cluster = get_cluster() + table_info = tables[table] - sync_execute( - f"ALTER TABLE {updated_table} {on_cluster} DROP INDEX IF EXISTS {index_name}", - settings={"alter_sync": 2 if TEST else 1}, + column = MaterializedColumn( + name=column_name or _materialized_column_name(table, property, table_column), + details=MaterializedColumnDetails( + table_column=table_column, + property_name=property, + is_disabled=False, + ), ) + table_info.map_data_nodes( + cluster, + CreateColumnOnDataNodesTask( + table_info.data_table, + column, + create_minmax_index, + add_column_comment=table_info.read_table == table_info.data_table, + ).execute, + ).result() + + if isinstance(table_info, ShardedTableInfo): + cluster.map_all_hosts( + CreateColumnOnQueryNodesTask( + table_info.dist_table, + column, + ).execute + ).result() + + return column.name + + +@dataclass +class UpdateColumnCommentTask: + table: str + column: MaterializedColumn + + def execute(self, client: Client) -> None: + client.execute( + f"ALTER TABLE {self.table} COMMENT COLUMN {self.column.name} %(comment)s", + {"comment": self.column.details.as_column_comment()}, + settings={"alter_sync": 2 if TEST else 1}, + ) + + +def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name: str, is_disabled: bool) -> None: + cluster = get_cluster() + table_info = tables[table] + + cluster.map_all_hosts( + UpdateColumnCommentTask( + table_info.read_table, + MaterializedColumn( + name=column_name, + details=replace( + MaterializedColumn.get(table, column_name).details, + is_disabled=is_disabled, + ), + ), + ).execute + ).result() + + +@dataclass +class DropColumnTask: + table: str + column_name: str + try_drop_index: bool + + def execute(self, client: Client) -> None: + # XXX: copy/pasted from create task + if self.try_drop_index: + index_name = f"minmax_{self.column_name}" + client.execute( + f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}", + settings={"alter_sync": 2 if TEST else 1}, + ) + + client.execute( + f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}", + settings={"alter_sync": 2 if TEST else 1}, + ) + + +def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None: + cluster = get_cluster() + table_info = tables[table] + + if isinstance(table_info, ShardedTableInfo): + cluster.map_all_hosts( + DropColumnTask( + table_info.dist_table, + column_name, + try_drop_index=False, # no indexes on distributed tables + ).execute + ).result() + + table_info.map_data_nodes( + cluster, + DropColumnTask( + table_info.data_table, + column_name, + try_drop_index=True, + ).execute, + ).result() + + +@dataclass +class BackfillColumnTask: + table: str + columns: list[MaterializedColumn] + backfill_period: timedelta | None + test_settings: dict[str, Any] | None + + def execute(self, client: Client) -> None: + # Hack from https://github.com/ClickHouse/ClickHouse/issues/19785 + # Note that for this to work all inserts should list columns explicitly + # Improve this if https://github.com/ClickHouse/ClickHouse/issues/27730 ever gets resolved + for column in self.columns: + client.execute( + f""" + ALTER TABLE {self.table} + MODIFY COLUMN {column.name} VARCHAR DEFAULT {TRIM_AND_EXTRACT_PROPERTY.format(table_column=column.details.table_column)} + """, + {"property": column.details.property_name}, + settings=self.test_settings, + ) + + # Kick off mutations which will update clickhouse partitions in the background. This will return immediately + assignments = ", ".join(f"{column.name} = {column.name}" for column in self.columns) + + if self.backfill_period is not None: + where_clause = "timestamp > %(cutoff)s" + parameters = {"cutoff": (now() - self.backfill_period).strftime("%Y-%m-%d")} + else: + where_clause = "1 = 1" + parameters = {} + + client.execute( + f"ALTER TABLE {self.table} UPDATE {assignments} WHERE {where_clause}", + parameters, + settings=self.test_settings, + ) + def backfill_materialized_columns( table: TableWithProperties, @@ -261,40 +386,25 @@ def backfill_materialized_columns( if len(properties) == 0: return - updated_table = "sharded_events" if table == "events" else table - on_cluster = get_on_cluster_clause_for_table(table) - - materialized_columns = get_materialized_columns(table) - - # Hack from https://github.com/ClickHouse/ClickHouse/issues/19785 - # Note that for this to work all inserts should list columns explicitly - # Improve this if https://github.com/ClickHouse/ClickHouse/issues/27730 ever gets resolved - for property, table_column in properties: - sync_execute( - f""" - ALTER TABLE {updated_table} {on_cluster} - MODIFY COLUMN - {materialized_columns[(property, table_column)]} VARCHAR DEFAULT {TRIM_AND_EXTRACT_PROPERTY.format(table_column=table_column)} - """, - {"property": property}, - settings=test_settings, - ) - - # Kick off mutations which will update clickhouse partitions in the background. This will return immediately - assignments = ", ".join( - f"{materialized_columns[property_and_column]} = {materialized_columns[property_and_column]}" - for property_and_column in properties - ) + cluster = get_cluster() + table_info = tables[table] - sync_execute( - f""" - ALTER TABLE {updated_table} {on_cluster} - UPDATE {assignments} - WHERE {"timestamp > %(cutoff)s" if table == "events" else "1 = 1"} - """, - {"cutoff": (now() - backfill_period).strftime("%Y-%m-%d")}, - settings=test_settings, - ) + # TODO: this will eventually need to handle duplicates + materialized_columns = { + (column.details.property_name, column.details.table_column): column + for column in MaterializedColumn.get_all(table) + } + columns = [materialized_columns[property] for property in properties] + + table_info.map_data_nodes( + cluster, + BackfillColumnTask( + table_info.data_table, + columns, + backfill_period if table == "events" else None, # XXX + test_settings, + ).execute, + ).result() def _materialized_column_name( diff --git a/posthog/clickhouse/cluster.py b/posthog/clickhouse/cluster.py new file mode 100644 index 0000000000000..3aa67c94ff3b5 --- /dev/null +++ b/posthog/clickhouse/cluster.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import logging +from collections.abc import Callable, Iterator, Sequence +from concurrent.futures import ALL_COMPLETED, FIRST_EXCEPTION, Future, ThreadPoolExecutor, as_completed +from typing import Literal, NamedTuple, TypeVar + +from clickhouse_driver import Client +from clickhouse_pool import ChPool +from django.conf import settings + +from posthog.clickhouse.client.connection import make_ch_pool + + +logger = logging.getLogger(__name__) + + +K = TypeVar("K") +V = TypeVar("V") + + +class FuturesMap(dict[K, Future[V]]): + def as_completed(self, timeout: float | int | None = None) -> Iterator[tuple[K, Future[V]]]: + reverse_map = {v: k for k, v in self.items()} + assert len(reverse_map) == len(self) + + for f in as_completed(self.values(), timeout=timeout): + yield reverse_map[f], f + + def result( + self, + timeout: float | int | None = None, + return_when: Literal["FIRST_EXCEPTION", "ALL_COMPLETED"] = ALL_COMPLETED, + ) -> dict[K, V]: + results = {} + errors = {} + for k, future in self.as_completed(timeout=timeout): + try: + results[k] = future.result() + except Exception as e: + if return_when is FIRST_EXCEPTION: + raise + else: + errors[k] = e + + if errors: + # TODO: messaging could be improved here + raise ExceptionGroup("not all futures returned a result", [*errors.values()]) + + return results + + +class ConnectionInfo(NamedTuple): + address: str + port: int + + +class HostInfo(NamedTuple): + connection_info: ConnectionInfo + shard_num: int | None + replica_num: int | None + + +T = TypeVar("T") + + +class ClickhouseCluster: + def __init__(self, bootstrap_client: Client, extra_hosts: Sequence[ConnectionInfo] | None = None) -> None: + self.__hosts = [ + HostInfo(ConnectionInfo(host_address, port), shard_num, replica_num) + for (host_address, port, shard_num, replica_num) in bootstrap_client.execute( + """ + SELECT host_address, port, shard_num, replica_num + FROM system.clusters + WHERE name = %(name)s + ORDER BY shard_num, replica_num + """, + {"name": settings.CLICKHOUSE_CLUSTER}, + ) + ] + if extra_hosts is not None: + self.__hosts.extend( + [HostInfo(connection_info, shard_num=None, replica_num=None) for connection_info in extra_hosts] + ) + self.__pools: dict[HostInfo, ChPool] = {} + + def __get_task_function(self, host: HostInfo, fn: Callable[[Client], T]) -> Callable[[], T]: + pool = self.__pools.get(host) + if pool is None: + pool = self.__pools[host] = make_ch_pool(host=host.connection_info.address, port=host.connection_info.port) + + def task(): + with pool.get_client() as client: + logger.debug("Executing %r on %r...", fn, host) + try: + result = fn(client) + except Exception: + logger.exception("Failed to execute %r on %r!", fn, host) + raise + else: + logger.debug("Successfully executed %r on %r.", fn, host) + return result + + return task + + def map_all_hosts(self, fn: Callable[[Client], T]) -> FuturesMap[HostInfo, T]: + """ + Execute the callable once for each host in the cluster. + """ + with ThreadPoolExecutor() as executor: + return FuturesMap({host: executor.submit(self.__get_task_function(host, fn)) for host in self.__hosts}) + + def map_one_host_per_shard(self, fn: Callable[[Client], T]) -> FuturesMap[HostInfo, T]: + """ + Execute the callable once for each shard in the cluster. + """ + shard_hosts: dict[int, HostInfo] = {} + for host in self.__hosts: + if host.shard_num is not None and host.shard_num not in shard_hosts: + shard_hosts[host.shard_num] = host + + with ThreadPoolExecutor() as executor: + return FuturesMap( + {host: executor.submit(self.__get_task_function(host, fn)) for host in shard_hosts.values()} + )