Skip to content

Commit

Permalink
feat(insights): add async_except_on_cache_miss for the query runner (
Browse files Browse the repository at this point in the history
…#25760)

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
skoob13 and github-actions[bot] authored Oct 25, 2024
1 parent 46d706f commit bfd3b50
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 22 deletions.
108 changes: 108 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -9998,6 +9998,105 @@
},
"required": ["questions"],
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"error": {
"description": "Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.",
"type": "string"
},
"hogql": {
"description": "Generated HogQL query.",
"type": "string"
},
"modifiers": {
"$ref": "#/definitions/HogQLQueryModifiers",
"description": "Modifiers used when performing the query"
},
"query_status": {
"$ref": "#/definitions/QueryStatus",
"description": "Query status indicates whether next to the provided data, a query is still running."
},
"results": {
"$ref": "#/definitions/TeamTaxonomyResponse"
},
"timings": {
"description": "Measured timings for different parts of the query generation process",
"items": {
"$ref": "#/definitions/QueryTiming"
},
"type": "array"
}
},
"required": ["results"],
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"error": {
"description": "Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.",
"type": "string"
},
"hogql": {
"description": "Generated HogQL query.",
"type": "string"
},
"modifiers": {
"$ref": "#/definitions/HogQLQueryModifiers",
"description": "Modifiers used when performing the query"
},
"query_status": {
"$ref": "#/definitions/QueryStatus",
"description": "Query status indicates whether next to the provided data, a query is still running."
},
"results": {
"$ref": "#/definitions/EventTaxonomyResponse"
},
"timings": {
"description": "Measured timings for different parts of the query generation process",
"items": {
"$ref": "#/definitions/QueryTiming"
},
"type": "array"
}
},
"required": ["results"],
"type": "object"
},
{
"additionalProperties": false,
"properties": {
"error": {
"description": "Query error. Returned only if 'explain' or `modifiers.debug` is true. Throws an error otherwise.",
"type": "string"
},
"hogql": {
"description": "Generated HogQL query.",
"type": "string"
},
"modifiers": {
"$ref": "#/definitions/HogQLQueryModifiers",
"description": "Modifiers used when performing the query"
},
"query_status": {
"$ref": "#/definitions/QueryStatus",
"description": "Query status indicates whether next to the provided data, a query is still running."
},
"results": {
"$ref": "#/definitions/ActorsPropertyTaxonomyResponse"
},
"timings": {
"description": "Measured timings for different parts of the query generation process",
"items": {
"$ref": "#/definitions/QueryTiming"
},
"type": "array"
}
},
"required": ["results"],
"type": "object"
}
]
},
Expand Down Expand Up @@ -10110,6 +10209,15 @@
},
{
"$ref": "#/definitions/SuggestedQuestionsQuery"
},
{
"$ref": "#/definitions/TeamTaxonomyQuery"
},
{
"$ref": "#/definitions/EventTaxonomyQuery"
},
{
"$ref": "#/definitions/ActorsPropertyTaxonomyQuery"
}
],
"required": ["kind"],
Expand Down
3 changes: 3 additions & 0 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ export type QuerySchema =

// AI
| SuggestedQuestionsQuery
| TeamTaxonomyQuery
| EventTaxonomyQuery
| ActorsPropertyTaxonomyQuery

// Keep this, because QuerySchema itself will be collapsed as it is used in other models
export type QuerySchemaRoot = QuerySchema
Expand Down
16 changes: 7 additions & 9 deletions posthog/clickhouse/client/execute_async.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import datetime

import orjson as json
from typing import TYPE_CHECKING, Optional
import uuid
from typing import TYPE_CHECKING, Optional

from pydantic import BaseModel
import orjson as json
import sentry_sdk
import structlog
from prometheus_client import Histogram
from rest_framework.exceptions import NotFound, APIException
from pydantic import BaseModel
from rest_framework.exceptions import APIException, NotFound

from posthog import celery, redis
from posthog.clickhouse.client.async_task_chain import add_task_to_on_commit
from posthog.clickhouse.query_tagging import tag_queries
from posthog.errors import ExposedCHQueryError, CHQueryErrorTooManySimultaneousQueries
from posthog.errors import CHQueryErrorTooManySimultaneousQueries, ExposedCHQueryError
from posthog.hogql.constants import LimitContext
from posthog.hogql.errors import ExposedHogQLError
from posthog.renderers import SafeJSONRenderer
from posthog.schema import QueryStatus
from posthog.schema import ClickhouseQueryProgress
from posthog.schema import ClickhouseQueryProgress, QueryStatus
from posthog.tasks.tasks import process_query_task

