From 103280fc06083710496b509ac58388bcad2c7add Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Wed, 17 Jan 2024 16:54:37 +0000 Subject: [PATCH] feat(trends): parallelize trends query runner for multiple series (#19784) * Parallelize trends query runner for multiple series * Fixed tests * Update query snapshots * Added comment for closing db connection --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- mypy-baseline.txt | 3 +- posthog/hogql/context.py | 3 + posthog/hogql/database/database.py | 12 ++- posthog/hogql/printer.py | 2 +- posthog/hogql/query.py | 2 + .../insights/trends/test/test_trends.py | 1 + .../trends/test/test_trends_query_runner.py | 22 ++++++ .../insights/trends/trends_query_runner.py | 75 +++++++++++++++---- posthog/models/filters/base_filter.py | 6 +- posthog/settings/__init__.py | 2 + 10 files changed, 108 insertions(+), 20 deletions(-) diff --git a/mypy-baseline.txt b/mypy-baseline.txt index 7747af86b3112..061cc288ffcd5 100644 --- a/mypy-baseline.txt +++ b/mypy-baseline.txt @@ -421,8 +421,7 @@ posthog/api/person.py:0: error: Argument 1 to "loads" has incompatible type "str posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] posthog/api/person.py:0: error: Argument "user" to "log_activity" has incompatible type "User | AnonymousUser"; expected "User | None" [arg-type] posthog/hogql_queries/web_analytics/web_analytics_query_runner.py:0: error: Argument 1 to "append" of "list" has incompatible type "EventPropertyFilter"; expected "Expr" [arg-type] -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Need type annotation for "timings" (hint: "timings: List[] = ...") [var-annotated] -posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "extend" of "list" has incompatible type "list[QueryTiming] | None"; expected "Iterable[Any]" [arg-type] +posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Unused "type: ignore" comment [unused-ignore] posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Statement is unreachable [unreachable] posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type] posthog/hogql_queries/insights/trends/trends_query_runner.py:0: error: Argument 1 to "FormulaAST" has incompatible type "map[Any]"; expected "list[list[float]]" [arg-type] diff --git a/posthog/hogql/context.py b/posthog/hogql/context.py index 7f45d66fa4f83..1b43362790f16 100644 --- a/posthog/hogql/context.py +++ b/posthog/hogql/context.py @@ -6,6 +6,7 @@ if TYPE_CHECKING: from posthog.hogql.database.database import Database + from posthog.models import Team @dataclass @@ -22,6 +23,8 @@ class HogQLContext: # Team making the queries team_id: Optional[int] + # Team making the queries - if team is passed in, then the team isn't queried when creating the database + team: Optional["Team"] = None # Virtual database we're querying, will be populated from team_id if not present database: Optional["Database"] = None # If set, will save string constants to this dict. Inlines strings into the query if None. diff --git a/posthog/hogql/database/database.py b/posthog/hogql/database/database.py index caac14649f573..b25c7c95c4ca3 100644 --- a/posthog/hogql/database/database.py +++ b/posthog/hogql/database/database.py @@ -1,4 +1,4 @@ -from typing import Any, ClassVar, Dict, List, Literal, Optional, TypedDict +from typing import TYPE_CHECKING, Any, ClassVar, Dict, List, Literal, Optional, TypedDict from zoneinfo import ZoneInfo, ZoneInfoNotFoundError from pydantic import ConfigDict, BaseModel @@ -51,6 +51,10 @@ from posthog.schema import HogQLQueryModifiers, PersonsOnEventsMode +if TYPE_CHECKING: + from posthog.models import Team + + class Database(BaseModel): model_config = ConfigDict(extra="allow") @@ -121,7 +125,9 @@ def add_warehouse_tables(self, **field_definitions: Any): setattr(self, f_name, f_def) -def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers] = None) -> Database: +def create_hogql_database( + team_id: int, modifiers: Optional[HogQLQueryModifiers] = None, team_arg: Optional["Team"] = None +) -> Database: from posthog.models import Team from posthog.hogql.query import create_default_modifiers_for_team from posthog.warehouse.models import ( @@ -130,7 +136,7 @@ def create_hogql_database(team_id: int, modifiers: Optional[HogQLQueryModifiers] DataWarehouseViewLink, ) - team = Team.objects.get(pk=team_id) + team = team_arg or Team.objects.get(pk=team_id) modifiers = create_default_modifiers_for_team(team, modifiers) database = Database(timezone=team.timezone, week_start_day=team.week_start_day) diff --git a/posthog/hogql/printer.py b/posthog/hogql/printer.py index 428ab73a70811..98de2dd1bfcf1 100644 --- a/posthog/hogql/printer.py +++ b/posthog/hogql/printer.py @@ -97,7 +97,7 @@ def prepare_ast_for_printing( settings: Optional[HogQLGlobalSettings] = None, ) -> ast.Expr: with context.timings.measure("create_hogql_database"): - context.database = context.database or create_hogql_database(context.team_id, context.modifiers) + context.database = context.database or create_hogql_database(context.team_id, context.modifiers, context.team) if context.modifiers.inCohortVia == InCohortVia.leftjoin_conjoined: with context.timings.measure("resolve_in_cohorts_conjoined"): diff --git a/posthog/hogql/query.py b/posthog/hogql/query.py index 63b99ea516f5e..3ea5c3d49eb7a 100644 --- a/posthog/hogql/query.py +++ b/posthog/hogql/query.py @@ -81,6 +81,7 @@ def execute_hogql_query( with timings.measure("prepare_ast"): hogql_query_context = HogQLContext( team_id=team.pk, + team=team, enable_select_queries=True, timings=timings, modifiers=query_modifiers, @@ -121,6 +122,7 @@ def execute_hogql_query( with timings.measure("print_ast"): clickhouse_context = HogQLContext( team_id=team.pk, + team=team, enable_select_queries=True, timings=timings, modifiers=query_modifiers, diff --git a/posthog/hogql_queries/insights/trends/test/test_trends.py b/posthog/hogql_queries/insights/trends/test/test_trends.py index d2f927070c24f..874d02e246dc4 100644 --- a/posthog/hogql_queries/insights/trends/test/test_trends.py +++ b/posthog/hogql_queries/insights/trends/test/test_trends.py @@ -200,6 +200,7 @@ def convert_filter_to_trends_query(filter: Filter) -> TrendsQuery: return tq +@override_settings(IN_UNIT_TESTING=True) class TestTrends(ClickhouseTestMixin, APIBaseTest): maxDiff = None diff --git a/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py b/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py index 2fe0570ab104a..95e14d0fbd676 100644 --- a/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/test/test_trends_query_runner.py @@ -1,5 +1,7 @@ from dataclasses import dataclass from typing import Dict, List, Optional +from unittest.mock import patch +from django.test import override_settings from freezegun import freeze_time from posthog.hogql_queries.insights.trends.trends_query_runner import TrendsQueryRunner from posthog.models.cohort.cohort import Cohort @@ -41,6 +43,7 @@ class SeriesTestData: properties: Dict[str, str | int] +@override_settings(IN_UNIT_TESTING=True) class TestQuery(ClickhouseTestMixin, APIBaseTest): default_date_from = "2020-01-09" default_date_to = "2020-01-19" @@ -1158,3 +1161,22 @@ def test_smoothing(self): ) assert response.results[0]["data"] == [1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0] + + @patch("posthog.hogql_queries.insights.trends.trends_query_runner.execute_hogql_query") + def test_should_throw_exception(self, patch_sync_execute): + patch_sync_execute.side_effect = Exception("Error thrown inside thread") + + with self.assertRaises(Exception) as e: + self._run_trends_query( + "2020-01-09", + "2020-01-20", + IntervalType.day, + [EventsNode(event="$pageview")], + None, + None, + ) + + self.assertEqual( + str(e.exception), + "Error thrown inside thread", + ) diff --git a/posthog/hogql_queries/insights/trends/trends_query_runner.py b/posthog/hogql_queries/insights/trends/trends_query_runner.py index cb4b5f5ca654d..b45ca18612bb0 100644 --- a/posthog/hogql_queries/insights/trends/trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/trends_query_runner.py @@ -3,7 +3,9 @@ from itertools import groupby from math import ceil from operator import itemgetter +import threading from typing import List, Optional, Any, Dict +from django.conf import settings from django.utils.timezone import datetime from posthog.caching.insights_api import ( @@ -40,6 +42,7 @@ EventsNode, HogQLQueryResponse, InCohortVia, + QueryTiming, TrendsQuery, TrendsQueryResponse, HogQLQueryModifiers, @@ -129,23 +132,69 @@ def to_actors_query(self) -> ast.SelectQuery | ast.SelectUnionQuery: def calculate(self): queries = self.to_query() - res = [] - timings = [] + res_matrix: List[List[Any] | Any | None] = [None] * len(queries) + timings_matrix: List[List[QueryTiming] | None] = [None] * len(queries) + errors: List[Exception] = [] - for index, query in enumerate(queries): - series_with_extra = self.series[index] + def run(index: int, query: ast.SelectQuery | ast.SelectUnionQuery, is_parallel: bool): + try: + series_with_extra = self.series[index] - response = execute_hogql_query( - query_type="TrendsQuery", - query=query, - team=self.team, - timings=self.timings, - modifiers=self.modifiers, - ) + response = execute_hogql_query( + query_type="TrendsQuery", + query=query, + team=self.team, + timings=self.timings, + modifiers=self.modifiers, + ) - timings.extend(response.timings) + timings_matrix[index] = response.timings + res_matrix[index] = self.build_series_response(response, series_with_extra, len(queries)) + except Exception as e: + errors.append(e) + finally: + if is_parallel: + from django.db import connection + + # This will only close the DB connection for the newly spawned thread and not the whole app + connection.close() + + # This exists so that we're not spawning threads during unit tests. We can't do + # this right now due to the lack of multithreaded support of Django + if settings.IN_UNIT_TESTING: # type: ignore + for index, query in enumerate(queries): + run(index, query, False) + elif len(queries) == 1: + run(0, queries[0], False) + else: + jobs = [threading.Thread(target=run, args=(index, query, True)) for index, query in enumerate(queries)] - res.extend(self.build_series_response(response, series_with_extra, len(queries))) + # Start the threads + for j in jobs: + j.start() + + # Ensure all of the threads have finished + for j in jobs: + j.join() + + # Raise any errors raised in a seperate thread + if len(errors) > 0: + raise errors[0] + + # Flatten res and timings + res = [] + for result in res_matrix: + if isinstance(result, List): + res.extend(result) + else: + res.append(result) + + timings = [] + for result in timings_matrix: + if isinstance(result, List): + timings.extend(result) + else: + timings.append(result) if ( self.query.trendsFilter is not None diff --git a/posthog/models/filters/base_filter.py b/posthog/models/filters/base_filter.py index 10ca241ad8f16..ca2ef9e4c575f 100644 --- a/posthog/models/filters/base_filter.py +++ b/posthog/models/filters/base_filter.py @@ -56,7 +56,11 @@ def __init__( # Set the HogQL context for the request self.hogql_context = self.kwargs.get( "hogql_context", - HogQLContext(within_non_hogql_query=True, team_id=self.team.pk if self.team else None), + HogQLContext( + within_non_hogql_query=True, + team_id=self.team.pk if self.team else None, + team=self.team if self.team else None, + ), ) if self.team: self.hogql_context.person_on_events_mode = self.team.person_on_events_mode diff --git a/posthog/settings/__init__.py b/posthog/settings/__init__.py index 2e3e1d1e112de..3593d44f40c57 100644 --- a/posthog/settings/__init__.py +++ b/posthog/settings/__init__.py @@ -110,6 +110,8 @@ PROM_PUSHGATEWAY_ADDRESS = os.getenv("PROM_PUSHGATEWAY_ADDRESS", None) +IN_UNIT_TESTING = get_from_env("IN_UNIT_TESTING", False, type_cast=str_to_bool) + # Extend and override these settings with EE's ones if "ee.apps.EnterpriseConfig" in INSTALLED_APPS: from ee.settings import * # noqa: F401, F403