Skip to content

Commit

Permalink
Merge branch 'master' into taxonomy-project
Browse files Browse the repository at this point in the history
  • Loading branch information
Twixes authored Dec 5, 2024
2 parents 6cca91d + dcb77c3 commit b25fb29
Show file tree
Hide file tree
Showing 93 changed files with 2,183 additions and 1,217 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/browserslist-update.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Update Browserslist database

on:
schedule:
- cron: '0 12 * * MON'
workflow_dispatch:

permissions:
contents: write
pull-requests: write

jobs:
update-browserslist-database:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Configure git
run: |
git config --global user.email "[email protected]"
git config --global user.name "Browserslist Update Action"
- name: Install pnpm
uses: pnpm/action-setup@v4

- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: 18.12.1

- name: Update Browserslist database and create PR if applies
uses: c2corg/browserslist-update-action@v2
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
commit_message: 'build: update Browserslist db'
title: 'build: update Browserslist db'
labels: 'dependencies, automerge'
2 changes: 1 addition & 1 deletion ee/benchmarks/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

django.setup()

from posthog.clickhouse.materialized_columns import get_enabled_materialized_columns # noqa: E402
from ee.clickhouse.materialized_columns.columns import get_enabled_materialized_columns # noqa: E402
from posthog import client # noqa: E402
from posthog.clickhouse.query_tagging import reset_query_tags, tag_queries # noqa: E402
from posthog.models.utils import UUIDT # noqa: E402
Expand Down
3 changes: 2 additions & 1 deletion ee/clickhouse/materialized_columns/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def materialize_properties_task(
backfill_period_days: int = MATERIALIZE_COLUMNS_BACKFILL_PERIOD_DAYS,
dry_run: bool = False,
team_id_to_analyze: Optional[int] = None,
is_nullable: bool = False,
) -> None:
"""
Creates materialized columns for event and person properties based off of slow queries
Expand Down Expand Up @@ -203,7 +204,7 @@ def materialize_properties_task(
logger.info(f"Materializing column. table={table}, property_name={property_name}")

if not dry_run:
materialize(table, property_name, table_column=table_column)
materialize(table, property_name, table_column=table_column, is_nullable=is_nullable)
properties[table].append((property_name, table_column))

if backfill_period_days > 0 and not dry_run:
Expand Down
144 changes: 102 additions & 42 deletions ee/clickhouse/materialized_columns/columns.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
from __future__ import annotations

import logging
import re
from collections.abc import Callable, Iterator
from copy import copy
from dataclasses import dataclass, replace
from datetime import timedelta
from typing import Any, Literal, NamedTuple, TypeVar, cast
from typing import Any, Literal, TypeVar, cast

from clickhouse_driver import Client
from django.utils.timezone import now

from posthog.cache_utils import cache_for
from posthog.clickhouse.client.connection import default_client
from posthog.clickhouse.cluster import ClickhouseCluster, ConnectionInfo, FuturesMap, HostInfo
from posthog.clickhouse.kafka_engine import trim_quotes_expr
from posthog.clickhouse.materialized_columns import ColumnName, TablesWithMaterializedColumns
from posthog.client import sync_execute
from posthog.models.event.sql import EVENTS_DATA_TABLE
from posthog.models.instance_setting import get_instance_setting
from posthog.models.person.sql import PERSONS_TABLE
from posthog.models.property import PropertyName, TableColumn, TableWithProperties
from posthog.models.utils import generate_random_short_suffix
from posthog.settings import CLICKHOUSE_DATABASE, CLICKHOUSE_PER_TEAM_SETTINGS, TEST


logger = logging.getLogger(__name__)

T = TypeVar("T")

DEFAULT_TABLE_COLUMN: Literal["properties"] = "properties"

TRIM_AND_EXTRACT_PROPERTY = trim_quotes_expr("JSONExtractRaw({table_column}, %(property)s)")

SHORT_TABLE_COLUMN_NAME = {
"properties": "p",
"group_properties": "gp",
Expand All @@ -40,15 +42,36 @@
}


class MaterializedColumn(NamedTuple):
@dataclass
class MaterializedColumn:
name: ColumnName
details: MaterializedColumnDetails
is_nullable: bool

@property
def type(self) -> str:
if self.is_nullable:
return "Nullable(String)"
else:
return "String"

def get_expression_and_parameters(self) -> tuple[str, dict[str, Any]]:
if self.is_nullable:
return (
f"JSONExtract({self.details.table_column}, %(property_name)s, %(property_type)s)",
{"property_name": self.details.property_name, "property_type": self.type},
)
else:
return (
trim_quotes_expr(f"JSONExtractRaw({self.details.table_column}, %(property)s)"),
{"property": self.details.property_name},
)

@staticmethod
def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn]:
rows = sync_execute(
"""
SELECT name, comment
SELECT name, comment, type like 'Nullable(%%)' as is_nullable
FROM system.columns
WHERE database = %(database)s
AND table = %(table)s
Expand All @@ -58,8 +81,8 @@ def get_all(table: TablesWithMaterializedColumns) -> Iterator[MaterializedColumn
{"database": CLICKHOUSE_DATABASE, "table": table},
)

