Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(insights): Lifecycle insight to HogQL #17135

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions frontend/src/lib/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ export const FEATURE_FLAGS = {
SURVEY_NPS_RESULTS: 'survey-nps-results', // owner: @liyiy
// owner: #team-monitoring
SESSION_RECORDING_ALLOW_V1_SNAPSHOTS: 'session-recording-allow-v1-snapshots',
HOGQL_INSIGHTS: 'hogql-insights', // owner: @mariusandra
} as const
export type FeatureFlagKey = (typeof FEATURE_FLAGS)[keyof typeof FEATURE_FLAGS]

Expand Down
19 changes: 13 additions & 6 deletions frontend/src/queries/query.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import posthog from 'posthog-js'
import { DataNode, HogQLQueryResponse, PersonsNode } from './schema'
import {
isInsightQueryNode,
isDataTableNode,
isEventsQuery,
isHogQLQuery,
isInsightQueryNode,
isInsightVizNode,
isLifecycleQuery,
isPersonsNode,
isTimeToSeeDataSessionsQuery,
isTimeToSeeDataQuery,
isDataTableNode,
isTimeToSeeDataSessionsNode,
isHogQLQuery,
isInsightVizNode,
isTimeToSeeDataSessionsQuery,
} from './utils'
import api, { ApiMethodOptions } from 'lib/api'
import { getCurrentTeamId } from 'lib/utils/logics'
Expand All @@ -27,6 +28,8 @@ import { toParams } from 'lib/utils'
import { queryNodeToFilter } from './nodes/InsightQuery/utils/queryNodeToFilter'
import { now } from 'lib/dayjs'
import { currentSessionId } from 'lib/internalMetrics'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'

const EXPORT_MAX_LIMIT = 10000

