Skip to content

Commit

Permalink
feat(hogql): rename in_export_context to limit_context (#19080)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra authored Dec 5, 2023
1 parent 9cc2c22 commit ec24bc3
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 37 deletions.
7 changes: 4 additions & 3 deletions posthog/api/services/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from rest_framework.exceptions import ValidationError

from posthog.clickhouse.query_tagging import tag_queries
from posthog.hogql.constants import LimitContext
from posthog.hogql.database.database import create_hogql_database, serialize_database
from posthog.hogql.metadata import get_hogql_metadata
from posthog.hogql.modifiers import create_default_modifiers_for_team
Expand Down Expand Up @@ -54,7 +55,7 @@ def _unwrap_pydantic_dict(response: Any) -> Dict:
def process_query(
team: Team,
query_json: Dict,
in_export_context: Optional[bool] = False,
limit_context: Optional[LimitContext] = None,
refresh_requested: Optional[bool] = False,
) -> Dict:
# query_json has been parsed by QuerySchemaParser
Expand All @@ -63,10 +64,10 @@ def process_query(
tag_queries(query=query_json)

if query_kind in QUERY_WITH_RUNNER:
query_runner = get_query_runner(query_json, team, in_export_context=in_export_context)
query_runner = get_query_runner(query_json, team, limit_context=limit_context)
return _unwrap_pydantic_dict(query_runner.run(refresh_requested=refresh_requested))
elif query_kind in QUERY_WITH_RUNNER_NO_CACHE:
query_runner = get_query_runner(query_json, team, in_export_context=in_export_context)
query_runner = get_query_runner(query_json, team, limit_context=limit_context)
return _unwrap_pydantic_dict(query_runner.calculate())
elif query_kind == "HogQLMetadata":
metadata_query = HogQLMetadata.model_validate(query_json)
Expand Down
5 changes: 3 additions & 2 deletions posthog/api/test/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from rest_framework import status

from posthog.api.services.query import process_query
from posthog.hogql.query import LimitContext
from posthog.models.property_definition import PropertyDefinition, PropertyType
from posthog.models.utils import UUIDT
from posthog.schema import (
Expand Down Expand Up @@ -611,7 +612,7 @@ def test_full_hogql_query_limit_exported(self, MAX_SELECT_RETURNED_ROWS=15, DEFA
"kind": "HogQLQuery",
"query": f"select event from events where distinct_id='{random_uuid}'",
},
in_export_context=True, # This is the only difference
limit_context=LimitContext.EXPORT, # This is the only difference
)
self.assertEqual(len(response.get("results", [])), 15)

Expand Down Expand Up @@ -663,7 +664,7 @@ def test_full_events_query_limit_exported(self, MAX_SELECT_RETURNED_ROWS=15, DEF
"select": ["event"],
"where": [f"distinct_id = '{random_uuid}'"],
},
in_export_context=True,
limit_context=LimitContext.EXPORT,
)

self.assertEqual(len(response.get("results", [])), 15)
Expand Down
4 changes: 2 additions & 2 deletions posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def redis_heartbeat():


@app.task(ignore_result=True, bind=True)
def process_query_task(self, team_id, query_id, query_json, in_export_context=False, refresh_requested=False):
def process_query_task(self, team_id, query_id, query_json, limit_context=None, refresh_requested=False):
"""
Kick off query
Once complete save results to redis
Expand All @@ -413,7 +413,7 @@ def process_query_task(self, team_id, query_id, query_json, in_export_context=Fa
team_id=team_id,
query_id=query_id,
query_json=query_json,
in_export_context=in_export_context,
limit_context=limit_context,
refresh_requested=refresh_requested,
)

Expand Down
11 changes: 7 additions & 4 deletions posthog/clickhouse/client/execute_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from posthog import celery, redis
from posthog.celery import process_query_task
from posthog.clickhouse.query_tagging import tag_queries
from posthog.hogql.constants import LimitContext
from posthog.schema import QueryStatus

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -69,7 +70,7 @@ def execute_process_query(
team_id,
query_id,
query_json,
in_export_context,
limit_context,
refresh_requested,
):
manager = QueryStatusManager(query_id, team_id)
Expand All @@ -90,7 +91,7 @@ def execute_process_query(
try:
tag_queries(client_query_id=query_id, team_id=team_id)
results = process_query(
team=team, query_json=query_json, in_export_context=in_export_context, refresh_requested=refresh_requested
team=team, query_json=query_json, limit_context=limit_context, refresh_requested=refresh_requested
)
logger.info("Got results for team %s query %s", team_id, query_id)
query_status.complete = True
Expand Down Expand Up @@ -135,10 +136,12 @@ def enqueue_process_query_task(

if bypass_celery:
# Call directly ( for testing )
process_query_task(team_id, query_id, query_json, in_export_context=True, refresh_requested=refresh_requested)
process_query_task(
team_id, query_id, query_json, limit_context=LimitContext.EXPORT, refresh_requested=refresh_requested
)
else:
task = process_query_task.delay(
team_id, query_id, query_json, in_export_context=True, refresh_requested=refresh_requested
team_id, query_id, query_json, limit_context=LimitContext.EXPORT, refresh_requested=refresh_requested
)
query_status.task_id = task.id
manager.store_query_status(query_status)
Expand Down
6 changes: 6 additions & 0 deletions posthog/hogql/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date, datetime
from enum import Enum
from typing import Optional, Literal, TypeAlias, Tuple, List
from uuid import UUID
from pydantic import ConfigDict, BaseModel
Expand Down Expand Up @@ -32,6 +33,11 @@
MAX_SELECT_RETURNED_ROWS = 10000 # sync with CSV_EXPORT_LIMIT


class LimitContext(Enum):
QUERY = "query"
EXPORT = "export"


# Settings applied at the SELECT level
class HogQLQuerySettings(BaseModel):
model_config = ConfigDict(extra="forbid")
Expand Down
8 changes: 5 additions & 3 deletions posthog/hogql/modifiers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from typing import Optional
from typing import Optional, TYPE_CHECKING

from posthog.models import Team
from posthog.schema import HogQLQueryModifiers, MaterializationMode
from posthog.utils import PersonOnEventsMode

if TYPE_CHECKING:
from posthog.models import Team


def create_default_modifiers_for_team(
team: Team, modifiers: Optional[HogQLQueryModifiers] = None
team: "Team", modifiers: Optional[HogQLQueryModifiers] = None
) -> HogQLQueryModifiers:
if modifiers is None:
modifiers = HogQLQueryModifiers()
Expand Down
8 changes: 4 additions & 4 deletions posthog/hogql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from posthog.clickhouse.client.connection import Workload
from posthog.errors import ExposedCHQueryError
from posthog.hogql import ast
from posthog.hogql.constants import HogQLGlobalSettings
from posthog.hogql.constants import HogQLGlobalSettings, LimitContext
from posthog.hogql.errors import HogQLException
from posthog.hogql.hogql import HogQLContext
from posthog.hogql.modifiers import create_default_modifiers_for_team
Expand Down Expand Up @@ -34,7 +34,7 @@ def execute_hogql_query(
workload: Workload = Workload.ONLINE,
settings: Optional[HogQLGlobalSettings] = None,
modifiers: Optional[HogQLQueryModifiers] = None,
in_export_context: Optional[bool] = False,
limit_context: Optional[LimitContext] = None,
timings: Optional[HogQLTimings] = None,
explain: Optional[bool] = False,
) -> HogQLQueryResponse:
Expand Down Expand Up @@ -80,7 +80,7 @@ def execute_hogql_query(
if one_query.limit is None:
# One more "max" of MAX_SELECT_RETURNED_ROWS (10k) in applied in the query printer.
one_query.limit = ast.Constant(
value=MAX_SELECT_RETURNED_ROWS if in_export_context else DEFAULT_RETURNED_ROWS
value=MAX_SELECT_RETURNED_ROWS if limit_context == LimitContext.EXPORT else DEFAULT_RETURNED_ROWS
)

# Get printed HogQL query, and returned columns. Using a cloned query.
Expand Down Expand Up @@ -122,7 +122,7 @@ def execute_hogql_query(
)

settings = settings or HogQLGlobalSettings()
if in_export_context:
if limit_context == LimitContext.EXPORT:
settings.max_execution_time = EXPORT_CONTEXT_MAX_EXECUTION_TIME

# Print the ClickHouse SQL query
Expand Down
6 changes: 3 additions & 3 deletions posthog/hogql_queries/events_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from posthog.hogql import ast
from posthog.hogql.parser import parse_expr, parse_order_expr
from posthog.hogql.property import action_to_expr, has_aggregation, property_to_expr
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.query import execute_hogql_query, LimitContext
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.query_runner import QueryRunner
from posthog.models import Action, Person
Expand Down Expand Up @@ -187,7 +187,7 @@ def calculate(self) -> EventsQueryResponse:
query_type="EventsQuery",
timings=self.timings,
modifiers=self.modifiers,
in_export_context=self.in_export_context,
limit_context=self.limit_context,
)

# Convert star field from tuple to dict in each result
Expand Down Expand Up @@ -265,7 +265,7 @@ def limit(self) -> int:
return (
min(
MAX_SELECT_RETURNED_ROWS,
(MAX_SELECT_RETURNED_ROWS if self.in_export_context else DEFAULT_RETURNED_ROWS)
(MAX_SELECT_RETURNED_ROWS if self.limit_context == LimitContext.EXPORT else DEFAULT_RETURNED_ROWS)
if self.query.limit is None
else self.query.limit,
)
Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql_queries/hogql_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def calculate(self) -> HogQLQueryResponse:
team=self.team,
workload=Workload.ONLINE,
timings=self.timings,
in_export_context=self.in_export_context,
limit_context=self.limit_context,
explain=bool(self.query.explain),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class InsightPersonsQueryRunner(QueryRunner):

@cached_property
def source_runner(self) -> QueryRunner:
return get_query_runner(self.query.source, self.team, self.timings, self.in_export_context)
return get_query_runner(self.query.source, self.team, self.timings, self.limit_context)

def to_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
if isinstance(self.source_runner, LifecycleQueryRunner):
Expand Down
4 changes: 2 additions & 2 deletions posthog/hogql_queries/insights/trends/trends_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ def __init__(
team: Team,
timings: Optional[HogQLTimings] = None,
modifiers: Optional[HogQLQueryModifiers] = None,
in_export_context: Optional[bool] = None,
limit_context: Optional[bool] = None,
):
super().__init__(query, team=team, timings=timings, modifiers=modifiers, in_export_context=in_export_context)
super().__init__(query, team=team, timings=timings, modifiers=modifiers, limit_context=limit_context)
self.series = self.setup_series()

def _is_stale(self, cached_result_package):
Expand Down
23 changes: 12 additions & 11 deletions posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from posthog.clickhouse.query_tagging import tag_queries
from posthog.hogql import ast
from posthog.hogql.constants import LimitContext
from posthog.hogql.context import HogQLContext
from posthog.hogql.printer import print_ast
from posthog.hogql.query import create_default_modifiers_for_team
Expand Down Expand Up @@ -88,7 +89,7 @@ def get_query_runner(
query: Dict[str, Any] | RunnableQueryNode | BaseModel,
team: Team,
timings: Optional[HogQLTimings] = None,
in_export_context: Optional[bool] = False,
limit_context: Optional[LimitContext] = None,
modifiers: Optional[HogQLQueryModifiers] = None,
) -> "QueryRunner":
kind = None
Expand All @@ -106,7 +107,7 @@ def get_query_runner(
query=cast(LifecycleQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "TrendsQuery":
Expand All @@ -116,7 +117,7 @@ def get_query_runner(
query=cast(TrendsQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "EventsQuery":
Expand All @@ -126,7 +127,7 @@ def get_query_runner(
query=cast(EventsQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "PersonsQuery":
Expand All @@ -136,7 +137,7 @@ def get_query_runner(
query=cast(PersonsQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "InsightPersonsQuery":
Expand All @@ -146,7 +147,7 @@ def get_query_runner(
query=cast(InsightPersonsQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "HogQLQuery":
Expand All @@ -156,7 +157,7 @@ def get_query_runner(
query=cast(HogQLQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "SessionsTimelineQuery":
Expand Down Expand Up @@ -190,19 +191,19 @@ class QueryRunner(ABC):
team: Team
timings: HogQLTimings
modifiers: HogQLQueryModifiers
in_export_context: bool
limit_context: LimitContext

def __init__(
self,
query: RunnableQueryNode | BaseModel | Dict[str, Any],
team: Team,
timings: Optional[HogQLTimings] = None,
modifiers: Optional[HogQLQueryModifiers] = None,
in_export_context: Optional[bool] = False,
limit_context: Optional[LimitContext] = None,
):
self.team = team
self.timings = timings or HogQLTimings()
self.in_export_context = in_export_context or False
self.limit_context = limit_context or LimitContext.QUERY
self.modifiers = create_default_modifiers_for_team(team, modifiers)
if isinstance(query, self.query_type):
self.query = query # type: ignore
Expand All @@ -216,7 +217,7 @@ def calculate(self) -> BaseModel:
raise NotImplementedError()

def run(self, refresh_requested: Optional[bool] = None) -> CachedQueryResponse:
cache_key = self._cache_key() + ("_export" if self.in_export_context else "")
cache_key = self._cache_key() + ("_export" if self.limit_context == LimitContext.EXPORT else "")
tag_queries(cache_key=cache_key)

if not refresh_requested:
Expand Down
3 changes: 2 additions & 1 deletion posthog/tasks/exports/csv_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
EXPORT_TIMER,
)
from ...constants import CSV_EXPORT_LIMIT
from ...hogql.query import LimitContext

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -184,7 +185,7 @@ def _export_to_csv(exported_asset: ExportedAsset, limit: int = 1000) -> None:

if resource.get("source"):
query = resource.get("source")
query_response = process_query(team=exported_asset.team, query_json=query, in_export_context=True)
query_response = process_query(team=exported_asset.team, query_json=query, limit_context=LimitContext.EXPORT)
all_csv_rows = _convert_response_to_csv_data(query_response)

else:
Expand Down

0 comments on commit ec24bc3

Please sign in to comment.