if TYPE_CHECKING:
Expand Down Expand Up @@ -143,7 +141,7 @@ def execute_process_query(
):
manager = QueryStatusManager(query_id, team_id)

from posthog.api.services.query import process_query_dict, ExecutionMode
from posthog.api.services.query import ExecutionMode, process_query_dict
from posthog.models import Team
from posthog.models.user import User

Expand Down
64 changes: 51 additions & 13 deletions posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, UTC
from datetime import UTC, datetime, timedelta
from enum import StrEnum
from typing import Any, Generic, Optional, TypeVar, Union, cast, TypeGuard
from typing import Any, Generic, Optional, TypeGuard, TypeVar, Union, cast

import structlog
from prometheus_client import Counter
from pydantic import BaseModel, ConfigDict
from sentry_sdk import capture_exception, push_scope, set_tag, get_traceparent
from sentry_sdk import capture_exception, get_traceparent, push_scope, set_tag

from posthog.caching.utils import is_stale, ThresholdMode, cache_target_age, last_refresh_from_cached_result
from posthog.clickhouse.client.execute_async import enqueue_process_query_task, get_query_status, QueryNotFoundError
from posthog.clickhouse.query_tagging import tag_queries, get_query_tag_value
from posthog.caching.utils import ThresholdMode, cache_target_age, is_stale, last_refresh_from_cached_result
from posthog.clickhouse.client.execute_async import QueryNotFoundError, enqueue_process_query_task, get_query_status
from posthog.clickhouse.query_tagging import get_query_tag_value, tag_queries
from posthog.hogql import ast
from posthog.hogql.constants import LimitContext
from posthog.hogql.context import HogQLContext
from posthog.hogql.modifiers import create_default_modifiers_for_user
from posthog.hogql.printer import print_ast
from posthog.hogql.query import create_default_modifiers_for_team
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.query_cache import QueryCacheManager
from posthog.metrics import LABEL_TEAM_ID
from posthog.models import Team, User
from posthog.schema import (
ActorsPropertyTaxonomyQuery,
ActorsQuery,
CacheMissResponse,
DashboardFilter,
DateRange,
EventsQuery,
EventTaxonomyQuery,
FilterLogicalOperator,
FunnelCorrelationActorsQuery,
FunnelCorrelationQuery,
FunnelsActorsQuery,
FunnelsQuery,
GenericCachedQueryResponse,
HogQLQuery,
HogQLQueryModifiers,
HogQLVariable,
Expand All @@ -40,25 +44,24 @@
PathsQuery,
PropertyGroupFilter,
PropertyGroupFilterValue,
QueryStatus,
QueryStatusResponse,
QueryTiming,
RetentionQuery,
SamplingRate,
SessionAttributionExplorerQuery,
SessionsTimelineQuery,
StickinessQuery,
SuggestedQuestionsQuery,
TeamTaxonomyQuery,
TrendsQuery,
WebGoalsQuery,
WebOverviewQuery,
WebStatsTableQuery,
WebTopClicksQuery,
QueryStatusResponse,
GenericCachedQueryResponse,
QueryStatus,
SessionAttributionExplorerQuery,
WebGoalsQuery,
)
from posthog.schema_helpers import to_dict, to_json
from posthog.utils import generate_cache_key, get_from_dict_or_attr
from posthog.hogql.modifiers import create_default_modifiers_for_user

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -90,6 +93,8 @@ class ExecutionMode(StrEnum):
"""Use cache for longer, kick off async calculation when results are missing or stale."""
CACHE_ONLY_NEVER_CALCULATE = "force_cache"
"""Do not initiate calculation."""
RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS = "async_except_on_cache_miss"
"""Use cache, kick off async calculation when results are stale, but block on cache miss."""


