Skip to content

Commit

Permalink
fix(insights): Make async query execution use the same cache as sync (#…
Browse files Browse the repository at this point in the history
…22234)

* Add `test_insight_refreshing_query_async`

It's red.

* Allow `enqueue_process_query_task` short circuit if cache warm

* Make cache key consistent between sync and async execution

* Remove obsolete comment

* Fix test_execute_async.py

* Fix `execute_process_query` call

* Fix `enqueue_process_query_task` call

* Update expected cache keys

This is to be expected, because now `get_cache_key()` returns the _full_ cache key, i.e. including f"...._{self._limit_context_aliased_for_cache}_v2".

* Update mypy-baseline.txt

* Make `process_query_...` return actual model instances

This makes `CacheMissResponse` checks easy and obvious.

* Fix post-refactor issues

* Stabilize `test_create_two_similarly_named_organizations`

It looks like we're calling `random()` at some point now, which causes the test to see a different disambiguated org slug than before. But this test should not rely on that – `choice()` just needs to be mocked.
  • Loading branch information
Twixes authored May 21, 2024
1 parent b11ebf5 commit 64aff85
Show file tree
Hide file tree
Showing 21 changed files with 263 additions and 133 deletions.
8 changes: 3 additions & 5 deletions ee/api/test/test_organization.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import datetime as dt
import random
from unittest import mock
from unittest.mock import ANY, call, patch

Expand Down Expand Up @@ -29,9 +28,8 @@ def test_create_organization(self):
OrganizationMembership.Level.OWNER,
)

def test_create_two_similarly_named_organizations(self):
random.seed(0)

