Skip to content

Commit

Permalink
feat(queries): persons query source
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra committed Oct 18, 2023
1 parent 9aeda88 commit 589b26c
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 129 deletions.
10 changes: 10 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1714,6 +1714,16 @@
"$ref": "#/definitions/HogQLExpression"
},
"type": "array"
},
"source": {
"anyOf": [
{
"$ref": "#/definitions/LifecycleQuery"
},
{
"$ref": "#/definitions/HogQLQuery"
}
]
}
},
"required": ["kind"],
Expand Down
1 change: 1 addition & 0 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ export interface PersonsQueryResponse {

export interface PersonsQuery extends DataNode {
kind: NodeKind.PersonsQuery
source?: LifecycleQuery | HogQLQuery
select?: HogQLExpression[]
search?: string
properties?: AnyPropertyFilter[]
Expand Down
23 changes: 2 additions & 21 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
from posthog.api.routing import StructuredViewSetMixin
from posthog.clickhouse.query_tagging import tag_queries
from posthog.errors import ExposedCHQueryError
from posthog.hogql import ast
from posthog.hogql.ai import PromptUnclear, write_sql_from_prompt
from posthog.hogql.database.database import create_hogql_database, serialize_database
from posthog.hogql.errors import HogQLException
from posthog.hogql.metadata import get_hogql_metadata
from posthog.hogql.modifiers import create_default_modifiers_for_team
from posthog.hogql.query import execute_hogql_query

from posthog.hogql_queries.query_runner import get_query_runner
from posthog.models import Team
Expand All @@ -35,7 +33,7 @@
from posthog.queries.time_to_see_data.serializers import SessionEventsQuerySerializer, SessionsQuerySerializer
from posthog.queries.time_to_see_data.sessions import get_session_events, get_sessions
from posthog.rate_limit import AIBurstRateThrottle, AISustainedRateThrottle, TeamRateThrottle
from posthog.schema import HogQLQuery, HogQLMetadata
from posthog.schema import HogQLMetadata
from posthog.utils import refresh_requested_by_client

QUERY_WITH_RUNNER = [
Expand All @@ -50,6 +48,7 @@
QUERY_WITH_RUNNER_NO_CACHE = [
"EventsQuery",
"PersonsQuery",
"HogQLQuery",
]


Expand Down Expand Up @@ -226,24 +225,6 @@ def process_query(
elif query_kind in QUERY_WITH_RUNNER_NO_CACHE:
query_runner = get_query_runner(query_json, team, in_export_context=in_export_context)
return _unwrap_pydantic_dict(query_runner.calculate())
elif query_kind == "HogQLQuery":
hogql_query = HogQLQuery.model_validate(query_json)
values = (
{key: ast.Constant(value=value) for key, value in hogql_query.values.items()}
if hogql_query.values
else None
)
hogql_response = execute_hogql_query(
query_type="HogQLQuery",
query=hogql_query.query,
team=team,
filters=hogql_query.filters,
modifiers=hogql_query.modifiers,
placeholders=values,
in_export_context=in_export_context,
explain=hogql_query.explain,
)
return _unwrap_pydantic_dict(hogql_response)
elif query_kind == "HogQLMetadata":
metadata_query = HogQLMetadata.model_validate(query_json)
metadata_response = get_hogql_metadata(query=metadata_query, team=team)
Expand Down
4 changes: 2 additions & 2 deletions posthog/hogql/filters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional

from dateutil.parser import isoparse

Expand All @@ -17,7 +17,7 @@ def replace_filters(node: ast.Expr, filters: HogQLFilters, team: Team) -> ast.Ex


class ReplaceFilters(CloningVisitor):
def __init__(self, filters: HogQLFilters, team: Team = None):
def __init__(self, filters: Optional[HogQLFilters], team: Team = None):
super().__init__()
self.filters = filters
self.team = team
Expand Down
68 changes: 68 additions & 0 deletions posthog/hogql_queries/hogql_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from datetime import timedelta
from typing import Dict, Optional, Any

from posthog.clickhouse.client.connection import Workload
from posthog.hogql import ast
from posthog.hogql.filters import replace_filters
from posthog.hogql.parser import parse_select
from posthog.hogql.placeholders import find_placeholders
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.query_runner import QueryRunner
from posthog.models import Team
from posthog.schema import HogQLQuery, HogQLQueryResponse


class HogQLQueryRunner(QueryRunner):
query: HogQLQuery
query_type = HogQLQuery

def __init__(
self,
query: HogQLQuery | Dict[str, Any],
team: Team,
timings: Optional[HogQLTimings] = None,
in_export_context: Optional[bool] = False,
):
super().__init__(query, team, timings, in_export_context)
if isinstance(query, HogQLQuery):
self.query = query
else:
self.query = HogQLQuery.model_validate(query)

def to_query(self) -> ast.SelectQuery:
if self.timings is None:
self.timings = HogQLTimings()
values = (
{key: ast.Constant(value=value) for key, value in self.query.values.items()} if self.query.values else None
)
with self.timings.measure("parse_select"):
parsed_select = parse_select(str(self.query.query), timings=self.timings, placeholders=values)

if self.query.filters:
with self.timings.measure("filters"):
placeholders_in_query = find_placeholders(parsed_select)
if "filters" in placeholders_in_query:
parsed_select = replace_filters(parsed_select, self.query.filters, self.team)
return parsed_select

def to_persons_query(self) -> ast.SelectQuery:
return self.to_query()

def calculate(self) -> HogQLQueryResponse:
return execute_hogql_query(
query_type="HogQLQuery",
query=self.to_query(),
filters=self.query.filters,
modifiers=self.query.modifiers,
team=self.team,
workload=Workload.ONLINE,
timings=self.timings,
in_export_context=self.in_export_context,
)

def _is_stale(self, cached_result_package):
return True

def _refresh_frequency(self):
return timedelta(minutes=1)
2 changes: 1 addition & 1 deletion posthog/hogql_queries/insights/lifecycle_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def to_persons_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
return parse_select(
"""
SELECT
person_id, start_of_period as breakdown_1, status as breakdown_2
person_id --, start_of_period as breakdown_1, status as breakdown_2
FROM
{events_query}
""",
Expand Down
13 changes: 12 additions & 1 deletion posthog/hogql_queries/persons_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from posthog.hogql.property import property_to_expr, has_aggregation
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.query_runner import QueryRunner
from posthog.hogql_queries.query_runner import QueryRunner, get_query_runner
from posthog.models import Team
from posthog.schema import PersonsQuery, PersonsQueryResponse

Expand Down Expand Up @@ -63,6 +63,17 @@ def calculate(self) -> PersonsQueryResponse:
def filter_conditions(self) -> List[ast.Expr]:
where_exprs: List[ast.Expr] = []

if self.query.source:
source = self.query.source
try:
source_query_runner = get_query_runner(source, self.team, self.timings)
source_query = source_query_runner.to_persons_query()
where_exprs.append(
ast.CompareOperation(left=ast.Field(chain=["id"]), op=ast.CompareOperationOp.In, right=source_query)
)
except NotImplementedError:
raise ValueError(f"Queries of type '{source.kind}' are not implemented as a PersonsQuery sources.")

if self.query.properties:
where_exprs.append(property_to_expr(self.query.properties, self.team, scope="person"))

Expand Down
11 changes: 11 additions & 0 deletions posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
PersonsQuery,
EventsQuery,
WebStatsTableQuery,
HogQLQuery,
)
from posthog.utils import generate_cache_key, get_safe_cache

Expand Down Expand Up @@ -64,6 +65,7 @@ class CachedQueryResponse(QueryResponse):


RunnableQueryNode = Union[
HogQLQuery,
TrendsQuery,
LifecycleQuery,
EventsQuery,
Expand Down Expand Up @@ -122,6 +124,15 @@ def get_query_runner(
timings=timings,
in_export_context=in_export_context,
)
if kind == "HogQLQuery":
from .hogql_query_runner import HogQLQueryRunner

return HogQLQueryRunner(
query=cast(HogQLQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
)
if kind == "WebOverviewStatsQuery":
from .web_analytics.overview_stats import WebOverviewStatsQueryRunner

Expand Down
85 changes: 85 additions & 0 deletions posthog/hogql_queries/test/test_hogql_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from posthog.hogql import ast
from posthog.hogql.visitor import clear_locations
from posthog.hogql_queries.hogql_query_runner import HogQLQueryRunner
from posthog.models.utils import UUIDT
from posthog.schema import HogQLPropertyFilter, HogQLQuery, HogQLFilters
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_person, flush_persons_and_events, _create_event


class TestHogQLQueryRunner(ClickhouseTestMixin, APIBaseTest):
maxDiff = None
random_uuid: str

def _create_random_persons(self) -> str:
random_uuid = str(UUIDT())
for index in range(10):
_create_person(
properties={
"email": f"jacob{index}@{random_uuid}.posthog.com",
"name": f"Mr Jacob {random_uuid}",
"random_uuid": random_uuid,
"index": index,
},
team=self.team,
distinct_ids=[f"id-{random_uuid}-{index}"],
is_identified=True,
)
_create_event(distinct_id=f"id-{random_uuid}-{index}", event=f"clicky-{index}", team=self.team)
flush_persons_and_events()
return random_uuid

def _create_runner(self, query: HogQLQuery) -> HogQLQueryRunner:
return HogQLQueryRunner(team=self.team, query=query)

def setUp(self):
super().setUp()
self.random_uuid = self._create_random_persons()

def test_default_hogql_query(self):
runner = self._create_runner(HogQLQuery(query="select count(event) from events"))
query = runner.to_query()
query = clear_locations(query)
expected = ast.SelectQuery(
select=[ast.Call(name="count", args=[ast.Field(chain=["event"])])],
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
)
self.assertEqual(clear_locations(query), expected)
response = runner.calculate()
self.assertEqual(response.results[0][0], 10)

def test_hogql_query_filters(self):
runner = self._create_runner(
HogQLQuery(
query="select count(event) from events where {filters}",
filters=HogQLFilters(properties=[HogQLPropertyFilter(key="event='clicky-3'")]),
)
)
query = runner.to_query()
query = clear_locations(query)
expected = ast.SelectQuery(
select=[ast.Call(name="count", args=[ast.Field(chain=["event"])])],
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
where=ast.CompareOperation(
left=ast.Field(chain=["event"]), op=ast.CompareOperationOp.Eq, right=ast.Constant(value="clicky-3")
),
)
self.assertEqual(clear_locations(query), expected)
response = runner.calculate()
self.assertEqual(response.results[0][0], 1)

def test_hogql_query_values(self):
runner = self._create_runner(
HogQLQuery(query="select count(event) from events where event={e}", values={"e": "clicky-3"})
)
query = runner.to_query()
query = clear_locations(query)
expected = ast.SelectQuery(
select=[ast.Call(name="count", args=[ast.Field(chain=["event"])])],
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
where=ast.CompareOperation(
left=ast.Field(chain=["event"]), op=ast.CompareOperationOp.Eq, right=ast.Constant(value="clicky-3")
),
)
self.assertEqual(clear_locations(query), expected)
response = runner.calculate()
self.assertEqual(response.results[0][0], 1)
Loading

0 comments on commit 589b26c

Please sign in to comment.