Skip to content

Commit

Permalink
feat(data-warehouse): Views backend (#16573)
Browse files Browse the repository at this point in the history
* backend basics for a view

* view parsing "working"

* add tests and make view name unique

* adjust tests

* api tests and edge cases

* typing

* try comment

* fix migration check

* rename

* block overlapping names at api level

* add model validator

* more naming changes and full integration test

* update migration

* update migration

* remove repeat

* ClickHouse

* regex

* update validator name

* update migration

* update constraint

* casing

---------

Co-authored-by: Michael Matloka <[email protected]>
  • Loading branch information
EDsCODE and Twixes authored Jul 28, 2023
1 parent da3b2c9 commit ec15c51
Show file tree
Hide file tree
Showing 18 changed files with 564 additions and 24 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0337_more_session_recording_fields
posthog: 0338_datawarehouse_saved_query
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
3 changes: 2 additions & 1 deletion posthog/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from posthog.api.routing import DefaultRouterPlusPlus
from posthog.batch_exports import http as batch_exports
from posthog.settings import EE_AVAILABLE
from posthog.warehouse.api import table
from posthog.warehouse.api import saved_query, table

from . import (
activity_log,
Expand Down Expand Up @@ -135,6 +135,7 @@ def api_not_found(request):
batch_exports_router.register(r"runs", batch_exports.BatchExportRunViewSet, "runs", ["team_id", "batch_export_id"])

projects_router.register(r"warehouse_table", table.TableViewSet, "warehouse_api", ["team_id"])
projects_router.register(r"warehouse_view", saved_query.DataWarehouseSavedQueryViewSet, "warehouse_api", ["team_id"])

# Organizations nested endpoints
organizations_router = router.register(r"organizations", organization.OrganizationViewSet, "organizations")
Expand Down
32 changes: 32 additions & 0 deletions posthog/api/test/__snapshots__/test_query.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,38 @@
allow_experimental_object_type=True
'
---
# name: TestQuery.test_full_hogql_query_view
'
/* user_id:0 request:_snapshot_ */
SELECT events.event AS event,
events.distinct_id AS distinct_id,
replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, 'key'), ''), 'null'), '^"|"$', '') AS key
FROM events
WHERE equals(events.team_id, 2)
ORDER BY toTimeZone(events.timestamp, 'UTC') ASC
LIMIT 100 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=True
'
---
# name: TestQuery.test_full_hogql_query_view.1
'
/* user_id:0 request:_snapshot_ */
SELECT event_view.event,
event_view.distinct_id,
event_view.key
FROM
(SELECT events.event AS event,
events.distinct_id AS distinct_id,
replaceRegexpAll(nullIf(nullIf(JSONExtractRaw(events.properties, 'key'), ''), 'null'), '^"|"$', '') AS key
FROM events
WHERE equals(events.team_id, 2)
ORDER BY toTimeZone(events.timestamp, 'UTC') ASC) AS event_view
LIMIT 100 SETTINGS readonly=2,
max_execution_time=60,
allow_experimental_object_type=True
'
---
# name: TestQuery.test_hogql_property_filter
'
/* user_id:0 request:_snapshot_ */
Expand Down
47 changes: 47 additions & 0 deletions posthog/api/test/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,3 +473,50 @@ def test_invalid_query_kind(self):
assert api_response.json()["code"] == "parse_error"
assert "validation errors for Model" in api_response.json()["detail"]
assert "type=value_error.const; given=Tomato Soup" in api_response.json()["detail"]

@snapshot_clickhouse_queries
def test_full_hogql_query_view(self):
with freeze_time("2020-01-10 12:00:00"):
_create_person(
properties={"email": "[email protected]"},
distinct_ids=["2", "some-random-uid"],
team=self.team,
immediate=True,
)
_create_event(team=self.team, event="sign up", distinct_id="2", properties={"key": "test_val1"})
with freeze_time("2020-01-10 12:11:00"):
_create_event(team=self.team, event="sign out", distinct_id="2", properties={"key": "test_val2"})
with freeze_time("2020-01-10 12:12:00"):
_create_event(team=self.team, event="sign out", distinct_id="3", properties={"key": "test_val2"})
with freeze_time("2020-01-10 12:13:00"):
_create_event(
team=self.team, event="sign out", distinct_id="4", properties={"key": "test_val3", "path": "a/b/c"}
)
flush_persons_and_events()

