Skip to content

Commit

Permalink
feat(trends): parallelize trends query runner for multiple series (#1…
Browse files Browse the repository at this point in the history
…9784)

* Parallelize trends query runner for multiple series

* Fixed tests

* Update query snapshots

* Added comment for closing db connection

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
Gilbert09 and github-actions[bot] authored Jan 17, 2024
1 parent 45c4443 commit 103280f
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 20 deletions.
3 changes: 1 addition & 2 deletions mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,7 @@ posthog/api/person.py:0: error: Argument 1 to "loads" has incompatible type "str
posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type]
posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type]
posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Argument 1 to "append" of "list" has incompatible type "EventPropertyFilter"; expected "Expr" [arg-type]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Need type annotation for "timings" (hint: "timings: List[<type>] = ...") [var-annotated]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "extend" of "list" has incompatible type "list[QueryTiming] | None"; expected "Iterable[Any]" [arg-type]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Unused "type: ignore" comment [unused-ignore]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type]
posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type]
Expand Down
3 changes: 3 additions & 0 deletions posthog/hogql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

if TYPE_CHECKING:
from posthog.hogql.database.database import Database
from posthog.models import Team


@dataclass
Expand All @@ -22,6 +23,8 @@ class HogQLContext:

# Team making the queries
team_id: Optional[int]
# Team making the queries - if team is passed in, then the team isn't queried when creating the database
team: Optional["Team"] = None
# Virtual database we're querying, will be populated from team_id if not present
database: Optional["Database"] = None
# If set, will save string constants to this dict. Inlines strings into the query if None.
Expand Down
12 changes: 9 additions & 3 deletions posthog/hogql/database/database.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, ClassVar, Dict, List, Literal, Optional, TypedDict
from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Literal, Optional, TypedDict
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from pydantic import ConfigDict, BaseModel

Expand Down Expand Up @@ -51,6 +51,10 @@
from posthog.schema import HogQLQueryModifiers, PersonsOnEventsMode


if TYPE_CHECKING:
from posthog.models import Team


class Database(BaseModel):
model_config = ConfigDict(extra="allow")

Expand Down Expand Up @@ -121,7 +125,9 @@ def add_warehouse_tables(self, **field_definitions: Any):
setattr(self, f_name, f_def)


def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers] = None) -> Database:
def create_hogql_database(
team_id: int, modifiers: Optional[HogQLQueryModifiers] = None, team_arg: Optional["Team"] = None
) -> Database:
from posthog.models import Team
from posthog.hogql.query import create_default_modifiers_for_team
from posthog.warehouse.models import (
Expand All @@ -130,7 +136,7 @@ def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers]
DataWarehouseViewLink,
)

team = Team.objects.get(pk=team_id)
team = team_arg or Team.objects.get(pk=team_id)
modifiers = create_default_modifiers_for_team(team, modifiers)
database = Database(timezone=team.timezone, week_start_day=team.week_start_day)

Expand Down
2 changes: 1 addition & 1 deletion posthog/hogql/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def prepare_ast_for_printing(
settings: Optional[HogQLGlobalSettings] = None,
) -> ast.Expr:
with context.timings.measure("create_hogql_database"):
context.database = context.database or create_hogql_database(context.team_id, context.modifiers)
context.database = context.database or create_hogql_database(context.team_id, context.modifiers, context.team)

