Skip to content

Commit

Permalink
basic caching
Browse files Browse the repository at this point in the history
  • Loading branch information
thmsobrmlr committed Sep 17, 2023
1 parent 25315e2 commit c57a870
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 86 deletions.
14 changes: 9 additions & 5 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from posthog.queries.time_to_see_data.sessions import get_session_events, get_sessions
from posthog.rate_limit import AIBurstRateThrottle, AISustainedRateThrottle, TeamRateThrottle
from posthog.schema import EventsQuery, HogQLQuery, HogQLMetadata
from posthog.utils import refresh_requested_by_client


class QueryThrottle(TeamRateThrottle):
Expand Down Expand Up @@ -92,10 +93,10 @@ def get_throttles(self):
def list(self, request: Request, **kw) -> HttpResponse:
self._tag_client_query_id(request.GET.get("client_query_id"))
query_json = QuerySchemaParser.validate_query(self._query_json_from_request(request))

refresh_requested = refresh_requested_by_client(request)
# allow lists as well as dicts in response with safe=False
try:
return JsonResponse(process_query(self.team, query_json), safe=False)
return JsonResponse(process_query(self.team, query_json, refresh_requested=refresh_requested), safe=False)
except HogQLException as e:
raise ValidationError(str(e))
except ExposedCHQueryError as e:
Expand All @@ -105,9 +106,10 @@ def post(self, request, *args, **kwargs):
request_json = request.data
query_json = request_json.get("query")
self._tag_client_query_id(request_json.get("client_query_id"))
refresh_requested = refresh_requested_by_client(request)
# allow lists as well as dicts in response with safe=False
try:
return JsonResponse(process_query(self.team, query_json), safe=False)
return JsonResponse(process_query(self.team, query_json, refresh_requested=refresh_requested), safe=False)
except HogQLException as e:
raise ValidationError(str(e))
except ExposedCHQueryError as e:
Expand Down Expand Up @@ -196,7 +198,9 @@ def _unwrap_pydantic_dict(response: Any) -> Dict:
return cast(dict, _unwrap_pydantic(response))