@patch("secrets.choice", return_value="Y")
def test_create_two_similarly_named_organizations(self, mock_choice):
response = self.client.post(
"/api/organizations/",
{"name": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"},
Expand All @@ -53,7 +51,7 @@ def test_create_two_similarly_named_organizations(self):
self.assertDictContainsSubset(
{
"name": "#XXxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxX",
"slug": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-yWAc",
"slug": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx-YYYY",
},
response.json(),
)
Expand Down
13 changes: 7 additions & 6 deletions ee/clickhouse/materialized_columns/test/test_columns.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random
from datetime import timedelta
from time import sleep
from unittest.mock import patch

from freezegun import freeze_time

Expand Down Expand Up @@ -71,19 +71,20 @@ def test_caching_and_materializing(self):
["$foo", "$bar", "abc", *EVENTS_TABLE_DEFAULT_MATERIALIZED_COLUMNS],
)

def test_materialized_column_naming(self):
random.seed(0)

@patch("secrets.choice", return_value="X")
def test_materialized_column_naming(self, mock_choice):
materialize("events", "$foO();--sqlinject", create_minmax_index=True)
mock_choice.return_value = "Y"
materialize("events", "$foO();ääsqlinject", create_minmax_index=True)
mock_choice.return_value = "Z"
materialize("events", "$foO_____sqlinject", create_minmax_index=True)
materialize("person", "SoMePrOp", create_minmax_index=True)

self.assertDictContainsSubset(
{
("$foO();--sqlinject", "properties"): "mat_$foO_____sqlinject",
("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_yWAc",
("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_qGFz",
("$foO();ääsqlinject", "properties"): "mat_$foO_____sqlinject_YYYY",
("$foO_____sqlinject", "properties"): "mat_$foO_____sqlinject_ZZZZ",
},
get_materialized_columns("events"),
)
Expand Down
7 changes: 6 additions & 1 deletion frontend/src/queries/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,14 @@ async function executeQuery<N extends DataNode>(

const response = await api.query(queryNode, methodOptions, queryId, refresh, isAsyncQuery)

if (!isAsyncQuery || !response.query_async) {
if (!response.query_async) {
// Executed query synchronously
return response
}
if (response.complete || response.error) {
// Async query returned immediately
return response.results
}

const pollStart = performance.now()
let currentDelay = 300 // start low, because all queries will take at minimum this
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -6495,7 +6495,9 @@
"type": "string"
},
"query_async": {
"const": true,
"default": true,
"description": "ONLY async queries use QueryStatus.",
"type": "boolean"
},
"query_progress": {
Expand Down
7 changes: 5 additions & 2 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,11 @@ export type ClickhouseQueryStatus = {

export type QueryStatus = {
id: string
/** @default true */
query_async: boolean
/**
* ONLY async queries use QueryStatus.
* @default true
*/
query_async: true
team_id: integer
/** @default false */
error: boolean
Expand Down
1 change: 0 additions & 1 deletion mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,6 @@ posthog/api/search.py:0: error: Argument "extra_fields" to "class_queryset" has
posthog/api/search.py:0: error: "type[Model]" has no attribute "objects" [attr-defined]
posthog/api/search.py:0: error: Argument 1 to "reduce" has incompatible type "Callable[[SearchVector, SearchVector], CombinedExpression]"; expected "Callable[[SearchVector, SearchVector], SearchVector]" [arg-type]
posthog/api/search.py:0: error: Incompatible return value type (got "CombinedExpression", expected "SearchVector") [return-value]
posthog/api/query.py:0: error: Argument "user_id" to "enqueue_process_query_task" has incompatible type "int | AutoField[Combinable | int | str | None, int] | Any"; expected "int" [arg-type]
posthog/api/property_definition.py:0: error: Item "AnonymousUser" of "User | AnonymousUser" has no attribute "organization" [union-attr]
posthog/api/property_definition.py:0: error: Item "None" of "Organization | Any | None" has no attribute "is_feature_available" [union-attr]
posthog/api/property_definition.py:0: error: Item "ForeignObjectRel" of "Field[Any, Any] | ForeignObjectRel | GenericForeignKey" has no attribute "cached_col" [union-attr]
Expand Down
8 changes: 6 additions & 2 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import re
from typing import cast
import uuid

from django.http import JsonResponse
from drf_spectacular.utils import OpenApiResponse
from pydantic import BaseModel
from posthog.hogql_queries.query_runner import ExecutionMode
from rest_framework import viewsets
from rest_framework.decorators import action
Expand Down Expand Up @@ -66,8 +68,8 @@ def create(self, request, *args, **kwargs) -> Response:

if data.async_:
query_status = enqueue_process_query_task(
team_id=self.team.pk,
user_id=self.request.user.pk,
team=self.team,
user=cast(User, self.request.user),
query_json=request.data["query"],
query_id=client_query_id,
refresh_requested=data.refresh or False,
Expand All @@ -83,6 +85,8 @@ def create(self, request, *args, **kwargs) -> Response:
if data.refresh
else ExecutionMode.RECENT_CACHE_CALCULATE_IF_STALE,
)
if isinstance(result, BaseModel):
result = result.model_dump()
return Response(result)
except (ExposedHogQLError, ExposedCHQueryError) as e:
raise ValidationError(str(e), getattr(e, "code_name", None))
Expand Down
8 changes: 3 additions & 5 deletions posthog/api/services/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
logger = structlog.get_logger(__name__)


def process_query(
def process_query_dict(
team: Team,
query_json: dict,
*,
dashboard_filters_json: Optional[dict] = None,
limit_context: Optional[LimitContext] = None,
execution_mode: ExecutionMode = ExecutionMode.RECENT_CACHE_CALCULATE_IF_STALE,
) -> dict:
) -> dict | BaseModel:
model = QuerySchemaRoot.model_validate(query_json)
tag_queries(query=query_json)
dashboard_filters = DashboardFilter.model_validate(dashboard_filters_json) if dashboard_filters_json else None
Expand All @@ -55,7 +55,7 @@ def process_query_model(
dashboard_filters: Optional[DashboardFilter] = None,
limit_context: Optional[LimitContext] = None,
execution_mode: ExecutionMode = ExecutionMode.RECENT_CACHE_CALCULATE_IF_STALE,
) -> dict:
) -> dict | BaseModel:
result: dict | BaseModel

try:
Expand Down Expand Up @@ -98,6 +98,4 @@ def process_query_model(
query_runner.apply_dashboard_filters(dashboard_filters)
result = query_runner.run(execution_mode=execution_mode)

if isinstance(result, BaseModel):
return result.model_dump()
return result
86 changes: 86 additions & 0 deletions posthog/api/test/test_insight.py
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,92 @@ def test_insight_refreshing_query_with_background_update(
self.assertEqual(response["last_modified_at"], "2012-01-15T04:01:34Z")
self.assertTrue(response["is_cached"])

@parameterized.expand(
[
[ # Property group filter, which is what's actually used these days
PropertyGroupFilter(
type=FilterLogicalOperator.AND,
values=[
PropertyGroupFilterValue(
type=FilterLogicalOperator.OR,
values=[EventPropertyFilter(key="another", value="never_return_this", operator="is_not")],
)
],
)
],
[ # Classic list of filters
[EventPropertyFilter(key="another", value="never_return_this", operator="is_not")]
],
]
)
@patch("posthog.hogql_queries.insights.trends.trends_query_runner.execute_hogql_query", wraps=execute_hogql_query)
def test_insight_refreshing_query_async(self, properties_filter, spy_execute_hogql_query) -> None:
dashboard_id, _ = self.dashboard_api.create_dashboard({"filters": {"date_from": "-14d"}})

with freeze_time("2012-01-14T03:21:34.000Z"):
_create_event(
team=self.team,
event="$pageview",
distinct_id="1",
properties={"prop": "val"},
)
_create_event(
team=self.team,
event="$pageview",
distinct_id="2",
properties={"prop": "another_val"},
)
_create_event(
team=self.team,
event="$pageview",
distinct_id="2",
properties={"prop": "val", "another": "never_return_this"},
)

query_dict = TrendsQuery(
series=[
EventsNode(
event="$pageview",
)
],
properties=properties_filter,
).model_dump()

with freeze_time("2012-01-15T04:01:34.000Z"):
response = self.client.post(
f"/api/projects/{self.team.id}/insights",
data={
"query": query_dict,
"dashboards": [dashboard_id],
},
).json()
self.assertNotIn("code", response) # Watching out for an error code
self.assertEqual(response["last_refresh"], None)
insight_id = response["id"]

response = self.client.get(f"/api/projects/{self.team.id}/insights/{insight_id}/?refresh=true").json()
self.assertNotIn("code", response)
self.assertEqual(spy_execute_hogql_query.call_count, 1)
self.assertEqual(response["result"][0]["data"], [0, 0, 0, 0, 0, 0, 2, 0])
self.assertEqual(response["last_refresh"], "2012-01-15T04:01:34Z")
self.assertEqual(response["last_modified_at"], "2012-01-15T04:01:34Z")
self.assertFalse(response["is_cached"])

with freeze_time("2012-01-15T05:17:39.000Z"):
# Make sure the /query/ endpoint reuses the same cached result - ASYNC EXECUTION HERE!
response = self.client.post(
f"/api/projects/{self.team.id}/query/", {"query": query_dict, "async": True}
).json()
self.assertNotIn("code", response)
self.assertTrue(response.get("query_async"))
self.assertTrue(response.get("complete"))
results = response["results"]

self.assertEqual(spy_execute_hogql_query.call_count, 1)
self.assertEqual(results["results"][0]["data"], [0, 0, 0, 0, 0, 0, 2, 0])
self.assertEqual(results["last_refresh"], "2012-01-15T04:01:34Z") # Using cached result
self.assertTrue(results["is_cached"])

def test_dashboard_filters_applied_to_sql_data_table_node(self):
dashboard_id, _ = self.dashboard_api.create_dashboard(
{"name": "the dashboard", "filters": {"date_from": "-180d"}}
Expand Down
Loading

0 comments on commit 64aff85

Please sign in to comment.