-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support property group migrations on distributed and sharded tables #24274
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,10 @@ | ||
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions | ||
from posthog.clickhouse.property_groups import sharded_events_property_groups | ||
from posthog.clickhouse.property_groups import property_groups | ||
|
||
operations = [ | ||
run_sql_with_exceptions(statement) | ||
for statement in [ | ||
*sharded_events_property_groups.get_alter_create_statements("custom"), | ||
*sharded_events_property_groups.get_alter_create_statements("feature_flags"), | ||
*property_groups.get_alter_create_statements("sharded_events", "properties", "custom"), | ||
*property_groups.get_alter_create_statements("sharded_events", "properties", "feature_flags"), | ||
] | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions | ||
from posthog.clickhouse.property_groups import property_groups | ||
|
||
operations = [ | ||
run_sql_with_exceptions(statement) | ||
for statement in [ | ||
*property_groups.get_alter_create_statements("events", "properties", "custom"), | ||
*property_groups.get_alter_create_statements("events", "properties", "feature_flags"), | ||
] | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,40 +1,76 @@ | ||
from collections.abc import Iterable, MutableMapping | ||
from collections.abc import Iterable | ||
from dataclasses import dataclass | ||
import dataclasses | ||
from collections.abc import Mapping | ||
|
||
from posthog import settings | ||
|
||
|
||
@dataclass | ||
class PropertyGroupDefinition: | ||
filter_expression: str | ||
key_filter_expression: str | ||
codec: str = "ZSTD(1)" | ||
is_materialized: bool = True | ||
|
||
|
||
TableName = str | ||
ColumnName = str | ||
PropertyGroupName = str | ||
|
||
|
||
class PropertyGroupManager: | ||
def __init__(self, cluster: str, table: str, source_column: str) -> None: | ||
def __init__( | ||
self, | ||
cluster: str, | ||
groups: Mapping[TableName, Mapping[ColumnName, Mapping[PropertyGroupName, PropertyGroupDefinition]]], | ||
) -> None: | ||
self.__cluster = cluster | ||
self.__table = table | ||
self.__source_column = source_column | ||
self.__groups: MutableMapping[str, PropertyGroupDefinition] = {} | ||
self.__groups = groups | ||
|
||
def __get_map_column_name(self, column: ColumnName, group_name: PropertyGroupName) -> str: | ||
return f"{column}_group_{group_name}" | ||
|
||
def register(self, name: str, definition: PropertyGroupDefinition) -> None: | ||
assert name not in self.__groups, "property group names can only be used once" | ||
self.__groups[name] = definition | ||
def __get_column_definition(self, table: TableName, column: ColumnName, group_name: PropertyGroupName) -> str: | ||
group_definition = self.__groups[table][column][group_name] | ||
map_column_name = self.__get_map_column_name(column, group_name) | ||
column_definition = f"{map_column_name} Map(String, String)" | ||
if not group_definition.is_materialized: | ||
return column_definition | ||
else: | ||
return f"""\ | ||
{column_definition} | ||
MATERIALIZED mapSort( | ||
mapFilter((key, _) -> {group_definition.key_filter_expression}, | ||
CAST(JSONExtractKeysAndValues({column}, 'String'), 'Map(String, String)')) | ||
) | ||
CODEC({group_definition.codec}) | ||
""" | ||
|
||
def __get_map_expression(self, definition: PropertyGroupDefinition) -> str: | ||
return f"mapSort(mapFilter((key, _) -> {definition.filter_expression}, CAST(JSONExtractKeysAndValues({self.__source_column}, 'String'), 'Map(String, String)')))" | ||
def __get_index_definitions( | ||
self, table: TableName, column: ColumnName, group_name: PropertyGroupName | ||
) -> Iterable[str]: | ||
group_definition = self.__groups[table][column][group_name] | ||
if not group_definition.is_materialized: | ||
return | ||
|
||
def get_alter_create_statements(self, name: str) -> Iterable[str]: | ||
column_name = f"{self.__source_column}_group_{name}" | ||
definition = self.__groups[name] | ||
return [ | ||
f"ALTER TABLE {self.__table} ON CLUSTER {self.__cluster} ADD COLUMN {column_name} Map(String, String) MATERIALIZED {self.__get_map_expression(definition)} CODEC({definition.codec})", | ||
f"ALTER TABLE {self.__table} ON CLUSTER {self.__cluster} ADD INDEX {column_name}_keys_bf mapKeys({column_name}) TYPE bloom_filter", | ||
f"ALTER TABLE {self.__table} ON CLUSTER {self.__cluster} ADD INDEX {column_name}_values_bf mapValues({column_name}) TYPE bloom_filter", | ||
] | ||
map_column_name = self.__get_map_column_name(column, group_name) | ||
yield f"{map_column_name}_keys_bf mapKeys({map_column_name}) TYPE bloom_filter" | ||
yield f"{map_column_name}_values_bf mapValues({map_column_name}) TYPE bloom_filter" | ||
|
||
def get_create_table_pieces(self, table: TableName) -> Iterable[str]: | ||
for column, groups in self.__groups[table].items(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what the correct terminology is here. They're not expressions, but also not whole statements, so just went with something generic. |
||
for group_name in groups: | ||
yield self.__get_column_definition(table, column, group_name) | ||
for index_definition in self.__get_index_definitions(table, column, group_name): | ||
yield f"INDEX {index_definition}" | ||
|
||
Comment on lines
+61
to
+64
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dig this. Really nice use of python iterator building up table bits |
||
def get_alter_create_statements( | ||
self, table: TableName, column: ColumnName, group_name: PropertyGroupName | ||
) -> Iterable[str]: | ||
yield f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD COLUMN IF NOT EXISTS {self.__get_column_definition(table, column, group_name)}" | ||
for index_definition in self.__get_index_definitions(table, column, group_name): | ||
yield f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD INDEX IF NOT EXISTS {index_definition}" | ||
|
||
sharded_events_property_groups = PropertyGroupManager(settings.CLICKHOUSE_CLUSTER, "sharded_events", "properties") | ||
|
||
ignore_custom_properties = [ | ||
# `token` & `distinct_id` properties are sent with ~50% of events and by | ||
|
@@ -65,11 +101,25 @@ def get_alter_create_statements(self, name: str) -> Iterable[str]: | |
"rdt_cid", # reddit | ||
] | ||
|
||
sharded_events_property_groups.register( | ||
"custom", | ||
PropertyGroupDefinition( | ||
f"key NOT LIKE '$%' AND key NOT IN (" + f", ".join(f"'{name}'" for name in ignore_custom_properties) + f")" | ||
), | ||
) | ||
event_property_group_definitions = { | ||
"properties": { | ||
"custom": PropertyGroupDefinition( | ||
f"key NOT LIKE '$%' AND key NOT IN (" + f", ".join(f"'{name}'" for name in ignore_custom_properties) + f")" | ||
), | ||
"feature_flags": PropertyGroupDefinition("key like '$feature/%'"), | ||
} | ||
} | ||
|
||
sharded_events_property_groups.register("feature_flags", PropertyGroupDefinition("key like '$feature/%'")) | ||
property_groups = PropertyGroupManager( | ||
settings.CLICKHOUSE_CLUSTER, | ||
{ | ||
"sharded_events": event_property_group_definitions, | ||
"events": { | ||
column_name: { | ||
group_name: dataclasses.replace(group_definition, is_materialized=False) | ||
for group_name, group_definition in column_group_definitions.items() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is kind of clunky — maybe in retrospect it would have been better to have this as some sort of flag up at the table level rather than each individual property group definition… There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah it's definitely not easy reading |
||
} | ||
for column_name, column_group_definitions in event_property_group_definitions.items() | ||
}, | ||
}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(These aliases are just for documentation purposes.)