def process_query(team: Team, query_json: Dict, default_limit: Optional[int] = None) -> Dict:
def process_query(
team: Team, query_json: Dict, default_limit: Optional[int] = None, refresh_requested: bool = False
) -> Dict:
# query_json has been parsed by QuerySchemaParser
# it _should_ be impossible to end up in here with a "bad" query
query_kind = query_json.get("kind")
Expand All @@ -223,7 +227,7 @@ def process_query(team: Team, query_json: Dict, default_limit: Optional[int] = N
return _unwrap_pydantic_dict(metadata_response)
elif query_kind == "LifecycleQuery":
lifecycle_query_runner = LifecycleQueryRunner(query_json, team)
return _unwrap_pydantic_dict(lifecycle_query_runner.run())
return _unwrap_pydantic_dict(lifecycle_query_runner.run_cached(refresh_requested=refresh_requested))
elif query_kind == "DatabaseSchemaQuery":
database = create_hogql_database(team.pk)
return serialize_database(database)
Expand Down
72 changes: 68 additions & 4 deletions posthog/caching/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
import datetime
from typing import List, Optional, Set, Tuple, Union
from datetime import datetime
from typing import Any, List, Optional, Set, Tuple, Union
from zoneinfo import ZoneInfo

from dateutil.parser import parser

import posthoganalytics


from posthog.client import sync_execute
from posthog.cloud_utils import is_cloud
from posthog.datetime import start_of_day, start_of_hour, start_of_month, start_of_week
from posthog.models.filters.filter import Filter
from posthog.models.filters.path_filter import PathFilter
from posthog.models.filters.retention_filter import RetentionFilter
from posthog.models.filters.stickiness_filter import StickinessFilter
from posthog.models.team.team import Team
from posthog.redis import get_client


RECENTLY_ACCESSED_TEAMS_REDIS_KEY = "INSIGHT_CACHE_UPDATE_RECENTLY_ACCESSED_TEAMS"

IN_A_DAY = 86_400


def ensure_is_date(candidate: Optional[Union[str, datetime.datetime]]) -> Optional[datetime.datetime]:
def ensure_is_date(candidate: Optional[Union[str, datetime]]) -> Optional[datetime]:
if candidate is None:
return None
if isinstance(candidate, datetime.datetime):
if isinstance(candidate, datetime):
return candidate
return parser().parse(candidate)

Expand Down Expand Up @@ -48,3 +60,55 @@ def active_teams() -> Set[int]:
all_teams = teams_by_recency

return set(int(team_id) for team_id, _ in all_teams)


def stale_cache_invalidation_disabled(team: Team) -> bool:
"""Can be disabled temporarly to help in cases of service degradation."""
if is_cloud(): # on PostHog Cloud, use the feature flag
return not posthoganalytics.feature_enabled(
"stale-cache-invalidation-enabled",
str(team.uuid),
groups={"organization": str(team.organization.id)},
group_properties={
"organization": {"id": str(team.organization.id), "created_at": team.organization.created_at}
},
only_evaluate_locally=True,
send_feature_flag_events=False,
)
else:
return False


def is_stale_filter(
team: Team, filter: Filter | RetentionFilter | StickinessFilter | PathFilter, cached_result: Any
) -> bool:
interval = filter.period.lower() if isinstance(filter, RetentionFilter) else filter.interval
return is_stale(team, filter.date_to, interval, cached_result)


def is_stale(team: Team, date_to: datetime, interval: str, cached_result: Any) -> bool:
"""Indicates wether a cache item is obviously outdated based on filters,
i.e. the next time interval was entered since the last computation. For
example an insight with -7d date range that was last computed yesterday.
The same insight refreshed today wouldn't be marked as stale.
"""

if stale_cache_invalidation_disabled(team):
return False

last_refresh = cached_result.get("last_refresh", None)
date_to = min([date_to, datetime.now(tz=ZoneInfo("UTC"))]) # can't be later than now

if last_refresh is None:
raise Exception("Cached results require a last_refresh")

if interval == "hour":
return start_of_hour(date_to) > start_of_hour(last_refresh)
elif interval == "day":
return start_of_day(date_to) > start_of_day(last_refresh)
elif interval == "week":
return start_of_week(date_to) > start_of_week(last_refresh)
elif interval == "month":
return start_of_month(date_to) > start_of_month(last_refresh)
else:
return False
59 changes: 2 additions & 57 deletions posthog/decorators.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
from datetime import datetime
from enum import Enum
from functools import wraps
from typing import Any, Callable, Dict, List, TypeVar, Union, cast
from zoneinfo import ZoneInfo

import posthoganalytics
from django.urls import resolve
from django.utils.timezone import now
from rest_framework.request import Request
from rest_framework.viewsets import GenericViewSet
from statshog.defaults.django import statsd
from posthog.caching.utils import is_stale_filter

from posthog.clickhouse.query_tagging import tag_queries
from posthog.cloud_utils import is_cloud
from posthog.datetime import start_of_day, start_of_hour, start_of_month, start_of_week
from posthog.models import User
from posthog.models.filters.filter import Filter
from posthog.models.filters.path_filter import PathFilter
from posthog.models.filters.retention_filter import RetentionFilter
from posthog.models.filters.stickiness_filter import StickinessFilter
from posthog.models.filters.utils import get_filter
from posthog.models.team.team import Team
from posthog.utils import refresh_requested_by_client

from .utils import generate_cache_key, get_safe_cache
Expand Down Expand Up @@ -84,7 +75,7 @@ def wrapper(self, request) -> T:
route = "unknown"

if cached_result_package and cached_result_package.get("result"):
if not is_stale(team, filter, cached_result_package):
if not is_stale_filter(team, filter, cached_result_package):
cached_result_package["is_cached"] = True
statsd.incr("posthog_cached_function_cache_hit", tags={"route": route})
return cached_result_package
Expand All @@ -106,49 +97,3 @@ def wrapper(self, request) -> T:
return fresh_result_package

return wrapper


def stale_cache_invalidation_disabled(team: Team) -> bool:
"""Can be disabled temporarly to help in cases of service degradation."""
if is_cloud(): # on PostHog Cloud, use the feature flag
return not posthoganalytics.feature_enabled(
"stale-cache-invalidation-enabled",
str(team.uuid),
groups={"organization": str(team.organization.id)},
group_properties={
"organization": {"id": str(team.organization.id), "created_at": team.organization.created_at}
},
only_evaluate_locally=True,
send_feature_flag_events=False,
)
else:
return False


def is_stale(team: Team, filter: Filter | RetentionFilter | StickinessFilter | PathFilter, cached_result: Any) -> bool:
"""Indicates wether a cache item is obviously outdated based on filters,
i.e. the next time interval was entered since the last computation. For
example an insight with -7d date range that was last computed yesterday.
The same insight refreshed today wouldn't be marked as stale.
"""

if stale_cache_invalidation_disabled(team):
return False

last_refresh = cached_result.get("last_refresh", None)
date_to = min([filter.date_to, datetime.now(tz=ZoneInfo("UTC"))]) # can't be later than now
interval = filter.period.lower() if isinstance(filter, RetentionFilter) else filter.interval

if last_refresh is None:
raise Exception("Cached results require a last_refresh")

if interval == "hour":
return start_of_hour(date_to) > start_of_hour(last_refresh)
elif interval == "day":
return start_of_day(date_to) > start_of_day(last_refresh)
elif interval == "week":
return start_of_week(date_to) > start_of_week(last_refresh)
elif interval == "month":
return start_of_month(date_to) > start_of_month(last_refresh)
else:
return False
6 changes: 6 additions & 0 deletions posthog/hogql_queries/lifecycle_query_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional, Any, Dict, List

from django.utils.timezone import datetime
from posthog.caching.utils import is_stale

from posthog.hogql import ast
from posthog.hogql.parser import parse_expr, parse_select
Expand Down Expand Up @@ -247,3 +248,8 @@ def periods_query(self):
timings=self.timings,
)
return periods_query