if context.modifiers.inCohortVia == InCohortVia.leftjoin_conjoined:
with context.timings.measure("resolve_in_cohorts_conjoined"):
Expand Down
2 changes: 2 additions & 0 deletions posthog/hogql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def execute_hogql_query(
with timings.measure("prepare_ast"):
hogql_query_context = HogQLContext(
team_id=team.pk,
team=team,
enable_select_queries=True,
timings=timings,
modifiers=query_modifiers,
Expand Down Expand Up @@ -121,6 +122,7 @@ def execute_hogql_query(
with timings.measure("print_ast"):
clickhouse_context = HogQLContext(
team_id=team.pk,
team=team,
enable_select_queries=True,
timings=timings,
modifiers=query_modifiers,
Expand Down
1 change: 1 addition & 0 deletions posthog/hogql_queries/insights/trends/test/test_trends.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ def convert_filter_to_trends_query(filter: Filter) -> TrendsQuery:
return tq


@override_settings(IN_UNIT_TESTING=True)
class TestTrends(ClickhouseTestMixin, APIBaseTest):
maxDiff = None

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from dataclasses import dataclass
from typing import Dict, List, Optional
from unittest.mock import patch
from django.test import override_settings
from freezegun import freeze_time
from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner
from posthog.models.cohort.cohort import Cohort
Expand Down Expand Up @@ -41,6 +43,7 @@ class SeriesTestData:
properties: Dict[str, str | int]


@override_settings(IN_UNIT_TESTING=True)
class TestQuery(ClickhouseTestMixin, APIBaseTest):
default_date_from = "2020-01-09"
default_date_to = "2020-01-19"
Expand Down Expand Up @@ -1158,3 +1161,22 @@ def test_smoothing(self):
)

assert response.results[0]["data"] == [1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0]

@patch("posthog.hogql_queries.insights.trends.trends_query_runner.execute_hogql_query")
def test_should_throw_exception(self, patch_sync_execute):
patch_sync_execute.side_effect = Exception("Error thrown inside thread")

with self.assertRaises(Exception) as e:
self._run_trends_query(
"2020-01-09",
"2020-01-20",
IntervalType.day,
[EventsNode(event="$pageview")],
None,
None,
)

self.assertEqual(
str(e.exception),
"Error thrown inside thread",
)
75 changes: 62 additions & 13 deletions posthog/hogql_queries/insights/trends/trends_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from itertools import groupby
from math import ceil
from operator import itemgetter
import threading
from typing import List, Optional, Any, Dict
from django.conf import settings

from django.utils.timezone import datetime
from posthog.caching.insights_api import (
Expand Down Expand Up @@ -40,6 +42,7 @@
EventsNode,
HogQLQueryResponse,
InCohortVia,
QueryTiming,
TrendsQuery,
TrendsQueryResponse,
HogQLQueryModifiers,
Expand Down Expand Up @@ -129,23 +132,69 @@ def to_actors_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
def calculate(self):
queries = self.to_query()

res = []
timings = []
res_matrix: List[List[Any] | Any | None] = [None] * len(queries)
timings_matrix: List[List[QueryTiming] | None] = [None] * len(queries)
errors: List[Exception] = []

for index, query in enumerate(queries):
series_with_extra = self.series[index]
def run(index: int, query: ast.SelectQuery | ast.SelectUnionQuery, is_parallel: bool):
try:
series_with_extra = self.series[index]

response = execute_hogql_query(
query_type="TrendsQuery",
query=query,
team=self.team,
timings=self.timings,
modifiers=self.modifiers,
)
response = execute_hogql_query(
query_type="TrendsQuery",
query=query,
team=self.team,
timings=self.timings,
modifiers=self.modifiers,
)

timings.extend(response.timings)
timings_matrix[index] = response.timings
res_matrix[index] = self.build_series_response(response, series_with_extra, len(queries))
except Exception as e:
errors.append(e)
finally:
if is_parallel:
from django.db import connection

# This will only close the DB connection for the newly spawned thread and not the whole app
connection.close()

# This exists so that we're not spawning threads during unit tests. We can't do
# this right now due to the lack of multithreaded support of Django
if settings.IN_UNIT_TESTING: # type: ignore
for index, query in enumerate(queries):
run(index, query, False)
elif len(queries) == 1:
run(0, queries[0], False)
else:
jobs = [threading.Thread(target=run, args=(index, query, True)) for index, query in enumerate(queries)]

res.extend(self.build_series_response(response, series_with_extra, len(queries)))
# Start the threads
for j in jobs:
j.start()

# Ensure all of the threads have finished
for j in jobs:
j.join()

# Raise any errors raised in a seperate thread
if len(errors) > 0:
raise errors[0]

# Flatten res and timings
res = []
for result in res_matrix:
if isinstance(result, List):
res.extend(result)
else:
res.append(result)

timings = []
for result in timings_matrix:
if isinstance(result, List):
timings.extend(result)
else:
timings.append(result)

if (
self.query.trendsFilter is not None
Expand Down
6 changes: 5 additions & 1 deletion posthog/models/filters/base_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ def __init__(
# Set the HogQL context for the request
self.hogql_context = self.kwargs.get(
"hogql_context",
HogQLContext(within_non_hogql_query=True, team_id=self.team.pk if self.team else None),
HogQLContext(
within_non_hogql_query=True,
team_id=self.team.pk if self.team else None,
team=self.team if self.team else None,
),
)
if self.team:
self.hogql_context.person_on_events_mode = self.team.person_on_events_mode
Expand Down
2 changes: 2 additions & 0 deletions posthog/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@

PROM_PUSHGATEWAY_ADDRESS = os.getenv("PROM_PUSHGATEWAY_ADDRESS", None)

IN_UNIT_TESTING = get_from_env("IN_UNIT_TESTING", False, type_cast=str_to_bool)

# Extend and override these settings with EE's ones
if "ee.apps.EnterpriseConfig" in INSTALLED_APPS:
from ee.settings import * # noqa: F401, F403

0 comments on commit 103280f

Please sign in to comment.