From ec24bc332f49b38567b1605e1525220e3bbb8e1d Mon Sep 17 00:00:00 2001 From: Marius Andra Date: Tue, 5 Dec 2023 13:05:42 +0100 Subject: [PATCH] feat(hogql): rename in_export_context to limit_context (#19080) --- posthog/api/services/query.py | 7 +++--- posthog/api/test/test_query.py | 5 ++-- posthog/celery.py | 4 ++-- posthog/clickhouse/client/execute_async.py | 11 +++++---- posthog/hogql/constants.py | 6 +++++ posthog/hogql/modifiers.py | 8 ++++--- posthog/hogql/query.py | 8 +++---- posthog/hogql_queries/events_query_runner.py | 6 ++--- posthog/hogql_queries/hogql_query_runner.py | 2 +- .../insights/insight_persons_query_runner.py | 2 +- .../insights/trends/trends_query_runner.py | 4 ++-- posthog/hogql_queries/query_runner.py | 23 ++++++++++--------- posthog/tasks/exports/csv_exporter.py | 3 ++- 13 files changed, 52 insertions(+), 37 deletions(-) diff --git a/posthog/api/services/query.py b/posthog/api/services/query.py index 1ef831bde1b82..48339aa38bad9 100644 --- a/posthog/api/services/query.py +++ b/posthog/api/services/query.py @@ -5,6 +5,7 @@ from rest_framework.exceptions import ValidationError from posthog.clickhouse.query_tagging import tag_queries +from posthog.hogql.constants import LimitContext from posthog.hogql.database.database import create_hogql_database, serialize_database from posthog.hogql.metadata import get_hogql_metadata from posthog.hogql.modifiers import create_default_modifiers_for_team @@ -54,7 +55,7 @@ def _unwrap_pydantic_dict(response: Any) -> Dict: def process_query( team: Team, query_json: Dict, - in_export_context: Optional[bool] = False, + limit_context: Optional[LimitContext] = None, refresh_requested: Optional[bool] = False, ) -> Dict: # query_json has been parsed by QuerySchemaParser @@ -63,10 +64,10 @@ def process_query( tag_queries(query=query_json) if query_kind in QUERY_WITH_RUNNER: - query_runner = get_query_runner(query_json, team, in_export_context=in_export_context) + query_runner = get_query_runner(query_json, team, limit_context=limit_context) return _unwrap_pydantic_dict(query_runner.run(refresh_requested=refresh_requested)) elif query_kind in QUERY_WITH_RUNNER_NO_CACHE: - query_runner = get_query_runner(query_json, team, in_export_context=in_export_context) + query_runner = get_query_runner(query_json, team, limit_context=limit_context) return _unwrap_pydantic_dict(query_runner.calculate()) elif query_kind == "HogQLMetadata": metadata_query = HogQLMetadata.model_validate(query_json) diff --git a/posthog/api/test/test_query.py b/posthog/api/test/test_query.py index ff03704605014..d538e5a241cdf 100644 --- a/posthog/api/test/test_query.py +++ b/posthog/api/test/test_query.py @@ -6,6 +6,7 @@ from rest_framework import status from posthog.api.services.query import process_query +from posthog.hogql.query import LimitContext from posthog.models.property_definition import PropertyDefinition, PropertyType from posthog.models.utils import UUIDT from posthog.schema import ( @@ -611,7 +612,7 @@ def test_full_hogql_query_limit_exported(self, MAX_SELECT_RETURNED_ROWS=15, DEFA "kind": "HogQLQuery", "query": f"select event from events where distinct_id='{random_uuid}'", }, - in_export_context=True, # This is the only difference + limit_context=LimitContext.EXPORT, # This is the only difference ) self.assertEqual(len(response.get("results", [])), 15) @@ -663,7 +664,7 @@ def test_full_events_query_limit_exported(self, MAX_SELECT_RETURNED_ROWS=15, DEF "select": ["event"], "where": [f"distinct_id = '{random_uuid}'"], }, - in_export_context=True, + limit_context=LimitContext.EXPORT, ) self.assertEqual(len(response.get("results", [])), 15) diff --git a/posthog/celery.py b/posthog/celery.py index 90037463e1358..d1804524760ac 100644 --- a/posthog/celery.py +++ b/posthog/celery.py @@ -402,7 +402,7 @@ def redis_heartbeat(): @app.task(ignore_result=True, bind=True) -def process_query_task(self, team_id, query_id, query_json, in_export_context=False, refresh_requested=False): +def process_query_task(self, team_id, query_id, query_json, limit_context=None, refresh_requested=False): """ Kick off query Once complete save results to redis @@ -413,7 +413,7 @@ def process_query_task(self, team_id, query_id, query_json, in_export_context=Fa team_id=team_id, query_id=query_id, query_json=query_json, - in_export_context=in_export_context, + limit_context=limit_context, refresh_requested=refresh_requested, ) diff --git a/posthog/clickhouse/client/execute_async.py b/posthog/clickhouse/client/execute_async.py index 211c685a068b1..9be449596fdf0 100644 --- a/posthog/clickhouse/client/execute_async.py +++ b/posthog/clickhouse/client/execute_async.py @@ -9,6 +9,7 @@ from posthog import celery, redis from posthog.celery import process_query_task from posthog.clickhouse.query_tagging import tag_queries +from posthog.hogql.constants import LimitContext from posthog.schema import QueryStatus logger = structlog.get_logger(__name__) @@ -69,7 +70,7 @@ def execute_process_query( team_id, query_id, query_json, - in_export_context, + limit_context, refresh_requested, ): manager = QueryStatusManager(query_id, team_id) @@ -90,7 +91,7 @@ def execute_process_query( try: tag_queries(client_query_id=query_id, team_id=team_id) results = process_query( - team=team, query_json=query_json, in_export_context=in_export_context, refresh_requested=refresh_requested + team=team, query_json=query_json, limit_context=limit_context, refresh_requested=refresh_requested ) logger.info("Got results for team %s query %s", team_id, query_id) query_status.complete = True @@ -135,10 +136,12 @@ def enqueue_process_query_task( if bypass_celery: # Call directly ( for testing ) - process_query_task(team_id, query_id, query_json, in_export_context=True, refresh_requested=refresh_requested) + process_query_task( + team_id, query_id, query_json, limit_context=LimitContext.EXPORT, refresh_requested=refresh_requested + ) else: task = process_query_task.delay( - team_id, query_id, query_json, in_export_context=True, refresh_requested=refresh_requested + team_id, query_id, query_json, limit_context=LimitContext.EXPORT, refresh_requested=refresh_requested ) query_status.task_id = task.id manager.store_query_status(query_status) diff --git a/posthog/hogql/constants.py b/posthog/hogql/constants.py index 0a2806ca99878..8ea5670321267 100644 --- a/posthog/hogql/constants.py +++ b/posthog/hogql/constants.py @@ -1,4 +1,5 @@ from datetime import date, datetime +from enum import Enum from typing import Optional, Literal, TypeAlias, Tuple, List from uuid import UUID from pydantic import ConfigDict, BaseModel @@ -32,6 +33,11 @@ MAX_SELECT_RETURNED_ROWS = 10000 # sync with CSV_EXPORT_LIMIT +class LimitContext(Enum): + QUERY = "query" + EXPORT = "export" + + # Settings applied at the SELECT level class HogQLQuerySettings(BaseModel): model_config = ConfigDict(extra="forbid") diff --git a/posthog/hogql/modifiers.py b/posthog/hogql/modifiers.py index 8884f197afcf6..fd49ba2bc270c 100644 --- a/posthog/hogql/modifiers.py +++ b/posthog/hogql/modifiers.py @@ -1,12 +1,14 @@ -from typing import Optional +from typing import Optional, TYPE_CHECKING -from posthog.models import Team from posthog.schema import HogQLQueryModifiers, MaterializationMode from posthog.utils import PersonOnEventsMode +if TYPE_CHECKING: + from posthog.models import Team + def create_default_modifiers_for_team( - team: Team, modifiers: Optional[HogQLQueryModifiers] = None + team: "Team", modifiers: Optional[HogQLQueryModifiers] = None ) -> HogQLQueryModifiers: if modifiers is None: modifiers = HogQLQueryModifiers() diff --git a/posthog/hogql/query.py b/posthog/hogql/query.py index 751b9fb46b860..8ca5f5b582ab1 100644 --- a/posthog/hogql/query.py +++ b/posthog/hogql/query.py @@ -3,7 +3,7 @@ from posthog.clickhouse.client.connection import Workload from posthog.errors import ExposedCHQueryError from posthog.hogql import ast -from posthog.hogql.constants import HogQLGlobalSettings +from posthog.hogql.constants import HogQLGlobalSettings, LimitContext from posthog.hogql.errors import HogQLException from posthog.hogql.hogql import HogQLContext from posthog.hogql.modifiers import create_default_modifiers_for_team @@ -34,7 +34,7 @@ def execute_hogql_query( workload: Workload = Workload.ONLINE, settings: Optional[HogQLGlobalSettings] = None, modifiers: Optional[HogQLQueryModifiers] = None, - in_export_context: Optional[bool] = False, + limit_context: Optional[LimitContext] = None, timings: Optional[HogQLTimings] = None, explain: Optional[bool] = False, ) -> HogQLQueryResponse: @@ -80,7 +80,7 @@ def execute_hogql_query( if one_query.limit is None: # One more "max" of MAX_SELECT_RETURNED_ROWS (10k) in applied in the query printer. one_query.limit = ast.Constant( - value=MAX_SELECT_RETURNED_ROWS if in_export_context else DEFAULT_RETURNED_ROWS + value=MAX_SELECT_RETURNED_ROWS if limit_context == LimitContext.EXPORT else DEFAULT_RETURNED_ROWS ) # Get printed HogQL query, and returned columns. Using a cloned query. @@ -122,7 +122,7 @@ def execute_hogql_query( ) settings = settings or HogQLGlobalSettings() - if in_export_context: + if limit_context == LimitContext.EXPORT: settings.max_execution_time = EXPORT_CONTEXT_MAX_EXECUTION_TIME # Print the ClickHouse SQL query diff --git a/posthog/hogql_queries/events_query_runner.py b/posthog/hogql_queries/events_query_runner.py index e7ec26a441ded..f9ee10c648f25 100644 --- a/posthog/hogql_queries/events_query_runner.py +++ b/posthog/hogql_queries/events_query_runner.py @@ -12,7 +12,7 @@ from posthog.hogql import ast from posthog.hogql.parser import parse_expr, parse_order_expr from posthog.hogql.property import action_to_expr, has_aggregation, property_to_expr -from posthog.hogql.query import execute_hogql_query +from posthog.hogql.query import execute_hogql_query, LimitContext from posthog.hogql.timings import HogQLTimings from posthog.hogql_queries.query_runner import QueryRunner from posthog.models import Action, Person @@ -187,7 +187,7 @@ def calculate(self) -> EventsQueryResponse: query_type="EventsQuery", timings=self.timings, modifiers=self.modifiers, - in_export_context=self.in_export_context, + limit_context=self.limit_context, ) # Convert star field from tuple to dict in each result @@ -265,7 +265,7 @@ def limit(self) -> int: return ( min( MAX_SELECT_RETURNED_ROWS, - (MAX_SELECT_RETURNED_ROWS if self.in_export_context else DEFAULT_RETURNED_ROWS) + (MAX_SELECT_RETURNED_ROWS if self.limit_context == LimitContext.EXPORT else DEFAULT_RETURNED_ROWS) if self.query.limit is None else self.query.limit, ) diff --git a/posthog/hogql_queries/hogql_query_runner.py b/posthog/hogql_queries/hogql_query_runner.py index a79e875d14a73..1a6bcc89c730c 100644 --- a/posthog/hogql_queries/hogql_query_runner.py +++ b/posthog/hogql_queries/hogql_query_runner.py @@ -49,7 +49,7 @@ def calculate(self) -> HogQLQueryResponse: team=self.team, workload=Workload.ONLINE, timings=self.timings, - in_export_context=self.in_export_context, + limit_context=self.limit_context, explain=bool(self.query.explain), ) diff --git a/posthog/hogql_queries/insights/insight_persons_query_runner.py b/posthog/hogql_queries/insights/insight_persons_query_runner.py index 51cf792346992..e0681bc5af08a 100644 --- a/posthog/hogql_queries/insights/insight_persons_query_runner.py +++ b/posthog/hogql_queries/insights/insight_persons_query_runner.py @@ -16,7 +16,7 @@ class InsightPersonsQueryRunner(QueryRunner): @cached_property def source_runner(self) -> QueryRunner: - return get_query_runner(self.query.source, self.team, self.timings, self.in_export_context) + return get_query_runner(self.query.source, self.team, self.timings, self.limit_context) def to_query(self) -> ast.SelectQuery | ast.SelectUnionQuery: if isinstance(self.source_runner, LifecycleQueryRunner): diff --git a/posthog/hogql_queries/insights/trends/trends_query_runner.py b/posthog/hogql_queries/insights/trends/trends_query_runner.py index 76f204c8a310f..eb450338a446a 100644 --- a/posthog/hogql_queries/insights/trends/trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/trends_query_runner.py @@ -51,9 +51,9 @@ def __init__( team: Team, timings: Optional[HogQLTimings] = None, modifiers: Optional[HogQLQueryModifiers] = None, - in_export_context: Optional[bool] = None, + limit_context: Optional[bool] = None, ): - super().__init__(query, team=team, timings=timings, modifiers=modifiers, in_export_context=in_export_context) + super().__init__(query, team=team, timings=timings, modifiers=modifiers, limit_context=limit_context) self.series = self.setup_series() def _is_stale(self, cached_result_package): diff --git a/posthog/hogql_queries/query_runner.py b/posthog/hogql_queries/query_runner.py index 1f2b0d43ad743..ed08a9fcbb34e 100644 --- a/posthog/hogql_queries/query_runner.py +++ b/posthog/hogql_queries/query_runner.py @@ -9,6 +9,7 @@ from posthog.clickhouse.query_tagging import tag_queries from posthog.hogql import ast +from posthog.hogql.constants import LimitContext from posthog.hogql.context import HogQLContext from posthog.hogql.printer import print_ast from posthog.hogql.query import create_default_modifiers_for_team @@ -88,7 +89,7 @@ def get_query_runner( query: Dict[str, Any] | RunnableQueryNode | BaseModel, team: Team, timings: Optional[HogQLTimings] = None, - in_export_context: Optional[bool] = False, + limit_context: Optional[LimitContext] = None, modifiers: Optional[HogQLQueryModifiers] = None, ) -> "QueryRunner": kind = None @@ -106,7 +107,7 @@ def get_query_runner( query=cast(LifecycleQuery | Dict[str, Any], query), team=team, timings=timings, - in_export_context=in_export_context, + limit_context=limit_context, modifiers=modifiers, ) if kind == "TrendsQuery": @@ -116,7 +117,7 @@ def get_query_runner( query=cast(TrendsQuery | Dict[str, Any], query), team=team, timings=timings, - in_export_context=in_export_context, + limit_context=limit_context, modifiers=modifiers, ) if kind == "EventsQuery": @@ -126,7 +127,7 @@ def get_query_runner( query=cast(EventsQuery | Dict[str, Any], query), team=team, timings=timings, - in_export_context=in_export_context, + limit_context=limit_context, modifiers=modifiers, ) if kind == "PersonsQuery": @@ -136,7 +137,7 @@ def get_query_runner( query=cast(PersonsQuery | Dict[str, Any], query), team=team, timings=timings, - in_export_context=in_export_context, + limit_context=limit_context, modifiers=modifiers, ) if kind == "InsightPersonsQuery": @@ -146,7 +147,7 @@ def get_query_runner( query=cast(InsightPersonsQuery | Dict[str, Any], query), team=team, timings=timings, - in_export_context=in_export_context, + limit_context=limit_context, modifiers=modifiers, ) if kind == "HogQLQuery": @@ -156,7 +157,7 @@ def get_query_runner( query=cast(HogQLQuery | Dict[str, Any], query), team=team, timings=timings, - in_export_context=in_export_context, + limit_context=limit_context, modifiers=modifiers, ) if kind == "SessionsTimelineQuery": @@ -190,7 +191,7 @@ class QueryRunner(ABC): team: Team timings: HogQLTimings modifiers: HogQLQueryModifiers - in_export_context: bool + limit_context: LimitContext def __init__( self, @@ -198,11 +199,11 @@ def __init__( team: Team, timings: Optional[HogQLTimings] = None, modifiers: Optional[HogQLQueryModifiers] = None, - in_export_context: Optional[bool] = False, + limit_context: Optional[LimitContext] = None, ): self.team = team self.timings = timings or HogQLTimings() - self.in_export_context = in_export_context or False + self.limit_context = limit_context or LimitContext.QUERY self.modifiers = create_default_modifiers_for_team(team, modifiers) if isinstance(query, self.query_type): self.query = query # type: ignore @@ -216,7 +217,7 @@ def calculate(self) -> BaseModel: raise NotImplementedError() def run(self, refresh_requested: Optional[bool] = None) -> CachedQueryResponse: - cache_key = self._cache_key() + ("_export" if self.in_export_context else "") + cache_key = self._cache_key() + ("_export" if self.limit_context == LimitContext.EXPORT else "") tag_queries(cache_key=cache_key) if not refresh_requested: diff --git a/posthog/tasks/exports/csv_exporter.py b/posthog/tasks/exports/csv_exporter.py index 8f6fffd0c9f90..c0dc99ff436fc 100644 --- a/posthog/tasks/exports/csv_exporter.py +++ b/posthog/tasks/exports/csv_exporter.py @@ -19,6 +19,7 @@ EXPORT_TIMER, ) from ...constants import CSV_EXPORT_LIMIT +from ...hogql.query import LimitContext logger = structlog.get_logger(__name__) @@ -184,7 +185,7 @@ def _export_to_csv(exported_asset: ExportedAsset, limit: int = 1000) -> None: if resource.get("source"): query = resource.get("source") - query_response = process_query(team=exported_asset.team, query_json=query, in_export_context=True) + query_response = process_query(team=exported_asset.team, query_json=query, limit_context=LimitContext.EXPORT) all_csv_rows = _convert_response_to_csv_data(query_response) else: