Skip to content

Commit

Permalink
fix: warming and filter (#25674)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
aspicer and github-actions[bot] authored Oct 18, 2024
1 parent a867559 commit f56e4c1
Show file tree
Hide file tree
Showing 15 changed files with 728 additions and 163 deletions.
11 changes: 9 additions & 2 deletions posthog/caching/warming.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from posthog.caching.utils import largest_teams
from posthog.clickhouse.query_tagging import tag_queries
from posthog.errors import CHQueryErrorTooManySimultaneousQueries
from posthog.hogql.constants import LimitContext
from posthog.hogql_queries.query_cache import QueryCacheManager
from posthog.hogql_queries.legacy_compatibility.flagged_conversion_manager import conversion_to_query_based
from posthog.hogql_queries.query_runner import ExecutionMode
Expand Down Expand Up @@ -126,13 +127,18 @@ def schedule_warming_for_teams_task():
max_retries=3,
)
def warm_insight_cache_task(insight_id: int, dashboard_id: Optional[int]):
insight = Insight.objects.get(pk=insight_id)
try:
insight = Insight.objects.get(pk=insight_id)
except Insight.DoesNotExist:
logger.info(f"Warming insight cache failed 404 insight not found: {insight_id}")
return

dashboard = None

tag_queries(team_id=insight.team_id, insight_id=insight.pk, trigger="warmingV2")
if dashboard_id:
tag_queries(dashboard_id=dashboard_id)
dashboard = insight.dashboards.get(pk=dashboard_id)
dashboard = insight.dashboards.filter(pk=dashboard_id).first()