with freeze_time("2020-01-10 12:14:00"):

self.client.post(
f"/api/projects/{self.team.id}/warehouse_view/",
{
"name": "event_view",
"query": {
"kind": "HogQLQuery",
"query": f"select event AS event, distinct_id as distinct_id, properties.key as key from events order by timestamp",
},
},
)
query = HogQLQuery(query="select * from event_view")
api_response = self.client.post(f"/api/projects/{self.team.id}/query/", {"query": query.dict()}).json()
query.response = HogQLQueryResponse.parse_obj(api_response)

self.assertEqual(query.response.results and len(query.response.results), 4)
self.assertEqual(
query.response.results,
[
["sign up", "2", "test_val1"],
["sign out", "2", "test_val2"],
["sign out", "3", "test_val2"],
["sign out", "4", "test_val3"],
],
)
19 changes: 18 additions & 1 deletion posthog/hogql/database/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ class Config:
raw_cohort_people: RawCohortPeople = RawCohortPeople()
raw_person_overrides: RawPersonOverridesTable = RawPersonOverridesTable()

# clunky: keep table names in sync with above
_table_names: List[Table] = [
"events",
"groups",
"person",
"person_distinct_id2",
"person_overrides",
"session_recording_events",
"session_replay_events",
"cohortpeople",
"person_static_cohort",
]

def __init__(self, timezone: Optional[str]):
super().__init__()
try:
Expand All @@ -77,7 +90,7 @@ def add_warehouse_tables(self, **field_definitions: Any):

def create_hogql_database(team_id: int) -> Database:
from posthog.models import Team
from posthog.warehouse.models import DataWarehouseTable
from posthog.warehouse.models import DataWarehouseTable, DataWarehouseSavedQuery

team = Team.objects.get(pk=team_id)
database = Database(timezone=team.timezone)
Expand All @@ -89,6 +102,10 @@ def create_hogql_database(team_id: int) -> Database:
tables = {}
for table in DataWarehouseTable.objects.filter(team_id=team.pk).exclude(deleted=True):
tables[table.name] = table.hogql_definition()

for table in DataWarehouseSavedQuery.objects.filter(team_id=team.pk).exclude(deleted=True):
tables[table.name] = table.hogql_definition()

database.add_warehouse_tables(**tables)

