From bfd3b502e413ac2e3ee4bfe5199044182a1c2e29 Mon Sep 17 00:00:00 2001 From: Georgiy Tarasov Date: Fri, 25 Oct 2024 15:07:42 +0200 Subject: [PATCH] feat(insights): add `async_except_on_cache_miss` for the query runner (#25760) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- frontend/src/queries/schema.json | 108 ++++++++++++++++++ frontend/src/queries/schema.ts | 3 + posthog/clickhouse/client/execute_async.py | 16 ++- posthog/hogql_queries/query_runner.py | 64 ++++++++--- .../hogql_queries/test/test_query_runner.py | 42 +++++++ posthog/schema.py | 78 +++++++++++++ 6 files changed, 289 insertions(+), 22 deletions(-) diff --git a/frontend/src/queries/schema.json b/frontend/src/queries/schema.json index de989c53b1c72..315dfaa5e1926 100644 --- a/frontend/src/queries/schema.json +++ b/frontend/src/queries/schema.json @@ -9998,6 +9998,105 @@ }, "required": ["questions"], "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "error": { + "description": "Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.", + "type": "string" + }, + "hogql": { + "description": "Generated HogQL query.", + "type": "string" + }, + "modifiers": { + "$ref": "#/definitions/HogQLQueryModifiers", + "description": "Modifiers used when performing the query" + }, + "query_status": { + "$ref": "#/definitions/QueryStatus", + "description": "Query status indicates whether next to the provided data, a query is still running." + }, + "results": { + "$ref": "#/definitions/TeamTaxonomyResponse" + }, + "timings": { + "description": "Measured timings for different parts of the query generation process", + "items": { + "$ref": "#/definitions/QueryTiming" + }, + "type": "array" + } + }, + "required": ["results"], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "error": { + "description": "Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.", + "type": "string" + }, + "hogql": { + "description": "Generated HogQL query.", + "type": "string" + }, + "modifiers": { + "$ref": "#/definitions/HogQLQueryModifiers", + "description": "Modifiers used when performing the query" + }, + "query_status": { + "$ref": "#/definitions/QueryStatus", + "description": "Query status indicates whether next to the provided data, a query is still running." + }, + "results": { + "$ref": "#/definitions/EventTaxonomyResponse" + }, + "timings": { + "description": "Measured timings for different parts of the query generation process", + "items": { + "$ref": "#/definitions/QueryTiming" + }, + "type": "array" + } + }, + "required": ["results"], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "error": { + "description": "Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.", + "type": "string" + }, + "hogql": { + "description": "Generated HogQL query.", + "type": "string" + }, + "modifiers": { + "$ref": "#/definitions/HogQLQueryModifiers", + "description": "Modifiers used when performing the query" + }, + "query_status": { + "$ref": "#/definitions/QueryStatus", + "description": "Query status indicates whether next to the provided data, a query is still running." + }, + "results": { + "$ref": "#/definitions/ActorsPropertyTaxonomyResponse" + }, + "timings": { + "description": "Measured timings for different parts of the query generation process", + "items": { + "$ref": "#/definitions/QueryTiming" + }, + "type": "array" + } + }, + "required": ["results"], + "type": "object" } ] }, @@ -10110,6 +10209,15 @@ }, { "$ref": "#/definitions/SuggestedQuestionsQuery" + }, + { + "$ref": "#/definitions/TeamTaxonomyQuery" + }, + { + "$ref": "#/definitions/EventTaxonomyQuery" + }, + { + "$ref": "#/definitions/ActorsPropertyTaxonomyQuery" } ], "required": ["kind"], diff --git a/frontend/src/queries/schema.ts b/frontend/src/queries/schema.ts index 3fee10aa88c33..397e0487de994 100644 --- a/frontend/src/queries/schema.ts +++ b/frontend/src/queries/schema.ts @@ -183,6 +183,9 @@ export type QuerySchema = // AI | SuggestedQuestionsQuery + | TeamTaxonomyQuery + | EventTaxonomyQuery + | ActorsPropertyTaxonomyQuery // Keep this, because QuerySchema itself will be collapsed as it is used in other models export type QuerySchemaRoot = QuerySchema diff --git a/posthog/clickhouse/client/execute_async.py b/posthog/clickhouse/client/execute_async.py index 8c7fa9cdfaec5..bf85bba7e4393 100644 --- a/posthog/clickhouse/client/execute_async.py +++ b/posthog/clickhouse/client/execute_async.py @@ -1,24 +1,22 @@ import datetime - -import orjson as json -from typing import TYPE_CHECKING, Optional import uuid +from typing import TYPE_CHECKING, Optional -from pydantic import BaseModel +import orjson as json import sentry_sdk import structlog from prometheus_client import Histogram -from rest_framework.exceptions import NotFound, APIException +from pydantic import BaseModel +from rest_framework.exceptions import APIException, NotFound from posthog import celery, redis from posthog.clickhouse.client.async_task_chain import add_task_to_on_commit from posthog.clickhouse.query_tagging import tag_queries -from posthog.errors import ExposedCHQueryError, CHQueryErrorTooManySimultaneousQueries +from posthog.errors import CHQueryErrorTooManySimultaneousQueries, ExposedCHQueryError from posthog.hogql.constants import LimitContext from posthog.hogql.errors import ExposedHogQLError from posthog.renderers import SafeJSONRenderer -from posthog.schema import QueryStatus -from posthog.schema import ClickhouseQueryProgress +from posthog.schema import ClickhouseQueryProgress, QueryStatus from posthog.tasks.tasks import process_query_task if TYPE_CHECKING: @@ -143,7 +141,7 @@ def execute_process_query( ): manager = QueryStatusManager(query_id, team_id) - from posthog.api.services.query import process_query_dict, ExecutionMode + from posthog.api.services.query import ExecutionMode, process_query_dict from posthog.models import Team from posthog.models.user import User diff --git a/posthog/hogql_queries/query_runner.py b/posthog/hogql_queries/query_runner.py index 664430cc7da04..40918e51b5e97 100644 --- a/posthog/hogql_queries/query_runner.py +++ b/posthog/hogql_queries/query_runner.py @@ -1,19 +1,20 @@ from abc import ABC, abstractmethod -from datetime import datetime, timedelta, UTC +from datetime import UTC, datetime, timedelta from enum import StrEnum -from typing import Any, Generic, Optional, TypeVar, Union, cast, TypeGuard +from typing import Any, Generic, Optional, TypeGuard, TypeVar, Union, cast import structlog from prometheus_client import Counter from pydantic import BaseModel, ConfigDict -from sentry_sdk import capture_exception, push_scope, set_tag, get_traceparent +from sentry_sdk import capture_exception, get_traceparent, push_scope, set_tag -from posthog.caching.utils import is_stale, ThresholdMode, cache_target_age, last_refresh_from_cached_result -from posthog.clickhouse.client.execute_async import enqueue_process_query_task, get_query_status, QueryNotFoundError -from posthog.clickhouse.query_tagging import tag_queries, get_query_tag_value +from posthog.caching.utils import ThresholdMode, cache_target_age, is_stale, last_refresh_from_cached_result +from posthog.clickhouse.client.execute_async import QueryNotFoundError, enqueue_process_query_task, get_query_status +from posthog.clickhouse.query_tagging import get_query_tag_value, tag_queries from posthog.hogql import ast from posthog.hogql.constants import LimitContext from posthog.hogql.context import HogQLContext +from posthog.hogql.modifiers import create_default_modifiers_for_user from posthog.hogql.printer import print_ast from posthog.hogql.query import create_default_modifiers_for_team from posthog.hogql.timings import HogQLTimings @@ -21,16 +22,19 @@ from posthog.metrics import LABEL_TEAM_ID from posthog.models import Team, User from posthog.schema import ( + ActorsPropertyTaxonomyQuery, ActorsQuery, CacheMissResponse, DashboardFilter, DateRange, EventsQuery, + EventTaxonomyQuery, FilterLogicalOperator, FunnelCorrelationActorsQuery, FunnelCorrelationQuery, FunnelsActorsQuery, FunnelsQuery, + GenericCachedQueryResponse, HogQLQuery, HogQLQueryModifiers, HogQLVariable, @@ -40,25 +44,24 @@ PathsQuery, PropertyGroupFilter, PropertyGroupFilterValue, + QueryStatus, + QueryStatusResponse, QueryTiming, RetentionQuery, SamplingRate, + SessionAttributionExplorerQuery, SessionsTimelineQuery, StickinessQuery, SuggestedQuestionsQuery, + TeamTaxonomyQuery, TrendsQuery, + WebGoalsQuery, WebOverviewQuery, WebStatsTableQuery, WebTopClicksQuery, - QueryStatusResponse, - GenericCachedQueryResponse, - QueryStatus, - SessionAttributionExplorerQuery, - WebGoalsQuery, ) from posthog.schema_helpers import to_dict, to_json from posthog.utils import generate_cache_key, get_from_dict_or_attr -from posthog.hogql.modifiers import create_default_modifiers_for_user logger = structlog.get_logger(__name__) @@ -90,6 +93,8 @@ class ExecutionMode(StrEnum): """Use cache for longer, kick off async calculation when results are missing or stale.""" CACHE_ONLY_NEVER_CALCULATE = "force_cache" """Do not initiate calculation.""" + RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS = "async_except_on_cache_miss" + """Use cache, kick off async calculation when results are stale, but block on cache miss.""" _REFRESH_TO_EXECUTION_MODE: dict[str | bool, ExecutionMode] = { @@ -397,6 +402,36 @@ def get_query_runner( limit_context=limit_context, modifiers=modifiers, ) + if kind == "TeamTaxonomyQuery": + from .ai.team_taxonomy_query_runner import TeamTaxonomyQueryRunner + + return TeamTaxonomyQueryRunner( + query=cast(TeamTaxonomyQuery | dict[str, Any], query), + team=team, + timings=timings, + limit_context=limit_context, + modifiers=modifiers, + ) + if kind == "EventTaxonomyQuery": + from .ai.event_taxonomy_query_runner import EventTaxonomyQueryRunner + + return EventTaxonomyQueryRunner( + query=cast(EventTaxonomyQuery | dict[str, Any], query), + team=team, + timings=timings, + limit_context=limit_context, + modifiers=modifiers, + ) + if kind == "ActorsPropertyTaxonomyQuery": + from .ai.actors_property_taxonomy_query_runner import ActorsPropertyTaxonomyQueryRunner + + return ActorsPropertyTaxonomyQueryRunner( + query=cast(ActorsPropertyTaxonomyQuery | dict[str, Any], query), + team=team, + timings=timings, + limit_context=limit_context, + modifiers=modifiers, + ) raise ValueError(f"Can't get a runner for an unknown query kind: {kind}") @@ -555,7 +590,10 @@ def handle_cache_and_async_logic( if execution_mode == ExecutionMode.CACHE_ONLY_NEVER_CALCULATE: cached_response.query_status = self.get_async_query_status(cache_key=cache_manager.cache_key) return cached_response - elif execution_mode == ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE: + elif execution_mode in ( + ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE, + ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS, + ): # We're allowed to calculate, but we'll do it asynchronously and attach the query status cached_response.query_status = self.enqueue_async_calculation( cache_manager=cache_manager, user=user, refresh_requested=True diff --git a/posthog/hogql_queries/test/test_query_runner.py b/posthog/hogql_queries/test/test_query_runner.py index 927930dad2da0..462515d5ffb88 100644 --- a/posthog/hogql_queries/test/test_query_runner.py +++ b/posthog/hogql_queries/test/test_query_runner.py @@ -3,6 +3,7 @@ from unittest import mock from zoneinfo import ZoneInfo +from django.core.cache import cache from freezegun import freeze_time from pydantic import BaseModel @@ -29,6 +30,10 @@ class TestQuery(BaseModel): class TestQueryRunner(BaseTest): maxDiff = None + def tearDown(self): + super().tearDown() + cache.clear() + def setup_test_query_runner_class(self): """Setup required methods and attributes of the abstract base class.""" @@ -206,6 +211,43 @@ def test_cache_response(self, mock_on_commit): self.assertEqual(response.is_cached, True) mock_on_commit.assert_called_once() + @mock.patch("django.db.transaction.on_commit") + def test_recent_cache_calculate_async_if_stale_and_blocking_on_miss(self, mock_on_commit): + TestQueryRunner = self.setup_test_query_runner_class() + + runner = TestQueryRunner(query={"some_attr": "bla"}, team=self.team) + + with freeze_time(datetime(2023, 2, 4, 13, 37, 42)): + # in cache-only mode, returns cache miss response if uncached + response = runner.run(execution_mode=ExecutionMode.CACHE_ONLY_NEVER_CALCULATE) + self.assertIsInstance(response, CacheMissResponse) + + response = runner.run( + execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS + ) + self.assertIsInstance(response, TestCachedBasicQueryResponse) + self.assertEqual(response.is_cached, False) + self.assertEqual(response.last_refresh.isoformat(), "2023-02-04T13:37:42+00:00") + self.assertEqual(response.next_allowed_client_refresh.isoformat(), "2023-02-04T13:41:42+00:00") + + # returns cached response afterwards + response = runner.run( + execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS + ) + self.assertIsInstance(response, TestCachedBasicQueryResponse) + self.assertEqual(response.is_cached, True) + + with freeze_time(datetime(2023, 2, 4, 13, 37 + 11, 42)): + # returns fresh response if stale + response = runner.run( + execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS + ) + self.assertIsInstance(response, TestCachedBasicQueryResponse) + # Should kick off the calculation in the background + self.assertEqual(response.is_cached, True) + self.assertEqual(response.last_refresh.isoformat(), "2023-02-04T13:37:42+00:00") + mock_on_commit.assert_called_once() + def test_modifier_passthrough(self): try: from ee.clickhouse.materialized_columns.analyze import materialize diff --git a/posthog/schema.py b/posthog/schema.py index 3c7f04dd893ee..b04a1bb6ebdf6 100644 --- a/posthog/schema.py +++ b/posthog/schema.py @@ -3832,6 +3832,69 @@ class QueryResponseAlternative36(BaseModel): types: Optional[list] = None +class QueryResponseAlternative39(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + error: Optional[str] = Field( + default=None, + description="Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.", + ) + hogql: Optional[str] = Field(default=None, description="Generated HogQL query.") + modifiers: Optional[HogQLQueryModifiers] = Field( + default=None, description="Modifiers used when performing the query" + ) + query_status: Optional[QueryStatus] = Field( + default=None, description="Query status indicates whether next to the provided data, a query is still running." + ) + results: list[TeamTaxonomyItem] + timings: Optional[list[QueryTiming]] = Field( + default=None, description="Measured timings for different parts of the query generation process" + ) + + +class QueryResponseAlternative40(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + error: Optional[str] = Field( + default=None, + description="Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.", + ) + hogql: Optional[str] = Field(default=None, description="Generated HogQL query.") + modifiers: Optional[HogQLQueryModifiers] = Field( + default=None, description="Modifiers used when performing the query" + ) + query_status: Optional[QueryStatus] = Field( + default=None, description="Query status indicates whether next to the provided data, a query is still running." + ) + results: list[EventTaxonomyItem] + timings: Optional[list[QueryTiming]] = Field( + default=None, description="Measured timings for different parts of the query generation process" + ) + + +class QueryResponseAlternative41(BaseModel): + model_config = ConfigDict( + extra="forbid", + ) + error: Optional[str] = Field( + default=None, + description="Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.", + ) + hogql: Optional[str] = Field(default=None, description="Generated HogQL query.") + modifiers: Optional[HogQLQueryModifiers] = Field( + default=None, description="Modifiers used when performing the query" + ) + query_status: Optional[QueryStatus] = Field( + default=None, description="Query status indicates whether next to the provided data, a query is still running." + ) + results: ActorsPropertyTaxonomyResponse + timings: Optional[list[QueryTiming]] = Field( + default=None, description="Measured timings for different parts of the query generation process" + ) + + class RetentionFilter(BaseModel): model_config = ConfigDict( extra="forbid", @@ -5907,6 +5970,9 @@ class QueryResponseAlternative( QueryResponseAlternative36, QueryResponseAlternative37, QueryResponseAlternative38, + QueryResponseAlternative39, + QueryResponseAlternative40, + QueryResponseAlternative41, ] ] ): @@ -5947,6 +6013,9 @@ class QueryResponseAlternative( QueryResponseAlternative36, QueryResponseAlternative37, QueryResponseAlternative38, + QueryResponseAlternative39, + QueryResponseAlternative40, + QueryResponseAlternative41, ] @@ -6440,6 +6509,9 @@ class QueryRequest(BaseModel): FunnelCorrelationQuery, DatabaseSchemaQuery, SuggestedQuestionsQuery, + TeamTaxonomyQuery, + EventTaxonomyQuery, + ActorsPropertyTaxonomyQuery, ] = Field( ..., description=( @@ -6505,6 +6577,9 @@ class QuerySchemaRoot( FunnelCorrelationQuery, DatabaseSchemaQuery, SuggestedQuestionsQuery, + TeamTaxonomyQuery, + EventTaxonomyQuery, + ActorsPropertyTaxonomyQuery, ] ] ): @@ -6544,6 +6619,9 @@ class QuerySchemaRoot( FunnelCorrelationQuery, DatabaseSchemaQuery, SuggestedQuestionsQuery, + TeamTaxonomyQuery, + EventTaxonomyQuery, + ActorsPropertyTaxonomyQuery, ] = Field(..., discriminator="kind")