for name, comment in rows:
yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment))
for name, comment, is_nullable in rows:
yield MaterializedColumn(name, MaterializedColumnDetails.from_column_comment(comment), is_nullable)

@staticmethod
def get(table: TablesWithMaterializedColumns, column_name: ColumnName) -> MaterializedColumn:
Expand Down Expand Up @@ -111,22 +134,24 @@ def from_column_comment(cls, comment: str) -> MaterializedColumnDetails:

def get_materialized_columns(
table: TablesWithMaterializedColumns,
exclude_disabled_columns: bool = False,
) -> dict[tuple[PropertyName, TableColumn], ColumnName]:
if not get_instance_setting("MATERIALIZED_COLUMNS_ENABLED"):
return {}

) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]:
return {
(column.details.property_name, column.details.table_column): column.name
(column.details.property_name, column.details.table_column): column
for column in MaterializedColumn.get_all(table)
if not (exclude_disabled_columns and column.details.is_disabled)
}


@cache_for(timedelta(minutes=15))
def get_enabled_materialized_columns(
table: TablesWithMaterializedColumns,
) -> dict[tuple[PropertyName, TableColumn], MaterializedColumn]:
return {k: column for k, column in get_materialized_columns(table).items() if not column.details.is_disabled}


def get_cluster() -> ClickhouseCluster:
extra_hosts = []
for host_config in map(copy, CLICKHOUSE_PER_TEAM_SETTINGS.values()):
extra_hosts.append(ConnectionInfo(host_config.pop("host"), host_config.pop("port", None)))
extra_hosts.append(ConnectionInfo(host_config.pop("host")))
assert len(host_config) == 0, f"unexpected values: {host_config!r}"
return ClickhouseCluster(default_client(), extra_hosts=extra_hosts)

Expand Down Expand Up @@ -161,6 +186,10 @@ def map_data_nodes(self, cluster: ClickhouseCluster, fn: Callable[[Client], T])
}


def get_minmax_index_name(column: str) -> str:
return f"minmax_{column}"


@dataclass
class CreateColumnOnDataNodesTask:
table: str
Expand All @@ -169,20 +198,17 @@ class CreateColumnOnDataNodesTask:
add_column_comment: bool

def execute(self, client: Client) -> None:
expression, parameters = self.column.get_expression_and_parameters()
actions = [
f"""
ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR
MATERIALIZED {TRIM_AND_EXTRACT_PROPERTY.format(table_column=self.column.details.table_column)}
""",
f"ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type} MATERIALIZED {expression}",
]
parameters = {"property": self.column.details.property_name}

if self.add_column_comment:
actions.append(f"COMMENT COLUMN {self.column.name} %(comment)s")
parameters["comment"] = self.column.details.as_column_comment()

if self.create_minmax_index:
index_name = f"minmax_{self.column.name}"
index_name = get_minmax_index_name(self.column.name)
actions.append(f"ADD INDEX IF NOT EXISTS {index_name} {self.column.name} TYPE minmax GRANULARITY 1")

