Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support property group migrations on distributed and sharded tables #24274

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.clickhouse.property_groups import sharded_events_property_groups
from posthog.clickhouse.property_groups import property_groups

operations = [
run_sql_with_exceptions(statement)
for statement in [
*sharded_events_property_groups.get_alter_create_statements("custom"),
*sharded_events_property_groups.get_alter_create_statements("feature_flags"),
*property_groups.get_alter_create_statements("sharded_events", "properties", "custom"),
*property_groups.get_alter_create_statements("sharded_events", "properties", "feature_flags"),
]
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions
from posthog.clickhouse.property_groups import property_groups

operations = [
run_sql_with_exceptions(statement)
for statement in [
*property_groups.get_alter_create_statements("events", "properties", "custom"),
*property_groups.get_alter_create_statements("events", "properties", "feature_flags"),
]
]
103 changes: 76 additions & 27 deletions posthog/clickhouse/property_groups.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,75 @@
from collections.abc import Iterable, MutableMapping
import dataclasses
from collections.abc import Iterable, Mapping
from dataclasses import dataclass

from posthog import settings


@dataclass
class PropertyGroupDefinition:
filter_expression: str
key_filter_expression: str
codec: str = "ZSTD(1)"
is_materialized: bool = True


TableName = str
ColumnName = str
PropertyGroupName = str

Comment on lines +15 to +17
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(These aliases are just for documentation purposes.)


class PropertyGroupManager:
def __init__(self, cluster: str, table: str, source_column: str) -> None:
def __init__(
self,
cluster: str,
groups: Mapping[TableName, Mapping[ColumnName, Mapping[PropertyGroupName, PropertyGroupDefinition]]],
) -> None:
self.__cluster = cluster
self.__table = table
self.__source_column = source_column
self.__groups: MutableMapping[str, PropertyGroupDefinition] = {}
self.__groups = groups

def __get_map_column_name(self, column: ColumnName, group_name: PropertyGroupName) -> str:
return f"{column}_group_{group_name}"

def register(self, name: str, definition: PropertyGroupDefinition) -> None:
assert name not in self.__groups, "property group names can only be used once"
self.__groups[name] = definition
def __get_column_definition(self, table: TableName, column: ColumnName, group_name: PropertyGroupName) -> str:
group_definition = self.__groups[table][column][group_name]
map_column_name = self.__get_map_column_name(column, group_name)
column_definition = f"{map_column_name} Map(String, String)"
if not group_definition.is_materialized:
return column_definition
else:
return f"""\
{column_definition}
MATERIALIZED mapSort(
mapFilter((key, _) -> {group_definition.key_filter_expression},
CAST(JSONExtractKeysAndValues({column}, 'String'), 'Map(String, String)'))
)
CODEC({group_definition.codec})
"""

def __get_map_expression(self, definition: PropertyGroupDefinition) -> str:
return f"mapSort(mapFilter((key, _) -> {definition.filter_expression}, CAST(JSONExtractKeysAndValues({self.__source_column}, 'String'), 'Map(String, String)')))"
def __get_index_definitions(
self, table: TableName, column: ColumnName, group_name: PropertyGroupName
) -> Iterable[str]:
group_definition = self.__groups[table][column][group_name]
if not group_definition.is_materialized:
return

def get_alter_create_statements(self, name: str) -> Iterable[str]:
column_name = f"{self.__source_column}_group_{name}"
definition = self.__groups[name]
return [
f"ALTER TABLE {self.__table} ON CLUSTER {self.__cluster} ADD COLUMN {column_name} Map(String, String) MATERIALIZED {self.__get_map_expression(definition)} CODEC({definition.codec})",
f"ALTER TABLE {self.__table} ON CLUSTER {self.__cluster} ADD INDEX {column_name}_keys_bf mapKeys({column_name}) TYPE bloom_filter",
f"ALTER TABLE {self.__table} ON CLUSTER {self.__cluster} ADD INDEX {column_name}_values_bf mapValues({column_name}) TYPE bloom_filter",
]
map_column_name = self.__get_map_column_name(column, group_name)
yield f"{map_column_name}_keys_bf mapKeys({map_column_name}) TYPE bloom_filter"
yield f"{map_column_name}_values_bf mapValues({map_column_name}) TYPE bloom_filter"

def get_create_table_pieces(self, table: TableName) -> Iterable[str]:
for column, groups in self.__groups[table].items():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the correct terminology is here. They're not expressions, but also not whole statements, so just went with something generic.

for group_name in groups:
yield self.__get_column_definition(table, column, group_name)
for index_definition in self.__get_index_definitions(table, column, group_name):
yield f"INDEX {index_definition}"

Comment on lines +61 to +64
Copy link
Member

@fuziontech fuziontech Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dig this. Really nice use of python iterator building up table bits

def get_alter_create_statements(
self, table: TableName, column: ColumnName, group_name: PropertyGroupName
) -> Iterable[str]:
yield f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD COLUMN IF NOT EXISTS {self.__get_column_definition(table, column, group_name)}"
for index_definition in self.__get_index_definitions(table, column, group_name):
yield f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD INDEX IF NOT EXISTS {index_definition}"

sharded_events_property_groups = PropertyGroupManager(settings.CLICKHOUSE_CLUSTER, "sharded_events", "properties")

ignore_custom_properties = [
# `token` & `distinct_id` properties are sent with ~50% of events and by
Expand Down Expand Up @@ -65,11 +100,25 @@ def get_alter_create_statements(self, name: str) -> Iterable[str]:
"rdt_cid", # reddit
]

sharded_events_property_groups.register(
"custom",
PropertyGroupDefinition(
f"key NOT LIKE '$%' AND key NOT IN (" + f", ".join(f"'{name}'" for name in ignore_custom_properties) + f")"
),
)
event_property_group_definitions = {
"properties": {
"custom": PropertyGroupDefinition(
f"key NOT LIKE '$%' AND key NOT IN (" + f", ".join(f"'{name}'" for name in ignore_custom_properties) + f")"
),
"feature_flags": PropertyGroupDefinition("key like '$feature/%'"),
}
}

sharded_events_property_groups.register("feature_flags", PropertyGroupDefinition("key like '$feature/%'"))
property_groups = PropertyGroupManager(
settings.CLICKHOUSE_CLUSTER,
{
"sharded_events": event_property_group_definitions,
"events": {
column_name: {
group_name: dataclasses.replace(group_definition, is_materialized=False)
for group_name, group_definition in column_group_definitions.items()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of clunky — maybe in retrospect it would have been better to have this as some sort of flag up at the table level rather than each individual property group definition…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's definitely not easy reading

}
for column_name, column_group_definitions in event_property_group_definitions.items()
},
},
)
27 changes: 27 additions & 0 deletions posthog/clickhouse/test/__snapshots__/test_schema.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@
, elements_chain_texts Array(String) COMMENT 'column_materializer::elements_chain::texts'
, elements_chain_ids Array(String) COMMENT 'column_materializer::elements_chain::ids'
, elements_chain_elements Array(Enum('a', 'button', 'form', 'input', 'select', 'textarea', 'label')) COMMENT 'column_materializer::elements_chain::elements'
, properties_group_custom Map(String, String), properties_group_feature_flags Map(String, String)


, _timestamp DateTime
Expand Down Expand Up @@ -2188,6 +2189,19 @@
, INDEX `minmax_$group_4` `$group_4` TYPE minmax GRANULARITY 1
, INDEX `minmax_$window_id` `$window_id` TYPE minmax GRANULARITY 1
, INDEX `minmax_$session_id` `$session_id` TYPE minmax GRANULARITY 1
, properties_group_custom Map(String, String)
MATERIALIZED mapSort(
mapFilter((key, _) -> key NOT LIKE '$%' AND key NOT IN ('token', 'distinct_id', 'utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'utm_term', 'gclid', 'gad_source', 'gclsrc', 'dclid', 'gbraid', 'wbraid', 'fbclid', 'msclkid', 'twclid', 'li_fat_id', 'mc_cid', 'igshid', 'ttclid', 'rdt_cid'),
CAST(JSONExtractKeysAndValues(properties, 'String'), 'Map(String, String)'))
)
CODEC(ZSTD(1))
, INDEX properties_group_custom_keys_bf mapKeys(properties_group_custom) TYPE bloom_filter, INDEX properties_group_custom_values_bf mapValues(properties_group_custom) TYPE bloom_filter, properties_group_feature_flags Map(String, String)
MATERIALIZED mapSort(
mapFilter((key, _) -> key like '$feature/%',
CAST(JSONExtractKeysAndValues(properties, 'String'), 'Map(String, String)'))
)
CODEC(ZSTD(1))
, INDEX properties_group_feature_flags_keys_bf mapKeys(properties_group_feature_flags) TYPE bloom_filter, INDEX properties_group_feature_flags_values_bf mapValues(properties_group_feature_flags) TYPE bloom_filter


, _timestamp DateTime
Expand Down Expand Up @@ -3282,6 +3296,19 @@
, INDEX `minmax_$group_4` `$group_4` TYPE minmax GRANULARITY 1
, INDEX `minmax_$window_id` `$window_id` TYPE minmax GRANULARITY 1
, INDEX `minmax_$session_id` `$session_id` TYPE minmax GRANULARITY 1
, properties_group_custom Map(String, String)
MATERIALIZED mapSort(
mapFilter((key, _) -> key NOT LIKE '$%' AND key NOT IN ('token', 'distinct_id', 'utm_source', 'utm_medium', 'utm_campaign', 'utm_content', 'utm_term', 'gclid', 'gad_source', 'gclsrc', 'dclid', 'gbraid', 'wbraid', 'fbclid', 'msclkid', 'twclid', 'li_fat_id', 'mc_cid', 'igshid', 'ttclid', 'rdt_cid'),
CAST(JSONExtractKeysAndValues(properties, 'String'), 'Map(String, String)'))
)
CODEC(ZSTD(1))
, INDEX properties_group_custom_keys_bf mapKeys(properties_group_custom) TYPE bloom_filter, INDEX properties_group_custom_values_bf mapValues(properties_group_custom) TYPE bloom_filter, properties_group_feature_flags Map(String, String)
MATERIALIZED mapSort(
mapFilter((key, _) -> key like '$feature/%',
CAST(JSONExtractKeysAndValues(properties, 'String'), 'Map(String, String)'))
)
CODEC(ZSTD(1))
, INDEX properties_group_feature_flags_keys_bf mapKeys(properties_group_feature_flags) TYPE bloom_filter, INDEX properties_group_feature_flags_values_bf mapValues(properties_group_feature_flags) TYPE bloom_filter


, _timestamp DateTime
Expand Down
5 changes: 4 additions & 1 deletion posthog/models/event/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
kafka_engine,
trim_quotes_expr,
)
from posthog.clickhouse.property_groups import property_groups
from posthog.clickhouse.table_engines import (
Distributed,
ReplacingMergeTree,
Expand Down Expand Up @@ -76,9 +77,10 @@
, INDEX `minmax_$group_4` `$group_4` TYPE minmax GRANULARITY 1
, INDEX `minmax_$window_id` `$window_id` TYPE minmax GRANULARITY 1
, INDEX `minmax_$session_id` `$session_id` TYPE minmax GRANULARITY 1
, {", ".join(property_groups.get_create_table_pieces("sharded_events"))}
"""

EVENTS_TABLE_PROXY_MATERIALIZED_COLUMNS = """
EVENTS_TABLE_PROXY_MATERIALIZED_COLUMNS = f"""
, $group_0 VARCHAR COMMENT 'column_materializer::$group_0'
, $group_1 VARCHAR COMMENT 'column_materializer::$group_1'
, $group_2 VARCHAR COMMENT 'column_materializer::$group_2'
Expand All @@ -90,6 +92,7 @@
, elements_chain_texts Array(String) COMMENT 'column_materializer::elements_chain::texts'
, elements_chain_ids Array(String) COMMENT 'column_materializer::elements_chain::ids'
, elements_chain_elements Array(Enum('a', 'button', 'form', 'input', 'select', 'textarea', 'label')) COMMENT 'column_materializer::elements_chain::elements'
, {", ".join(property_groups.get_create_table_pieces("events"))}
"""

EVENTS_DATA_TABLE_ENGINE = lambda: ReplacingMergeTree(
Expand Down
Loading