diff --git a/ee/clickhouse/queries/funnels/test/test_funnel_correlations_persons.py b/ee/clickhouse/queries/funnels/test/test_funnel_correlations_persons.py index afb7edb977bc1..4617ffde3c2d5 100644 --- a/ee/clickhouse/queries/funnels/test/test_funnel_correlations_persons.py +++ b/ee/clickhouse/queries/funnels/test/test_funnel_correlations_persons.py @@ -97,7 +97,7 @@ def _setup_basic_test(self): {"event": "negatively_related", "timestamp": datetime(2020, 1, 3, 14)}, {"event": "paid", "timestamp": datetime(2020, 1, 4, 14)}, ] - journeys_for(events_by_person, self.team) + journeys_for(events_by_person, self.team, create_people=False) return ( filter, diff --git a/ee/clickhouse/test/test_journeys.py b/ee/clickhouse/test/test_journeys.py deleted file mode 100644 index 0056acfc4f2d2..0000000000000 --- a/ee/clickhouse/test/test_journeys.py +++ /dev/null @@ -1,171 +0,0 @@ -import dataclasses -import json -from datetime import datetime -from typing import Any, Dict, List -from uuid import uuid4 - -from django.utils import timezone - -from posthog.client import sync_execute -from posthog.models import Group, Person, PersonDistinctId, Team -from posthog.models.event.sql import EVENTS_DATA_TABLE -from posthog.test.base import _create_event, flush_persons_and_events - - -def journeys_for( - events_by_person: Dict[str, List[Dict[str, Any]]], - team: Team, - create_people: bool = True, -) -> Dict[str, Person]: - """ - Helper for creating specific events for a team. - - Allows tests to be written in a declarative style - - # these things happened in the past for these people - events_by_person = { - "person1": [{"some": "events}], - "person2": [{"some": "more events}], - } - journeys_for(events_by_person, team) - - # then the application receives them - actual = system_under_test.runs() - - # and we can assert on the results of that - ... - - Writing tests in this way reduces duplication in test setup - And clarifies the preconditions of the test - """ - - def _create_event_from_args(**event): - return {**event} - - flush_persons_and_events() - people = {} - events_to_create = [] - for distinct_id, events in events_by_person.items(): - if create_people: - people[distinct_id] = update_or_create_person(distinct_ids=[distinct_id], team_id=team.pk) - else: - people[distinct_id] = Person.objects.get( - persondistinctid__distinct_id=distinct_id, - persondistinctid__team_id=team.pk, - ) - - for event in events: - # Populate group properties as well - group_props = {} - for property_key, value in (event.get("properties") or {}).items(): - if property_key.startswith("$group_"): - group_type_index = property_key[-1] - try: - group = Group.objects.get( - team_id=team.pk, - group_type_index=group_type_index, - group_key=value, - ) - group_property_key = f"group{group_type_index}_properties" - group_props = { - group_property_key: { - **group.group_properties, - **event.get(group_property_key, {}), - } - } - - except Group.DoesNotExist: - continue - - if "timestamp" not in event: - event["timestamp"] = datetime.now() - - events_to_create.append( - _create_event_from_args( - team=team, - distinct_id=distinct_id, - event=event["event"], - timestamp=event["timestamp"], - properties=event.get("properties", {}), - person_id=people[distinct_id].uuid, - person_properties=people[distinct_id].properties or {}, - group0_properties=event.get("group0_properties", {}) or group_props.get("group0_properties", {}), - group1_properties=event.get("group1_properties", {}) or group_props.get("group1_properties", {}), - group2_properties=event.get("group2_properties", {}) or group_props.get("group2_properties", {}), - group3_properties=event.get("group3_properties", {}) or group_props.get("group3_properties", {}), - group4_properties=event.get("group4_properties", {}) or group_props.get("group4_properties", {}), - ) - ) - - _create_all_events_raw(events_to_create) - - return people - - -def _create_all_events_raw(all_events: List[Dict]): - parsed = "" - for event in all_events: - data: Dict[str, Any] = { - "properties": {}, - "timestamp": timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f"), - "person_id": str(uuid4()), - "person_properties": {}, - "group0_properties": {}, - "group1_properties": {}, - "group2_properties": {}, - "group3_properties": {}, - "group4_properties": {}, - } - data.update(event) - in_memory_event = InMemoryEvent(**data) - parsed += f""" - ('{str(uuid4())}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{in_memory_event.person_id}', '{json.dumps(in_memory_event.person_properties)}', '{json.dumps(in_memory_event.group0_properties)}', '{json.dumps(in_memory_event.group1_properties)}', '{json.dumps(in_memory_event.group2_properties)}', '{json.dumps(in_memory_event.group3_properties)}', '{json.dumps(in_memory_event.group4_properties)}', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0) - """ - - sync_execute( - f""" - INSERT INTO {EVENTS_DATA_TABLE()} (uuid, event, properties, timestamp, team_id, distinct_id, elements_chain, person_id, person_properties, group0_properties, group1_properties, group2_properties, group3_properties, group4_properties, created_at, _timestamp, _offset) VALUES - {parsed} - """ - ) - - -def create_all_events(all_events: List[dict]): - for event in all_events: - _create_event(**event) - - -# We collect all events per test into an array and batch create the events to reduce creation time -@dataclasses.dataclass -class InMemoryEvent: - event: str - distinct_id: str - team: Team - timestamp: str - properties: Dict - person_id: str - person_properties: Dict - group0_properties: Dict - group1_properties: Dict - group2_properties: Dict - group3_properties: Dict - group4_properties: Dict - - -def update_or_create_person(distinct_ids: List[str], team_id: int, **kwargs): - (person, _) = Person.objects.update_or_create( - persondistinctid__distinct_id__in=distinct_ids, - persondistinctid__team_id=team_id, - defaults={**kwargs, "team_id": team_id}, - ) - for distinct_id in distinct_ids: - PersonDistinctId.objects.update_or_create( - distinct_id=distinct_id, - team_id=person.team_id, - defaults={ - "person_id": person.id, - "team_id": team_id, - "distinct_id": distinct_id, - }, - ) - return person diff --git a/ee/clickhouse/views/test/funnel/test_clickhouse_funnel_correlation.py b/ee/clickhouse/views/test/funnel/test_clickhouse_funnel_correlation.py index fcc51e7c7f7fb..829232d1bd94f 100644 --- a/ee/clickhouse/views/test/funnel/test_clickhouse_funnel_correlation.py +++ b/ee/clickhouse/views/test/funnel/test_clickhouse_funnel_correlation.py @@ -17,7 +17,7 @@ from posthog.models.element import Element from posthog.models.team import Team from posthog.test.base import BaseTest, _create_event, _create_person -from posthog.test.test_journeys import journeys_for, update_or_create_person +from posthog.test.test_journeys import journeys_for @pytest.mark.clickhouse_only @@ -506,16 +506,21 @@ def test_properties_correlation_endpoint_provides_people_drill_down_urls(self): with freeze_time("2020-01-01"): self.client.force_login(self.user) - update_or_create_person( + _create_person( distinct_ids=["Person 1"], team_id=self.team.pk, properties={"$browser": "1"}, ) - update_or_create_person( + _create_person( distinct_ids=["Person 2"], team_id=self.team.pk, properties={"$browser": "1"}, ) + _create_person( + distinct_ids=["Person 3"], + team_id=self.team.pk, + properties={}, + ) events = { "Person 1": [ @@ -534,7 +539,7 @@ def test_properties_correlation_endpoint_provides_people_drill_down_urls(self): ], } - journeys_for(events_by_person=events, team=self.team) + journeys_for(events_by_person=events, team=self.team, create_people=False) odds = get_funnel_correlation_ok( client=self.client, diff --git a/ee/clickhouse/views/test/test_clickhouse_trends.py b/ee/clickhouse/views/test/test_clickhouse_trends.py index 034234df3929e..75ab015e39a15 100644 --- a/ee/clickhouse/views/test/test_clickhouse_trends.py +++ b/ee/clickhouse/views/test/test_clickhouse_trends.py @@ -735,7 +735,7 @@ def test_insight_trends_cumulative(self): } ], } - created_people = journeys_for(events_by_person, self.team) + created_people = journeys_for(events_by_person, self.team, create_people=False) # Total Volume with freeze_time("2012-01-15T04:01:34.000Z"): diff --git a/frontend/src/queries/schema.json b/frontend/src/queries/schema.json index 899bdf57c13ba..c3d51a73dfabe 100644 --- a/frontend/src/queries/schema.json +++ b/frontend/src/queries/schema.json @@ -92,6 +92,9 @@ { "$ref": "#/definitions/PersonsQuery" }, + { + "$ref": "#/definitions/SessionsTimelineQuery" + }, { "$ref": "#/definitions/HogQLQuery" }, @@ -1881,6 +1884,7 @@ "HogQLQuery", "HogQLMetadata", "PersonsQuery", + "SessionsTimelineQuery", "DataTableNode", "SavedInsightNode", "InsightVizNode", @@ -2617,6 +2621,58 @@ "required": ["key", "operator", "type"], "type": "object" }, + "SessionsTimelineQuery": { + "additionalProperties": false, + "properties": { + "after": { + "description": "Only fetch sessions that started after this timestamp (default: '-24h')", + "type": "string" + }, + "before": { + "description": "Only fetch sessions that started before this timestamp (default: '+5s')", + "type": "string" + }, + "kind": { + "const": "SessionsTimelineQuery", + "type": "string" + }, + "personId": { + "description": "Fetch sessions only for a given person", + "type": "string" + }, + "response": { + "$ref": "#/definitions/SessionsTimelineQueryResponse", + "description": "Cached query response" + } + }, + "required": ["kind"], + "type": "object" + }, + "SessionsTimelineQueryResponse": { + "additionalProperties": false, + "properties": { + "hasMore": { + "type": "boolean" + }, + "hogql": { + "type": "string" + }, + "results": { + "items": { + "$ref": "#/definitions/TimelineEntry" + }, + "type": "array" + }, + "timings": { + "items": { + "$ref": "#/definitions/QueryTiming" + }, + "type": "array" + } + }, + "required": ["results"], + "type": "object" + }, "StepOrderValue": { "enum": ["strict", "unordered", "ordered"], "type": "string" @@ -2813,6 +2869,27 @@ "required": ["kind", "source"], "type": "object" }, + "TimelineEntry": { + "additionalProperties": false, + "properties": { + "events": { + "items": { + "$ref": "#/definitions/EventType" + }, + "type": "array" + }, + "recording_duration_s": { + "description": "Duration of the recording in seconds.", + "type": "number" + }, + "sessionId": { + "description": "Session ID. None means out-of-session events", + "type": "string" + } + }, + "required": ["events"], + "type": "object" + }, "TrendsFilter": { "additionalProperties": false, "description": "`TrendsFilterType` minus everything inherited from `FilterType` and `hidden_legend_keys` replaced by `hidden_legend_indexes`", diff --git a/frontend/src/queries/schema.ts b/frontend/src/queries/schema.ts index 98ce41024e7ed..1a13b65fddf4c 100644 --- a/frontend/src/queries/schema.ts +++ b/frontend/src/queries/schema.ts @@ -45,6 +45,7 @@ export enum NodeKind { HogQLQuery = 'HogQLQuery', HogQLMetadata = 'HogQLMetadata', PersonsQuery = 'PersonsQuery', + SessionsTimelineQuery = 'SessionsTimelineQuery', // Interface nodes DataTableNode = 'DataTableNode', @@ -81,6 +82,7 @@ export type AnyDataNode = | TimeToSeeDataSessionsQuery // old API | EventsQuery | PersonsQuery + | SessionsTimelineQuery | HogQLQuery | HogQLMetadata | WebOverviewQuery @@ -544,6 +546,32 @@ export interface PersonsQuery extends DataNode { response?: PersonsQueryResponse } +export interface TimelineEntry { + /** Session ID. None means out-of-session events */ + sessionId?: string + events: EventType[] + /** Duration of the recording in seconds. */ + recording_duration_s?: number +} + +export interface SessionsTimelineQueryResponse { + results: TimelineEntry[] + hasMore?: boolean + timings?: QueryTiming[] + hogql?: string +} + +export interface SessionsTimelineQuery extends DataNode { + kind: NodeKind.SessionsTimelineQuery + /** Fetch sessions only for a given person */ + personId?: string + /** Only fetch sessions that started after this timestamp (default: '-24h') */ + after?: string + /** Only fetch sessions that started before this timestamp (default: '+5s') */ + before?: string + response?: SessionsTimelineQueryResponse +} + export type WebAnalyticsPropertyFilters = (EventPropertyFilter | HogQLPropertyFilter)[] export interface WebAnalyticsQueryBase { diff --git a/frontend/src/scenes/saved-insights/SavedInsights.tsx b/frontend/src/scenes/saved-insights/SavedInsights.tsx index 84e2251723ab8..2f01773ebd0b4 100644 --- a/frontend/src/scenes/saved-insights/SavedInsights.tsx +++ b/frontend/src/scenes/saved-insights/SavedInsights.tsx @@ -230,6 +230,12 @@ export const QUERY_TYPES_METADATA: Record = { icon: IconCoffee, inMenu: true, }, + [NodeKind.SessionsTimelineQuery]: { + name: 'Sessions', + description: 'Sessions timeline query', + icon: InsightsTrendsIcon, + inMenu: true, + }, [NodeKind.HogQLQuery]: { name: 'HogQL', description: 'Direct HogQL query', diff --git a/posthog/hogql/functions/mapping.py b/posthog/hogql/functions/mapping.py index 5c4933ca981a4..8d8fca037f21a 100644 --- a/posthog/hogql/functions/mapping.py +++ b/posthog/hogql/functions/mapping.py @@ -1,4 +1,5 @@ from dataclasses import dataclass +from itertools import chain from typing import List, Optional, Dict, Tuple, Type from posthog.hogql import ast from posthog.hogql.base import ConstantType @@ -139,6 +140,7 @@ class HogQLFunctionMeta: "not": HogQLFunctionMeta("not", 1, 1), # type conversions "toInt": HogQLFunctionMeta("toInt64OrNull", 1, 1), + "_toInt64": HogQLFunctionMeta("toInt64", 1, 1), "toFloat": HogQLFunctionMeta("toFloat64OrNull", 1, 1), "toDecimal": HogQLFunctionMeta("toDecimal64OrNull", 1, 1), "toDate": HogQLFunctionMeta( @@ -727,6 +729,10 @@ class HogQLFunctionMeta: "sparkline": HogQLFunctionMeta("sparkline", 1, 1), } +ALL_EXPOSED_FUNCTION_NAMES = [ + name for name in chain(HOGQL_CLICKHOUSE_FUNCTIONS.keys(), HOGQL_AGGREGATIONS.keys()) if not name.startswith("_") +] + # TODO: Make the below details part of function meta # Functions where we use a -OrNull variant by default ADD_OR_NULL_DATETIME_FUNCTIONS = ( diff --git a/posthog/hogql/printer.py b/posthog/hogql/printer.py index a2a541b3b0b41..fa55d34e586a8 100644 --- a/posthog/hogql/printer.py +++ b/posthog/hogql/printer.py @@ -29,7 +29,7 @@ escape_hogql_identifier, escape_hogql_string, ) -from posthog.hogql.functions.mapping import validate_function_args +from posthog.hogql.functions.mapping import ALL_EXPOSED_FUNCTION_NAMES, validate_function_args from posthog.hogql.resolver import ResolverException, resolve_types from posthog.hogql.resolver_utils import lookup_field_by_name from posthog.hogql.transforms.in_cohort import resolve_in_cohorts @@ -778,8 +778,7 @@ def visit_call(self, node: ast.Call): elif node.name in HOGQL_POSTHOG_FUNCTIONS: raise HogQLException(f"Unexpected unresolved HogQL function '{node.name}(...)'") else: - all_function_names = list(HOGQL_CLICKHOUSE_FUNCTIONS.keys()) + list(HOGQL_AGGREGATIONS.keys()) - close_matches = get_close_matches(node.name, all_function_names, 1) + close_matches = get_close_matches(node.name, ALL_EXPOSED_FUNCTION_NAMES, 1) if len(close_matches) > 0: raise HogQLException( f"Unsupported function call '{node.name}(...)'. Perhaps you meant '{close_matches[0]}(...)'?" diff --git a/posthog/hogql_queries/query_runner.py b/posthog/hogql_queries/query_runner.py index a83ab368b6ad0..fe10fdbbba156 100644 --- a/posthog/hogql_queries/query_runner.py +++ b/posthog/hogql_queries/query_runner.py @@ -17,6 +17,7 @@ from posthog.models import Team from posthog.schema import ( QueryTiming, + SessionsTimelineQuery, TrendsQuery, LifecycleQuery, WebTopClicksQuery, @@ -71,6 +72,7 @@ class CachedQueryResponse(QueryResponse): LifecycleQuery, EventsQuery, PersonsQuery, + SessionsTimelineQuery, WebOverviewQuery, WebTopClicksQuery, WebStatsTableQuery, @@ -134,6 +136,14 @@ def get_query_runner( timings=timings, in_export_context=in_export_context, ) + if kind == "SessionsTimelineQuery": + from .sessions_timeline_query_runner import SessionsTimelineQueryRunner + + return SessionsTimelineQueryRunner( + query=cast(SessionsTimelineQuery | Dict[str, Any], query), + team=team, + timings=timings, + ) if kind == "WebOverviewQuery": from .web_analytics.web_overview import WebOverviewQueryRunner diff --git a/posthog/hogql_queries/sessions_timeline_query_runner.py b/posthog/hogql_queries/sessions_timeline_query_runner.py new file mode 100644 index 0000000000000..7d9d1d29a1646 --- /dev/null +++ b/posthog/hogql_queries/sessions_timeline_query_runner.py @@ -0,0 +1,197 @@ +from datetime import timedelta +import json +from typing import Dict, Optional, Any, cast +from posthog.api.element import ElementSerializer + + +from posthog.clickhouse.client.connection import Workload +from posthog.hogql import ast +from posthog.hogql.parser import parse_select +from posthog.hogql.query import execute_hogql_query +from posthog.hogql.timings import HogQLTimings +from posthog.hogql_queries.query_runner import QueryRunner +from posthog.models import Team +from posthog.models.element.element import chain_to_elements +from posthog.schema import EventType, SessionsTimelineQuery, SessionsTimelineQueryResponse, TimelineEntry +from posthog.utils import relative_date_parse + + +class SessionsTimelineQueryRunner(QueryRunner): + """ + ## How does the sessions timeline work? + + A formal session on the timeline is defined as a collection of all events with a given session ID. + An informal session is defined as a collection of contiguous events that don't have a session ID. + Additionally, a new informal session is formed when the time between two consecutive events exceeds 30 minutes + (which does not apply to formal sessions). + + > Note that the logic above is not the same as that of Trends session duration. + > In Trends, only events with a session ID are considered (i.e. formal sessions). + + Now, the sessions timeline is a sequence of sessions (both formal and informal), starting with ones that started + most recently. Events within a session are also ordered with latest first. + """ + + EVENT_LIMIT = 1000 + + query: SessionsTimelineQuery + query_type = SessionsTimelineQuery + + def __init__( + self, + query: SessionsTimelineQuery | Dict[str, Any], + team: Team, + timings: Optional[HogQLTimings] = None, + ): + super().__init__(query, team, timings) + if isinstance(query, SessionsTimelineQuery): + self.query = query + else: + self.query = SessionsTimelineQuery.model_validate(query) + + def _get_events_subquery(self) -> ast.SelectQuery: + after = relative_date_parse(self.query.after or "-24h", self.team.timezone_info) + before = relative_date_parse(self.query.before or "-0h", self.team.timezone_info) + with self.timings.measure("build_events_subquery"): + event_conditions: list[ast.Expr] = [ + ast.CompareOperation( + op=ast.CompareOperationOp.Gt, + left=ast.Field(chain=["timestamp"]), + right=ast.Constant(value=after), + ), + ast.CompareOperation( + op=ast.CompareOperationOp.Lt, + left=ast.Field(chain=["timestamp"]), + right=ast.Constant(value=before), + ), + ] + if self.query.personId: + event_conditions.append( + ast.CompareOperation( + left=ast.Field(chain=["person_id"]), + right=ast.Constant(value=self.query.personId), + op=ast.CompareOperationOp.Eq, + ) + ) + select_query = parse_select( + """ + SELECT + uuid, + person_id AS person_id, + timestamp AS timestamp, + event, + properties, + distinct_id, + elements_chain, + $session_id AS session_id, + lagInFrame($session_id, 1) OVER ( + PARTITION BY person_id ORDER BY timestamp + ) AS prev_session_id + FROM events + WHERE {event_conditions} + ORDER BY timestamp DESC + LIMIT {event_limit_with_more}""", + placeholders={ + "event_limit_with_more": ast.Constant(value=self.EVENT_LIMIT + 1), + "event_conditions": ast.And(exprs=event_conditions), + }, + ) + return cast(ast.SelectQuery, select_query) + + def to_query(self) -> ast.SelectQuery: + if self.timings is None: + self.timings = HogQLTimings() + + with self.timings.measure("build_sessions_timeline_query"): + select_query = parse_select( + """ + SELECT + e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER ( + PARTITION BY (e.person_id, session_id_flip_index) ORDER BY _toInt64(timestamp) + RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW /* split informal session after 30+ min */ + ) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM ( + SELECT + *, + sum(session_id = prev_session_id ? 0 : 1) OVER ( + PARTITION BY person_id ORDER BY timestamp ROWS UNBOUNDED PRECEDING + ) AS session_id_flip_index + FROM ({events_subquery}) + ) e + LEFT JOIN ( + SELECT start_time AS start_time, end_time AS end_time, session_id FROM session_replay_events + ) AS sre + ON e.session_id = sre.session_id + ORDER BY timestamp DESC""", + placeholders={"events_subquery": self._get_events_subquery()}, + ) + return cast(ast.SelectQuery, select_query) + + def to_persons_query(self): + return parse_select( + """SELECT DISTINCT person_id FROM {events_subquery}""", {"events_subquery": self._get_events_subquery()} + ) + + def calculate(self) -> SessionsTimelineQueryResponse: + query_result = execute_hogql_query( + query=self.to_query(), + team=self.team, + workload=Workload.ONLINE, + query_type="SessionsTimelineQuery", + timings=self.timings, + ) + assert query_result.results is not None + timeline_entries_map: Dict[str, TimelineEntry] = {} + for ( + uuid, + timestamp_parsed, + event, + properties_raw, + distinct_id, + elements_chain, + formal_session_id, + informal_session_id, + recording_duration_s, + ) in reversed( + query_result.results[: self.EVENT_LIMIT] + ): # The last result is a marker of more results + entry_id = str(formal_session_id or informal_session_id) + if entry_id not in timeline_entries_map: + timeline_entries_map[entry_id] = TimelineEntry( + sessionId=formal_session_id or None, events=[], recording_duration_s=recording_duration_s or None + ) + timeline_entries_map[entry_id].events.append( + EventType( + id=str(uuid), + distinct_id=distinct_id, + event=event, + timestamp=timestamp_parsed.isoformat(), + properties=json.loads(properties_raw), + elements_chain=elements_chain or None, + elements=ElementSerializer(chain_to_elements(elements_chain), many=True).data, + ) + ) + timeline_entries = list(reversed(timeline_entries_map.values())) + for entry in timeline_entries: + entry.events.reverse() + + return SessionsTimelineQueryResponse( + results=timeline_entries, + hasMore=len(query_result.results) > self.EVENT_LIMIT, + timings=self.timings.to_list(), + hogql=query_result.hogql, + ) + + def _is_stale(self, cached_result_package): + return True # TODO: Make sure this is cached + + def _refresh_frequency(self): + return timedelta(minutes=1) # TODO: Make sure this is cached diff --git a/posthog/hogql_queries/test/__snapshots__/test_sessions_timeline_query_runner.ambr b/posthog/hogql_queries/test/__snapshots__/test_sessions_timeline_query_runner.ambr new file mode 100644 index 0000000000000..b73f7380c6b6b --- /dev/null +++ b/posthog/hogql_queries/test/__snapshots__/test_sessions_timeline_query_runner.ambr @@ -0,0 +1,434 @@ +# name: TestSessionsTimelineQueryRunner.test_before_and_after + ' + SELECT e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER (PARTITION BY tuple(e.person_id, e.session_id_flip_index) + ORDER BY toInt64(e.timestamp) ASC RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM + (SELECT uuid, + person_id, timestamp, event, + properties, + distinct_id, + elements_chain, + session_id, + prev_session_id, + sum(if(ifNull(equals(session_id, prev_session_id), isNull(session_id) + and isNull(prev_session_id)), 0, 1)) OVER (PARTITION BY person_id + ORDER BY timestamp ASC ROWS UNBOUNDED PRECEDING) AS session_id_flip_index + FROM + (SELECT events.uuid, + events__pdi.person_id AS person_id, + toTimeZone(events.timestamp, 'UTC') AS timestamp, + events.event, + events.properties, + events.distinct_id, + events.elements_chain, + events.`$session_id` AS session_id, + lagInFrame(events.`$session_id`, 1) OVER (PARTITION BY person_id + ORDER BY timestamp ASC) AS prev_session_id + FROM events + INNER JOIN + (SELECT argMax(person_distinct_id2.person_id, person_distinct_id2.version) AS person_id, + person_distinct_id2.distinct_id AS distinct_id + FROM person_distinct_id2 + WHERE equals(person_distinct_id2.team_id, 2) + GROUP BY person_distinct_id2.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0)) AS events__pdi ON equals(events.distinct_id, events__pdi.distinct_id) + WHERE and(equals(events.team_id, 2), ifNull(greater(timestamp, toDateTime64('2023-10-01 12:00:00.000000', 6, 'UTC')), 0), ifNull(less(timestamp, toDateTime64('2023-10-01 17:00:00.000000', 6, 'UTC')), 0)) + ORDER BY timestamp DESC + LIMIT 1001)) AS e + LEFT JOIN + (SELECT toTimeZone(session_replay_events.start_time, 'UTC') AS start_time, + toTimeZone(session_replay_events.end_time, 'UTC') AS end_time, + session_replay_events.session_id + FROM + (SELECT min(session_replay_events.min_first_timestamp) AS start_time, + max(session_replay_events.max_last_timestamp) AS end_time, + session_replay_events.session_id AS session_id + FROM session_replay_events + WHERE equals(session_replay_events.team_id, 2) + GROUP BY session_replay_events.session_id) AS session_replay_events) AS sre ON equals(e.session_id, sre.session_id) + ORDER BY e.timestamp DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ' +--- +# name: TestSessionsTimelineQueryRunner.test_before_and_after_defaults + ' + SELECT e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER (PARTITION BY tuple(e.person_id, e.session_id_flip_index) + ORDER BY toInt64(e.timestamp) ASC RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM + (SELECT uuid, + person_id, timestamp, event, + properties, + distinct_id, + elements_chain, + session_id, + prev_session_id, + sum(if(ifNull(equals(session_id, prev_session_id), isNull(session_id) + and isNull(prev_session_id)), 0, 1)) OVER (PARTITION BY person_id + ORDER BY timestamp ASC ROWS UNBOUNDED PRECEDING) AS session_id_flip_index + FROM + (SELECT events.uuid, + events__pdi.person_id AS person_id, + toTimeZone(events.timestamp, 'UTC') AS timestamp, + events.event, + events.properties, + events.distinct_id, + events.elements_chain, + events.`$session_id` AS session_id, + lagInFrame(events.`$session_id`, 1) OVER (PARTITION BY person_id + ORDER BY timestamp ASC) AS prev_session_id + FROM events + INNER JOIN + (SELECT argMax(person_distinct_id2.person_id, person_distinct_id2.version) AS person_id, + person_distinct_id2.distinct_id AS distinct_id + FROM person_distinct_id2 + WHERE equals(person_distinct_id2.team_id, 2) + GROUP BY person_distinct_id2.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0)) AS events__pdi ON equals(events.distinct_id, events__pdi.distinct_id) + WHERE and(equals(events.team_id, 2), ifNull(greater(timestamp, toDateTime64('2023-09-30 16:00:00.000000', 6, 'UTC')), 0), ifNull(less(timestamp, toDateTime64('2023-10-01 16:00:00.000000', 6, 'UTC')), 0)) + ORDER BY timestamp DESC + LIMIT 1001)) AS e + LEFT JOIN + (SELECT toTimeZone(session_replay_events.start_time, 'UTC') AS start_time, + toTimeZone(session_replay_events.end_time, 'UTC') AS end_time, + session_replay_events.session_id + FROM + (SELECT min(session_replay_events.min_first_timestamp) AS start_time, + max(session_replay_events.max_last_timestamp) AS end_time, + session_replay_events.session_id AS session_id + FROM session_replay_events + WHERE equals(session_replay_events.team_id, 2) + GROUP BY session_replay_events.session_id) AS session_replay_events) AS sre ON equals(e.session_id, sre.session_id) + ORDER BY e.timestamp DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ' +--- +# name: TestSessionsTimelineQueryRunner.test_event_limit_and_has_more + ' + SELECT e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER (PARTITION BY tuple(e.person_id, e.session_id_flip_index) + ORDER BY toInt64(e.timestamp) ASC RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM + (SELECT uuid, + person_id, timestamp, event, + properties, + distinct_id, + elements_chain, + session_id, + prev_session_id, + sum(if(ifNull(equals(session_id, prev_session_id), isNull(session_id) + and isNull(prev_session_id)), 0, 1)) OVER (PARTITION BY person_id + ORDER BY timestamp ASC ROWS UNBOUNDED PRECEDING) AS session_id_flip_index + FROM + (SELECT events.uuid, + events__pdi.person_id AS person_id, + toTimeZone(events.timestamp, 'UTC') AS timestamp, + events.event, + events.properties, + events.distinct_id, + events.elements_chain, + events.`$session_id` AS session_id, + lagInFrame(events.`$session_id`, 1) OVER (PARTITION BY person_id + ORDER BY timestamp ASC) AS prev_session_id + FROM events + INNER JOIN + (SELECT argMax(person_distinct_id2.person_id, person_distinct_id2.version) AS person_id, + person_distinct_id2.distinct_id AS distinct_id + FROM person_distinct_id2 + WHERE equals(person_distinct_id2.team_id, 2) + GROUP BY person_distinct_id2.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0)) AS events__pdi ON equals(events.distinct_id, events__pdi.distinct_id) + WHERE and(equals(events.team_id, 2), ifNull(greater(timestamp, toDateTime64('2023-10-01 06:00:00.000000', 6, 'UTC')), 0), ifNull(less(timestamp, toDateTime64('2023-10-02 06:00:00.000000', 6, 'UTC')), 0)) + ORDER BY timestamp DESC + LIMIT 3)) AS e + LEFT JOIN + (SELECT toTimeZone(session_replay_events.start_time, 'UTC') AS start_time, + toTimeZone(session_replay_events.end_time, 'UTC') AS end_time, + session_replay_events.session_id + FROM + (SELECT min(session_replay_events.min_first_timestamp) AS start_time, + max(session_replay_events.max_last_timestamp) AS end_time, + session_replay_events.session_id AS session_id + FROM session_replay_events + WHERE equals(session_replay_events.team_id, 2) + GROUP BY session_replay_events.session_id) AS session_replay_events) AS sre ON equals(e.session_id, sre.session_id) + ORDER BY e.timestamp DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ' +--- +# name: TestSessionsTimelineQueryRunner.test_formal_and_informal_sessions_global + ' + SELECT e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER (PARTITION BY tuple(e.person_id, e.session_id_flip_index) + ORDER BY toInt64(e.timestamp) ASC RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM + (SELECT uuid, + person_id, timestamp, event, + properties, + distinct_id, + elements_chain, + session_id, + prev_session_id, + sum(if(ifNull(equals(session_id, prev_session_id), isNull(session_id) + and isNull(prev_session_id)), 0, 1)) OVER (PARTITION BY person_id + ORDER BY timestamp ASC ROWS UNBOUNDED PRECEDING) AS session_id_flip_index + FROM + (SELECT events.uuid, + events__pdi.person_id AS person_id, + toTimeZone(events.timestamp, 'UTC') AS timestamp, + events.event, + events.properties, + events.distinct_id, + events.elements_chain, + events.`$session_id` AS session_id, + lagInFrame(events.`$session_id`, 1) OVER (PARTITION BY person_id + ORDER BY timestamp ASC) AS prev_session_id + FROM events + INNER JOIN + (SELECT argMax(person_distinct_id2.person_id, person_distinct_id2.version) AS person_id, + person_distinct_id2.distinct_id AS distinct_id + FROM person_distinct_id2 + WHERE equals(person_distinct_id2.team_id, 2) + GROUP BY person_distinct_id2.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0)) AS events__pdi ON equals(events.distinct_id, events__pdi.distinct_id) + WHERE and(equals(events.team_id, 2), ifNull(greater(timestamp, toDateTime64('2023-10-01 06:00:00.000000', 6, 'UTC')), 0), ifNull(less(timestamp, toDateTime64('2023-10-02 06:00:00.000000', 6, 'UTC')), 0)) + ORDER BY timestamp DESC + LIMIT 1001)) AS e + LEFT JOIN + (SELECT toTimeZone(session_replay_events.start_time, 'UTC') AS start_time, + toTimeZone(session_replay_events.end_time, 'UTC') AS end_time, + session_replay_events.session_id + FROM + (SELECT min(session_replay_events.min_first_timestamp) AS start_time, + max(session_replay_events.max_last_timestamp) AS end_time, + session_replay_events.session_id AS session_id + FROM session_replay_events + WHERE equals(session_replay_events.team_id, 2) + GROUP BY session_replay_events.session_id) AS session_replay_events) AS sre ON equals(e.session_id, sre.session_id) + ORDER BY e.timestamp DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ' +--- +# name: TestSessionsTimelineQueryRunner.test_formal_session_with_recording + ' + SELECT e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER (PARTITION BY tuple(e.person_id, e.session_id_flip_index) + ORDER BY toInt64(e.timestamp) ASC RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM + (SELECT uuid, + person_id, timestamp, event, + properties, + distinct_id, + elements_chain, + session_id, + prev_session_id, + sum(if(ifNull(equals(session_id, prev_session_id), isNull(session_id) + and isNull(prev_session_id)), 0, 1)) OVER (PARTITION BY person_id + ORDER BY timestamp ASC ROWS UNBOUNDED PRECEDING) AS session_id_flip_index + FROM + (SELECT events.uuid, + events__pdi.person_id AS person_id, + toTimeZone(events.timestamp, 'UTC') AS timestamp, + events.event, + events.properties, + events.distinct_id, + events.elements_chain, + events.`$session_id` AS session_id, + lagInFrame(events.`$session_id`, 1) OVER (PARTITION BY person_id + ORDER BY timestamp ASC) AS prev_session_id + FROM events + INNER JOIN + (SELECT argMax(person_distinct_id2.person_id, person_distinct_id2.version) AS person_id, + person_distinct_id2.distinct_id AS distinct_id + FROM person_distinct_id2 + WHERE equals(person_distinct_id2.team_id, 2) + GROUP BY person_distinct_id2.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0)) AS events__pdi ON equals(events.distinct_id, events__pdi.distinct_id) + WHERE and(equals(events.team_id, 2), ifNull(greater(timestamp, toDateTime64('2023-10-01 06:00:00.000000', 6, 'UTC')), 0), ifNull(less(timestamp, toDateTime64('2023-10-02 06:00:00.000000', 6, 'UTC')), 0)) + ORDER BY timestamp DESC + LIMIT 1001)) AS e + LEFT JOIN + (SELECT toTimeZone(session_replay_events.start_time, 'UTC') AS start_time, + toTimeZone(session_replay_events.end_time, 'UTC') AS end_time, + session_replay_events.session_id + FROM + (SELECT min(session_replay_events.min_first_timestamp) AS start_time, + max(session_replay_events.max_last_timestamp) AS end_time, + session_replay_events.session_id AS session_id + FROM session_replay_events + WHERE equals(session_replay_events.team_id, 2) + GROUP BY session_replay_events.session_id) AS session_replay_events) AS sre ON equals(e.session_id, sre.session_id) + ORDER BY e.timestamp DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ' +--- +# name: TestSessionsTimelineQueryRunner.test_formal_sessions_for_person + ' + SELECT e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER (PARTITION BY tuple(e.person_id, e.session_id_flip_index) + ORDER BY toInt64(e.timestamp) ASC RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM + (SELECT uuid, + person_id, timestamp, event, + properties, + distinct_id, + elements_chain, + session_id, + prev_session_id, + sum(if(ifNull(equals(session_id, prev_session_id), isNull(session_id) + and isNull(prev_session_id)), 0, 1)) OVER (PARTITION BY person_id + ORDER BY timestamp ASC ROWS UNBOUNDED PRECEDING) AS session_id_flip_index + FROM + (SELECT events.uuid, + events__pdi.person_id AS person_id, + toTimeZone(events.timestamp, 'UTC') AS timestamp, + events.event, + events.properties, + events.distinct_id, + events.elements_chain, + events.`$session_id` AS session_id, + lagInFrame(events.`$session_id`, 1) OVER (PARTITION BY person_id + ORDER BY timestamp ASC) AS prev_session_id + FROM events + INNER JOIN + (SELECT argMax(person_distinct_id2.person_id, person_distinct_id2.version) AS person_id, + person_distinct_id2.distinct_id AS distinct_id + FROM person_distinct_id2 + WHERE equals(person_distinct_id2.team_id, 2) + GROUP BY person_distinct_id2.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0)) AS events__pdi ON equals(events.distinct_id, events__pdi.distinct_id) + WHERE and(equals(events.team_id, 2), ifNull(greater(timestamp, toDateTime64('2023-10-01 06:00:00.000000', 6, 'UTC')), 0), ifNull(less(timestamp, toDateTime64('2023-10-02 06:00:00.000000', 6, 'UTC')), 0), ifNull(equals(person_id, 'd63e1ecf-f92f-1e50-71f9-501e16e9aebe'), 0)) + ORDER BY timestamp DESC + LIMIT 1001)) AS e + LEFT JOIN + (SELECT toTimeZone(session_replay_events.start_time, 'UTC') AS start_time, + toTimeZone(session_replay_events.end_time, 'UTC') AS end_time, + session_replay_events.session_id + FROM + (SELECT min(session_replay_events.min_first_timestamp) AS start_time, + max(session_replay_events.max_last_timestamp) AS end_time, + session_replay_events.session_id AS session_id + FROM session_replay_events + WHERE equals(session_replay_events.team_id, 2) + GROUP BY session_replay_events.session_id) AS session_replay_events) AS sre ON equals(e.session_id, sre.session_id) + ORDER BY e.timestamp DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ' +--- +# name: TestSessionsTimelineQueryRunner.test_formal_sessions_global + ' + SELECT e.uuid, + e.timestamp, + e.event, + e.properties, + e.distinct_id, + e.elements_chain, + e.session_id AS formal_session_id, + first_value(e.uuid) OVER (PARTITION BY tuple(e.person_id, e.session_id_flip_index) + ORDER BY toInt64(e.timestamp) ASC RANGE BETWEEN 1800 PRECEDING AND CURRENT ROW) AS informal_session_uuid, + dateDiff('s', sre.start_time, sre.end_time) AS recording_duration_s + FROM + (SELECT uuid, + person_id, timestamp, event, + properties, + distinct_id, + elements_chain, + session_id, + prev_session_id, + sum(if(ifNull(equals(session_id, prev_session_id), isNull(session_id) + and isNull(prev_session_id)), 0, 1)) OVER (PARTITION BY person_id + ORDER BY timestamp ASC ROWS UNBOUNDED PRECEDING) AS session_id_flip_index + FROM + (SELECT events.uuid, + events__pdi.person_id AS person_id, + toTimeZone(events.timestamp, 'UTC') AS timestamp, + events.event, + events.properties, + events.distinct_id, + events.elements_chain, + events.`$session_id` AS session_id, + lagInFrame(events.`$session_id`, 1) OVER (PARTITION BY person_id + ORDER BY timestamp ASC) AS prev_session_id + FROM events + INNER JOIN + (SELECT argMax(person_distinct_id2.person_id, person_distinct_id2.version) AS person_id, + person_distinct_id2.distinct_id AS distinct_id + FROM person_distinct_id2 + WHERE equals(person_distinct_id2.team_id, 2) + GROUP BY person_distinct_id2.distinct_id + HAVING ifNull(equals(argMax(person_distinct_id2.is_deleted, person_distinct_id2.version), 0), 0)) AS events__pdi ON equals(events.distinct_id, events__pdi.distinct_id) + WHERE and(equals(events.team_id, 2), ifNull(greater(timestamp, toDateTime64('2023-10-01 06:00:00.000000', 6, 'UTC')), 0), ifNull(less(timestamp, toDateTime64('2023-10-02 06:00:00.000000', 6, 'UTC')), 0)) + ORDER BY timestamp DESC + LIMIT 1001)) AS e + LEFT JOIN + (SELECT toTimeZone(session_replay_events.start_time, 'UTC') AS start_time, + toTimeZone(session_replay_events.end_time, 'UTC') AS end_time, + session_replay_events.session_id + FROM + (SELECT min(session_replay_events.min_first_timestamp) AS start_time, + max(session_replay_events.max_last_timestamp) AS end_time, + session_replay_events.session_id AS session_id + FROM session_replay_events + WHERE equals(session_replay_events.team_id, 2) + GROUP BY session_replay_events.session_id) AS session_replay_events) AS sre ON equals(e.session_id, sre.session_id) + ORDER BY e.timestamp DESC + LIMIT 100 SETTINGS readonly=2, + max_execution_time=60, + allow_experimental_object_type=1 + ' +--- diff --git a/posthog/hogql_queries/test/test_sessions_timeline_query_runner.py b/posthog/hogql_queries/test/test_sessions_timeline_query_runner.py new file mode 100644 index 0000000000000..075d556d739b9 --- /dev/null +++ b/posthog/hogql_queries/test/test_sessions_timeline_query_runner.py @@ -0,0 +1,653 @@ +from unittest.mock import patch + +from freezegun import freeze_time +from posthog.hogql_queries.sessions_timeline_query_runner import SessionsTimelineQueryRunner +from posthog.schema import EventType, SessionsTimelineQuery, TimelineEntry +from posthog.session_recordings.queries.test.session_replay_sql import produce_replay_summary +from posthog.test.base import APIBaseTest, ClickhouseTestMixin, snapshot_clickhouse_queries +from posthog.test.test_journeys import journeys_for + + +class TestSessionsTimelineQueryRunner(ClickhouseTestMixin, APIBaseTest): + def _create_runner(self, query: SessionsTimelineQuery) -> SessionsTimelineQueryRunner: + return SessionsTimelineQueryRunner(team=self.team, query=query) + + @snapshot_clickhouse_queries + def test_formal_sessions_global(self): + journeys_for( + team=self.team, + events_by_person={ + "person1": [ + # The sessions are sorted most recently started to least, + # while events within most recent to least recent + { + "event_uuid": "6e6e645b-2936-4613-b409-b33f4d9a0f18", + "event": "$pageview", + "timestamp": "2023-10-01 12:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "c15119f2-b243-4547-ab46-1b29a0435948", + "event": "user signed up", + "timestamp": "2023-10-01 13:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "b826a13e-aae3-4766-b407-0d3a582140e4", + "event": "$pageview", + "timestamp": "2023-10-01 14:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "e1208e6b-8101-4dde-ba21-c47781bb5bad", + "event": "$pageview", + "timestamp": "2023-10-01 17:00:00", + "properties": {"$session_id": "s2"}, + }, + ], + "person2": [ # Partly overlapping with person1 + { + "event_uuid": "605f6843-bf83-4d7b-b9a0-4d6f7f57415f", + "event": "$pageview", + "timestamp": "2023-10-01 13:30:00", + "properties": {"$session_id": "s3"}, + }, + { + "event_uuid": "04dde300-a6c3-4372-9366-80472c2d02b1", + "event": "did important thing", + "timestamp": "2023-10-02 02:00:00", + "properties": {"$session_id": "s3"}, + }, + ], + }, + ) + + runner = self._create_runner(SessionsTimelineQuery(before="2023-10-02T06:00:00Z", after="2023-10-01T06:00:00Z")) + response = runner.calculate() + + assert response.results == [ + TimelineEntry( + sessionId="s2", + events=[ + EventType( + id="e1208e6b-8101-4dde-ba21-c47781bb5bad", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T17:00:00+00:00", + properties={"$session_id": "s2"}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId="s3", + events=[ + EventType( + id="04dde300-a6c3-4372-9366-80472c2d02b1", + distinct_id="person2", + event="did important thing", + timestamp="2023-10-02T02:00:00+00:00", + properties={"$session_id": "s3"}, + elements=[], + ), + EventType( + id="605f6843-bf83-4d7b-b9a0-4d6f7f57415f", + distinct_id="person2", + event="$pageview", + timestamp="2023-10-01T13:30:00+00:00", + properties={"$session_id": "s3"}, + elements=[], + ), + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId="s1", + events=[ + EventType( + id="b826a13e-aae3-4766-b407-0d3a582140e4", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T14:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + EventType( + id="c15119f2-b243-4547-ab46-1b29a0435948", + distinct_id="person1", + event="user signed up", + timestamp="2023-10-01T13:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + EventType( + id="6e6e645b-2936-4613-b409-b33f4d9a0f18", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T12:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + ], + recording_duration_s=None, + ), + ] + assert response.hasMore is False + + @snapshot_clickhouse_queries + def test_formal_sessions_for_person(self): + persons = journeys_for( + team=self.team, + events_by_person={ + "person1": [ + # The sessions are sorted most recently started to least, + # while events within most recent to least recent + { + "event_uuid": "6e6e645b-2936-4613-b409-b33f4d9a0f18", + "event": "$pageview", + "timestamp": "2023-10-01 12:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "c15119f2-b243-4547-ab46-1b29a0435948", + "event": "user signed up", + "timestamp": "2023-10-01 13:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "b826a13e-aae3-4766-b407-0d3a582140e4", + "event": "$pageview", + "timestamp": "2023-10-01 14:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "e1208e6b-8101-4dde-ba21-c47781bb5bad", + "event": "$pageview", + "timestamp": "2023-10-01 17:00:00", + "properties": {"$session_id": "s2"}, + }, + ], + "person2": [ # Partly overlapping with person1 + { + "event_uuid": "605f6843-bf83-4d7b-b9a0-4d6f7f57415f", + "event": "$pageview", + "timestamp": "2023-10-01 13:30:00", + "properties": {"$session_id": "s3"}, + }, + { + "event_uuid": "04dde300-a6c3-4372-9366-80472c2d02b1", + "event": "did important thing", + "timestamp": "2023-10-02 02:00:00", + "properties": {"$session_id": "s3"}, + }, + ], + }, + ) + person_1_uuid = str(persons["person1"].uuid) + + runner = self._create_runner( + SessionsTimelineQuery(before="2023-10-02T06:00:00Z", after="2023-10-01T06:00:00Z", personId=person_1_uuid) + ) + response = runner.calculate() + + assert response.results == [ + TimelineEntry( + sessionId="s2", + events=[ + EventType( + id="e1208e6b-8101-4dde-ba21-c47781bb5bad", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T17:00:00+00:00", + properties={"$session_id": "s2"}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId="s1", + events=[ + EventType( + id="b826a13e-aae3-4766-b407-0d3a582140e4", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T14:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + EventType( + id="c15119f2-b243-4547-ab46-1b29a0435948", + distinct_id="person1", + event="user signed up", + timestamp="2023-10-01T13:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + EventType( + id="6e6e645b-2936-4613-b409-b33f4d9a0f18", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T12:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + ], + recording_duration_s=None, + ), + ] + assert response.hasMore is False + + @snapshot_clickhouse_queries + def test_formal_and_informal_sessions_global(self): + journeys_for( + team=self.team, + events_by_person={ + "person1": [ + { + "event_uuid": "6e6e645b-2936-4613-b409-b33f4d9a0f18", + "event": "$pageview", + "timestamp": "2023-10-01 12:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "c15119f2-b243-4547-ab46-1b29a0435948", + "event": "user signed up", + "timestamp": "2023-10-01 13:00:00", + "properties": {}, # No session ID + }, + { + "event_uuid": "b826a13e-aae3-4766-b407-0d3a582140e4", + "event": "$pageview", + "timestamp": "2023-10-01 13:10:00", + "properties": {}, # No session ID - this should be in the same entry as the previous event + }, + { + "event_uuid": "fa16ea8a-3fb9-4cb3-9ce6-de25b21e3016", + "event": "$pageview", + "timestamp": "2023-10-01 13:50:00", + "properties": {}, # No session ID - this should be in a new entry because of 40-minute gap + }, + { + "event_uuid": "e1208e6b-8101-4dde-ba21-c47781bb5bad", + "event": "$pageview", + "timestamp": "2023-10-01 17:00:00", + "properties": {"$session_id": "s2"}, + }, + { + "event_uuid": "1389d75f-4717-4152-8f21-f3acee936a03", + "event": "$pageview", + "timestamp": "2023-10-01 18:00:00", + "properties": {}, # No session ID - this should be in a single-event entry + }, + ], + "person2": [ + { + "event_uuid": "605f6843-bf83-4d7b-b9a0-4d6f7f57415f", + "event": "$pageview", + "timestamp": "2023-10-01 13:30:00", + "properties": {"$session_id": "s3"}, + }, + { + "event_uuid": "04dde300-a6c3-4372-9366-80472c2d02b1", + "event": "did important thing", + "timestamp": "2023-10-01 19:00:00", + "properties": {}, # No session ID - this should be in a single-event entry + }, + ], + }, + ) + + runner = self._create_runner(SessionsTimelineQuery(before="2023-10-02T06:00:00Z", after="2023-10-01T06:00:00Z")) + response = runner.calculate() + + assert response.results == [ + TimelineEntry( + sessionId=None, + events=[ + EventType( + id="04dde300-a6c3-4372-9366-80472c2d02b1", + distinct_id="person2", + event="did important thing", + timestamp="2023-10-01T19:00:00+00:00", + properties={}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId=None, + events=[ + EventType( + id="1389d75f-4717-4152-8f21-f3acee936a03", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T18:00:00+00:00", + properties={}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId="s2", + events=[ + EventType( + id="e1208e6b-8101-4dde-ba21-c47781bb5bad", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T17:00:00+00:00", + properties={"$session_id": "s2"}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId=None, + events=[ + EventType( + id="fa16ea8a-3fb9-4cb3-9ce6-de25b21e3016", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T13:50:00+00:00", + properties={}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId="s3", + events=[ + EventType( + id="605f6843-bf83-4d7b-b9a0-4d6f7f57415f", + distinct_id="person2", + event="$pageview", + timestamp="2023-10-01T13:30:00+00:00", + properties={"$session_id": "s3"}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId=None, + events=[ + EventType( + id="b826a13e-aae3-4766-b407-0d3a582140e4", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T13:10:00+00:00", + properties={}, + elements=[], + ), + EventType( + id="c15119f2-b243-4547-ab46-1b29a0435948", + distinct_id="person1", + event="user signed up", + timestamp="2023-10-01T13:00:00+00:00", + properties={}, + elements=[], + ), + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId="s1", + events=[ + EventType( + id="6e6e645b-2936-4613-b409-b33f4d9a0f18", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T12:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ) + ], + recording_duration_s=None, + ), + ] + assert response.hasMore is False + + @snapshot_clickhouse_queries + def test_formal_session_with_recording(self): + journeys_for( + team=self.team, + events_by_person={ + "person1": [ + { + "event_uuid": "6e6e645b-2936-4613-b409-b33f4d9a0f18", + "event": "$pageview", + "timestamp": "2023-10-01 12:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "c15119f2-b243-4547-ab46-1b29a0435948", + "event": "user signed up", + "timestamp": "2023-10-01 13:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "e1208e6b-8101-4dde-ba21-c47781bb5bad", + "event": "$pageview", + "timestamp": "2023-10-01 17:00:00", + "properties": {"$session_id": "s2"}, + }, + ], + }, + ) + produce_replay_summary( + team_id=self.team.pk, + session_id="s1", + distinct_id="person1", + first_timestamp="2023-10-01 12:30:00", + last_timestamp="2023-10-01 12:39:00", + ) + + runner = self._create_runner(SessionsTimelineQuery(before="2023-10-02T06:00:00Z", after="2023-10-01T06:00:00Z")) + response = runner.calculate() + + assert response.results == [ + TimelineEntry( + sessionId="s2", + events=[ + EventType( + id="e1208e6b-8101-4dde-ba21-c47781bb5bad", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T17:00:00+00:00", + properties={"$session_id": "s2"}, + elements=[], + ) + ], + recording_duration_s=None, # No recording + ), + TimelineEntry( + sessionId="s1", + events=[ + EventType( + id="c15119f2-b243-4547-ab46-1b29a0435948", + distinct_id="person1", + event="user signed up", + timestamp="2023-10-01T13:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + EventType( + id="6e6e645b-2936-4613-b409-b33f4d9a0f18", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T12:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + ], + recording_duration_s=540, + ), + ] + assert response.hasMore is False + + @snapshot_clickhouse_queries + @patch("posthog.hogql_queries.sessions_timeline_query_runner.SessionsTimelineQueryRunner.EVENT_LIMIT", 2) + def test_event_limit_and_has_more(self): + journeys_for( + team=self.team, + events_by_person={ + "person1": [ + { + "event_uuid": "6e6e645b-2936-4613-b409-b33f4d9a0f18", + "event": "$pageview", + "timestamp": "2023-10-01 12:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "c15119f2-b243-4547-ab46-1b29a0435948", + "event": "user signed up", + "timestamp": "2023-10-01 13:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "e1208e6b-8101-4dde-ba21-c47781bb5bad", + "event": "$pageview", + "timestamp": "2023-10-01 17:00:00", + "properties": {"$session_id": "s2"}, + }, + ], + }, + ) + + runner = self._create_runner(SessionsTimelineQuery(before="2023-10-02T06:00:00Z", after="2023-10-01T06:00:00Z")) + response = runner.calculate() + + assert response.results == [ + TimelineEntry( + sessionId="s2", + events=[ + EventType( + id="e1208e6b-8101-4dde-ba21-c47781bb5bad", + distinct_id="person1", + event="$pageview", + timestamp="2023-10-01T17:00:00+00:00", + properties={"$session_id": "s2"}, + elements=[], + ) + ], + recording_duration_s=None, + ), + TimelineEntry( + sessionId="s1", + events=[ + EventType( + id="c15119f2-b243-4547-ab46-1b29a0435948", + distinct_id="person1", + event="user signed up", + timestamp="2023-10-01T13:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + # The 2023-10-01 12:00:00 event is beyond the EVENT_LIMIT of 2 + ], + recording_duration_s=None, + ), + ] + assert response.hasMore is True + + @snapshot_clickhouse_queries + def test_before_and_after(self): + journeys_for( + team=self.team, + events_by_person={ + "person1": [ + { + "event_uuid": "6e6e645b-2936-4613-b409-b33f4d9a0f18", + "event": "$pageview", + "timestamp": "2023-10-01 12:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "c15119f2-b243-4547-ab46-1b29a0435948", + "event": "user signed up", + "timestamp": "2023-10-01 13:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "e1208e6b-8101-4dde-ba21-c47781bb5bad", + "event": "$pageview", + "timestamp": "2023-10-01 17:00:00", + "properties": {"$session_id": "s2"}, + }, + ], + }, + ) + + runner = self._create_runner(SessionsTimelineQuery(before="2023-10-01T17:00:00Z", after="2023-10-01T12:00:00Z")) + response = runner.calculate() + + assert response.results == [ + TimelineEntry( + sessionId="s1", + events=[ + EventType( + id="c15119f2-b243-4547-ab46-1b29a0435948", + distinct_id="person1", + event="user signed up", + timestamp="2023-10-01T13:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + ], + recording_duration_s=None, + ), + ] + + @snapshot_clickhouse_queries + def test_before_and_after_defaults(self): + journeys_for( + team=self.team, + events_by_person={ + "person1": [ + { + "event_uuid": "6e6e645b-2936-4613-b409-b33f4d9a0f18", + "event": "$pageview", + "timestamp": "2023-09-29 23:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "c15119f2-b243-4547-ab46-1b29a0435948", + "event": "user signed up", + "timestamp": "2023-10-01 13:00:00", + "properties": {"$session_id": "s1"}, + }, + { + "event_uuid": "e1208e6b-8101-4dde-ba21-c47781bb5bad", + "event": "$pageview", + "timestamp": "2023-10-01 17:00:00", + "properties": {"$session_id": "s2"}, + }, + ], + }, + ) + + with freeze_time("2023-10-01T16:00:00Z"): + runner = self._create_runner(SessionsTimelineQuery()) + response = runner.calculate() + + assert response.results == [ + TimelineEntry( + sessionId="s1", + events=[ + EventType( + id="c15119f2-b243-4547-ab46-1b29a0435948", + distinct_id="person1", + event="user signed up", + timestamp="2023-10-01T13:00:00+00:00", + properties={"$session_id": "s1"}, + elements=[], + ), + ], + recording_duration_s=None, + ), + ] diff --git a/posthog/queries/funnels/test/breakdown_cases.py b/posthog/queries/funnels/test/breakdown_cases.py index 9f8fd74785f63..273ce3b201601 100644 --- a/posthog/queries/funnels/test/breakdown_cases.py +++ b/posthog/queries/funnels/test/breakdown_cases.py @@ -739,7 +739,7 @@ def test_funnel_step_breakdown_person(self): {"event": "play movie", "timestamp": datetime(2020, 1, 2, 16)}, ], } - journeys_for(peoples_journeys, self.team) + journeys_for(peoples_journeys, self.team, create_people=False) result = funnel.run() @@ -1131,6 +1131,7 @@ def test_funnel_cohort_breakdown(self): people = journeys_for( {"person1": [{"event": "sign up", "timestamp": datetime(2020, 1, 2, 12)}]}, self.team, + create_people=False, ) cohort = Cohort.objects.create( diff --git a/posthog/queries/funnels/test/test_funnel_persons.py b/posthog/queries/funnels/test/test_funnel_persons.py index 995030339484a..46881af6d5b1d 100644 --- a/posthog/queries/funnels/test/test_funnel_persons.py +++ b/posthog/queries/funnels/test/test_funnel_persons.py @@ -132,6 +132,7 @@ def _create_browser_breakdown_events(self): ], }, self.team, + create_people=False, ) return person1, person2 diff --git a/posthog/schema.py b/posthog/schema.py index 9c5a88143669f..58f4e1dfd5756 100644 --- a/posthog/schema.py +++ b/posthog/schema.py @@ -328,6 +328,7 @@ class NodeKind(str, Enum): HogQLQuery = "HogQLQuery" HogQLMetadata = "HogQLMetadata" PersonsQuery = "PersonsQuery" + SessionsTimelineQuery = "SessionsTimelineQuery" DataTableNode = "DataTableNode" SavedInsightNode = "SavedInsightNode" InsightVizNode = "InsightVizNode" @@ -571,6 +572,15 @@ class TimeToSeeDataWaterfallNode(BaseModel): source: TimeToSeeDataQuery +class TimelineEntry(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + events: List[EventType] + recording_duration_s: Optional[float] = Field(default=None, description="Duration of the recording in seconds.") + sessionId: Optional[str] = Field(default=None, description="Session ID. None means out-of-session events") + + class TrendsFilter(BaseModel): model_config = ConfigDict( extra="forbid", @@ -896,6 +906,16 @@ class RetentionFilter(BaseModel): total_intervals: Optional[float] = None +class SessionsTimelineQueryResponse(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + hasMore: Optional[bool] = None + hogql: Optional[str] = None + results: List[TimelineEntry] + timings: Optional[List[QueryTiming]] = None + + class TimeToSeeDataJSONNode(BaseModel): model_config = ConfigDict( extra="forbid", @@ -1285,6 +1305,21 @@ class PropertyGroupFilterValue(BaseModel): ] +class SessionsTimelineQuery(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + after: Optional[str] = Field( + default=None, description="Only fetch sessions that started after this timestamp (default: '-24h')" + ) + before: Optional[str] = Field( + default=None, description="Only fetch sessions that started before this timestamp (default: '+5s')" + ) + kind: Literal["SessionsTimelineQuery"] = "SessionsTimelineQuery" + personId: Optional[str] = Field(default=None, description="Fetch sessions only for a given person") + response: Optional[SessionsTimelineQueryResponse] = Field(default=None, description="Cached query response") + + class ActionsNode(BaseModel): model_config = ConfigDict( extra="forbid", @@ -1803,6 +1838,7 @@ class QuerySchema(RootModel): TimeToSeeDataSessionsQuery, EventsQuery, PersonsQuery, + SessionsTimelineQuery, HogQLQuery, HogQLMetadata, WebOverviewQuery, diff --git a/posthog/test/test_journeys.py b/posthog/test/test_journeys.py index 784a1d8867d81..6e2122c3b5da9 100644 --- a/posthog/test/test_journeys.py +++ b/posthog/test/test_journeys.py @@ -1,8 +1,10 @@ import dataclasses +from hashlib import md5 import json from datetime import datetime +import os from typing import Any, Dict, List -from uuid import uuid4 +from uuid import UUID, uuid4 from django.utils import timezone @@ -38,16 +40,16 @@ def journeys_for( Writing tests in this way reduces duplication in test setup And clarifies the preconditions of the test """ - - def _create_event_from_args(**event): - return {**event} - flush_persons_and_events() people = {} events_to_create = [] for distinct_id, events in events_by_person.items(): if create_people: - people[distinct_id] = update_or_create_person(distinct_ids=[distinct_id], team_id=team.pk) + # Create the person UUID from the distinct ID and test path, so that SQL snapshots are deterministic + derived_uuid = UUID(bytes=md5((os.environ["PYTEST_CURRENT_TEST"] + distinct_id).encode("utf-8")).digest()) + people[distinct_id] = update_or_create_person( + distinct_ids=[distinct_id], team_id=team.pk, uuid=derived_uuid + ) else: people[distinct_id] = Person.objects.get( persondistinctid__distinct_id=distinct_id, @@ -75,7 +77,8 @@ def _create_event_from_args(**event): event["timestamp"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") events_to_create.append( - _create_event_from_args( + dict( + event_uuid=UUID(event["event_uuid"]) if "event_uuid" in event else uuid4(), team=team, distinct_id=distinct_id, event=event["event"], @@ -148,7 +151,7 @@ def _create_all_events_raw(all_events: List[Dict]): data[key] = timestamp in_memory_event = InMemoryEvent(**data) parsed += f""" - ('{str(uuid4())}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{in_memory_event.person_id}', '{json.dumps(in_memory_event.person_properties)}', '{in_memory_event.person_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{json.dumps(in_memory_event.group0_properties)}', '{json.dumps(in_memory_event.group1_properties)}', '{json.dumps(in_memory_event.group2_properties)}', '{json.dumps(in_memory_event.group3_properties)}', '{json.dumps(in_memory_event.group4_properties)}', '{in_memory_event.group0_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group1_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group2_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group3_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group4_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0) + ('{in_memory_event.event_uuid}', '{in_memory_event.event}', '{json.dumps(in_memory_event.properties)}', '{in_memory_event.timestamp}', {in_memory_event.team.pk}, '{in_memory_event.distinct_id}', '', '{in_memory_event.person_id}', '{json.dumps(in_memory_event.person_properties)}', '{in_memory_event.person_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{json.dumps(in_memory_event.group0_properties)}', '{json.dumps(in_memory_event.group1_properties)}', '{json.dumps(in_memory_event.group2_properties)}', '{json.dumps(in_memory_event.group3_properties)}', '{json.dumps(in_memory_event.group4_properties)}', '{in_memory_event.group0_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group1_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group2_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group3_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{in_memory_event.group4_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")}', '{timezone.now().strftime("%Y-%m-%d %H:%M:%S.%f")}', now(), 0) """ sync_execute( @@ -165,8 +168,9 @@ def create_all_events(all_events: List[dict]): # We collect all events per test into an array and batch create the events to reduce creation time -@dataclasses.dataclass +@dataclasses.dataclass(kw_only=True) class InMemoryEvent: + event_uuid: UUID = dataclasses.field(default_factory=uuid4) event: str distinct_id: str team: Team