client.execute(
Expand All @@ -201,7 +227,7 @@ def execute(self, client: Client) -> None:
client.execute(
f"""
ALTER TABLE {self.table}
ADD COLUMN IF NOT EXISTS {self.column.name} VARCHAR,
ADD COLUMN IF NOT EXISTS {self.column.name} {self.column.type},
COMMENT COLUMN {self.column.name} %(comment)s
""",
{"comment": self.column.details.as_column_comment()},
Expand All @@ -215,6 +241,7 @@ def materialize(
column_name: ColumnName | None = None,
table_column: TableColumn = DEFAULT_TABLE_COLUMN,
create_minmax_index=not TEST,
is_nullable: bool = False,
) -> ColumnName | None:
if (property, table_column) in get_materialized_columns(table):
if TEST:
Expand All @@ -235,6 +262,7 @@ def materialize(
property_name=property,
is_disabled=False,
),
is_nullable=is_nullable,
)

table_info.map_data_nodes(
Expand Down Expand Up @@ -275,40 +303,71 @@ def update_column_is_disabled(table: TablesWithMaterializedColumns, column_name:
cluster = get_cluster()
table_info = tables[table]

column = MaterializedColumn.get(table, column_name)

cluster.map_all_hosts(
UpdateColumnCommentTask(
table_info.read_table,
MaterializedColumn(
name=column_name,
details=replace(
MaterializedColumn.get(table, column_name).details,
is_disabled=is_disabled,
),
),
replace(column, details=replace(column.details, is_disabled=is_disabled)),
).execute
).result()


def check_index_exists(client: Client, table: str, index: str) -> bool:
[(count,)] = client.execute(
"""
SELECT count()
FROM system.data_skipping_indices
WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s
""",
{"table": table, "name": index},
)
assert 1 >= count >= 0
return bool(count)


def check_column_exists(client: Client, table: str, column: str) -> bool:
[(count,)] = client.execute(
"""
SELECT count()
FROM system.columns
WHERE database = currentDatabase() AND table = %(table)s AND name = %(name)s
""",
{"table": table, "name": column},
)
assert 1 >= count >= 0
return bool(count)


@dataclass
class DropColumnTask:
table: str
column_name: str
try_drop_index: bool

def execute(self, client: Client) -> None:
# XXX: copy/pasted from create task
actions = []

if self.try_drop_index:
index_name = f"minmax_{self.column_name}"
index_name = get_minmax_index_name(self.column_name)
drop_index_action = f"DROP INDEX IF EXISTS {index_name}"
if check_index_exists(client, self.table, index_name):
actions.append(drop_index_action)
else:
logger.info("Skipping %r, nothing to do...", drop_index_action)

drop_column_action = f"DROP COLUMN IF EXISTS {self.column_name}"
if check_column_exists(client, self.table, self.column_name):
actions.append(drop_column_action)
else:
logger.info("Skipping %r, nothing to do...", drop_column_action)

if actions:
client.execute(
f"ALTER TABLE {self.table} DROP INDEX IF EXISTS {index_name}",
f"ALTER TABLE {self.table} " + ", ".join(actions),
settings={"alter_sync": 2 if TEST else 1},
)

client.execute(
f"ALTER TABLE {self.table} DROP COLUMN IF EXISTS {self.column_name}",
settings={"alter_sync": 2 if TEST else 1},
)


def drop_column(table: TablesWithMaterializedColumns, column_name: str) -> None:
cluster = get_cluster()
Expand Down Expand Up @@ -345,12 +404,13 @@ def execute(self, client: Client) -> None:
# Note that for this to work all inserts should list columns explicitly
# Improve this if https://github.com/ClickHouse/ClickHouse/issues/27730 ever gets resolved
for column in self.columns:
expression, parameters = column.get_expression_and_parameters()
client.execute(
f"""
ALTER TABLE {self.table}
MODIFY COLUMN {column.name} VARCHAR DEFAULT {TRIM_AND_EXTRACT_PROPERTY.format(table_column=column.details.table_column)}
MODIFY COLUMN {column.name} {column.type} DEFAULT {expression}
""",
{"property": column.details.property_name},
parameters,
settings=self.test_settings,
)

Expand Down Expand Up @@ -420,10 +480,10 @@ def _materialized_column_name(
prefix += f"{SHORT_TABLE_COLUMN_NAME[table_column]}_"
property_str = re.sub("[^0-9a-zA-Z$]", "_", property)

existing_materialized_columns = set(get_materialized_columns(table).values())
existing_materialized_column_names = {column.name for column in get_materialized_columns(table).values()}
suffix = ""

while f"{prefix}{property_str}{suffix}" in existing_materialized_columns:
while f"{prefix}{property_str}{suffix}" in existing_materialized_column_names:
suffix = "_" + generate_random_short_suffix()

return f"{prefix}{property_str}{suffix}"
8 changes: 4 additions & 4 deletions ee/clickhouse/materialized_columns/test/test_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ def test_mat_columns(self, patch_backfill, patch_materialize):
materialize_properties_task()
patch_materialize.assert_has_calls(
[
call("events", "materialize_me", table_column="properties"),
call("events", "materialize_me2", table_column="properties"),
call("events", "materialize_person_prop", table_column="person_properties"),
call("events", "materialize_me3", table_column="properties"),
call("events", "materialize_me", table_column="properties", is_nullable=False),
call("events", "materialize_me2", table_column="properties", is_nullable=False),
call("events", "materialize_person_prop", table_column="person_properties", is_nullable=False),
call("events", "materialize_me3", table_column="properties", is_nullable=False),
]
)
Loading

0 comments on commit b25fb29

Please sign in to comment.