def is_stale(self, cached_result_package):
date_to = self.query_date_range.date_to()
interval = self.query_date_range.interval_name
return is_stale(self.team, date_to, interval, cached_result_package)
55 changes: 54 additions & 1 deletion posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
from abc import ABC, abstractmethod
from typing import Any, Optional, Type, Dict

from prometheus_client import Counter
from django.utils.timezone import now
from django.core.cache import cache
from django.conf import settings

from posthog.clickhouse.query_tagging import tag_queries
from posthog.hogql import ast
from posthog.hogql.context import HogQLContext
from posthog.hogql.printer import print_ast
from posthog.hogql.timings import HogQLTimings
from posthog.metrics import LABEL_TEAM_ID
from posthog.models import Team
from posthog.types import InsightQueryNode
from posthog.utils import generate_cache_key
from posthog.utils import generate_cache_key, get_safe_cache

QUERY_CACHE_WRITE_COUNTER = Counter(
"query_cache_write_total",
"When a query result was persisted in the cache.",
labelnames=[LABEL_TEAM_ID],
)

QUERY_CACHE_HIT_COUNTER = Counter(
"query_cache_hit_total",
"Whether we could fetch the query from the cache or not.",
labelnames=[LABEL_TEAM_ID, "cache_hit"],
)


class QueryRunner(ABC):
Expand All @@ -28,6 +47,36 @@ def __init__(self, query: InsightQueryNode | Dict[str, Any], team: Team, timings
def run(self) -> InsightQueryNode:
raise NotImplementedError()

@abstractmethod
def run_cached(self, refresh_requested: bool) -> InsightQueryNode:
cache_key = self.cache_key()
tag_queries(cache_key=cache_key)

if not refresh_requested:
cached_result_package = get_safe_cache(cache_key)

if cached_result_package and cached_result_package.get("result"):
if not self.is_stale(cached_result_package):
cached_result_package["is_cached"] = True
QUERY_CACHE_HIT_COUNTER.labels(team_id=self.team.pk, cache_hit=True).inc()
return cached_result_package
else:
QUERY_CACHE_HIT_COUNTER.labels(team_id=self.team.pk, cache_hit=False).inc()
else:
QUERY_CACHE_HIT_COUNTER.labels(team_id=self.team.pk, cache_hit=False).inc()

fresh_result_package = self.run()
if isinstance(fresh_result_package, dict):
result = fresh_result_package.get("result")
if not isinstance(result, dict) or not result.get("loading"):
timestamp = now()
fresh_result_package["last_refresh"] = timestamp
fresh_result_package["is_cached"] = False
cache.set(cache_key, result, settings.CACHED_RESULTS_TTL)
QUERY_CACHE_WRITE_COUNTER.inc()

return fresh_result_package

@abstractmethod
def to_query(self) -> ast.SelectQuery:
raise NotImplementedError()
Expand All @@ -54,3 +103,7 @@ def cache_key(self, cache_invalidation_key: Optional[str] = None):
payload += f"_{cache_invalidation_key}"

return generate_cache_key(payload)

@abstractmethod
def is_stale(self, cached_result_package):
raise NotImplementedError()
Loading

0 comments on commit c57a870

Please sign in to comment.