_REFRESH_TO_EXECUTION_MODE: dict[str | bool, ExecutionMode] = {
Expand Down Expand Up @@ -397,6 +402,36 @@ def get_query_runner(
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "TeamTaxonomyQuery":
from .ai.team_taxonomy_query_runner import TeamTaxonomyQueryRunner

return TeamTaxonomyQueryRunner(
query=cast(TeamTaxonomyQuery | dict[str, Any], query),
team=team,
timings=timings,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "EventTaxonomyQuery":
from .ai.event_taxonomy_query_runner import EventTaxonomyQueryRunner

return EventTaxonomyQueryRunner(
query=cast(EventTaxonomyQuery | dict[str, Any], query),
team=team,
timings=timings,
limit_context=limit_context,
modifiers=modifiers,
)
if kind == "ActorsPropertyTaxonomyQuery":
from .ai.actors_property_taxonomy_query_runner import ActorsPropertyTaxonomyQueryRunner

return ActorsPropertyTaxonomyQueryRunner(
query=cast(ActorsPropertyTaxonomyQuery | dict[str, Any], query),
team=team,
timings=timings,
limit_context=limit_context,
modifiers=modifiers,
)

raise ValueError(f"Can't get a runner for an unknown query kind: {kind}")

Expand Down Expand Up @@ -555,7 +590,10 @@ def handle_cache_and_async_logic(
if execution_mode == ExecutionMode.CACHE_ONLY_NEVER_CALCULATE:
cached_response.query_status = self.get_async_query_status(cache_key=cache_manager.cache_key)
return cached_response
elif execution_mode == ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE:
elif execution_mode in (
ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE,
ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS,
):
# We're allowed to calculate, but we'll do it asynchronously and attach the query status
cached_response.query_status = self.enqueue_async_calculation(
cache_manager=cache_manager, user=user, refresh_requested=True
Expand Down
42 changes: 42 additions & 0 deletions posthog/hogql_queries/test/test_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest import mock
from zoneinfo import ZoneInfo

from django.core.cache import cache
from freezegun import freeze_time
from pydantic import BaseModel

Expand All @@ -29,6 +30,10 @@ class TestQuery(BaseModel):
class TestQueryRunner(BaseTest):
maxDiff = None

def tearDown(self):
super().tearDown()
cache.clear()

def setup_test_query_runner_class(self):
"""Setup required methods and attributes of the abstract base class."""

Expand Down Expand Up @@ -206,6 +211,43 @@ def test_cache_response(self, mock_on_commit):
self.assertEqual(response.is_cached, True)
mock_on_commit.assert_called_once()

@mock.patch("django.db.transaction.on_commit")
def test_recent_cache_calculate_async_if_stale_and_blocking_on_miss(self, mock_on_commit):
TestQueryRunner = self.setup_test_query_runner_class()

runner = TestQueryRunner(query={"some_attr": "bla"}, team=self.team)

with freeze_time(datetime(2023, 2, 4, 13, 37, 42)):
# in cache-only mode, returns cache miss response if uncached
response = runner.run(execution_mode=ExecutionMode.CACHE_ONLY_NEVER_CALCULATE)
self.assertIsInstance(response, CacheMissResponse)

response = runner.run(
execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS
)
self.assertIsInstance(response, TestCachedBasicQueryResponse)
self.assertEqual(response.is_cached, False)
self.assertEqual(response.last_refresh.isoformat(), "2023-02-04T13:37:42+00:00")
self.assertEqual(response.next_allowed_client_refresh.isoformat(), "2023-02-04T13:41:42+00:00")

# returns cached response afterwards
response = runner.run(
execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS
)
self.assertIsInstance(response, TestCachedBasicQueryResponse)
self.assertEqual(response.is_cached, True)

with freeze_time(datetime(2023, 2, 4, 13, 37 + 11, 42)):
# returns fresh response if stale
response = runner.run(
execution_mode=ExecutionMode.RECENT_CACHE_CALCULATE_ASYNC_IF_STALE_AND_BLOCKING_ON_MISS
)
self.assertIsInstance(response, TestCachedBasicQueryResponse)
# Should kick off the calculation in the background
self.assertEqual(response.is_cached, True)
self.assertEqual(response.last_refresh.isoformat(), "2023-02-04T13:37:42+00:00")
mock_on_commit.assert_called_once()

def test_modifier_passthrough(self):
try:
from ee.clickhouse.materialized_columns.analyze import materialize
Expand Down
Loading

0 comments on commit bfd3b50

Please sign in to comment.