return database
Expand Down
9 changes: 9 additions & 0 deletions posthog/hogql/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,12 @@ class FunctionCallTable(Table):
"""

name: str


class SavedQuery(Table):
"""
A table that returns a subquery, e.g. my_saved_query -> (SELECT * FROM some_saved_table). The team_id guard is NOT added for the overall subquery
"""

query: str
name: str
40 changes: 40 additions & 0 deletions posthog/hogql/database/test/tables.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from posthog.hogql.database.models import DateDatabaseField, IntegerDatabaseField, FloatDatabaseField
from posthog.hogql.database.s3_table import S3Table
from posthog.hogql.database.models import SavedQuery


def create_aapl_stock_s3_table(name="aapl_stock") -> S3Table:
Expand All @@ -17,3 +18,42 @@ def create_aapl_stock_s3_table(name="aapl_stock") -> S3Table:
"OpenInt": IntegerDatabaseField(name="OpenInt"),
},
)


def create_aapl_stock_table_view() -> SavedQuery:
return SavedQuery(
name="aapl_stock_view",
query="SELECT * FROM aapl_stock",
fields={
"Date": DateDatabaseField(name="Date"),
"Open": FloatDatabaseField(name="Open"),
"High": FloatDatabaseField(name="High"),
"Low": FloatDatabaseField(name="Low"),
},
)


def create_nested_aapl_stock_view() -> SavedQuery:
return SavedQuery(
name="aapl_stock_nested_view",
query="SELECT * FROM aapl_stock_view",
fields={
"Date": DateDatabaseField(name="Date"),
"Open": FloatDatabaseField(name="Open"),
"High": FloatDatabaseField(name="High"),
"Low": FloatDatabaseField(name="Low"),
},
)


def create_aapl_stock_table_self_referencing() -> SavedQuery:
return SavedQuery(
name="aapl_stock_self",
query="SELECT * FROM aapl_stock_self",
fields={
"Date": DateDatabaseField(name="Date"),
"Open": FloatDatabaseField(name="Open"),
"High": FloatDatabaseField(name="High"),
"Low": FloatDatabaseField(name="Low"),
},
)
65 changes: 65 additions & 0 deletions posthog/hogql/database/test/test_saved_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.database import create_hogql_database
from posthog.hogql.parser import parse_select
from posthog.hogql.printer import print_ast
from posthog.test.base import BaseTest
from posthog.hogql.database.test.tables import (
create_aapl_stock_table_view,
create_aapl_stock_s3_table,
create_nested_aapl_stock_view,
create_aapl_stock_table_self_referencing,
)


class TestSavedQuery(BaseTest):
maxDiff = None

def _init_database(self):
self.database = create_hogql_database(self.team.pk)
self.database.aapl_stock_view = create_aapl_stock_table_view()
self.database.aapl_stock = create_aapl_stock_s3_table()
self.database.aapl_stock_nested_view = create_nested_aapl_stock_view()
self.database.aapl_stock_self = create_aapl_stock_table_self_referencing()
self.context = HogQLContext(team_id=self.team.pk, enable_select_queries=True, database=self.database)

def _select(self, query: str, dialect: str = "clickhouse") -> str:
return print_ast(parse_select(query), self.context, dialect=dialect)

def test_saved_query_table_select(self):
self._init_database()

hogql = self._select(query="SELECT * FROM aapl_stock LIMIT 10", dialect="hogql")
self.assertEqual(hogql, "SELECT Date, Open, High, Low, Close, Volume, OpenInt FROM aapl_stock LIMIT 10")

clickhouse = self._select(query="SELECT * FROM aapl_stock_view LIMIT 10", dialect="clickhouse")

self.assertEqual(
clickhouse,
"SELECT aapl_stock_view.Date, aapl_stock_view.Open, aapl_stock_view.High, aapl_stock_view.Low, aapl_stock_view.Close, aapl_stock_view.Volume, aapl_stock_view.OpenInt FROM (WITH aapl_stock AS (SELECT * FROM s3Cluster('posthog', %(hogql_val_0_sensitive)s, %(hogql_val_1)s)) SELECT aapl_stock.Date, aapl_stock.Open, aapl_stock.High, aapl_stock.Low, aapl_stock.Close, aapl_stock.Volume, aapl_stock.OpenInt FROM aapl_stock) AS aapl_stock_view LIMIT 10",
)

def test_nested_saved_queries(self):
self._init_database()

hogql = self._select(query="SELECT * FROM aapl_stock LIMIT 10", dialect="hogql")
self.assertEqual(hogql, "SELECT Date, Open, High, Low, Close, Volume, OpenInt FROM aapl_stock LIMIT 10")

clickhouse = self._select(query="SELECT * FROM aapl_stock_nested_view LIMIT 10", dialect="clickhouse")

self.assertEqual(
clickhouse,
"SELECT aapl_stock_nested_view.Date, aapl_stock_nested_view.Open, aapl_stock_nested_view.High, aapl_stock_nested_view.Low, aapl_stock_nested_view.Close, aapl_stock_nested_view.Volume, aapl_stock_nested_view.OpenInt FROM (SELECT aapl_stock_view.Date, aapl_stock_view.Open, aapl_stock_view.High, aapl_stock_view.Low, aapl_stock_view.Close, aapl_stock_view.Volume, aapl_stock_view.OpenInt FROM (WITH aapl_stock AS (SELECT * FROM s3Cluster('posthog', %(hogql_val_0_sensitive)s, %(hogql_val_1)s)) SELECT aapl_stock.Date, aapl_stock.Open, aapl_stock.High, aapl_stock.Low, aapl_stock.Close, aapl_stock.Volume, aapl_stock.OpenInt FROM aapl_stock) AS aapl_stock_view) AS aapl_stock_nested_view LIMIT 10",
)

def test_saved_query_with_alias(self):
self._init_database()

hogql = self._select(query="SELECT * FROM aapl_stock LIMIT 10", dialect="hogql")
self.assertEqual(hogql, "SELECT Date, Open, High, Low, Close, Volume, OpenInt FROM aapl_stock LIMIT 10")

clickhouse = self._select(query="SELECT * FROM aapl_stock_view AS some_alias LIMIT 10", dialect="clickhouse")

self.assertEqual(
clickhouse,
"SELECT some_alias.Date, some_alias.Open, some_alias.High, some_alias.Low, some_alias.Close, some_alias.Volume, some_alias.OpenInt FROM (WITH aapl_stock AS (SELECT * FROM s3Cluster('posthog', %(hogql_val_0_sensitive)s, %(hogql_val_1)s)) SELECT aapl_stock.Date, aapl_stock.Open, aapl_stock.High, aapl_stock.Low, aapl_stock.Close, aapl_stock.Volume, aapl_stock.OpenInt FROM aapl_stock) AS some_alias LIMIT 10",
)
9 changes: 6 additions & 3 deletions posthog/hogql/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
HOGQL_POSTHOG_FUNCTIONS,
)
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.models import Table, FunctionCallTable
from posthog.hogql.database.models import Table, FunctionCallTable, SavedQuery
from posthog.hogql.database.database import create_hogql_database
from posthog.hogql.database.s3_table import S3Table
from posthog.hogql.errors import HogQLException
Expand Down Expand Up @@ -75,7 +75,6 @@ def prepare_ast_for_printing(
if dialect == "clickhouse":
node = resolve_property_types(node, context)
resolve_lazy_tables(node, stack, context)

# We add a team_id guard right before printing. It's not a separate step here.
return node

Expand Down Expand Up @@ -259,7 +258,11 @@ def visit_join_expr(self, node: ast.JoinExpr) -> JoinExprResponse:

# :IMPORTANT: This assures a "team_id" where clause is present on every selected table.
# Skip function call tables like numbers(), s3(), etc.
if self.dialect == "clickhouse" and not isinstance(table_type.table, FunctionCallTable):
if (
self.dialect == "clickhouse"
and not isinstance(table_type.table, FunctionCallTable)
and not isinstance(table_type.table, SavedQuery)
):
extra_where = team_id_guard_for_table(node.type, self.context)

if self.dialect == "clickhouse":
Expand Down
10 changes: 9 additions & 1 deletion posthog/hogql/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from posthog.hogql.ast import FieldTraverserType, ConstantType
from posthog.hogql.functions import HOGQL_POSTHOG_FUNCTIONS
from posthog.hogql.context import HogQLContext
from posthog.hogql.database.models import StringJSONDatabaseField, FunctionCallTable, LazyTable
from posthog.hogql.database.models import StringJSONDatabaseField, FunctionCallTable, LazyTable, SavedQuery
from posthog.hogql.errors import ResolverException
from posthog.hogql.functions.cohort import cohort
from posthog.hogql.functions.mapping import validate_function_args
from posthog.hogql.functions.sparkline import sparkline
from posthog.hogql.parser import parse_select
from posthog.hogql.visitor import CloningVisitor, clone_expr
from posthog.models.utils import UUIDT

Expand Down Expand Up @@ -205,6 +206,13 @@ def visit_join_expr(self, node: ast.JoinExpr):

if self.database.has_table(table_name):
database_table = self.database.get_table(table_name)

if isinstance(database_table, SavedQuery):
node.table = parse_select(str(database_table.query))
node.alias = table_alias or database_table.name
node = self.visit(node)
return node

if isinstance(database_table, LazyTable):
node_table_type = ast.LazyTableType(table=database_table)
else:
Expand Down
2 changes: 1 addition & 1 deletion posthog/management/commands/test_migrations_are_safe.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def run_and_check_migration(variable):
sys.exit(1)
if "CONSTRAINT" in operation_sql and (
"-- existing-table-constraint-ignore" not in operation_sql
or (
and (
table_being_altered not in tables_created_so_far
or self._get_table("ALTER TABLE", operation_sql) not in new_tables
) # Ignore for brand-new tables
Expand Down
Loading

0 comments on commit ec15c51

Please sign in to comment.