Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queries): persons query source #18060

Merged
merged 6 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions frontend/src/queries/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,16 @@
"$ref": "#/definitions/HogQLExpression"
},
"type": "array"
},
"source": {
"anyOf": [
{
"$ref": "#/definitions/LifecycleQuery"
},
{
"$ref": "#/definitions/HogQLQuery"
}
]
}
},
"required": ["kind"],
Expand Down
1 change: 1 addition & 0 deletions frontend/src/queries/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ export interface PersonsQueryResponse {

export interface PersonsQuery extends DataNode {
kind: NodeKind.PersonsQuery
source?: LifecycleQuery | HogQLQuery
select?: HogQLExpression[]
search?: string
properties?: AnyPropertyFilter[]
Expand Down
23 changes: 2 additions & 21 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
from posthog.api.routing import StructuredViewSetMixin
from posthog.clickhouse.query_tagging import tag_queries
from posthog.errors import ExposedCHQueryError
from posthog.hogql import ast
from posthog.hogql.ai import PromptUnclear, write_sql_from_prompt
from posthog.hogql.database.database import create_hogql_database, serialize_database
from posthog.hogql.errors import HogQLException
from posthog.hogql.metadata import get_hogql_metadata
from posthog.hogql.modifiers import create_default_modifiers_for_team
from posthog.hogql.query import execute_hogql_query

from posthog.hogql_queries.query_runner import get_query_runner
from posthog.models import Team
Expand All @@ -35,7 +33,7 @@
from posthog.queries.time_to_see_data.serializers import SessionEventsQuerySerializer, SessionsQuerySerializer
from posthog.queries.time_to_see_data.sessions import get_session_events, get_sessions
from posthog.rate_limit import AIBurstRateThrottle, AISustainedRateThrottle, TeamRateThrottle
from posthog.schema import HogQLQuery, HogQLMetadata
from posthog.schema import HogQLMetadata
from posthog.utils import refresh_requested_by_client

QUERY_WITH_RUNNER = [
Expand All @@ -50,6 +48,7 @@
QUERY_WITH_RUNNER_NO_CACHE = [
"EventsQuery",
"PersonsQuery",
"HogQLQuery",
]


Expand Down Expand Up @@ -226,24 +225,6 @@ def process_query(
elif query_kind in QUERY_WITH_RUNNER_NO_CACHE:
query_runner = get_query_runner(query_json, team, in_export_context=in_export_context)
return _unwrap_pydantic_dict(query_runner.calculate())
elif query_kind == "HogQLQuery":
hogql_query = HogQLQuery.model_validate(query_json)
values = (
{key: ast.Constant(value=value) for key, value in hogql_query.values.items()}
if hogql_query.values
else None
)
hogql_response = execute_hogql_query(
query_type="HogQLQuery",
query=hogql_query.query,
team=team,
filters=hogql_query.filters,
modifiers=hogql_query.modifiers,
placeholders=values,
in_export_context=in_export_context,
explain=hogql_query.explain,
)
return _unwrap_pydantic_dict(hogql_response)
elif query_kind == "HogQLMetadata":
metadata_query = HogQLMetadata.model_validate(query_json)
metadata_response = get_hogql_metadata(query=metadata_query, team=team)
Expand Down
4 changes: 2 additions & 2 deletions posthog/hogql/filters.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional

from dateutil.parser import isoparse

Expand All @@ -17,7 +17,7 @@ def replace_filters(node: ast.Expr, filters: HogQLFilters, team: Team) -> ast.Ex


class ReplaceFilters(CloningVisitor):
def __init__(self, filters: HogQLFilters, team: Team = None):
def __init__(self, filters: Optional[HogQLFilters], team: Team = None):
super().__init__()
self.filters = filters
self.team = team
Expand Down
68 changes: 68 additions & 0 deletions posthog/hogql_queries/hogql_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from datetime import timedelta
from typing import Dict, Optional, Any

from posthog.clickhouse.client.connection import Workload
from posthog.hogql import ast
from posthog.hogql.filters import replace_filters
from posthog.hogql.parser import parse_select
from posthog.hogql.placeholders import find_placeholders
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.query_runner import QueryRunner
from posthog.models import Team
from posthog.schema import HogQLQuery, HogQLQueryResponse


class HogQLQueryRunner(QueryRunner):
query: HogQLQuery
query_type = HogQLQuery

def __init__(
self,
query: HogQLQuery | Dict[str, Any],
team: Team,
timings: Optional[HogQLTimings] = None,
in_export_context: Optional[bool] = False,
):
super().__init__(query, team, timings, in_export_context)
if isinstance(query, HogQLQuery):
self.query = query
else:
self.query = HogQLQuery.model_validate(query)

def to_query(self) -> ast.SelectQuery:
if self.timings is None:
self.timings = HogQLTimings()
values = (
{key: ast.Constant(value=value) for key, value in self.query.values.items()} if self.query.values else None
)
with self.timings.measure("parse_select"):
parsed_select = parse_select(str(self.query.query), timings=self.timings, placeholders=values)

if self.query.filters:
with self.timings.measure("filters"):
placeholders_in_query = find_placeholders(parsed_select)
if "filters" in placeholders_in_query:
parsed_select = replace_filters(parsed_select, self.query.filters, self.team)
return parsed_select

def to_persons_query(self) -> ast.SelectQuery:
return self.to_query()

def calculate(self) -> HogQLQueryResponse:
return execute_hogql_query(
query_type="HogQLQuery",
query=self.to_query(),
filters=self.query.filters,
modifiers=self.query.modifiers,
team=self.team,
workload=Workload.ONLINE,
timings=self.timings,
in_export_context=self.in_export_context,
)

def _is_stale(self, cached_result_package):
return True

def _refresh_frequency(self):
return timedelta(minutes=1)
2 changes: 1 addition & 1 deletion posthog/hogql_queries/insights/lifecycle_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def to_persons_query(self) -> ast.SelectQuery | ast.SelectUnionQuery:
return parse_select(
"""
SELECT
person_id, start_of_period as breakdown_1, status as breakdown_2
person_id --, start_of_period as breakdown_1, status as breakdown_2
FROM
{events_query}
""",
Expand Down
13 changes: 12 additions & 1 deletion posthog/hogql_queries/persons_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from posthog.hogql.property import property_to_expr, has_aggregation
from posthog.hogql.query import execute_hogql_query
from posthog.hogql.timings import HogQLTimings
from posthog.hogql_queries.query_runner import QueryRunner
from posthog.hogql_queries.query_runner import QueryRunner, get_query_runner
from posthog.models import Team
from posthog.schema import PersonsQuery, PersonsQueryResponse

Expand Down Expand Up @@ -63,6 +63,17 @@ def calculate(self) -> PersonsQueryResponse:
def filter_conditions(self) -> List[ast.Expr]:
where_exprs: List[ast.Expr] = []

if self.query.source:
source = self.query.source
try:
source_query_runner = get_query_runner(source, self.team, self.timings)
source_query = source_query_runner.to_persons_query()
where_exprs.append(
ast.CompareOperation(left=ast.Field(chain=["id"]), op=ast.CompareOperationOp.In, right=source_query)
)
except NotImplementedError:
raise ValueError(f"Queries of type '{source.kind}' are not implemented as a PersonsQuery sources.")

if self.query.properties:
where_exprs.append(property_to_expr(self.query.properties, self.team, scope="person"))

Expand Down
11 changes: 11 additions & 0 deletions posthog/hogql_queries/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
PersonsQuery,
EventsQuery,
WebStatsTableQuery,
HogQLQuery,
)
from posthog.utils import generate_cache_key, get_safe_cache

Expand Down Expand Up @@ -64,6 +65,7 @@ class CachedQueryResponse(QueryResponse):


RunnableQueryNode = Union[
HogQLQuery,
TrendsQuery,
LifecycleQuery,
EventsQuery,
Expand Down Expand Up @@ -122,6 +124,15 @@ def get_query_runner(
timings=timings,
in_export_context=in_export_context,
)
if kind == "HogQLQuery":
from .hogql_query_runner import HogQLQueryRunner

return HogQLQueryRunner(
query=cast(HogQLQuery | Dict[str, Any], query),
team=team,
timings=timings,
in_export_context=in_export_context,
)
if kind == "WebOverviewStatsQuery":
from .web_analytics.overview_stats import WebOverviewStatsQueryRunner

Expand Down
85 changes: 85 additions & 0 deletions posthog/hogql_queries/test/test_hogql_query_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from posthog.hogql import ast
from posthog.hogql.visitor import clear_locations
from posthog.hogql_queries.hogql_query_runner import HogQLQueryRunner
from posthog.models.utils import UUIDT
from posthog.schema import HogQLPropertyFilter, HogQLQuery, HogQLFilters
from posthog.test.base import APIBaseTest, ClickhouseTestMixin, _create_person, flush_persons_and_events, _create_event


class TestHogQLQueryRunner(ClickhouseTestMixin, APIBaseTest):
maxDiff = None
random_uuid: str

def _create_random_persons(self) -> str:
random_uuid = str(UUIDT())
for index in range(10):
_create_person(
properties={
"email": f"jacob{index}@{random_uuid}.posthog.com",
"name": f"Mr Jacob {random_uuid}",
"random_uuid": random_uuid,
"index": index,
},
team=self.team,
distinct_ids=[f"id-{random_uuid}-{index}"],
is_identified=True,
)
_create_event(distinct_id=f"id-{random_uuid}-{index}", event=f"clicky-{index}", team=self.team)
flush_persons_and_events()
return random_uuid

def _create_runner(self, query: HogQLQuery) -> HogQLQueryRunner:
return HogQLQueryRunner(team=self.team, query=query)

def setUp(self):
super().setUp()
self.random_uuid = self._create_random_persons()

def test_default_hogql_query(self):
runner = self._create_runner(HogQLQuery(query="select count(event) from events"))
query = runner.to_query()
query = clear_locations(query)
expected = ast.SelectQuery(
select=[ast.Call(name="count", args=[ast.Field(chain=["event"])])],
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
)
self.assertEqual(clear_locations(query), expected)
response = runner.calculate()
self.assertEqual(response.results[0][0], 10)

def test_hogql_query_filters(self):
runner = self._create_runner(
HogQLQuery(
query="select count(event) from events where {filters}",
filters=HogQLFilters(properties=[HogQLPropertyFilter(key="event='clicky-3'")]),
)
)
query = runner.to_query()
query = clear_locations(query)
expected = ast.SelectQuery(
select=[ast.Call(name="count", args=[ast.Field(chain=["event"])])],
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
where=ast.CompareOperation(
left=ast.Field(chain=["event"]), op=ast.CompareOperationOp.Eq, right=ast.Constant(value="clicky-3")
),
)
self.assertEqual(clear_locations(query), expected)
response = runner.calculate()
self.assertEqual(response.results[0][0], 1)

def test_hogql_query_values(self):
runner = self._create_runner(
HogQLQuery(query="select count(event) from events where event={e}", values={"e": "clicky-3"})
)
query = runner.to_query()
query = clear_locations(query)
expected = ast.SelectQuery(
select=[ast.Call(name="count", args=[ast.Field(chain=["event"])])],
select_from=ast.JoinExpr(table=ast.Field(chain=["events"])),
where=ast.CompareOperation(
left=ast.Field(chain=["event"]), op=ast.CompareOperationOp.Eq, right=ast.Constant(value="clicky-3")
),
)
self.assertEqual(clear_locations(query), expected)
response = runner.calculate()
self.assertEqual(response.results[0][0], 1)
Loading
Loading