Skip to content

Commit

Permalink
tidy up
Browse files Browse the repository at this point in the history
  • Loading branch information
tkaemming committed Aug 8, 2024
1 parent 0cee719 commit bdfa5b0
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions posthog/clickhouse/property_groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ def __init__(
def __get_map_column_name(self, column: ColumnName, group_name: PropertyGroupName) -> str:
return f"{column}_group_{group_name}"

def __get_map_expression(self, column: ColumnName, definition: PropertyGroupDefinition) -> str:
return f"mapSort(mapFilter((key, _) -> {definition.key_filter_expression}, CAST(JSONExtractKeysAndValues({column}, 'String'), 'Map(String, String)')))"

def get_property_group_columns(self, table: TableName, column: ColumnName, property_key: str) -> Iterable[str]:
if (table_groups := self.__groups.get(table)) and (column_groups := table_groups.get(column)):
for group_name, group_definition in column_groups.items():
Expand All @@ -50,37 +47,39 @@ def __get_column_definition(self, table: TableName, column: ColumnName, group_na
if not group_definition.is_materialized:
return column_definition
else:
return f"{column_definition} MATERIALIZED {self.__get_map_expression(column, group_definition)} CODEC({group_definition.codec})"
return f"""\
{column_definition}
MATERIALIZED mapSort(
mapFilter((key, _) -> {group_definition.key_filter_expression},
CAST(JSONExtractKeysAndValues({column}, 'String'), 'Map(String, String)'))
)
CODEC({group_definition.codec})
"""

def __get_index_definitions(
self, table: TableName, column: ColumnName, group_name: PropertyGroupName
) -> Iterable[str]:
group_definition = self.__groups[table][column][group_name]
if not group_definition.is_materialized:
return

def __get_index_definitions(self, table: TableName, column: ColumnName, group_name: PropertyGroupName) -> str:
_ = self.__groups[table][column][group_name] # just to make sure the group exists
map_column_name = self.__get_map_column_name(column, group_name)
return [
f"{map_column_name}_keys_bf mapKeys({map_column_name}) TYPE bloom_filter",
f"{map_column_name}_values_bf mapValues({map_column_name}) TYPE bloom_filter",
]
yield f"{map_column_name}_keys_bf mapKeys({map_column_name}) TYPE bloom_filter"
yield f"{map_column_name}_values_bf mapValues({map_column_name}) TYPE bloom_filter"

def get_create_table_pieces(self, table: TableName) -> Iterable[str]:
for column, groups in self.__groups[table].items():
for group_name, group_definition in groups.items():
for group_name in groups:
yield self.__get_column_definition(table, column, group_name)
if group_definition.is_materialized:
for index_definition in self.__get_index_definitions(table, column, group_name):
yield f"INDEX {index_definition}"
for index_definition in self.__get_index_definitions(table, column, group_name):
yield f"INDEX {index_definition}"

def get_alter_create_statements(
self, table: TableName, column: ColumnName, group_name: PropertyGroupName
) -> Iterable[str]:
group_definition = self.__groups[table][column][group_name]
statements = [
f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD COLUMN IF NOT EXISTS {self.__get_column_definition(table, column, group_name)}"
]
if group_definition.is_materialized:
statements.extend(
f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD INDEX IF NOT EXISTS {index_definition}"
for index_definition in self.__get_index_definitions(table, column, group_name)
)
return statements
yield f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD COLUMN IF NOT EXISTS {self.__get_column_definition(table, column, group_name)}"
for index_definition in self.__get_index_definitions(table, column, group_name):
yield f"ALTER TABLE {table} ON CLUSTER {self.__cluster} ADD INDEX IF NOT EXISTS {index_definition}"


ignore_custom_properties = [
Expand Down

0 comments on commit bdfa5b0

Please sign in to comment.