From 41fa9736d5d75128a7b8a06204065547c5b24ed9 Mon Sep 17 00:00:00 2001 From: Julian Bez Date: Thu, 21 Dec 2023 11:30:54 +0000 Subject: [PATCH] chore(hogql): Use paginator in EventsQueryRunner and add tests for it (#19440) --- frontend/src/queries/schema.json | 8 +- frontend/src/queries/schema.ts | 4 + posthog/hogql_queries/events_query_runner.py | 47 ++-- posthog/hogql_queries/insights/paginators.py | 29 ++- .../insights/test/test_paginators.py | 209 ++++++++++++++++++ posthog/hogql_queries/persons_query_runner.py | 10 +- posthog/hogql_queries/query_runner.py | 2 + posthog/schema.py | 2 + 8 files changed, 269 insertions(+), 42 deletions(-) create mode 100644 posthog/hogql_queries/insights/test/test_paginators.py diff --git a/frontend/src/queries/schema.json b/frontend/src/queries/schema.json index 2f58b6ed2816c..529fdc752f618 100644 --- a/frontend/src/queries/schema.json +++ b/frontend/src/queries/schema.json @@ -1047,6 +1047,12 @@ "hogql": { "type": "string" }, + "limit": { + "type": "integer" + }, + "offset": { + "type": "integer" + }, "results": { "items": { "items": {}, @@ -1067,7 +1073,7 @@ "type": "array" } }, - "required": ["columns", "types", "results", "hogql"], + "required": ["columns", "types", "results", "hogql", "limit", "offset"], "type": "object" }, "FeaturePropertyFilter": { diff --git a/frontend/src/queries/schema.ts b/frontend/src/queries/schema.ts index f2c15b07032cb..94b739a189871 100644 --- a/frontend/src/queries/schema.ts +++ b/frontend/src/queries/schema.ts @@ -259,6 +259,10 @@ export interface EventsQueryResponse { hogql: string hasMore?: boolean timings?: QueryTiming[] + /** @asType integer */ + limit: number + /** @asType integer */ + offset: number } export interface EventsQueryPersonColumn { uuid: string diff --git a/posthog/hogql_queries/events_query_runner.py b/posthog/hogql_queries/events_query_runner.py index bc9e9810f3698..df88a1c48d162 100644 --- a/posthog/hogql_queries/events_query_runner.py +++ b/posthog/hogql_queries/events_query_runner.py @@ -10,11 +10,10 @@ from posthog.api.utils import get_pk_or_uuid from posthog.clickhouse.client.connection import Workload from posthog.hogql import ast -from posthog.hogql.constants import get_max_limit_for_context, get_default_limit_for_context from posthog.hogql.parser import parse_expr, parse_order_expr from posthog.hogql.property import action_to_expr, has_aggregation, property_to_expr -from posthog.hogql.query import execute_hogql_query from posthog.hogql.timings import HogQLTimings +from posthog.hogql_queries.insights.paginators import HogQLHasMorePaginator from posthog.hogql_queries.query_runner import QueryRunner from posthog.models import Action, Person from posthog.models.element import chain_to_elements @@ -40,15 +39,18 @@ class EventsQueryRunner(QueryRunner): query: EventsQuery query_type = EventsQuery + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.paginator = HogQLHasMorePaginator.from_limit_context( + limit_context=self.limit_context, limit=self.query.limit, offset=self.query.offset + ) + def to_query(self) -> ast.SelectQuery: # Note: This code is inefficient and problematic, see https://github.com/PostHog/posthog/issues/13485 for details. if self.timings is None: self.timings = HogQLTimings() with self.timings.measure("build_ast"): - # limit & offset - offset = 0 if self.query.offset is None else self.query.offset - # columns & group_by with self.timings.measure("columns"): select_input: List[str] = [] @@ -175,13 +177,11 @@ def to_query(self) -> ast.SelectQuery: having=having, group_by=group_by if has_any_aggregation else None, order_by=order_by, - limit=ast.Constant(value=self.limit()), - offset=ast.Constant(value=offset), ) return stmt def calculate(self) -> EventsQueryResponse: - query_result = execute_hogql_query( + query_result = self.paginator.execute_hogql_query( query=self.to_query(), team=self.team, workload=Workload.ONLINE, @@ -195,8 +195,8 @@ def calculate(self) -> EventsQueryResponse: if "*" in self.select_input_raw(): with self.timings.measure("expand_asterisk"): star_idx = self.select_input_raw().index("*") - for index, result in enumerate(query_result.results): - query_result.results[index] = list(result) + for index, result in enumerate(self.paginator.results): + self.paginator.results[index] = list(result) select = result[star_idx] new_result = dict(zip(SELECT_STAR_FROM_EVENTS_FIELDS, select)) new_result["properties"] = json.loads(new_result["properties"]) @@ -204,18 +204,18 @@ def calculate(self) -> EventsQueryResponse: new_result["elements"] = ElementSerializer( chain_to_elements(new_result["elements_chain"]), many=True ).data - query_result.results[index][star_idx] = new_result + self.paginator.results[index][star_idx] = new_result person_indices: List[int] = [] for index, col in enumerate(self.select_input_raw()): if col.split("--")[0].strip() == "person": person_indices.append(index) - if len(person_indices) > 0 and len(query_result.results) > 0: + if len(person_indices) > 0 and len(self.paginator.results) > 0: with self.timings.measure("person_column_extra_query"): # Make a query into postgres to fetch person person_idx = person_indices[0] - distinct_ids = list(set(event[person_idx] for event in query_result.results)) + distinct_ids = list(set(event[person_idx] for event in self.paginator.results)) persons = get_persons_by_distinct_ids(self.team.pk, distinct_ids) persons = persons.prefetch_related(Prefetch("persondistinctid_set", to_attr="distinct_ids_cache")) distinct_to_person: Dict[str, Person] = {} @@ -226,41 +226,34 @@ def calculate(self) -> EventsQueryResponse: # Loop over all columns in case there is more than one "person" column for column_index in person_indices: - for index, result in enumerate(query_result.results): + for index, result in enumerate(self.paginator.results): distinct_id: str = result[column_index] - query_result.results[index] = list(result) + self.paginator.results[index] = list(result) if distinct_to_person.get(distinct_id): person = distinct_to_person[distinct_id] - query_result.results[index][column_index] = { + self.paginator.results[index][column_index] = { "uuid": person.uuid, "created_at": person.created_at, "properties": person.properties or {}, "distinct_id": distinct_id, } else: - query_result.results[index][column_index] = { + self.paginator.results[index][column_index] = { "distinct_id": distinct_id, } - received_extra_row = len(query_result.results) == self.limit() # limit was +=1'd above return EventsQueryResponse( - results=query_result.results[: self.limit() - 1] if received_extra_row else query_result.results, + results=self.paginator.results, columns=self.select_input_raw(), - types=[type for _, type in query_result.types], - hasMore=received_extra_row, + types=[t for _, t in query_result.types] if query_result.types else None, timings=self.timings.to_list(), hogql=query_result.hogql, + **self.paginator.response_params(), ) def select_input_raw(self) -> List[str]: return ["*"] if len(self.query.select) == 0 else self.query.select - def limit(self) -> int: - # adding +1 to the limit to check if there's a "next page" after the requested results - max_rows = get_max_limit_for_context(self.limit_context) - default_rows = get_default_limit_for_context(self.limit_context) - return min(max_rows, default_rows if self.query.limit is None else self.query.limit) + 1 - def _is_stale(self, cached_result_package): return True diff --git a/posthog/hogql_queries/insights/paginators.py b/posthog/hogql_queries/insights/paginators.py index 8b02be1bd5dd3..a2fc474346303 100644 --- a/posthog/hogql_queries/insights/paginators.py +++ b/posthog/hogql_queries/insights/paginators.py @@ -1,6 +1,12 @@ -from typing import List, Any, Optional, cast, Sequence +from typing import Any, Optional, cast from posthog.hogql import ast +from posthog.hogql.constants import ( + get_max_limit_for_context, + get_default_limit_for_context, + LimitContext, + DEFAULT_RETURNED_ROWS, +) from posthog.hogql.query import execute_hogql_query from posthog.schema import HogQLQueryResponse @@ -11,15 +17,24 @@ class HogQLHasMorePaginator: Takes care of setting the limit and offset on the query. """ - def __init__(self, limit: int, offset: int): + def __init__(self, *, limit: Optional[int], offset: Optional[int]): self.response: Optional[HogQLQueryResponse] = None - self.results: Sequence[Any] = [] - self.limit = limit - self.offset = offset + self.results: list[Any] = [] + self.limit = limit if limit and limit > 0 else DEFAULT_RETURNED_ROWS + self.offset = offset if offset and offset > 0 else 0 + + @classmethod + def from_limit_context( + cls, *, limit_context: LimitContext, limit: Optional[int], offset: Optional[int] + ) -> "HogQLHasMorePaginator": + max_rows = get_max_limit_for_context(limit_context) + default_rows = get_default_limit_for_context(limit_context) + limit = min(max_rows, default_rows if (limit is None or limit <= 0) else limit) + return cls(limit=limit, offset=offset) def paginate(self, query: ast.SelectQuery) -> ast.SelectQuery: query.limit = ast.Constant(value=self.limit + 1) - query.offset = ast.Constant(value=self.offset or 0) + query.offset = ast.Constant(value=self.offset) return query def has_more(self) -> bool: @@ -28,7 +43,7 @@ def has_more(self) -> bool: return len(self.response.results) > self.limit - def trim_results(self) -> List[Any]: + def trim_results(self) -> list[Any]: if not self.response or not self.response.results: return [] diff --git a/posthog/hogql_queries/insights/test/test_paginators.py b/posthog/hogql_queries/insights/test/test_paginators.py new file mode 100644 index 0000000000000..7954f35daf7f2 --- /dev/null +++ b/posthog/hogql_queries/insights/test/test_paginators.py @@ -0,0 +1,209 @@ +from posthog.hogql.constants import ( + LimitContext, + get_default_limit_for_context, + get_max_limit_for_context, + MAX_SELECT_RETURNED_ROWS, +) +from posthog.hogql.parser import parse_select +from posthog.hogql_queries.insights.paginators import HogQLHasMorePaginator +from posthog.hogql_queries.persons_query_runner import PersonsQueryRunner +from posthog.models.utils import UUIDT +from posthog.schema import ( + PersonsQuery, + PersonPropertyFilter, + PropertyOperator, +) +from posthog.test.base import ( + APIBaseTest, + ClickhouseTestMixin, + _create_person, + flush_persons_and_events, + _create_event, +) + + +class TestHogQLHasMorePaginator(ClickhouseTestMixin, APIBaseTest): + maxDiff = None + random_uuid: str + + def _create_random_persons(self) -> str: + random_uuid = f"RANDOM_TEST_ID::{UUIDT()}" + for index in range(10): + _create_person( + properties={ + "email": f"jacob{index}@{random_uuid}.posthog.com", + "name": f"Mr Jacob {random_uuid}", + "random_uuid": random_uuid, + "index": index, + }, + team=self.team, + distinct_ids=[f"id-{random_uuid}-{index}"], + is_identified=True, + ) + _create_event( + distinct_id=f"id-{random_uuid}-{index}", + event=f"clicky-{index}", + team=self.team, + ) + + flush_persons_and_events() + return random_uuid + + def _create_runner(self, query: PersonsQuery) -> PersonsQueryRunner: + return PersonsQueryRunner(team=self.team, query=query) + + def setUp(self): + super().setUp() + self.random_uuid = self._create_random_persons() + + def test_persons_query_limit(self): + runner = self._create_runner( + PersonsQuery(select=["properties.email"], orderBy=["properties.email DESC"], limit=1) + ) + response = runner.calculate() + self.assertEqual(response.results, [[f"jacob9@{self.random_uuid}.posthog.com"]]) + self.assertEqual(response.hasMore, True) + + runner = self._create_runner( + PersonsQuery( + select=["properties.email"], + orderBy=["properties.email DESC"], + limit=1, + offset=2, + ) + ) + response = runner.calculate() + self.assertEqual(response.results, [[f"jacob7@{self.random_uuid}.posthog.com"]]) + self.assertEqual(response.hasMore, True) + + def test_zero_limit(self): + """Test behavior with limit set to zero.""" + runner = self._create_runner(PersonsQuery(select=["properties.email"], limit=0)) + response = runner.calculate() + self.assertEqual(runner.paginator.limit, 100) + self.assertEqual(response.limit, 100) + self.assertEqual(len(response.results), 10) + self.assertFalse(response.hasMore) + + def test_negative_limit(self): + """Test behavior with negative limit value.""" + runner = self._create_runner(PersonsQuery(select=["properties.email"], limit=-1)) + response = runner.calculate() + self.assertEqual(runner.paginator.limit, 100) + self.assertEqual(response.limit, 100) + self.assertEqual(len(response.results), 10) + self.assertFalse(response.hasMore) + + def test_exact_limit_match(self): + """Test when available items equal the limit.""" + runner = self._create_runner(PersonsQuery(select=["properties.email"], limit=10)) + response = runner.calculate() + self.assertEqual(len(response.results), 10) + self.assertFalse(response.hasMore) + + def test_empty_result_set(self): + """Test behavior when query returns no results.""" + runner = self._create_runner( + PersonsQuery( + select=["properties.email"], + limit=10, + properties=[ + PersonPropertyFilter(key="email", value="random", operator=PropertyOperator.exact), + ], + ) + ) + response = runner.calculate() + self.assertEqual(len(response.results), 0) + self.assertFalse(response.hasMore) + + def test_large_offset(self): + """Test behavior with offset larger than the total number of items.""" + self.random_uuid = self._create_random_persons() + runner = self._create_runner(PersonsQuery(select=["properties.email"], limit=5, offset=100)) + response = runner.calculate() + self.assertEqual(len(response.results), 0) + self.assertFalse(response.hasMore) + + def test_offset_plus_limit_exceeding_total(self): + """Test when sum of offset and limit exceeds total items.""" + runner = self._create_runner(PersonsQuery(select=["properties.email"], limit=10, offset=5)) + response = runner.calculate() + self.assertEqual(runner.paginator.offset, 5) + self.assertEqual(len(response.results), 5) + self.assertFalse(response.hasMore) + + def test_response_params_consistency(self): + """Test consistency of response_params method.""" + paginator = HogQLHasMorePaginator(limit=5, offset=10) + paginator.response = paginator.execute_hogql_query( + "test_query", + parse_select("SELECT * FROM persons"), + team=self.team, + ) + params = paginator.response_params() + self.assertEqual(params["limit"], 5) + self.assertEqual(params["offset"], 10) + self.assertEqual(params["hasMore"], paginator.has_more()) + + def test_handle_none_response(self): + """Test handling of None response.""" + paginator = HogQLHasMorePaginator(limit=5, offset=0) + paginator.response = None # Simulate a None response + self.assertEqual(paginator.trim_results(), []) + self.assertFalse(paginator.has_more()) + + def test_limit_context_variations(self): + limit_context = LimitContext.QUERY + + test_cases = [ + { + "limit": 5, + "offset": 10, + "expected_limit": 5, + "expected_offset": 10, + }, + { + "limit": None, + "offset": 10, + "expected_limit": get_default_limit_for_context(limit_context), + "expected_offset": 10, + }, + { + "limit": 0, + "offset": 10, + "expected_limit": get_default_limit_for_context(limit_context), + "expected_offset": 10, + }, + { + "limit": -1, + "offset": 10, + "expected_limit": get_default_limit_for_context(limit_context), + "expected_offset": 10, + }, + { + "limit": MAX_SELECT_RETURNED_ROWS, + "offset": 10, + "expected_limit": get_max_limit_for_context(limit_context), + "expected_offset": 10, + }, + { + "limit": 5, + "offset": None, + "expected_limit": 5, + "expected_offset": 0, + }, + { + "limit": 5, + "offset": -1, + "expected_limit": 5, + "expected_offset": 0, + }, + ] + + for case in test_cases: + with self.subTest(case=case): + paginator = HogQLHasMorePaginator.from_limit_context( + limit_context=limit_context, limit=case["limit"], offset=case["offset"] + ) + self.assertEqual(paginator.limit, case["expected_limit"]) + self.assertEqual(paginator.offset, case["expected_offset"]) diff --git a/posthog/hogql_queries/persons_query_runner.py b/posthog/hogql_queries/persons_query_runner.py index 6574843ffb7ad..2b97a21811f65 100644 --- a/posthog/hogql_queries/persons_query_runner.py +++ b/posthog/hogql_queries/persons_query_runner.py @@ -1,7 +1,6 @@ from datetime import timedelta from typing import List, Generator, Sequence, Iterator, Optional from posthog.hogql import ast -from posthog.hogql.constants import get_max_limit_for_context, get_default_limit_for_context from posthog.hogql.parser import parse_expr, parse_order_expr from posthog.hogql.property import has_aggregation from posthog.hogql_queries.actor_strategies import ActorStrategy, PersonStrategy, GroupStrategy @@ -17,7 +16,9 @@ class PersonsQueryRunner(QueryRunner): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.paginator = HogQLHasMorePaginator(limit=self.query_limit(), offset=self.query.offset or 0) + self.paginator = HogQLHasMorePaginator.from_limit_context( + limit_context=self.limit_context, limit=self.query.limit, offset=self.query.offset + ) self.source_query_runner: Optional[QueryRunner] = None if self.query.source: @@ -80,11 +81,6 @@ def input_columns(self) -> List[str]: return self.strategy.input_columns() - def query_limit(self) -> int: - max_rows = get_max_limit_for_context(self.limit_context) - default_rows = get_default_limit_for_context(self.limit_context) - return min(max_rows, default_rows if self.query.limit is None else self.query.limit) - def source_id_column(self, source_query: ast.SelectQuery) -> List[str]: # Figure out the id column of the source query, first column that has id in the name for column in source_query.select: diff --git a/posthog/hogql_queries/query_runner.py b/posthog/hogql_queries/query_runner.py index 85c2d29372676..afe2b0aa84544 100644 --- a/posthog/hogql_queries/query_runner.py +++ b/posthog/hogql_queries/query_runner.py @@ -59,6 +59,8 @@ class QueryResponse(BaseModel, Generic[DataT]): columns: Optional[List[str]] = None hogql: Optional[str] = None hasMore: Optional[bool] = None + limit: Optional[int] = None + offset: Optional[int] = None class CachedQueryResponse(QueryResponse): diff --git a/posthog/schema.py b/posthog/schema.py index 4f9ad7413ae48..4bdd25e88378d 100644 --- a/posthog/schema.py +++ b/posthog/schema.py @@ -810,6 +810,8 @@ class EventsQueryResponse(BaseModel): columns: List hasMore: Optional[bool] = None hogql: str + limit: int + offset: int results: List[List] timings: Optional[List[QueryTiming]] = None types: List[str]