diff --git a/latest_migrations.manifest b/latest_migrations.manifest index e92cd61fc6623e..5cde4ea42dad50 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0462_change_replay_team_setting_defaults +posthog: 0463_datawarehousemodelpath_and_more sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/api/__init__.py b/posthog/api/__init__.py index 9d61824e95d13e..c6fd6851da9c7d 100644 --- a/posthog/api/__init__.py +++ b/posthog/api/__init__.py @@ -4,24 +4,26 @@ from posthog.batch_exports import http as batch_exports from posthog.settings import EE_AVAILABLE from posthog.warehouse.api import ( + external_data_schema, external_data_source, + modeling, saved_query, table, view_link, - external_data_schema, ) -from ..heatmaps.heatmaps_api import LegacyHeatmapViewSet, HeatmapViewSet -from .session import SessionViewSet + +from ..heatmaps.heatmaps_api import HeatmapViewSet, LegacyHeatmapViewSet from ..session_recordings.session_recording_api import SessionRecordingViewSet from . import ( - alert, activity_log, + alert, annotation, app_metrics, async_migration, authentication, comments, dead_letter_queue, + debug_ch_queries, early_access_feature, error_tracking, event_definition, @@ -46,18 +48,18 @@ property_definition, proxy_record, query, - search, scheduled_change, + search, sharing, survey, tagged_item, team, uploaded_media, user, - debug_ch_queries, ) from .dashboards import dashboard, dashboard_templates from .data_management import DataManagementViewSet +from .session import SessionViewSet @decorators.api_view(["GET", "HEAD", "POST", "PUT", "PATCH", "DELETE"]) @@ -227,6 +229,19 @@ def api_not_found(request): "environment_external_data_sources", ["team_id"], ) +projects_router.register( + r"warehouse_dag", + modeling.DataWarehouseModelDagViewSet, + "project_warehouse_dag", + ["team_id"], +) +projects_router.register( + r"warehouse_model_paths", + modeling.DataWarehouseModelPathViewSet, + "project_warehouse_model_paths", + ["team_id"], +) + projects_router.register( r"external_data_schemas", diff --git a/posthog/hogql/context.py b/posthog/hogql/context.py index 504696a4fe3e7a..e47b854f01ac74 100644 --- a/posthog/hogql/context.py +++ b/posthog/hogql/context.py @@ -1,12 +1,12 @@ from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Literal, Optional, Any +from typing import TYPE_CHECKING, Any, Literal, Optional from posthog.hogql.timings import HogQLTimings from posthog.schema import HogQLNotice, HogQLQueryModifiers if TYPE_CHECKING: - from posthog.hogql.transforms.property_types import PropertySwapper from posthog.hogql.database.database import Database + from posthog.hogql.transforms.property_types import PropertySwapper from posthog.models import Team @@ -36,8 +36,6 @@ class HogQLContext: enable_select_queries: bool = False # Do we apply a limit of MAX_SELECT_RETURNED_ROWS=10000 to the topmost select query? limit_top_select: bool = True - # How many nested views do we support on this query? - max_view_depth: int = 1 # Globals that will be resolved in the context of the query globals: Optional[dict] = None diff --git a/posthog/hogql/resolver.py b/posthog/hogql/resolver.py index 54b5f5f2987568..1ef96523d1bd9f 100644 --- a/posthog/hogql/resolver.py +++ b/posthog/hogql/resolver.py @@ -1,30 +1,30 @@ from datetime import date, datetime -from typing import Optional, Any, cast, Literal +from typing import Any, Literal, Optional, cast from uuid import UUID from posthog.hogql import ast -from posthog.hogql.ast import FieldTraverserType, ConstantType -from posthog.hogql.database.schema.persons import PersonsTable -from posthog.hogql.functions import find_hogql_posthog_function +from posthog.hogql.ast import ConstantType, FieldTraverserType from posthog.hogql.context import HogQLContext from posthog.hogql.database.models import ( - StringJSONDatabaseField, FunctionCallTable, LazyTable, SavedQuery, + StringJSONDatabaseField, ) +from posthog.hogql.database.s3_table import S3Table +from posthog.hogql.database.schema.events import EventsTable +from posthog.hogql.database.schema.persons import PersonsTable from posthog.hogql.errors import ImpossibleASTError, QueryError, ResolutionError +from posthog.hogql.functions import find_hogql_posthog_function from posthog.hogql.functions.action import matches_action from posthog.hogql.functions.cohort import cohort_query_node -from posthog.hogql.functions.mapping import validate_function_args, HOGQL_CLICKHOUSE_FUNCTIONS, compare_types +from posthog.hogql.functions.mapping import HOGQL_CLICKHOUSE_FUNCTIONS, compare_types, validate_function_args from posthog.hogql.functions.sparkline import sparkline -from posthog.hogql.hogqlx import convert_to_hx, HOGQLX_COMPONENTS +from posthog.hogql.hogqlx import HOGQLX_COMPONENTS, convert_to_hx from posthog.hogql.parser import parse_select from posthog.hogql.resolver_utils import expand_hogqlx_query, lookup_cte_by_name, lookup_field_by_name -from posthog.hogql.visitor import CloningVisitor, clone_expr, TraversingVisitor +from posthog.hogql.visitor import CloningVisitor, TraversingVisitor, clone_expr from posthog.models.utils import UUIDT -from posthog.hogql.database.schema.events import EventsTable -from posthog.hogql.database.s3_table import S3Table # https://github.com/ClickHouse/ClickHouse/issues/23194 - "Describe how identifiers in SELECT queries are resolved" @@ -306,9 +306,6 @@ def visit_join_expr(self, node: ast.JoinExpr): if isinstance(database_table, SavedQuery): self.current_view_depth += 1 - if self.current_view_depth > self.context.max_view_depth: - raise QueryError("Nested views are not supported") - node.table = parse_select(str(database_table.query)) if isinstance(node.table, ast.SelectQuery): diff --git a/posthog/management/commands/setup_test_environment.py b/posthog/management/commands/setup_test_environment.py index 07c39f6ce6414f..d0ab299e339add 100644 --- a/posthog/management/commands/setup_test_environment.py +++ b/posthog/management/commands/setup_test_environment.py @@ -3,14 +3,14 @@ from infi.clickhouse_orm import Database from posthog.clickhouse.schema import ( + CREATE_DATA_QUERIES, CREATE_DICTIONARY_QUERIES, CREATE_DISTRIBUTED_TABLE_QUERIES, CREATE_KAFKA_TABLE_QUERIES, CREATE_MERGETREE_TABLE_QUERIES, CREATE_MV_TABLE_QUERIES, - build_query, - CREATE_DATA_QUERIES, CREATE_VIEW_QUERIES, + build_query, ) from posthog.settings import ( CLICKHOUSE_CLUSTER, @@ -103,6 +103,7 @@ def handle(self, *args, **kwargs): # :TRICKY: Create extension and function depended on by models. with connection.cursor() as cursor: cursor.execute("CREATE EXTENSION pg_trgm") + cursor.execute("CREATE EXTENSION ltree") return super().handle(*args, **kwargs) diff --git a/posthog/migrations/0463_datawarehousemodelpath_and_more.py b/posthog/migrations/0463_datawarehousemodelpath_and_more.py new file mode 100644 index 00000000000000..81ef0cdf96d3be --- /dev/null +++ b/posthog/migrations/0463_datawarehousemodelpath_and_more.py @@ -0,0 +1,74 @@ +# Generated by Django 4.2.14 on 2024-08-12 12:04 + +import django.contrib.postgres.indexes +import django.db.models.constraints +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + +import posthog.models.utils +import posthog.warehouse.models.modeling + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0462_change_replay_team_setting_defaults"), + ] + + operations = [ + migrations.RunSQL("CREATE EXTENSION ltree;", reverse_sql="DROP EXTENSION ltree;"), + migrations.CreateModel( + name="DataWarehouseModelPath", + fields=[ + ("created_at", models.DateTimeField(auto_now_add=True)), + ("updated_at", models.DateTimeField(auto_now=True, null=True)), + ( + "id", + models.UUIDField( + default=posthog.models.utils.UUIDT, editable=False, primary_key=True, serialize=False + ), + ), + ("path", posthog.warehouse.models.modeling.LabelTreeField()), + ( + "created_by", + models.ForeignKey( + blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL + ), + ), + ( + "saved_query", + models.ForeignKey( + default=None, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to="posthog.datawarehousesavedquery", + ), + ), + ( + "table", + models.ForeignKey( + default=None, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to="posthog.datawarehousetable", + ), + ), + ("team", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to="posthog.team")), + ], + options={ + "indexes": [ + models.Index(fields=["team_id", "path"], name="team_id_path"), + models.Index(fields=["team_id", "saved_query_id"], name="team_id_saved_query_id"), + django.contrib.postgres.indexes.GistIndex(models.F("path"), name="model_path_path"), + ], + }, + ), + migrations.AddConstraint( + model_name="datawarehousemodelpath", + constraint=models.UniqueConstraint( + deferrable=django.db.models.constraints.Deferrable["IMMEDIATE"], + fields=("team_id", "path"), + name="unique_team_id_path", + ), + ), + ] diff --git a/posthog/warehouse/api/modeling.py b/posthog/warehouse/api/modeling.py new file mode 100644 index 00000000000000..b5a2d8953e80a2 --- /dev/null +++ b/posthog/warehouse/api/modeling.py @@ -0,0 +1,29 @@ +from rest_framework import request, response, serializers, viewsets + +from posthog.api.routing import TeamAndOrgViewSetMixin +from posthog.api.shared import UserBasicSerializer +from posthog.warehouse.models import DataWarehouseModelPath + + +class DataWarehouseModelPathSerializer(serializers.ModelSerializer): + created_by = UserBasicSerializer(read_only=True) + + class Meta: + model = DataWarehouseModelPath + + +class DataWarehouseModelPathViewSet(TeamAndOrgViewSetMixin, viewsets.ReadOnlyModelViewSet): + scope_object = "INTERNAL" + + queryset = DataWarehouseModelPath.objects.all() + serializer_class = DataWarehouseModelPathSerializer + + +class DataWarehouseModelDagViewSet(TeamAndOrgViewSetMixin, viewsets.ViewSet): + scope_object = "INTERNAL" + + def list(self, request: request.Request, *args, **kwargs) -> response.Response: + """Return this team's DAG as a set of edges and nodes""" + dag = DataWarehouseModelPath.objects.get_dag(self.team) + + return response.Response({"edges": dag.edges, "nodes": dag.nodes}) diff --git a/posthog/warehouse/api/saved_query.py b/posthog/warehouse/api/saved_query.py index 8d98def9233c84..db437861f967a5 100644 --- a/posthog/warehouse/api/saved_query.py +++ b/posthog/warehouse/api/saved_query.py @@ -1,7 +1,10 @@ from typing import Any +import structlog from django.conf import settings -from rest_framework import exceptions, filters, serializers, viewsets, response, request, status +from django.db import transaction +from rest_framework import exceptions, filters, request, response, serializers, status, viewsets +from rest_framework.decorators import action from posthog.api.routing import TeamAndOrgViewSetMixin from posthog.api.shared import UserBasicSerializer @@ -11,7 +14,9 @@ from posthog.hogql.metadata import is_valid_view from posthog.hogql.parser import parse_select from posthog.hogql.printer import print_ast -from posthog.warehouse.models import DataWarehouseSavedQuery, DataWarehouseJoin +from posthog.warehouse.models import DataWarehouseJoin, DataWarehouseModelPath, DataWarehouseSavedQuery + +logger = structlog.get_logger(__name__) class DataWarehouseSavedQuerySerializer(serializers.ModelSerializer): @@ -61,25 +66,45 @@ def create(self, validated_data): except Exception as err: raise serializers.ValidationError(str(err)) - view.save() + with transaction.atomic(): + view.save() + + try: + DataWarehouseModelPath.objects.create_from_saved_query(view) + except Exception: + # For now, do not fail saved query creation if we cannot model-ize it. + # Later, after bugs and errors have been ironed out, we may tie these two + # closer together. + logger.exception("Failed to create model path when creating view %s", view.name) + return view def update(self, instance: Any, validated_data: Any) -> Any: - view: DataWarehouseSavedQuery = super().update(instance, validated_data) + with transaction.atomic(): + view: DataWarehouseSavedQuery = super().update(instance, validated_data) + + try: + view.columns = view.get_columns() + view.external_tables = view.s3_tables + except RecursionError: + raise serializers.ValidationError("Model contains a cycle") + + except Exception as err: + raise serializers.ValidationError(str(err)) + + view.save() + + try: + DataWarehouseModelPath.objects.update_from_saved_query(view) + except Exception: + logger.exception("Failed to update model path when updating view %s", view.name) - try: - view.columns = view.get_columns() - view.external_tables = view.s3_tables - except Exception as err: - raise serializers.ValidationError(str(err)) - view.save() return view def validate_query(self, query): team_id = self.context["team_id"] context = HogQLContext(team_id=team_id, enable_select_queries=True) - context.max_view_depth = 0 select_ast = parse_select(query["query"]) _is_valid_view = is_valid_view(select_ast) if not _is_valid_view: @@ -126,3 +151,64 @@ def destroy(self, request: request.Request, *args: Any, **kwargs: Any) -> respon self.perform_destroy(instance) return response.Response(status=status.HTTP_204_NO_CONTENT) + + @action(methods=["POST"], detail=True) + def ancestors(self, request: request.Request, *args, **kwargs) -> response.Response: + """Return the ancestors of this saved query. + + By default, we return the immediate parents. The `level` parameter can be used to + look further back into the ancestor tree. If `level` overshoots (i.e. points to only + ancestors beyond the root), we return an empty list. + """ + level = request.data.get("level", 1) + + saved_query = self.get_object() + saved_query_id = saved_query.id.hex + lquery = f"*{{{level},}}.{saved_query_id}" + + paths = DataWarehouseModelPath.objects.filter(team=saved_query.team, path__lquery=lquery) + + if not paths: + return response.Response({"ancestors": []}) + + ancestors = set() + for model_path in paths: + offset = len(model_path.path) - level - 1 # -1 corrects for level being 1-indexed + + if offset < 0: + continue + + ancestors.add(model_path.path[offset]) + + return response.Response({"ancestors": ancestors}) + + @action(methods=["POST"], detail=True) + def descendants(self, request: request.Request, *args, **kwargs) -> response.Response: + """Return the descendants of this saved query. + + By default, we return the immediate children. The `level` parameter can be used to + look further ahead into the descendants tree. If `level` overshoots (i.e. points to only + descendants further than a leaf), we return an empty list. + """ + level = request.data.get("level", 1) + + saved_query = self.get_object() + saved_query_id = saved_query.id.hex + + lquery = f"*.{saved_query_id}.*{{{level},}}" + paths = DataWarehouseModelPath.objects.filter(team=saved_query.team, path__lquery=lquery) + + if not paths: + return response.Response({"descendants": []}) + + descendants = set() + + for model_path in paths: + offset = model_path.path.index(saved_query_id) + level + + if offset > len(model_path.path): + continue + + descendants.add(model_path.path[offset]) + + return response.Response({"descendants": descendants}) diff --git a/posthog/warehouse/api/test/test_modeling.py b/posthog/warehouse/api/test/test_modeling.py new file mode 100644 index 00000000000000..eb62b6a1d47fe0 --- /dev/null +++ b/posthog/warehouse/api/test/test_modeling.py @@ -0,0 +1,43 @@ +from posthog.test.base import APIBaseTest +from posthog.warehouse.models import DataWarehouseModelPath, DataWarehouseSavedQuery + + +class TestDag(APIBaseTest): + def test_get_dag(self): + parent_query = """\ + select + events.event, + persons.properties + from events + left join persons on events.person_id = persons.id + where events.event = 'login' and person.pdi != 'some_distinct_id' + """ + parent_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model", + query={"query": parent_query}, + ) + child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_child", + query={"query": "select * from my_model as my_other_model"}, + ) + DataWarehouseModelPath.objects.create_from_saved_query(parent_saved_query) + DataWarehouseModelPath.objects.create_from_saved_query(child_saved_query) + + response = self.client.get( + f"/api/projects/{self.team.id}/warehouse_dag", + ) + self.assertEqual(response.status_code, 200, response.content) + dag = response.json() + + self.assertIn([parent_saved_query.id.hex, child_saved_query.id.hex], dag["edges"]) + self.assertIn(["events", parent_saved_query.id.hex], dag["edges"]) + self.assertIn(["persons", parent_saved_query.id.hex], dag["edges"]) + self.assertEqual(len(dag["edges"]), 3) + + self.assertIn([child_saved_query.id.hex, "SavedQuery"], dag["nodes"]) + self.assertIn([parent_saved_query.id.hex, "SavedQuery"], dag["nodes"]) + self.assertIn(["events", "PostHog"], dag["nodes"]) + self.assertIn(["persons", "PostHog"], dag["nodes"]) + self.assertEqual(len(dag["nodes"]), 4) diff --git a/posthog/warehouse/api/test/test_saved_query.py b/posthog/warehouse/api/test/test_saved_query.py index 4e87ecfb39a63b..80deaad72ca3ad 100644 --- a/posthog/warehouse/api/test/test_saved_query.py +++ b/posthog/warehouse/api/test/test_saved_query.py @@ -1,4 +1,7 @@ +import uuid + from posthog.test.base import APIBaseTest +from posthog.warehouse.models import DataWarehouseModelPath class TestSavedQuery(APIBaseTest): @@ -9,7 +12,7 @@ def test_create(self): "name": "event_view", "query": { "kind": "HogQLQuery", - "query": f"select event as event from events LIMIT 100", + "query": "select event as event from events LIMIT 100", }, }, ) @@ -38,7 +41,7 @@ def test_create_name_overlap_error(self): "name": "events", "query": { "kind": "HogQLQuery", - "query": f"select event as event from events LIMIT 100", + "query": "select event as event from events LIMIT 100", }, }, ) @@ -51,7 +54,7 @@ def test_saved_query_doesnt_exist(self): "name": "event_view", "query": { "kind": "HogQLQuery", - "query": f"select event as event from event_view LIMIT 100", + "query": "select event as event from event_view LIMIT 100", }, }, ) @@ -64,7 +67,7 @@ def test_view_updated(self): "name": "event_view", "query": { "kind": "HogQLQuery", - "query": f"select event as event from events LIMIT 100", + "query": "select event as event from events LIMIT 100", }, }, ) @@ -75,7 +78,7 @@ def test_view_updated(self): { "query": { "kind": "HogQLQuery", - "query": f"select distinct_id as distinct_id from events LIMIT 100", + "query": "select distinct_id as distinct_id from events LIMIT 100", }, }, ) @@ -105,7 +108,7 @@ def test_nested_view(self): "name": "event_view", "query": { "kind": "HogQLQuery", - "query": f"select event as event from events LIMIT 100", + "query": "select event as event from events LIMIT 100", }, }, ) @@ -117,8 +120,184 @@ def test_nested_view(self): "name": "outer_event_view", "query": { "kind": "HogQLQuery", - "query": f"select event from event_view LIMIT 100", + "query": "select event from event_view LIMIT 100", }, }, ) self.assertEqual(saved_view_2_response.status_code, 400, saved_view_2_response.content) + + def test_create_with_saved_query(self): + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/", + { + "name": "event_view", + "query": { + "kind": "HogQLQuery", + "query": "select event as event from events", + }, + }, + ) + + self.assertEqual(response.status_code, 201, response.content) + saved_query_id = response.json()["id"] + paths = list(DataWarehouseModelPath.objects.filter(saved_query_id=saved_query_id).all()) + self.assertEqual(len(paths), 1) + self.assertEqual(["events", uuid.UUID(saved_query_id).hex], paths[0].path) + + def test_create_with_nested_saved_query(self): + response_1 = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/", + { + "name": "event_view", + "query": { + "kind": "HogQLQuery", + "query": "select event as event from events", + }, + }, + ) + self.assertEqual(response_1.status_code, 201, response_1.content) + + response_2 = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/", + { + "name": "event_view_2", + "query": { + "kind": "HogQLQuery", + "query": "select event as event from event_view", + }, + }, + ) + self.assertEqual(response_2.status_code, 201, response_1.content) + + saved_query_id_hex_1 = uuid.UUID(response_1.json()["id"]).hex + saved_query_id_hex_2 = uuid.UUID(response_2.json()["id"]).hex + + paths = [model_path.path for model_path in DataWarehouseModelPath.objects.all()] + self.assertEqual(len(paths), 3) + self.assertIn(["events"], paths) + self.assertIn(["events", saved_query_id_hex_1], paths) + self.assertIn(["events", saved_query_id_hex_1, saved_query_id_hex_2], paths) + + def test_ancestors(self): + query = """\ + select + e.event as event, + p.properties as properties + from events as e + left join persons as p on e.person_id = p.id + where e.event = 'login' + """ + + response_parent = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/", + { + "name": "event_view", + "query": { + "kind": "HogQLQuery", + "query": query, + }, + }, + ) + + response_child = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/", + { + "name": "event_view_2", + "query": { + "kind": "HogQLQuery", + "query": "select event as event from event_view", + }, + }, + ) + + self.assertEqual(response_parent.status_code, 201, response_parent.content) + self.assertEqual(response_child.status_code, 201, response_child.content) + + saved_query_parent_id = response_parent.json()["id"] + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/{saved_query_parent_id}/ancestors", + ) + + self.assertEqual(response.status_code, 200, response.content) + parent_ancestors = response.json()["ancestors"] + parent_ancestors.sort() + self.assertEqual(parent_ancestors, ["events", "persons"]) + + saved_query_child_id = response_child.json()["id"] + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/{saved_query_child_id}/ancestors", + ) + + self.assertEqual(response.status_code, 200, response.content) + child_ancestors = response.json()["ancestors"] + self.assertEqual(child_ancestors, [uuid.UUID(saved_query_parent_id).hex]) + + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/{saved_query_child_id}/ancestors", {"level": 2} + ) + + self.assertEqual(response.status_code, 200, response.content) + child_ancestors_level_2 = response.json()["ancestors"] + child_ancestors_level_2.sort() + self.assertEqual(child_ancestors_level_2, ["events", "persons"]) + + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/{saved_query_child_id}/ancestors", {"level": 10} + ) + + self.assertEqual(response.status_code, 200, response.content) + child_ancestors_level_10 = response.json()["ancestors"] + self.assertEqual(child_ancestors_level_10, []) + + def test_descendants(self): + query = """\ + select + e.event as event, + p.properties as properties + from events as e + left join persons as p on e.person_id = p.id + where e.event = 'login' + """ + + response_parent = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/", + { + "name": "event_view", + "query": { + "kind": "HogQLQuery", + "query": query, + }, + }, + ) + + response_child = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/", + { + "name": "event_view_2", + "query": { + "kind": "HogQLQuery", + "query": "select event as event from event_view", + }, + }, + ) + + self.assertEqual(response_parent.status_code, 201, response_parent.content) + self.assertEqual(response_child.status_code, 201, response_child.content) + + saved_query_parent_id = response_parent.json()["id"] + saved_query_child_id = response_child.json()["id"] + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/{saved_query_parent_id}/descendants", + ) + + self.assertEqual(response.status_code, 200, response.content) + parent_descendants = response.json()["descendants"] + self.assertEqual(parent_descendants, [uuid.UUID(saved_query_child_id).hex]) + + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_saved_queries/{saved_query_child_id}/descendants", + ) + + self.assertEqual(response.status_code, 200, response.content) + child_ancestors = response.json()["descendants"] + self.assertEqual(child_ancestors, []) diff --git a/posthog/warehouse/models/__init__.py b/posthog/warehouse/models/__init__.py index ce98461b5cc6eb..5af80ccc4b3add 100644 --- a/posthog/warehouse/models/__init__.py +++ b/posthog/warehouse/models/__init__.py @@ -1,7 +1,8 @@ from .credential import * from .datawarehouse_saved_query import * from .external_data_job import * +from .external_data_schema import * from .external_data_source import * -from .table import * from .join import * -from .external_data_schema import * +from .modeling import * +from .table import * diff --git a/posthog/warehouse/models/modeling.py b/posthog/warehouse/models/modeling.py new file mode 100644 index 00000000000000..f72b87f4f3b373 --- /dev/null +++ b/posthog/warehouse/models/modeling.py @@ -0,0 +1,508 @@ +import collections.abc +import dataclasses +import enum +import uuid + +from django.contrib.postgres import indexes as pg_indexes +from django.core.exceptions import ObjectDoesNotExist +from django.db import connection, models, transaction + +from posthog.hogql import ast +from posthog.hogql.database.database import Database, create_hogql_database +from posthog.hogql.parser import parse_select +from posthog.models.team import Team +from posthog.models.user import User +from posthog.models.utils import ( + CreatedMetaFields, + UpdatedMetaFields, + UUIDModel, + uuid7, +) +from posthog.warehouse.models.datawarehouse_saved_query import DataWarehouseSavedQuery +from posthog.warehouse.models.table import DataWarehouseTable + +LabelPath = list[str] + + +class LabelTreeField(models.Field): + """A Django model field for a PostgreSQL label tree. + + We represent label trees in Python as a list of strings, each item + in the list being one of the labels of the underlying ltree. + """ + + description = "A PostgreSQL label tree field provided by the ltree extension" + + def db_type(self, connection): + return "ltree" + + def from_db_value(self, value, expression, connection) -> None | LabelPath: + if value is None: + return value + + return value.split(".") + + def to_python(self, value) -> None | LabelPath: + if value is None: + return value + + if isinstance(value, list): + return value + + return value.split(".") + + def get_prep_value(self, value: LabelPath) -> str: + return ".".join(value) + + +class LabelQuery(models.Lookup): + """Implement a lookup for ltree label queries using the ~ operator.""" + + lookup_name = "lquery" + + def __init__(self, *args, **kwargs): + self.prepare_rhs = False + super().__init__(*args, **kwargs) + + def as_sql(self, compiler, connection): + lhs, lhs_params = self.process_lhs(compiler, connection) + rhs, rhs_params = self.process_rhs(compiler, connection) + params = lhs_params + rhs_params + return "%s ~ %s" % (lhs, rhs), params # noqa: UP031 + + +LabelTreeField.register_lookup(LabelQuery) + + +def get_parents_from_model_query(model_query: str) -> set[str]: + """Get parents from a given query. + + The parents of a query are any names in the `FROM` clause of the query. + """ + + hogql_query = parse_select(model_query) + + if isinstance(hogql_query, ast.SelectUnionQuery): + queries = hogql_query.select_queries + else: + queries = [hogql_query] + + parents = set() + ctes = set() + + while queries: + query = queries.pop() + + if query.ctes is not None: + for name, cte in query.ctes.items(): + ctes.add(name) + + if isinstance(cte.expr, ast.SelectUnionQuery): + queries.extend(cte.expr.select_queries) + elif isinstance(cte.expr, ast.SelectQuery): + queries.append(cte.expr) + + join = query.select_from + + if join is None: + continue + + if isinstance(join.table, ast.SelectQuery): + if join.table.view_name is not None: + parents.add(join.table.view_name) + continue + + queries.append(join.table) + elif isinstance(join.table, ast.SelectUnionQuery): + queries.extend(join.table.select_queries) + + while join is not None: + parent_name = join.table.chain[0] # type: ignore + + if parent_name not in ctes and isinstance(parent_name, str): + parents.add(parent_name) + + join = join.next_join + + return parents + + +class NodeType(enum.Enum): + SAVED_QUERY = "SavedQuery" + POSTHOG = "PostHog" + TABLE = "Table" + + +NodeId = str +Node = tuple[NodeId, NodeType] +Edge = tuple[NodeId, NodeId] + + +@dataclasses.dataclass +class DAG: + """A basic DAG composed of nodes and edges.""" + + edges: set[Edge] + nodes: set[Node] + + +UPDATE_PATHS_QUERY = """\ +insert into posthog_datawarehousemodelpath ( + id, + team_id, + table_id, + saved_query_id, + path, + created_by_id, + created_at, + updated_at +) ( + select + id, + team_id, + table_id, + saved_query_id, + parent.path || subpath(model_path.path, index(model_path.path, text2ltree(%(child)s))) as path, + created_by_id, + created_at, + now() as updated_at + from + posthog_datawarehousemodelpath as model_path, + ( + select + path + from posthog_datawarehousemodelpath + where path ~ ('*.' || %(parent)s)::lquery + and team_id = %(team_id)s + ) as parent + where + model_path.path ~ ('*.' || %(child)s || '.*')::lquery + and team_id = %(team_id)s +) +on conflict (id) do +update + set path = EXCLUDED.path, + updated_at = now() +""" + +DELETE_DUPLICATE_PATHS_QUERY = """\ +delete from posthog_datawarehousemodelpath +where + team_id = %(team_id)s + and id in ( + select id + from ( + select id, row_number() over (partition by team_id, path) as row_number + from posthog_datawarehousemodelpath + ) partitioned + where partitioned.row_number > 1 +); +""" + + +class UnknownParentError(Exception): + """Exception raised when the parent for a model is not found.""" + + def __init__(self, parent: str, query: str): + super().__init__( + f"The parent name {parent} does not correspond to an existing PostHog table, Data Warehouse Table, or Data Warehouse Saved Query." + ) + self.query = query + + +class ModelPathAlreadyExistsError(Exception): + """Exception raised when trying to create paths for a model that already has some.""" + + def __init__(self, model_name: str): + super().__init__(f"Model {model_name} cannot be created as it already exists") + + +class ModelPathDoesNotExistError(Exception): + """Exception raised when trying to update paths for a model that doesn't exist.""" + + def __init__(self, model_name: str): + super().__init__(f"Model {model_name} doesn't exist") + + +class DataWarehouseModelPathManager(models.Manager["DataWarehouseModelPath"]): + """A model manager that implements some common path operations.""" + + def create_from_saved_query(self, saved_query: DataWarehouseSavedQuery) -> "list[DataWarehouseModelPath]": + """Create one or more model paths from a new `DataWarehouseSavedQuery`. + + Creating one or more model paths from a new `DataWarehouseSavedQuery` is straight-forward as we + don't have to worry about this model having its own children paths that need updating: We are + only adding a leaf node to all parents' paths. We check this model indeed does not exist to + ensure that is the case. + + Raises: + ValueError: If no paths exists for the provided `DataWarehouseSavedQuery`. + """ + return self.create_leaf_paths_from_query( + query=saved_query.query["query"], + team=saved_query.team, + saved_query_id=saved_query.id, + created_by=saved_query.created_by, + label=saved_query.id.hex, + ) + + def create_leaf_paths_from_query( + self, + query: str, + team: Team, + label: str, + saved_query_id: uuid.UUID, + created_by: User | None = None, + table_id: uuid.UUID | None = None, + ) -> "list[DataWarehouseModelPath]": + """Create all paths to a new leaf model. + + A path will be created for each parent, as extracted from the given query. + """ + base_params = { + "team": team, + "created_by": created_by, + "saved_query_id": saved_query_id, + "table_id": table_id, + } + + with transaction.atomic(): + if self.filter(team=team, saved_query_id=saved_query_id).exists(): + raise ModelPathAlreadyExistsError(saved_query_id.hex) + + parent_paths = self.get_or_create_query_parent_paths(query, team=team) + + results = self.bulk_create( + [ + DataWarehouseModelPath(id=uuid7(), path=[*model_path.path, label], **base_params) + for model_path in parent_paths + ] + ) + return results + + def get_or_create_root_path_for_posthog_source( + self, posthog_source_name: str, team: Team + ) -> tuple["DataWarehouseModelPath", bool]: + """Get a root path for a PostHog source, creating it if it doesn't exist. + + PostHog sources are well-known PostHog tables. We check against the team's HogQL database + to ensure that the source exists before creating the path. + + Raises: + ValueError: If the provided `posthog_source_name` is not a PostHog table. + + Returns: + A tuple with the model path and a `bool` indicating whether it was created or not. + """ + posthog_tables = self.get_hogql_database(team).get_posthog_tables() + if posthog_source_name not in posthog_tables: + raise ValueError(f"Provided source {posthog_source_name} is not a PostHog table") + + return self.get_or_create(path=[posthog_source_name], team=team, defaults={"saved_query": None}) + + def get_hogql_database(self, team: Team) -> Database: + """Get the HogQL database for given team.""" + return create_hogql_database(team_id=team.pk, team_arg=team) + + def get_or_create_root_path_for_data_warehouse_table( + self, data_warehouse_table: DataWarehouseTable + ) -> tuple["DataWarehouseModelPath", bool]: + """Get a root path for a `DataWarehouseTable`, creating it if it doesn't exist. + + A `DataWarehouseTable` is loaded by us into S3 or read directly from an external data source, + like our user's S3 bucket or their PostgreSQL database. + + Either way, it is a table we can consider a root node, as it's managed by data warehouse + data import workflows. + + Returns: + A tuple with the model path and a `bool` indicating whether it was created or not. + """ + table_id = data_warehouse_table.id + return self.get_or_create( + path=[table_id.hex], + team=data_warehouse_table.team, + defaults={"saved_query": None, "table": data_warehouse_table}, + ) + + def filter_all_leaf_paths(self, leaf_id: str | uuid.UUID, team: Team): + """Filter all paths to leaf node given by `leaf_id`.""" + if isinstance(leaf_id, uuid.UUID): + leaf_id = leaf_id.hex + return self.filter(team=team, path__lquery=f"*.{leaf_id}") + + def get_or_create_query_parent_paths(self, query: str, team: Team) -> list["DataWarehouseModelPath"]: + """Get a list of model paths for a query's parents, creating root nodes if they do not exist.""" + parent_paths = [] + for parent in get_parents_from_model_query(query): + try: + parent_path, _ = self.get_or_create_root_path_for_posthog_source(parent, team) + except ValueError: + pass + else: + parent_paths.append(parent_path) + continue + + try: + parent_query = DataWarehouseSavedQuery.objects.filter(team=team, name=parent).get() + except ObjectDoesNotExist: + pass + else: + parent_query_paths = list(self.filter_all_leaf_paths(parent_query.id.hex, team=team).all()) + if not parent_query_paths: + raise UnknownParentError(parent, query) + + parent_paths.extend(parent_query_paths) + continue + + try: + parent_table = DataWarehouseTable.objects.filter(team=team, name=parent).get() + except ObjectDoesNotExist: + pass + else: + parent_path, _ = self.get_or_create_root_path_for_data_warehouse_table(parent_table) + parent_paths.append(parent_path) + continue + + raise UnknownParentError(parent, query) + + return parent_paths + + def update_from_saved_query(self, saved_query: DataWarehouseSavedQuery) -> None: + """Update model paths from an existing `DataWarehouseSavedQuery`.""" + if not self.filter(team=saved_query.team, saved_query=saved_query).exists(): + raise ValueError("Provided saved query contains no paths to update.") + + self.update_paths_from_query( + query=saved_query.query["query"], + team=saved_query.team, + label=saved_query.id.hex, + saved_query_id=saved_query.id, + ) + + def update_paths_from_query( + self, + query: str, + team: Team, + label: str, + saved_query_id: uuid.UUID | None = None, + table_id: uuid.UUID | None = None, + ) -> None: + """Update all model paths from a given query. + + We parse the query to extract all its direct parents. Then, we update all the paths + that contain `label` to add an edge from parent and `label`, effectively removing the + previous parent path. + + This may lead to duplicate paths, so we have to defer constraints, until the end of + the transaction and clean them up. + """ + parents = get_parents_from_model_query(query) + posthog_tables = self.get_hogql_database(team).get_posthog_tables() + + base_params = { + "team_id": team.pk, + "saved_query_id": saved_query_id, + "table_id": table_id, + } + + with transaction.atomic(): + with connection.cursor() as cursor: + cursor.execute("SET CONSTRAINTS ALL DEFERRED") + + for parent in parents: + if parent in posthog_tables: + parent_id = parent + else: + try: + parent_query = DataWarehouseSavedQuery.objects.filter(team=team, name=parent).get() + except ObjectDoesNotExist: + try: + parent_table = DataWarehouseTable.objects.filter(team=team, name=parent).get() + except ObjectDoesNotExist: + raise UnknownParentError(parent, query) + else: + parent_id = parent_table.id.hex + else: + parent_id = parent_query.id.hex + + cursor.execute(UPDATE_PATHS_QUERY, params={**{"child": label, "parent": parent_id}, **base_params}) + + cursor.execute(DELETE_DUPLICATE_PATHS_QUERY, params={"team_id": team.pk}) + cursor.execute("SET CONSTRAINTS ALL IMMEDIATE") + + def get_longest_common_ancestor_path( + self, leaf_models: collections.abc.Iterable[DataWarehouseSavedQuery | DataWarehouseTable] + ) -> str | None: + """Return the longest common ancestor path among paths from all leaf models, if any.""" + query = "select lca(array_agg(path)) from posthog_datawarehousemodelpath where path ? %(lqueries)s" + + with connection.cursor() as cursor: + cursor.execute(query, params={"lqueries": [f"*.{leaf_model.id.hex}" for leaf_model in leaf_models]}) + row = cursor.fetchone() + + return row[0] or None + + def get_dag(self, team: Team): + """Return a DAG of all the models for the given team. + + A DAG is composed by a set of edges and a set of nodes, where each node is a tuple + of a node id and type, and each edge is a pair of two nodes. The edges are directed in the + order of the tuple elements. + + TODO: + * Should we resolve node id and node type to their underlying models? + * Edges could be indexed by node if required by callers. + * Certain paths can be redundant and could be excluded from the query. + """ + edges = set() + nodes: set[Node] = set() + node_type: NodeType + node_id: NodeId + + for model_path in self.filter(team=team).select_related("saved_query", "table").all(): + if model_path.saved_query is not None: + node_type = NodeType.SAVED_QUERY + elif model_path.table is not None: + node_type = NodeType.TABLE + else: + node_type = NodeType.POSTHOG + + for index, node_id in enumerate(model_path.path): + try: + next_node_id = model_path.path[index + 1] + except IndexError: + node: tuple[NodeId, NodeType] = (node_id, node_type) + nodes.add(node) + else: + edges.add((node_id, next_node_id)) + + return DAG(edges=edges, nodes=nodes) + + +class DataWarehouseModelPath(CreatedMetaFields, UpdatedMetaFields, UUIDModel): + """Django model to represent paths to a data warehouse model. + + A data warehouse model is represented by a saved query, and the path to it contains all + tables and views that said query is selecting from, recursively all the way to root + PostHog tables and external data source tables. + """ + + class Meta: + indexes = [ + models.Index(fields=("team_id", "path"), name="team_id_path"), + models.Index(fields=("team_id", "saved_query_id"), name="team_id_saved_query_id"), + pg_indexes.GistIndex("path", name="model_path_path"), + ] + constraints = [ + models.UniqueConstraint( + fields=("team_id", "path"), name="unique_team_id_path", deferrable=models.Deferrable.IMMEDIATE + ), + ] + + objects: DataWarehouseModelPathManager = DataWarehouseModelPathManager() + + path = LabelTreeField(null=False) + team = models.ForeignKey(Team, on_delete=models.CASCADE) + table = models.ForeignKey(DataWarehouseTable, null=True, default=None, on_delete=models.SET_NULL) + saved_query = models.ForeignKey(DataWarehouseSavedQuery, null=True, default=None, on_delete=models.SET_NULL) diff --git a/posthog/warehouse/models/test/test_modeling.py b/posthog/warehouse/models/test/test_modeling.py new file mode 100644 index 00000000000000..9ab2ab2e60616c --- /dev/null +++ b/posthog/warehouse/models/test/test_modeling.py @@ -0,0 +1,258 @@ +import pytest +from django.db.utils import ProgrammingError + +from posthog.test.base import BaseTest +from posthog.warehouse.models.datawarehouse_saved_query import DataWarehouseSavedQuery +from posthog.warehouse.models.modeling import ( + DataWarehouseModelPath, + NodeType, + UnknownParentError, + get_parents_from_model_query, +) + + +@pytest.mark.parametrize( + "query,parents", + [ + ("select * from events, persons", {"events", "persons"}), + ("select * from some_random_view", {"some_random_view"}), + ( + "with cte as (select * from events), cte2 as (select * from cte), cte3 as (select 1) select * from cte2", + {"events"}, + ), + ("select 1", set()), + ], +) +def test_get_parents_from_model_query(query: str, parents: set[str]): + """Test parents are correctly parsed from sample queries.""" + assert parents == get_parents_from_model_query(query) + + +class TestModelPath(BaseTest): + def test_create_from_posthog_root_nodes_query(self): + """Test creation of a model path from a query that reads from PostHog root tables.""" + query = """\ + select + events.event, + persons.properties + from events + left join persons on events.person_id = persons.id + where events.event = 'login' and person.pdi != 'some_distinct_id' + """ + saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model", + query={"query": query}, + ) + + model_paths = DataWarehouseModelPath.objects.create_from_saved_query(saved_query) + paths = [model_path.path for model_path in model_paths] + + self.assertEqual(len(paths), 2) + self.assertIn(["events", saved_query.id.hex], paths) + self.assertIn(["persons", saved_query.id.hex], paths) + + def test_create_from_existing_path(self): + """Test creation of a model path from a query that reads from another query.""" + parent_query = """\ + select + events.event, + persons.properties + from events + left join persons on events.person_id = persons.id + where events.event = 'login' and person.pdi != 'some_distinct_id' + """ + parent_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model", + query={"query": parent_query}, + ) + child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_child", + query={"query": "select * from my_model as my_other_model"}, + ) + + parent_model_paths = DataWarehouseModelPath.objects.create_from_saved_query(parent_saved_query) + child_model_paths = DataWarehouseModelPath.objects.create_from_saved_query(child_saved_query) + + parent_paths = [model_path.path for model_path in parent_model_paths] + child_paths = [model_path.path for model_path in child_model_paths] + + self.assertEqual(len(parent_paths), 2) + self.assertIn(["events", parent_saved_query.id.hex], parent_paths) + self.assertIn(["persons", parent_saved_query.id.hex], parent_paths) + + self.assertEqual(len(child_paths), 2) + self.assertIn(["events", parent_saved_query.id.hex, child_saved_query.id.hex], child_paths) + self.assertIn(["persons", parent_saved_query.id.hex, child_saved_query.id.hex], child_paths) + + def test_update_path_from_saved_query(self): + """Test update of a model path from a query that reads from another query.""" + parent_query = """\ + select + events.event, + persons.properties + from events + left join persons on events.person_id = persons.id + where events.event = 'login' and person.pdi != 'some_distinct_id' + """ + parent_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model", + query={"query": parent_query}, + ) + child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_child", + query={"query": "select * from my_model as my_other_model"}, + ) + grand_child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_grand_child", + query={"query": "select * from my_model_child"}, + ) + + DataWarehouseModelPath.objects.create_from_saved_query(parent_saved_query) + DataWarehouseModelPath.objects.create_from_saved_query(child_saved_query) + DataWarehouseModelPath.objects.create_from_saved_query(grand_child_saved_query) + + child_saved_query.query = {"query": "select * from events as my_other_model"} + child_saved_query.save() + DataWarehouseModelPath.objects.update_from_saved_query(child_saved_query) + + child_refreshed_model_paths = DataWarehouseModelPath.objects.filter( + team=self.team, saved_query=child_saved_query + ).all() + child_paths = [model_path.path for model_path in child_refreshed_model_paths] + grand_child_refreshed_model_paths = DataWarehouseModelPath.objects.filter( + team=self.team, saved_query=grand_child_saved_query + ).all() + grand_child_paths = [model_path.path for model_path in grand_child_refreshed_model_paths] + + self.assertEqual(len(child_paths), 1) + self.assertIn(["events", child_saved_query.id.hex], child_paths) + self.assertEqual(len(grand_child_paths), 1) + self.assertIn(["events", child_saved_query.id.hex, grand_child_saved_query.id.hex], grand_child_paths) + + def test_get_longest_common_ancestor_path(self): + """Test resolving the longest common ancestor of two simple queries.""" + query_1 = """\ + select + events.event + from events + where events.event = 'login' + """ + query_2 = """\ + select + events.person_id as person_id + from events + where events.event = 'logout' + """ + + saved_query_1 = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model1", + query={"query": query_1}, + ) + saved_query_2 = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model2", + query={"query": query_2}, + ) + DataWarehouseModelPath.objects.create_from_saved_query(saved_query_1) + DataWarehouseModelPath.objects.create_from_saved_query(saved_query_2) + + lca = DataWarehouseModelPath.objects.get_longest_common_ancestor_path([saved_query_1, saved_query_2]) + self.assertEqual(lca, "events") + + def test_get_dag(self): + """Test the generation of a DAG with a couple simple models.""" + parent_query = """\ + select + events.event, + persons.properties + from events + left join persons on events.person_id = persons.id + where events.event = 'login' and person.pdi != 'some_distinct_id' + """ + parent_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model", + query={"query": parent_query}, + ) + child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_child", + query={"query": "select * from my_model as my_other_model"}, + ) + + DataWarehouseModelPath.objects.create_from_saved_query(parent_saved_query) + DataWarehouseModelPath.objects.create_from_saved_query(child_saved_query) + + dag = DataWarehouseModelPath.objects.get_dag(team=self.team) + + self.assertIn((parent_saved_query.id.hex, child_saved_query.id.hex), dag.edges) + self.assertIn(("events", parent_saved_query.id.hex), dag.edges) + self.assertIn(("persons", parent_saved_query.id.hex), dag.edges) + self.assertEqual(len(dag.edges), 3) + + self.assertIn((child_saved_query.id.hex, NodeType.SAVED_QUERY), dag.nodes) + self.assertIn((parent_saved_query.id.hex, NodeType.SAVED_QUERY), dag.nodes) + self.assertIn(("events", NodeType.POSTHOG), dag.nodes) + self.assertIn(("persons", NodeType.POSTHOG), dag.nodes) + self.assertEqual(len(dag.nodes), 4) + + def test_creating_cycles_raises_exception(self): + """Test cycles cannot be created just by creating queries that select from each other.""" + cycling_child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_other_model_child", + query={"query": "select * from my_model_grand_child"}, + ) + + grand_child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_grand_child", + query={"query": "select * from my_other_model_child"}, + ) + + with pytest.raises(UnknownParentError): + DataWarehouseModelPath.objects.create_from_saved_query(grand_child_saved_query) + DataWarehouseModelPath.objects.create_from_saved_query(cycling_child_saved_query) + + def test_creating_cycles_via_updates_raises_exception(self): + """Test cycles cannot be created just by updating queries that select from each other.""" + parent_query = """\ + select + events.event, + persons.properties + from events + left join persons on events.person_id = persons.id + where events.event = 'login' and person.pdi != 'some_distinct_id' + """ + parent_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model", + query={"query": parent_query}, + ) + child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_child", + query={"query": "select * from my_model"}, + ) + grand_child_saved_query = DataWarehouseSavedQuery.objects.create( + team=self.team, + name="my_model_grand_child", + query={"query": "select * from my_model_child"}, + ) + + DataWarehouseModelPath.objects.create_from_saved_query(parent_saved_query) + DataWarehouseModelPath.objects.create_from_saved_query(child_saved_query) + DataWarehouseModelPath.objects.create_from_saved_query(grand_child_saved_query) + + child_saved_query.query = {"query": "select * from my_model union all select * from my_model_grand_child"} + child_saved_query.save() + + with pytest.raises(ProgrammingError): + DataWarehouseModelPath.objects.update_from_saved_query(child_saved_query)