with conversion_to_query_based(insight):
logger.info(f"Warming insight cache: {insight.pk} for team {insight.team_id} and dashboard {dashboard_id}")
Expand All @@ -145,6 +151,7 @@ def warm_insight_cache_task(insight_id: int, dashboard_id: Optional[int]):
# We need an execution mode with recent cache:
# - in case someone refreshed after this task was triggered
# - if insight + dashboard combinations have the same cache key, we prevent needless recalculations
limit_context=LimitContext.QUERY_ASYNC,
execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_BLOCKING_IF_STALE,
insight_id=insight_id,
dashboard_id=dashboard_id,
Expand Down
1 change: 1 addition & 0 deletions posthog/hogql/functions/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def compare_types(arg_types: list[ConstantType], sig_arg_types: tuple[ConstantTy
"arraySplit": HogQLFunctionMeta("arraySplit", 2, None),
"arrayReverseFill": HogQLFunctionMeta("arrayReverseFill", 2, None),
"arrayReverseSplit": HogQLFunctionMeta("arrayReverseSplit", 2, None),
"arrayRotateLeft": HogQLFunctionMeta("arrayRotateLeft", 2, 2),
"arrayRotateRight": HogQLFunctionMeta("arrayRotateRight", 2, 2),
"arrayExists": HogQLFunctionMeta("arrayExists", 1, None),
"arrayAll": HogQLFunctionMeta("arrayAll", 1, None),
Expand Down
19 changes: 19 additions & 0 deletions posthog/hogql_queries/insights/funnels/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ def get_step_counts_query(self) -> ast.SelectQuery:
def get_step_counts_without_aggregation_query(self) -> ast.SelectQuery:
raise NotImplementedError()

# This is a simple heuristic to reduce the number of events we look at in UDF funnels (thus are serialized and sent over)
# We remove an event if it matches one or zero steps and there was already the same type of event before and after it (that don't have the same timestamp)
# arrayRotateRight turns [1,2,3] into [3,1,2]
# arrayRotateLeft turns [1,2,3] into [2,3,1]
# For some reason, using these uses much less memory than using indexing in clickhouse to check the previous and next element
def _udf_event_array_filter(self, timestamp_index: int, prop_val_index: int, steps_index: int):
return f"""arrayFilter(
(x, x_before, x_after) -> not (
length(x.{steps_index}) <= 1
and x.{steps_index} == x_before.{steps_index}
and x.{steps_index} == x_after.{steps_index}
and x.{prop_val_index} == x_before.{prop_val_index}
and x.{prop_val_index} == x_after.{prop_val_index}
and x.{timestamp_index} > x_before.{timestamp_index}
and x.{timestamp_index} < x_after.{timestamp_index}),
events_array,
arrayRotateRight(events_array, 1),
arrayRotateLeft(events_array, 1))"""

@cached_property
def breakdown_cohorts(self) -> list[Cohort]:
team, breakdown = self.context.team, self.context.breakdown
Expand Down
9 changes: 6 additions & 3 deletions posthog/hogql_queries/insights/funnels/funnel_trends_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from posthog.hogql.constants import HogQLQuerySettings
from posthog.hogql.parser import parse_select, parse_expr
from posthog.hogql_queries.insights.funnels import FunnelTrends
from posthog.hogql_queries.insights.funnels.funnel_udf import udf_event_array_filter
from posthog.hogql_queries.insights.utils.utils import get_start_of_interval_hogql_str
from posthog.schema import BreakdownType, BreakdownAttributionType
from posthog.utils import DATERANGE_MAP, relative_date_parse
Expand Down Expand Up @@ -47,6 +46,9 @@ def matched_event_select(self):
"""
return ""

def udf_event_array_filter(self):
return self._udf_event_array_filter(1, 4, 5)

# This is the function that calls the UDF
# This is used by both the query itself and the actors query
def _inner_aggregation_query(self):
Expand Down Expand Up @@ -103,15 +105,16 @@ def _inner_aggregation_query(self):
_toUInt64(toDateTime({get_start_of_interval_hogql_str(self.context.interval.value, team=self.context.team, source='timestamp')})),
uuid,
{prop_selector},
arrayFilter((x) -> x != 0, [{steps}{exclusions}])))) as events_array,
arrayFilter((x) -> x != 0, [{steps}{exclusions}])
))) as events_array,
arrayJoin({fn}(
{from_step},
{max_steps},
{self.conversion_window_limit()},
'{breakdown_attribution_string}',
'{self.context.funnelsFilter.funnelOrderType}',
{prop_vals},
{udf_event_array_filter(self.context.funnelsFilter.funnelOrderType)}
{self.udf_event_array_filter()}
)) as af_tuple,
toTimeZone(toDateTime(_toUInt64(af_tuple.1)), '{self.context.team.timezone}') as entrance_period_start,
af_tuple.2 as success_bool,
Expand Down
29 changes: 11 additions & 18 deletions posthog/hogql_queries/insights/funnels/funnel_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,13 @@
from posthog.hogql import ast
from posthog.hogql.parser import parse_select, parse_expr
from posthog.hogql_queries.insights.funnels.base import FunnelBase
from posthog.schema import BreakdownType, BreakdownAttributionType, StepOrderValue
from posthog.schema import BreakdownType, BreakdownAttributionType
from posthog.utils import DATERANGE_MAP

TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"
HUMAN_READABLE_TIMESTAMP_FORMAT = "%-d-%b-%Y"


# This is used to reduce the number of events we look at in strict funnels
# We remove a non-matching event if there was already one before it (that don't have the same timestamp)
# arrayRotateRight turns [1,2,3] into [3,1,2]
# For some reason, this uses much less memory than using indexing in clickhouse to check the previous element
def udf_event_array_filter(funnelOrderType: StepOrderValue | None):
if funnelOrderType == "strict":
return f"""
arrayFilter(
(x, x2) -> not (empty(x.4) and empty(x2.4) and x.3 == x2.3 and x.1 > x2.1),
events_array,
arrayRotateRight(events_array, 1))
"""
return "events_array"


class FunnelUDF(FunnelBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand All @@ -51,6 +36,9 @@ def matched_event_arrays_selects(self):
"""
return ""

def udf_event_array_filter(self):
return self._udf_event_array_filter(1, 3, 4)

# This is the function that calls the UDF
# This is used by both the query itself and the actors query
def _inner_aggregation_query(self):
Expand Down Expand Up @@ -92,14 +80,19 @@ def _inner_aggregation_query(self):
inner_select = parse_select(
f"""
SELECT
arraySort(t -> t.1, groupArray(tuple(toFloat(timestamp), uuid, {prop_selector}, arrayFilter((x) -> x != 0, [{steps}{exclusions}])))) as events_array,
arraySort(t -> t.1, groupArray(tuple(
toFloat(timestamp),
uuid,
{prop_selector},
arrayFilter((x) -> x != 0, [{steps}{exclusions}])
))) as events_array,
arrayJoin({fn}(
{self.context.max_steps},
{self.conversion_window_limit()},
'{breakdown_attribution_string}',
'{self.context.funnelsFilter.funnelOrderType}',
{prop_vals},
{udf_event_array_filter(self.context.funnelsFilter.funnelOrderType)}
{self.udf_event_array_filter()}
)) as af_tuple,
af_tuple.1 as step_reached,
af_tuple.1 + 1 as steps, -- Backward compatibility
Expand Down
Loading

0 comments on commit f56e4c1

Please sign in to comment.