Expand Down Expand Up @@ -104,10 +107,14 @@ export async function query<N extends DataNode = DataNode>(
const logParams: Record<string, any> = {}
const startTime = performance.now()

const hogQLInsightsFlagEnabled = Boolean(
featureFlagLogic.findMounted()?.values.featureFlags?.[FEATURE_FLAGS.HOGQL_INSIGHTS]
)

try {
if (isPersonsNode(queryNode)) {
response = await api.get(getPersonsEndpoint(queryNode), methodOptions)
} else if (isInsightQueryNode(queryNode)) {
} else if (isInsightQueryNode(queryNode) && !(hogQLInsightsFlagEnabled && isLifecycleQuery(queryNode))) {
const filters = queryNodeToFilter(queryNode)
const params = {
...filters,
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 7 additions & 2 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from django.http import HttpResponse, JsonResponse
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter
from posthog.models.event.events_query import run_events_query
from pydantic import BaseModel
from rest_framework import viewsets
from rest_framework.decorators import action
Expand All @@ -26,13 +27,13 @@
from posthog.hogql.metadata import get_hogql_metadata
from posthog.hogql.query import execute_hogql_query
from posthog.models import Team
from posthog.models.event.events_query import run_events_query
from posthog.models.user import User
from posthog.nodes.lifecycle_hogql_query import run_lifecycle_query
from posthog.permissions import ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission
from posthog.queries.time_to_see_data.serializers import SessionEventsQuerySerializer, SessionsQuerySerializer
from posthog.queries.time_to_see_data.sessions import get_session_events, get_sessions
from posthog.rate_limit import AIBurstRateThrottle, AISustainedRateThrottle, TeamRateThrottle
from posthog.schema import EventsQuery, HogQLQuery, HogQLMetadata
from posthog.schema import EventsQuery, HogQLQuery, HogQLMetadata, LifecycleQuery


class QueryThrottle(TeamRateThrottle):
Expand Down Expand Up @@ -219,6 +220,10 @@ def process_query(team: Team, query_json: Dict, default_limit: Optional[int] = N
metadata_query = HogQLMetadata.parse_obj(query_json)
response = get_hogql_metadata(query=metadata_query, team=team)
return _unwrap_pydantic_dict(response)
elif query_kind == "LifecycleQuery":
lifecycle_query = LifecycleQuery.parse_obj(query_json)
response = run_lifecycle_query(query=lifecycle_query, team=team)
return _unwrap_pydantic_dict(response)
elif query_kind == "DatabaseSchemaQuery":
database = create_hogql_database(team.pk)
return serialize_database(database)
Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ class JoinConstraint(Expr):
@dataclass(kw_only=True)
class JoinExpr(Expr):
# :TRICKY: When adding new fields, make sure they're handled in visitor.py and resolver.py
type: Optional[TableOrSelectType]
type: Optional[TableOrSelectType] = field(default=None)

join_type: Optional[str] = None
table: Optional[Union["SelectQuery", "SelectUnionQuery", Field]] = None
Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql/placeholders.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, placeholders: Optional[Dict[str, ast.Expr]]):
def visit_placeholder(self, node):
if not self.placeholders:
raise HogQLException(f"Placeholders, such as {{{node.field}}}, are not supported in this context")
if node.field in self.placeholders:
if node.field in self.placeholders and self.placeholders[node.field]:
new_node = self.placeholders[node.field]
new_node.start = node.start
new_node.end = node.end
Expand Down
27 changes: 23 additions & 4 deletions posthog/hogql/property.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
from posthog.models.property import PropertyGroup
from posthog.models.property.util import build_selector_regex
from posthog.models.property_definition import PropertyType
from posthog.schema import PropertyOperator
from posthog.schema import (
PropertyOperator,
PropertyGroupFilter,
FilterLogicalOperator,
PropertyGroupFilterValue,
)


def has_aggregation(expr: AST) -> bool:
Expand Down Expand Up @@ -59,16 +64,30 @@ def property_to_expr(property: Union[BaseModel, PropertyGroup, Property, dict, l
return ast.And(exprs=properties)
elif isinstance(property, Property):
pass
elif isinstance(property, PropertyGroup):
if property.type != PropertyOperatorType.AND and property.type != PropertyOperatorType.OR:
elif (
isinstance(property, PropertyGroup)
or isinstance(property, PropertyGroupFilter)
or isinstance(property, PropertyGroupFilterValue)
):
if (
isinstance(property, PropertyGroup)
and property.type != PropertyOperatorType.AND
and property.type != PropertyOperatorType.OR
):
raise NotImplementedException(f'PropertyGroup of unknown type "{property.type}"')
if (
(isinstance(property, PropertyGroupFilter) or isinstance(property, PropertyGroupFilterValue))
and property.type != FilterLogicalOperator.AND
and property.type != FilterLogicalOperator.OR
):
raise NotImplementedException(f'PropertyGroupFilter of unknown type "{property.type}"')

if len(property.values) == 0:
return ast.Constant(value=True)
if len(property.values) == 1:
return property_to_expr(property.values[0], team)

if property.type == PropertyOperatorType.AND:
if property.type == PropertyOperatorType.AND or property.type == FilterLogicalOperator.AND:
return ast.And(exprs=[property_to_expr(p, team) for p in property.values])
else:
return ast.Or(exprs=[property_to_expr(p, team) for p in property.values])
Expand Down
243 changes: 243 additions & 0 deletions posthog/nodes/lifecycle_hogql_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
import re
from typing import Dict, Any, Optional

from django.utils.timezone import datetime

from posthog.hogql import ast
from posthog.hogql.parser import parse_expr, parse_select
from posthog.hogql.property import property_to_expr, action_to_expr
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.timings import HogQLTimings
from posthog.models import Team, Action
from posthog.nodes.query_date_range import QueryDateRange
from posthog.schema import LifecycleQuery, IntervalType


def create_events_query(
interval: str, event_filter: ast.Expr, timings: HogQLTimings, sampling_factor: Optional[float] = None
):
one_interval_period = parse_expr(f"toInterval{interval.capitalize()}(1)")

if not event_filter:
event_filter = ast.Constant(value=True)

placeholders = {
"event_filter": event_filter,
"interval": ast.Constant(value=interval),
"one_interval_period": one_interval_period,
}

events_query = parse_select(
"""
SELECT
events.person.id as person_id,
min(events.person.created_at) AS created_at,
arraySort(groupUniqArray(dateTrunc({interval}, events.timestamp))) AS all_activity,
arrayPopBack(arrayPushFront(all_activity, dateTrunc({interval}, created_at))) as previous_activity,
arrayPopFront(arrayPushBack(all_activity, dateTrunc({interval}, toDateTime('1970-01-01 00:00:00')))) as following_activity,
arrayMap((previous, current, index) -> (previous = current ? 'new' : ((current - {one_interval_period}) = previous AND index != 1) ? 'returning' : 'resurrecting'), previous_activity, all_activity, arrayEnumerate(all_activity)) as initial_status,
arrayMap((current, next) -> (current + {one_interval_period} = next ? '' : 'dormant'), all_activity, following_activity) as dormant_status,
arrayMap(x -> x + {one_interval_period}, arrayFilter((current, is_dormant) -> is_dormant = 'dormant', all_activity, dormant_status)) as dormant_periods,
arrayMap(x -> 'dormant', dormant_periods) as dormant_label,
arrayConcat(arrayZip(all_activity, initial_status), arrayZip(dormant_periods, dormant_label)) as temp_concat,
arrayJoin(temp_concat) as period_status_pairs,
period_status_pairs.1 as start_of_period,
period_status_pairs.2 as status
FROM events
WHERE {event_filter}
GROUP BY person_id
""",
placeholders=placeholders,
timings=timings,
)

if sampling_factor is not None and (isinstance(sampling_factor, float) or isinstance(sampling_factor, int)):
sample_expr = ast.SampleExpr(sample_value=ast.RatioExpr(left=ast.Constant(value=sampling_factor)))
events_query.select_from.sample = sample_expr

return events_query


def assume_not_null(expr: ast.Expr) -> ast.Expr:
return ast.Call(name="assumeNotNull", args=[expr])


def to_datetime(expr: ast.Expr) -> ast.Expr:
return ast.Call(name="toDateTime", args=[expr])


def run_lifecycle_query(
team: Team,
query: LifecycleQuery,
) -> Dict[str, Any]:
now_dt = datetime.now()
timings = HogQLTimings()
interval = query.interval or IntervalType.day
if not isinstance(interval, IntervalType) or re.match(r"[^a-z]", interval.name):
raise ValueError(f"Invalid interval: {interval}")
interval_period = interval.name.capitalize()
one_interval_period = ast.Call(name=f"toInterval{interval_period}", args=[ast.Constant(value=1)])
number_interval_period = ast.Call(name=f"toInterval{interval_period}", args=[ast.Field(chain=["number"])])

event_filter = []
with timings.measure("date_range"):
query_date_range = QueryDateRange(date_range=query.dateRange, team=team, interval=query.interval, now=now_dt)
date_from = assume_not_null(to_datetime(ast.Constant(value=query_date_range.date_from)))
date_to = assume_not_null(to_datetime(ast.Constant(value=query_date_range.date_to)))
event_filter.append(
parse_expr(
"timestamp >= dateTrunc({interval}, {date_from}) - {one_interval}",
{
"interval": ast.Constant(value=interval.name),
"one_interval": one_interval_period,
"date_from": date_from,
},
timings=timings,
)
)
event_filter.append(
parse_expr(
"timestamp < dateTrunc({interval}, {date_to}) + {one_interval}",
{
"interval": ast.Constant(value=interval.name),
"one_interval": one_interval_period,
"date_to": date_to,
},
timings=timings,
)
)

with timings.measure("properties"):
if query.properties is not None and query.properties != []:
event_filter.append(property_to_expr(query.properties, team))

with timings.measure("series_filters"):
for serie in query.series or []:
if serie.kind == "ActionsNode":
action = Action.objects.get(pk=int(serie.id), team=team)
event_filter.append(action_to_expr(action))
elif serie.kind == "EventsNode":
if serie.event is not None:
event_filter.append(
ast.CompareOperation(
op=ast.CompareOperationOp.Eq,
left=ast.Field(chain=["event"]),
right=ast.Constant(value=str(serie.event)),
)
)
else:
raise ValueError(f"Invalid serie kind: {serie.kind}")
if serie.properties is not None and serie.properties != []:
event_filter.append(property_to_expr(serie.properties, team))

with timings.measure("test_account_filters"):
if (
query.filterTestAccounts
and isinstance(team.test_account_filters, list)
and len(team.test_account_filters) > 0
):
for property in team.test_account_filters:
event_filter.append(property_to_expr(property, team))

if len(event_filter) == 0:
event_filter = ast.Constant(value=True)
elif len(event_filter) == 1:
event_filter = event_filter[0]
else:
event_filter = ast.And(exprs=event_filter)

placeholders = {
"interval": ast.Constant(value=interval.name),
"one_interval_period": one_interval_period,
"number_interval_period": number_interval_period,
"event_filter": event_filter,
"date_from": date_from,
"date_to": date_to,
}

with timings.measure("events_query"):
events_query = create_events_query(
interval=interval.name, event_filter=event_filter, sampling_factor=query.samplingFactor, timings=timings
)

with timings.measure("periods_query"):
periods = parse_select(
"""
SELECT (
dateTrunc({interval}, {date_to}) - {number_interval_period}
) AS start_of_period
FROM numbers(
dateDiff(
{interval},
dateTrunc({interval}, {date_from}),
dateTrunc({interval}, {date_to} + {one_interval_period})
)
)
""",
placeholders=placeholders,
)

with timings.measure("lifecycle_query"):
lifecycle_sql = parse_select(
"""
SELECT groupArray(start_of_period) AS date,
groupArray(counts) AS total,
status
FROM (
SELECT
status = 'dormant' ? negate(sum(counts)) : negate(negate(sum(counts))) as counts,
start_of_period,
status
FROM (
SELECT
periods.start_of_period as start_of_period,
0 AS counts,
status
FROM {periods} as periods
CROSS JOIN (
SELECT status
FROM (SELECT 1)
ARRAY JOIN ['new', 'returning', 'resurrecting', 'dormant'] as status
) as sec
ORDER BY status, start_of_period
UNION ALL
SELECT
start_of_period, count(DISTINCT person_id) AS counts, status
FROM {events_query}
GROUP BY start_of_period, status
)
WHERE start_of_period <= dateTrunc({interval}, {date_to})
AND start_of_period >= dateTrunc({interval}, {date_from})
GROUP BY start_of_period, status
ORDER BY start_of_period ASC
)
GROUP BY status
""",
{**placeholders, "periods": periods, "events_query": events_query},
)

response = execute_hogql_query(
team=team,
query=lifecycle_sql,
query_type="LifecycleQuery",
)

res = []
for val in response.results:
counts = val[1]
labels = [item.strftime("%-d-%b-%Y{}".format(" %H:%M" if interval.name == "hour" else "")) for item in val[0]]
days = [item.strftime("%Y-%m-%d{}".format(" %H:%M:%S" if interval.name == "hour" else "")) for item in val[0]]

label = "{} - {}".format("", val[2]) # entity.name
additional_values = {"label": label, "status": val[2]}
res.append(
{
"data": [float(c) for c in counts],
"count": float(sum(counts)),
"labels": labels,
"days": days,
**additional_values,
}
)

return {"result": res, "timings": response.timings}
Loading