Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queries): consolidate query runners #17732

Merged
merged 3 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 14 additions & 27 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@
from posthog.hogql.metadata import get_hogql_metadata
from posthog.hogql.query import execute_hogql_query

from posthog.hogql_queries.insights.lifecycle_query_runner import LifecycleQueryRunner
from posthog.hogql_queries.insights.trends_query_runner import TrendsQueryRunner
from posthog.hogql_queries.web_analytics.top_clicks import WebTopClicksQueryRunner
from posthog.hogql_queries.web_analytics.top_pages import WebTopPagesQueryRunner
from posthog.hogql_queries.web_analytics.top_sources import WebTopSourcesQueryRunner
from posthog.hogql_queries.query_runner import get_query_runner
from posthog.models import Team
from posthog.models.event.events_query import run_events_query
from posthog.models.user import User
Expand All @@ -41,6 +37,14 @@
from posthog.schema import EventsQuery, HogQLQuery, HogQLMetadata
from posthog.utils import refresh_requested_by_client

QUERY_WITH_RUNNER = [
"LifecycleQuery",
"TrendsQuery",
"WebTopSourcesQuery",
"WebTopClicksQuery",
"WebTopPagesQuery",
]


class QueryThrottle(TeamRateThrottle):
scope = "query"
Expand Down Expand Up @@ -206,10 +210,13 @@ def process_query(
# query_json has been parsed by QuerySchemaParser
# it _should_ be impossible to end up in here with a "bad" query
query_kind = query_json.get("kind")

tag_queries(query=query_json)

if query_kind == "EventsQuery":
if query_kind in QUERY_WITH_RUNNER:
refresh_requested = refresh_requested_by_client(request) if request else False
query_runner = get_query_runner(query_json, team)
return _unwrap_pydantic_dict(query_runner.run(refresh_requested=refresh_requested))
elif query_kind == "EventsQuery":
events_query = EventsQuery.model_validate(query_json)
events_response = run_events_query(query=events_query, team=team, default_limit=default_limit)
return _unwrap_pydantic_dict(events_response)
Expand All @@ -227,14 +234,6 @@ def process_query(
metadata_query = HogQLMetadata.model_validate(query_json)
metadata_response = get_hogql_metadata(query=metadata_query, team=team)
return _unwrap_pydantic_dict(metadata_response)
elif query_kind == "LifecycleQuery":
refresh_requested = refresh_requested_by_client(request) if request else False
lifecycle_query_runner = LifecycleQueryRunner(query_json, team)
return _unwrap_pydantic_dict(lifecycle_query_runner.run(refresh_requested=refresh_requested))
elif query_kind == "TrendsQuery":
refresh_requested = refresh_requested_by_client(request) if request else False
trends_query_runner = TrendsQueryRunner(query_json, team)
return _unwrap_pydantic_dict(trends_query_runner.run(refresh_requested=refresh_requested))
elif query_kind == "DatabaseSchemaQuery":
database = create_hogql_database(team.pk)
return serialize_database(database)
Expand All @@ -253,18 +252,6 @@ def process_query(
)
serializer.is_valid(raise_exception=True)
return get_session_events(serializer) or {}
elif query_kind == "WebTopSourcesQuery":
refresh_requested = refresh_requested_by_client(request) if request else False
web_top_sources_query_runner = WebTopSourcesQueryRunner(query_json, team)
return _unwrap_pydantic_dict(web_top_sources_query_runner.run(refresh_requested=refresh_requested))
elif query_kind == "WebTopClicksQuery":
refresh_requested = refresh_requested_by_client(request) if request else False
web_top_clicks_query_runner = WebTopClicksQueryRunner(query_json, team)
return _unwrap_pydantic_dict(web_top_clicks_query_runner.run(refresh_requested=refresh_requested))
elif query_kind == "WebTopPagesQuery":
refresh_requested = refresh_requested_by_client(request) if request else False
web_top_pages_query_runner = WebTopPagesQueryRunner(query_json, team)
return _unwrap_pydantic_dict(web_top_pages_query_runner.run(refresh_requested=refresh_requested))
else:
if query_json.get("source"):
return process_query(team, query_json["source"])
Expand Down
46 changes: 36 additions & 10 deletions posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Generic, List, Optional, Type, Dict, TypeVar, Union, Tuple
from typing import Any, Generic, List, Optional, Type, Dict, TypeVar, Union, Tuple, cast

from django.conf import settings
from django.core.cache import cache
Expand All @@ -17,10 +17,6 @@
from posthog.schema import (
QueryTiming,
TrendsQuery,
FunnelsQuery,
RetentionQuery,
PathsQuery,
StickinessQuery,
LifecycleQuery,
WebTopSourcesQuery,
WebTopClicksQuery,
Expand Down Expand Up @@ -64,17 +60,47 @@ class CachedQueryResponse(QueryResponse):

RunnableQueryNode = Union[
TrendsQuery,
FunnelsQuery,
RetentionQuery,
PathsQuery,
StickinessQuery,
LifecycleQuery,
WebTopSourcesQuery,
WebTopClicksQuery,
WebTopPagesQuery,
]


def get_query_runner(
query: Dict[str, Any] | RunnableQueryNode, team: Team, timings: Optional[HogQLTimings] = None
) -> "QueryRunner":
kind = None
if isinstance(query, dict):
kind = query.get("kind", None)
elif hasattr(query, "kind"):
kind = query.kind

if kind == "LifecycleQuery":
from .insights.lifecycle_query_runner import LifecycleQueryRunner

return LifecycleQueryRunner(query=cast(LifecycleQuery | Dict[str, Any], query), team=team, timings=timings)
if kind == "TrendsQuery":
from .insights.trends_query_runner import TrendsQueryRunner

return TrendsQueryRunner(query=cast(TrendsQuery | Dict[str, Any], query), team=team, timings=timings)

if kind == "WebTopSourcesQuery":
from .web_analytics.top_sources import WebTopSourcesQueryRunner

return WebTopSourcesQueryRunner(query=query, team=team, timings=timings)
if kind == "WebTopClicksQuery":
from .web_analytics.top_clicks import WebTopClicksQueryRunner

return WebTopClicksQueryRunner(query=query, team=team, timings=timings)
if kind == "WebTopPagesQuery":
from .web_analytics.top_pages import WebTopPagesQueryRunner

return WebTopPagesQueryRunner(query=query, team=team, timings=timings)

raise ValueError(f"Can't get a runner for an unknown query kind: {kind}")


class QueryRunner(ABC):
query: RunnableQueryNode
query_type: Type[RunnableQueryNode]
Expand Down Expand Up @@ -124,7 +150,7 @@ def run(self, refresh_requested: bool) -> CachedQueryResponse:
def to_query(self) -> ast.SelectQuery:
raise NotImplementedError()

def to_persons_query(self) -> str:
def to_persons_query(self) -> ast.SelectQuery:
# TODO: add support for selecting and filtering by breakdowns
raise NotImplementedError()

Expand Down