Skip to content

Commit

Permalink
feat(hogql): implement basic caching for hogql insight queries (#17483)
Browse files Browse the repository at this point in the history
  • Loading branch information
thmsobrmlr authored Sep 19, 2023
1 parent 6668f74 commit c61a9d0
Show file tree
Hide file tree
Showing 19 changed files with 363 additions and 110 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 5 additions & 2 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1583,15 +1583,18 @@ const api = {
async query<T extends Record<string, any> = QuerySchema>(
query: T,
options?: ApiMethodOptions,
queryId?: string
queryId?: string,
refresh?: boolean
): Promise<
T extends { [response: string]: any }
? T['response'] extends infer P | undefined
? P
: T['response']
: Record<string, any>
> {
return await new ApiRequest().query().create({ ...options, data: { query, client_query_id: queryId } })
return await new ApiRequest()
.query()
.create({ ...options, data: { query, client_query_id: queryId, refresh: refresh } })
},

/** Fetch data from specified URL. The result already is JSON-parsed. */
Expand Down
11 changes: 9 additions & 2 deletions frontend/src/queries/nodes/DataNode/dataNodeLogic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import { loaders } from 'kea-loaders'
import type { dataNodeLogicType } from './dataNodeLogicType'
import { AnyResponseType, DataNode, EventsQuery, EventsQueryResponse, PersonsNode, QueryTiming } from '~/queries/schema'
import { query } from '~/queries/query'
import { isInsightQueryNode, isEventsQuery, isPersonsNode } from '~/queries/utils'
import { isInsightQueryNode, isEventsQuery, isPersonsNode, isQueryWithHogQLSupport } from '~/queries/utils'
import { subscriptions } from 'kea-subscriptions'
import { objectsEqual, shouldCancelQuery, uuid } from 'lib/utils'
import clsx from 'clsx'
Expand All @@ -29,6 +29,8 @@ import { teamLogic } from 'scenes/teamLogic'
import equal from 'fast-deep-equal'
import { filtersToQueryNode } from '../InsightQuery/utils/filtersToQueryNode'
import { compareInsightQuery } from 'scenes/insights/utils/compareInsightQuery'
import { featureFlagLogic } from 'lib/logic/featureFlagLogic'
import { FEATURE_FLAGS } from 'lib/constants'

export interface DataNodeLogicProps {
key: string
Expand All @@ -52,7 +54,7 @@ const queryEqual = (a: DataNode, b: DataNode): boolean => {
export const dataNodeLogic = kea<dataNodeLogicType>([
path(['queries', 'nodes', 'dataNodeLogic']),
connect({
values: [userLogic, ['user'], teamLogic, ['currentTeamId']],
values: [userLogic, ['user'], teamLogic, ['currentTeamId'], featureFlagLogic, ['featureFlags']],
}),
props({ query: {} } as DataNodeLogicProps),
key((props) => props.key),
Expand Down Expand Up @@ -96,6 +98,7 @@ export const dataNodeLogic = kea<dataNodeLogicType>([
}
if (
isInsightQueryNode(props.query) &&
!(values.hogQLInsightsFlagEnabled && isQueryWithHogQLSupport(props.query)) &&
props.cachedResults &&
props.cachedResults['id'] &&
props.cachedResults['filters'] &&
Expand Down Expand Up @@ -309,6 +312,10 @@ export const dataNodeLogic = kea<dataNodeLogicType>([
() => [(_, props) => props.cachedResults ?? null],
(cachedResults: AnyResponseType | null): boolean => !!cachedResults,
],
hogQLInsightsFlagEnabled: [
(s) => [s.featureFlags],
(featureFlags) => featureFlags[FEATURE_FLAGS.HOGQL_INSIGHTS],
],
query: [(_, p) => [p.query], (query) => query],
newQuery: [
(s, p) => [p.query, s.response],
Expand Down
6 changes: 3 additions & 3 deletions frontend/src/queries/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
isTimeToSeeDataSessionsNode,
isHogQLQuery,
isInsightVizNode,
isLifecycleQuery,
isQueryWithHogQLSupport,
} from './utils'
import api, { ApiMethodOptions } from 'lib/api'
import { getCurrentTeamId } from 'lib/utils/logics'
Expand Down Expand Up @@ -114,7 +114,7 @@ export async function query<N extends DataNode = DataNode>(
try {
if (isPersonsNode(queryNode)) {
response = await api.get(getPersonsEndpoint(queryNode), methodOptions)
} else if (isInsightQueryNode(queryNode) && !(hogQLInsightsFlagEnabled && isLifecycleQuery(queryNode))) {
} else if (isInsightQueryNode(queryNode) && !(hogQLInsightsFlagEnabled && isQueryWithHogQLSupport(queryNode))) {
const filters = queryNodeToFilter(queryNode)
const params = {
...filters,
Expand All @@ -141,7 +141,7 @@ export async function query<N extends DataNode = DataNode>(
methodOptions
)
} else {
response = await api.query(queryNode, methodOptions, queryId)
response = await api.query(queryNode, methodOptions, queryId, refresh)
if (isHogQLQuery(queryNode) && response && typeof response === 'object') {
logParams.clickhouse_sql = (response as HogQLQueryResponse)?.clickhouse
}
Expand Down
9 changes: 9 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,15 @@
"LifecycleQueryResponse": {
"additionalProperties": false,
"properties": {
"is_cached": {
"type": "boolean"
},
"last_refresh": {
"type": "string"
},
"next_allowed_client_refresh": {
"type": "string"
},
"result": {
"items": {
"type": "object"
Expand Down
11 changes: 9 additions & 2 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,16 @@ export type LifecycleFilter = Omit<LifecycleFilterType, keyof FilterType> & {
toggledLifecycles?: LifecycleToggle[]
} // using everything except what it inherits from FilterType

export interface LifecycleQueryResponse {
result: Record<string, any>[]
export interface QueryResponse {
result: unknown
timings?: QueryTiming[]
is_cached?: boolean
last_refresh?: string
next_allowed_client_refresh?: string
}

export interface LifecycleQueryResponse extends QueryResponse {
result: Record<string, any>[]
}

export interface LifecycleQuery extends InsightsQueryBase {
Expand Down
4 changes: 4 additions & 0 deletions frontend/src/queries/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ export function isLifecycleQuery(node?: Node | null): node is LifecycleQuery {
return node?.kind === NodeKind.LifecycleQuery
}

export function isQueryWithHogQLSupport(node?: Node | null): node is LifecycleQuery {
return isLifecycleQuery(node)
}

export function isInsightQueryWithDisplay(node?: Node | null): node is TrendsQuery | StickinessQuery {
return isTrendsQuery(node) || isStickinessQuery(node)
}
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
13 changes: 8 additions & 5 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from posthog.queries.time_to_see_data.sessions import get_session_events, get_sessions
from posthog.rate_limit import AIBurstRateThrottle, AISustainedRateThrottle, TeamRateThrottle
from posthog.schema import EventsQuery, HogQLQuery, HogQLMetadata
from posthog.utils import refresh_requested_by_client


class QueryThrottle(TeamRateThrottle):
Expand Down Expand Up @@ -92,10 +93,9 @@ def get_throttles(self):
def list(self, request: Request, **kw) -> HttpResponse:
self._tag_client_query_id(request.GET.get("client_query_id"))
query_json = QuerySchemaParser.validate_query(self._query_json_from_request(request))

# allow lists as well as dicts in response with safe=False
try:
return JsonResponse(process_query(self.team, query_json), safe=False)
return JsonResponse(process_query(self.team, query_json, request=request), safe=False)
except HogQLException as e:
raise ValidationError(str(e))
except ExposedCHQueryError as e:
Expand All @@ -107,7 +107,7 @@ def post(self, request, *args, **kwargs):
self._tag_client_query_id(request_json.get("client_query_id"))
# allow lists as well as dicts in response with safe=False
try:
return JsonResponse(process_query(self.team, query_json), safe=False)
return JsonResponse(process_query(self.team, query_json, request=request), safe=False)
except HogQLException as e:
raise ValidationError(str(e))
except ExposedCHQueryError as e:
Expand Down Expand Up @@ -196,7 +196,9 @@ def _unwrap_pydantic_dict(response: Any) -> Dict:
return cast(dict, _unwrap_pydantic(response))


def process_query(team: Team, query_json: Dict, default_limit: Optional[int] = None) -> Dict:
def process_query(
team: Team, query_json: Dict, default_limit: Optional[int] = None, request: Optional[Request] = None
) -> Dict:
# query_json has been parsed by QuerySchemaParser
# it _should_ be impossible to end up in here with a "bad" query
query_kind = query_json.get("kind")
Expand All @@ -222,8 +224,9 @@ def process_query(team: Team, query_json: Dict, default_limit: Optional[int] = N
metadata_response = get_hogql_metadata(query=metadata_query, team=team)
return _unwrap_pydantic_dict(metadata_response)
elif query_kind == "LifecycleQuery":
refresh_requested = refresh_requested_by_client(request) if request else False
lifecycle_query_runner = LifecycleQueryRunner(query_json, team)
return _unwrap_pydantic_dict(lifecycle_query_runner.run())
return _unwrap_pydantic_dict(lifecycle_query_runner.run(refresh_requested=refresh_requested))
elif query_kind == "DatabaseSchemaQuery":
database = create_hogql_database(team.pk)
return serialize_database(database)
Expand Down
78 changes: 74 additions & 4 deletions posthog/caching/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
import datetime
from typing import List, Optional, Set, Tuple, Union
from datetime import datetime
from dateutil.parser import isoparse
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from zoneinfo import ZoneInfo

from dateutil.parser import parser

import posthoganalytics


from posthog.client import sync_execute
from posthog.cloud_utils import is_cloud
from posthog.datetime import start_of_day, start_of_hour, start_of_month, start_of_week
from posthog.models.filters.filter import Filter
from posthog.models.filters.path_filter import PathFilter
from posthog.models.filters.retention_filter import RetentionFilter
from posthog.models.filters.stickiness_filter import StickinessFilter
from posthog.models.team.team import Team
from posthog.redis import get_client


RECENTLY_ACCESSED_TEAMS_REDIS_KEY = "INSIGHT_CACHE_UPDATE_RECENTLY_ACCESSED_TEAMS"

IN_A_DAY = 86_400


def ensure_is_date(candidate: Optional[Union[str, datetime.datetime]]) -> Optional[datetime.datetime]:
def ensure_is_date(candidate: Optional[Union[str, datetime]]) -> Optional[datetime]:
if candidate is None:
return None
if isinstance(candidate, datetime.datetime):
if isinstance(candidate, datetime):
return candidate
return parser().parse(candidate)

Expand Down Expand Up @@ -48,3 +61,60 @@ def active_teams() -> Set[int]:
all_teams = teams_by_recency

return set(int(team_id) for team_id, _ in all_teams)


def stale_cache_invalidation_disabled(team: Team) -> bool:
"""Can be disabled temporarly to help in cases of service degradation."""
if is_cloud(): # on PostHog Cloud, use the feature flag
return not posthoganalytics.feature_enabled(
"stale-cache-invalidation-enabled",
str(team.uuid),
groups={"organization": str(team.organization.id)},
group_properties={
"organization": {"id": str(team.organization.id), "created_at": team.organization.created_at}
},
only_evaluate_locally=True,
send_feature_flag_events=False,
)
else:
return False


def is_stale_filter(
team: Team, filter: Filter | RetentionFilter | StickinessFilter | PathFilter, cached_result: Any
) -> bool:
interval = filter.period.lower() if isinstance(filter, RetentionFilter) else filter.interval
return is_stale(team, filter.date_to, interval, cached_result)


def is_stale(team: Team, date_to: datetime, interval: str, cached_result: Any) -> bool:
"""Indicates wether a cache item is obviously outdated based on the last
requested date (date_to) and the granularity of the query (interval).
It is considered outdated when the next time interval was entered since the
last computation.
"""

if stale_cache_invalidation_disabled(team):
return False

last_refresh = (
cached_result.get("last_refresh", None) if isinstance(cached_result, Dict) else cached_result.last_refresh
)
date_to = min([date_to, datetime.now(tz=ZoneInfo("UTC"))]) # can't be later than now

if last_refresh is None:
raise Exception("Cached results require a last_refresh")

if isinstance(last_refresh, str):
last_refresh = isoparse(last_refresh)

if interval == "hour":
return start_of_hour(date_to) > start_of_hour(last_refresh)
elif interval == "day":
return start_of_day(date_to) > start_of_day(last_refresh)
elif interval == "week":
return start_of_week(date_to) > start_of_week(last_refresh)
elif interval == "month":
return start_of_month(date_to) > start_of_month(last_refresh)
else:
return False
59 changes: 2 additions & 57 deletions posthog/decorators.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
from datetime import datetime
from enum import Enum
from functools import wraps
from typing import Any, Callable, Dict, List, TypeVar, Union, cast
from zoneinfo import ZoneInfo

import posthoganalytics
from django.urls import resolve
from django.utils.timezone import now
from rest_framework.request import Request
from rest_framework.viewsets import GenericViewSet
from statshog.defaults.django import statsd
from posthog.caching.utils import is_stale_filter

from posthog.clickhouse.query_tagging import tag_queries
from posthog.cloud_utils import is_cloud
from posthog.datetime import start_of_day, start_of_hour, start_of_month, start_of_week
from posthog.models import User
from posthog.models.filters.filter import Filter
from posthog.models.filters.path_filter import PathFilter
from posthog.models.filters.retention_filter import RetentionFilter
from posthog.models.filters.stickiness_filter import StickinessFilter
from posthog.models.filters.utils import get_filter
from posthog.models.team.team import Team
from posthog.utils import refresh_requested_by_client

from .utils import generate_cache_key, get_safe_cache
Expand Down Expand Up @@ -84,7 +75,7 @@ def wrapper(self, request) -> T:
route = "unknown"

if cached_result_package and cached_result_package.get("result"):
if not is_stale(team, filter, cached_result_package):
if not is_stale_filter(team, filter, cached_result_package):
cached_result_package["is_cached"] = True
statsd.incr("posthog_cached_function_cache_hit", tags={"route": route})
return cached_result_package
Expand All @@ -106,49 +97,3 @@ def wrapper(self, request) -> T:
return fresh_result_package

return wrapper


def stale_cache_invalidation_disabled(team: Team) -> bool:
"""Can be disabled temporarly to help in cases of service degradation."""
if is_cloud(): # on PostHog Cloud, use the feature flag
return not posthoganalytics.feature_enabled(
"stale-cache-invalidation-enabled",
str(team.uuid),
groups={"organization": str(team.organization.id)},
group_properties={
"organization": {"id": str(team.organization.id), "created_at": team.organization.created_at}
},
only_evaluate_locally=True,
send_feature_flag_events=False,
)
else:
return False


def is_stale(team: Team, filter: Filter | RetentionFilter | StickinessFilter | PathFilter, cached_result: Any) -> bool:
"""Indicates wether a cache item is obviously outdated based on filters,
i.e. the next time interval was entered since the last computation. For
example an insight with -7d date range that was last computed yesterday.
The same insight refreshed today wouldn't be marked as stale.
"""

if stale_cache_invalidation_disabled(team):
return False

last_refresh = cached_result.get("last_refresh", None)
date_to = min([filter.date_to, datetime.now(tz=ZoneInfo("UTC"))]) # can't be later than now
interval = filter.period.lower() if isinstance(filter, RetentionFilter) else filter.interval

if last_refresh is None:
raise Exception("Cached results require a last_refresh")

if interval == "hour":
return start_of_hour(date_to) > start_of_hour(last_refresh)
elif interval == "day":
return start_of_day(date_to) > start_of_day(last_refresh)
elif interval == "week":
return start_of_week(date_to) > start_of_week(last_refresh)
elif interval == "month":
return start_of_month(date_to) > start_of_month(last_refresh)
else:
return False
2 changes: 1 addition & 1 deletion posthog/hogql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


def execute_hogql_query(
query: Union[str, ast.SelectQuery],
query: Union[str, ast.SelectQuery, ast.SelectUnionQuery],
team: Team,
query_type: str = "hogql_query",
filters: Optional[HogQLFilters] = None,
Expand Down
Loading

0 comments on commit c61a9d0

Please sign in to comment.