From dd9f827d7dd491d0a638a1d720326991211bb15d Mon Sep 17 00:00:00 2001 From: Daniel Bachhuber Date: Mon, 2 Dec 2024 06:37:01 -0800 Subject: [PATCH] feat(data-warehouse): Support for Experiment-optimized JOINs (#26446) Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- frontend/src/lib/api.ts | 7 +- .../scenes/data-warehouse/ViewLinkModal.tsx | 36 ++++++ .../scenes/data-warehouse/viewLinkLogic.tsx | 36 ++++++ frontend/src/types.ts | 4 + posthog/hogql/database/database.py | 4 +- posthog/hogql/printer.py | 7 +- .../experiment_trends_query_runner.py | 86 +------------ .../test_experiment_trends_query_runner.py | 34 +++-- .../insights/trends/trends_query_runner.py | 7 +- .../0524_datawarehousejoin_configuration.py | 17 +++ posthog/migrations/max_migration.txt | 2 +- .../test_session_recordings.ambr | 120 +++++++++++------- posthog/warehouse/api/test/test_view_link.py | 83 ++++++++++++ posthog/warehouse/api/view_link.py | 1 + posthog/warehouse/models/join.py | 87 +++++++++++++ 15 files changed, 374 insertions(+), 157 deletions(-) create mode 100644 posthog/migrations/0524_datawarehousejoin_configuration.py diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 4c10f7c0660e5..1701d3232a439 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -2356,7 +2356,12 @@ const api = { viewId: DataWarehouseViewLink['id'], data: Pick< DataWarehouseViewLink, - 'source_table_name' | 'source_table_key' | 'joining_table_name' | 'joining_table_key' | 'field_name' + | 'source_table_name' + | 'source_table_key' + | 'joining_table_name' + | 'joining_table_key' + | 'field_name' + | 'configuration' > ): Promise { return await new ApiRequest().dataWarehouseViewLink(viewId).update({ data }) diff --git a/frontend/src/scenes/data-warehouse/ViewLinkModal.tsx b/frontend/src/scenes/data-warehouse/ViewLinkModal.tsx index d6f9e9f083146..3004b8ee60daf 100644 --- a/frontend/src/scenes/data-warehouse/ViewLinkModal.tsx +++ b/frontend/src/scenes/data-warehouse/ViewLinkModal.tsx @@ -3,6 +3,7 @@ import './ViewLinkModal.scss' import { IconCollapse, IconExpand } from '@posthog/icons' import { LemonButton, + LemonCheckbox, LemonDivider, LemonDropdown, LemonInput, @@ -58,6 +59,8 @@ export function ViewLinkForm(): JSX.Element { sourceIsUsingHogQLExpression, joiningIsUsingHogQLExpression, isViewLinkSubmitting, + experimentsOptimized, + experimentsTimestampKey, } = useValues(viewLinkLogic) const { selectJoiningTable, @@ -66,6 +69,8 @@ export function ViewLinkForm(): JSX.Element { setFieldName, selectSourceKey, selectJoiningKey, + setExperimentsOptimized, + selectExperimentsTimestampKey, } = useActions(viewLinkLogic) const [advancedSettingsExpanded, setAdvancedSettingsExpanded] = useState(false) @@ -151,6 +156,37 @@ export function ViewLinkForm(): JSX.Element { + {'events' === selectedJoiningTableName && ( +
+ +
+
+ Optimize for Experiments + + setExperimentsOptimized(checked)} + fullWidth + label="Limit join to most recent matching event based on timestamp" + /> + +
+
+ Source Timestamp Key + + + +
+
+
+ )} {sqlCodeSnippet && (
diff --git a/frontend/src/scenes/data-warehouse/viewLinkLogic.tsx b/frontend/src/scenes/data-warehouse/viewLinkLogic.tsx index b55875358c7ed..9d2a7cd171d5b 100644 --- a/frontend/src/scenes/data-warehouse/viewLinkLogic.tsx +++ b/frontend/src/scenes/data-warehouse/viewLinkLogic.tsx @@ -41,6 +41,8 @@ export const viewLinkLogic = kea([ deleteViewLink: (table, column) => ({ table, column }), setError: (error: string) => ({ error }), setFieldName: (fieldName: string) => ({ fieldName }), + setExperimentsOptimized: (experimentsOptimized: boolean) => ({ experimentsOptimized }), + selectExperimentsTimestampKey: (experimentsTimestampKey: string | null) => ({ experimentsTimestampKey }), clearModalFields: true, })), reducers({ @@ -101,6 +103,22 @@ export const viewLinkLogic = kea([ clearModalFields: () => '', }, ], + experimentsOptimized: [ + false as boolean, + { + setExperimentsOptimized: (_, { experimentsOptimized }) => experimentsOptimized, + toggleEditJoinModal: (_, { join }) => join.configuration?.experiments_optimized ?? false, + clearModalFields: () => false, + }, + ], + experimentsTimestampKey: [ + null as string | null, + { + selectExperimentsTimestampKey: (_, { experimentsTimestampKey }) => experimentsTimestampKey, + toggleEditJoinModal: (_, { join }) => join.configuration?.experiments_timestamp_key ?? null, + clearModalFields: () => null, + }, + ], isJoinTableModalOpen: [ false, { @@ -136,6 +154,10 @@ export const viewLinkLogic = kea([ joining_table_name, joining_table_key: values.selectedJoiningKey ?? undefined, field_name: values.fieldName, + configuration: { + experiments_optimized: values.experimentsOptimized, + experiments_timestamp_key: values.experimentsTimestampKey ?? undefined, + }, }) actions.toggleJoinTableModal() @@ -156,6 +178,10 @@ export const viewLinkLogic = kea([ joining_table_name, joining_table_key: values.selectedJoiningKey ?? undefined, field_name: values.fieldName, + configuration: { + experiments_optimized: values.experimentsOptimized, + experiments_timestamp_key: values.experimentsTimestampKey ?? undefined, + }, }) actions.toggleJoinTableModal() @@ -175,6 +201,16 @@ export const viewLinkLogic = kea([ toggleEditJoinModal: ({ join }) => { actions.setViewLinkValues(join) }, + setExperimentsOptimized: ({ experimentsOptimized }) => { + if (!experimentsOptimized) { + actions.selectExperimentsTimestampKey(null) + } + }, + selectExperimentsTimestampKey: ({ experimentsTimestampKey }) => { + if (experimentsTimestampKey) { + actions.setExperimentsOptimized(true) + } + }, })), selectors({ selectedSourceTable: [ diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 42031efa47e5a..c4f4031a0e06e 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -4053,6 +4053,10 @@ export interface DataWarehouseViewLink { field_name?: string created_by?: UserBasicType | null created_at?: string | null + configuration?: { + experiments_optimized?: boolean + experiments_timestamp_key?: string | null + } } export enum DataWarehouseSettingsTab { diff --git a/posthog/hogql/database/database.py b/posthog/hogql/database/database.py index 9a990b518a7d3..94f9e1729ac41 100644 --- a/posthog/hogql/database/database.py +++ b/posthog/hogql/database/database.py @@ -409,7 +409,9 @@ def define_mappings(warehouse: dict[str, Table], get_table: Callable): from_field=from_field, to_field=to_field, join_table=joining_table, - join_function=join.join_function(), + join_function=join.join_function_for_experiments() + if "events" == join.joining_table_name and join.configuration.get("experiments_optimized") + else join.join_function(), ) if join.source_table_name == "persons": diff --git a/posthog/hogql/printer.py b/posthog/hogql/printer.py index dee9988d97c0c..37fea932f2014 100644 --- a/posthog/hogql/printer.py +++ b/posthog/hogql/printer.py @@ -23,7 +23,7 @@ ) from posthog.hogql.context import HogQLContext from posthog.hogql.database.models import Table, FunctionCallTable, SavedQuery -from posthog.hogql.database.database import Database, create_hogql_database +from posthog.hogql.database.database import create_hogql_database from posthog.hogql.database.s3_table import S3Table from posthog.hogql.errors import ImpossibleASTError, InternalHogQLError, QueryError, ResolutionError from posthog.hogql.escape_sql import ( @@ -66,9 +66,7 @@ def team_id_guard_for_table(table_type: Union[ast.TableType, ast.TableAliasType] ) -def to_printed_hogql( - query: ast.Expr, team: Team, modifiers: Optional[HogQLQueryModifiers] = None, database: Optional["Database"] = None -) -> str: +def to_printed_hogql(query: ast.Expr, team: Team, modifiers: Optional[HogQLQueryModifiers] = None) -> str: """Prints the HogQL query without mutating the node""" return print_ast( clone_expr(query), @@ -77,7 +75,6 @@ def to_printed_hogql( team_id=team.pk, enable_select_queries=True, modifiers=create_default_modifiers_for_team(team, modifiers), - database=database, ), pretty=True, ) diff --git a/posthog/hogql_queries/experiments/experiment_trends_query_runner.py b/posthog/hogql_queries/experiments/experiment_trends_query_runner.py index 5f5a93a84cbdb..06619c4dfeee1 100644 --- a/posthog/hogql_queries/experiments/experiment_trends_query_runner.py +++ b/posthog/hogql_queries/experiments/experiment_trends_query_runner.py @@ -3,9 +3,6 @@ from django.conf import settings from posthog.constants import ExperimentNoResultsErrorKeys from posthog.hogql import ast -from posthog.hogql.context import HogQLContext -from posthog.hogql.database.database import create_hogql_database -from posthog.hogql.database.models import LazyJoin from posthog.hogql_queries.experiments import CONTROL_VARIANT_KEY from posthog.hogql_queries.experiments.trends_statistics import ( are_results_significant, @@ -37,7 +34,7 @@ TrendsQuery, TrendsQueryResponse, ) -from typing import Any, Optional, cast +from typing import Any, Optional import threading @@ -255,86 +252,7 @@ def calculate(self) -> ExperimentTrendsQueryResponse: def run(query_runner: TrendsQueryRunner, result_key: str, is_parallel: bool): try: - # Create a new database instance where we can attach our - # custom join to the events table. It will be passed through - # and used by the query runner. - database = create_hogql_database(team_id=self.team.pk) - if self._is_data_warehouse_query(query_runner.query): - series_node = cast(DataWarehouseNode, query_runner.query.series[0]) - table = database.get_table(series_node.table_name) - table.fields["events"] = LazyJoin( - from_field=[series_node.distinct_id_field], - join_table=database.get_table("events"), - join_function=lambda join_to_add, context, node: ( - ast.JoinExpr( - table=ast.SelectQuery( - select=[ - ast.Alias(alias=name, expr=ast.Field(chain=["events", *chain])) - for name, chain in { - **join_to_add.fields_accessed, - "timestamp": ["timestamp"], - "distinct_id": ["distinct_id"], - "properties": ["properties"], - }.items() - ], - select_from=ast.JoinExpr(table=ast.Field(chain=["events"])), - ), - # ASOF JOIN finds the most recent matching event that occurred at or before each data warehouse timestamp. - # - # Why this matters: - # When a user performs an action (recorded in data warehouse), we want to know which - # experiment variant they were assigned at that moment. The most recent $feature_flag_called - # event before their action represents their active variant assignment. - # - # Example: - # Data Warehouse: timestamp=2024-01-03 12:00, distinct_id=user1 - # Events: - # 2024-01-02: (user1, variant='control') <- This event will be joined - # 2024-01-03: (user1, variant='test') <- Ignored (occurs after data warehouse timestamp) - # - # This ensures we capture the correct causal relationship: which experiment variant - # was the user assigned to when they performed the action? - join_type="ASOF LEFT JOIN", - alias=join_to_add.to_table, - constraint=ast.JoinConstraint( - expr=ast.And( - exprs=[ - ast.CompareOperation( - left=ast.Field(chain=[join_to_add.to_table, "event"]), - op=ast.CompareOperationOp.Eq, - right=ast.Constant(value="$feature_flag_called"), - ), - ast.CompareOperation( - left=ast.Field( - chain=[ - join_to_add.from_table, - series_node.distinct_id_field, - ] - ), - op=ast.CompareOperationOp.Eq, - right=ast.Field(chain=[join_to_add.to_table, "distinct_id"]), - ), - ast.CompareOperation( - left=ast.Field( - chain=[ - join_to_add.from_table, - series_node.timestamp_field, - ] - ), - op=ast.CompareOperationOp.GtEq, - right=ast.Field(chain=[join_to_add.to_table, "timestamp"]), - ), - ] - ), - constraint_type="ON", - ), - ) - ), - ) - - context = HogQLContext(team_id=self.team.pk, database=database) - - result = query_runner.calculate(context=context) + result = query_runner.calculate() shared_results[result_key] = result except Exception as e: errors.append(e) diff --git a/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py b/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py index 5645566a954aa..8837bfeab8607 100644 --- a/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py +++ b/posthog/hogql_queries/experiments/test/test_experiment_trends_query_runner.py @@ -1,5 +1,4 @@ from django.test import override_settings -from posthog.hogql.errors import QueryError from posthog.hogql_queries.experiments.experiment_trends_query_runner import ExperimentTrendsQueryRunner from posthog.models.experiment import Experiment, ExperimentHoldout from posthog.models.feature_flag.feature_flag import FeatureFlag @@ -34,6 +33,7 @@ from boto3 import resource from botocore.config import Config from posthog.warehouse.models.credential import DataWarehouseCredential +from posthog.warehouse.models.join import DataWarehouseJoin from posthog.warehouse.models.table import DataWarehouseTable TEST_BUCKET = "test_storage_bucket-posthog.hogql.datawarehouse.trendquery" + XDIST_SUFFIX @@ -137,7 +137,7 @@ def create_data_warehouse_table_with_payments(self): ) distinct_id = pa.array(["user_control_0", "user_test_1", "user_test_2", "user_test_3", "user_extra"]) amount = pa.array([100, 50, 75, 80, 90]) - names = ["id", "timestamp", "distinct_id", "amount"] + names = ["id", "dw_timestamp", "dw_distinct_id", "amount"] pq.write_to_dataset( pa.Table.from_arrays([id, timestamp, distinct_id, amount], names=names), @@ -163,12 +163,22 @@ def create_data_warehouse_table_with_payments(self): team=self.team, columns={ "id": "String", - "timestamp": "DateTime64(3, 'UTC')", - "distinct_id": "String", + "dw_timestamp": "DateTime64(3, 'UTC')", + "dw_distinct_id": "String", "amount": "Int64", }, credential=credential, ) + + DataWarehouseJoin.objects.create( + team=self.team, + source_table_name=table_name, + source_table_key="dw_distinct_id", + joining_table_name="events", + joining_table_key="distinct_id", + field_name="events", + configuration={"experiments_optimized": True, "experiments_timestamp_key": "dw_timestamp"}, + ) return table_name @freeze_time("2020-01-01T12:00:00Z") @@ -494,10 +504,10 @@ def test_query_runner_with_data_warehouse_series(self): series=[ DataWarehouseNode( id=table_name, - distinct_id_field="distinct_id", - id_field="distinct_id", + distinct_id_field="dw_distinct_id", + id_field="id", table_name=table_name, - timestamp_field="timestamp", + timestamp_field="dw_timestamp", ) ] ) @@ -587,10 +597,10 @@ def test_query_runner_with_invalid_data_warehouse_table_name(self): series=[ DataWarehouseNode( id=table_name, - distinct_id_field="distinct_id", - id_field="distinct_id", + distinct_id_field="dw_distinct_id", + id_field="id", table_name=table_name, - timestamp_field="timestamp", + timestamp_field="dw_timestamp", ) ] ) @@ -610,10 +620,10 @@ def test_query_runner_with_invalid_data_warehouse_table_name(self): query=ExperimentTrendsQuery(**experiment.metrics[0]["query"]), team=self.team ) with freeze_time("2023-01-07"): - with self.assertRaises(QueryError) as context: + with self.assertRaises(KeyError) as context: query_runner.calculate() - self.assertEqual(str(context.exception), 'Unknown table "invalid_table_name".') + self.assertEqual(str(context.exception), "'invalid_table_name'") @freeze_time("2020-01-01T12:00:00Z") def test_query_runner_with_avg_math(self): diff --git a/posthog/hogql_queries/insights/trends/trends_query_runner.py b/posthog/hogql_queries/insights/trends/trends_query_runner.py index 3a3dabc69641a..668cd8b2afb48 100644 --- a/posthog/hogql_queries/insights/trends/trends_query_runner.py +++ b/posthog/hogql_queries/insights/trends/trends_query_runner.py @@ -17,7 +17,6 @@ from posthog.clickhouse import query_tagging from posthog.hogql import ast from posthog.hogql.constants import MAX_SELECT_RETURNED_ROWS, LimitContext -from posthog.hogql.context import HogQLContext from posthog.hogql.printer import to_printed_hogql from posthog.hogql.query import execute_hogql_query from posthog.hogql.timings import HogQLTimings @@ -292,7 +291,7 @@ def to_actors_query_options(self) -> InsightActorsQueryOptionsResponse: compare=res_compare, ) - def calculate(self, context: Optional[HogQLContext] = None): + def calculate(self): queries = self.to_queries() if len(queries) == 0: @@ -304,8 +303,7 @@ def calculate(self, context: Optional[HogQLContext] = None): response_hogql_query = ast.SelectSetQuery.create_from_queries(queries, "UNION ALL") with self.timings.measure("printing_hogql_for_response"): - database = context.database if context else None - response_hogql = to_printed_hogql(response_hogql_query, self.team, self.modifiers, database) + response_hogql = to_printed_hogql(response_hogql_query, self.team, self.modifiers) res_matrix: list[list[Any] | Any | None] = [None] * len(queries) timings_matrix: list[list[QueryTiming] | None] = [None] * (2 + len(queries)) @@ -332,7 +330,6 @@ def run( timings=timings, modifiers=self.modifiers, limit_context=self.limit_context, - context=context, ) timings_matrix[index + 1] = response.timings diff --git a/posthog/migrations/0524_datawarehousejoin_configuration.py b/posthog/migrations/0524_datawarehousejoin_configuration.py new file mode 100644 index 0000000000000..a0335ea867acc --- /dev/null +++ b/posthog/migrations/0524_datawarehousejoin_configuration.py @@ -0,0 +1,17 @@ +# Generated by Django 4.2.15 on 2024-11-30 09:49 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0523_errortrackingsymbolset_content_hash"), + ] + + operations = [ + migrations.AddField( + model_name="datawarehousejoin", + name="configuration", + field=models.JSONField(default=dict, null=True), + ), + ] diff --git a/posthog/migrations/max_migration.txt b/posthog/migrations/max_migration.txt index a9fe528284987..12da4d602a5a2 100644 --- a/posthog/migrations/max_migration.txt +++ b/posthog/migrations/max_migration.txt @@ -1 +1 @@ -0523_errortrackingsymbolset_content_hash +0524_datawarehousejoin_configuration diff --git a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr index 7b27ec3e934a4..d84718acc7d20 100644 --- a/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr +++ b/posthog/session_recordings/test/__snapshots__/test_session_recordings.ambr @@ -640,12 +640,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '422' + AND "ee_accesscontrol"."resource_id" = '421' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '422' + AND "ee_accesscontrol"."resource_id" = '421' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -847,7 +847,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -1043,7 +1044,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -1688,12 +1690,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -1895,7 +1897,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -2042,7 +2045,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -2441,12 +2445,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -2648,7 +2652,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -2701,7 +2706,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -2793,7 +2799,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -3129,12 +3136,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -3336,7 +3343,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -3532,7 +3540,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -3881,12 +3890,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -4088,7 +4097,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -4233,7 +4243,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -4597,12 +4608,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -4804,7 +4815,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -4993,7 +5005,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -5395,12 +5408,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -5602,7 +5615,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -5659,12 +5673,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -5774,7 +5788,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -6034,7 +6049,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -6091,12 +6107,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -6206,7 +6222,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -6556,12 +6573,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -6763,7 +6780,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -6915,7 +6933,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -7248,12 +7267,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -7455,7 +7474,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -7602,7 +7622,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -7997,12 +8018,12 @@ LEFT OUTER JOIN "posthog_organizationmembership" ON ("ee_accesscontrol"."organization_member_id" = "posthog_organizationmembership"."id") WHERE (("ee_accesscontrol"."organization_member_id" IS NULL AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("posthog_organizationmembership"."user_id" = 99999 AND "ee_accesscontrol"."resource" = 'project' - AND "ee_accesscontrol"."resource_id" = '429' + AND "ee_accesscontrol"."resource_id" = '428' AND "ee_accesscontrol"."role_id" IS NULL AND "ee_accesscontrol"."team_id" = 99999) OR ("ee_accesscontrol"."organization_member_id" IS NULL @@ -8204,7 +8225,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -8257,7 +8279,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" @@ -8349,7 +8372,8 @@ "posthog_datawarehousejoin"."source_table_key", "posthog_datawarehousejoin"."joining_table_name", "posthog_datawarehousejoin"."joining_table_key", - "posthog_datawarehousejoin"."field_name" + "posthog_datawarehousejoin"."field_name", + "posthog_datawarehousejoin"."configuration" FROM "posthog_datawarehousejoin" WHERE ("posthog_datawarehousejoin"."team_id" = 99999 AND NOT ("posthog_datawarehousejoin"."deleted" diff --git a/posthog/warehouse/api/test/test_view_link.py b/posthog/warehouse/api/test/test_view_link.py index d8de45348b370..4bf4f697ef4a8 100644 --- a/posthog/warehouse/api/test/test_view_link.py +++ b/posthog/warehouse/api/test/test_view_link.py @@ -12,9 +12,56 @@ def test_create(self): "source_table_key": "uuid", "joining_table_key": "id", "field_name": "some_field", + "configuration": None, }, ) self.assertEqual(response.status_code, 201, response.content) + view_link = response.json() + self.assertEqual( + view_link, + { + "id": view_link["id"], + "deleted": False, + "created_by": view_link["created_by"], + "created_at": view_link["created_at"], + "source_table_name": "events", + "source_table_key": "uuid", + "joining_table_name": "persons", + "joining_table_key": "id", + "field_name": "some_field", + "configuration": None, + }, + ) + + def test_create_with_configuration(self): + response = self.client.post( + f"/api/projects/{self.team.id}/warehouse_view_links/", + { + "source_table_name": "events", + "joining_table_name": "persons", + "source_table_key": "uuid", + "joining_table_key": "id", + "field_name": "some_field", + "configuration": {"experiments_optimized": True, "experiments_timestamp_key": "timestamp"}, + }, + ) + self.assertEqual(response.status_code, 201, response.content) + view_link = response.json() + self.assertEqual( + view_link, + { + "id": view_link["id"], + "deleted": False, + "created_by": view_link["created_by"], + "created_at": view_link["created_at"], + "source_table_name": "events", + "source_table_key": "uuid", + "joining_table_name": "persons", + "joining_table_key": "id", + "field_name": "some_field", + "configuration": {"experiments_optimized": True, "experiments_timestamp_key": "timestamp"}, + }, + ) def test_create_key_error(self): response = self.client.post( @@ -55,6 +102,42 @@ def test_create_saved_query_join_key_function(self): ) self.assertEqual(response.status_code, 400, response.content) + def test_update_with_configuration(self): + join = DataWarehouseJoin.objects.create( + team=self.team, + source_table_name="events", + source_table_key="distinct_id", + joining_table_name="persons", + joining_table_key="id", + field_name="some_field", + configuration=None, + ) + join.save() + + response = self.client.patch( + f"/api/projects/{self.team.id}/warehouse_view_links/{join.id}/", + {"configuration": {"experiments_optimized": True, "experiments_timestamp_key": "timestamp"}}, + ) + self.assertEqual(response.status_code, 200, response.content) + view_link = response.json() + self.assertEqual( + view_link, + { + "id": view_link["id"], + "deleted": False, + "created_by": view_link["created_by"], + "created_at": view_link["created_at"], + "source_table_name": "events", + "source_table_key": "distinct_id", + "joining_table_name": "persons", + "joining_table_key": "id", + "field_name": "some_field", + "configuration": {"experiments_optimized": True, "experiments_timestamp_key": "timestamp"}, + }, + ) + join.refresh_from_db() + self.assertEqual(join.configuration, {"experiments_optimized": True, "experiments_timestamp_key": "timestamp"}) + def test_delete(self): response = self.client.post( f"/api/projects/{self.team.id}/warehouse_view_links/", diff --git a/posthog/warehouse/api/view_link.py b/posthog/warehouse/api/view_link.py index e3d701bb64b99..a249dbf9d3859 100644 --- a/posthog/warehouse/api/view_link.py +++ b/posthog/warehouse/api/view_link.py @@ -25,6 +25,7 @@ class Meta: "joining_table_name", "joining_table_key", "field_name", + "configuration", ] read_only_fields = ["id", "created_by", "created_at"] diff --git a/posthog/warehouse/models/join.py b/posthog/warehouse/models/join.py index febbf0182f1ca..b24d6916e93c9 100644 --- a/posthog/warehouse/models/join.py +++ b/posthog/warehouse/models/join.py @@ -3,6 +3,7 @@ from datetime import datetime from django.db import models +from posthog.hogql import ast from posthog.hogql.ast import SelectQuery from posthog.hogql.context import HogQLContext from posthog.hogql.database.models import LazyJoinToAdd @@ -40,6 +41,7 @@ class DataWarehouseJoin(CreatedMetaFields, UUIDModel, DeletedMetaFields): joining_table_name = models.CharField(max_length=400) joining_table_key = models.CharField(max_length=400) field_name = models.CharField(max_length=400) + configuration = models.JSONField(default=dict, null=True) def soft_delete(self): self.deleted = True @@ -94,3 +96,88 @@ def _join_function( return join_expr return _join_function + + def join_function_for_experiments(self): + def _join_function_for_experiments( + join_to_add: LazyJoinToAdd, + context: HogQLContext, + node: SelectQuery, + ): + if self.joining_table_name != "events": + raise ResolutionError("experiments_optimized is only supported for events table") + + if not self.configuration.get("experiments_optimized"): + raise ResolutionError("experiments_optimized is not enabled for this join") + + timestamp_key = self.configuration.get("experiments_timestamp_key") + if not timestamp_key: + raise ResolutionError("experiments_timestamp_key is not set for this join") + + return ast.JoinExpr( + table=ast.SelectQuery( + select=[ + ast.Alias( + alias=name, + expr=ast.Field(chain=["events", *(chain if isinstance(chain, list | tuple) else [chain])]), + ) + for name, chain in { + **join_to_add.fields_accessed, + "timestamp": ["timestamp"], + "distinct_id": ["distinct_id"], + "properties": ["properties"], + }.items() + ], + select_from=ast.JoinExpr(table=ast.Field(chain=["events"])), + ), + # ASOF JOIN finds the most recent matching event that occurred at or before each data warehouse timestamp. + # + # Why this matters: + # When a user performs an action (recorded in data warehouse), we want to know which + # experiment variant they were assigned at that moment. The most recent $feature_flag_called + # event before their action represents their active variant assignment. + # + # Example: + # Data Warehouse: timestamp=2024-01-03 12:00, distinct_id=user1 + # Events: + # 2024-01-02: (user1, variant='control') <- This event will be joined + # 2024-01-03: (user1, variant='test') <- Ignored (occurs after data warehouse timestamp) + # + # This ensures we capture the correct causal relationship: which experiment variant + # was the user assigned to when they performed the action? + join_type="ASOF LEFT JOIN", + alias=join_to_add.to_table, + constraint=ast.JoinConstraint( + expr=ast.And( + exprs=[ + ast.CompareOperation( + left=ast.Field(chain=[join_to_add.to_table, "event"]), + op=ast.CompareOperationOp.Eq, + right=ast.Constant(value="$feature_flag_called"), + ), + ast.CompareOperation( + left=ast.Field( + chain=[ + join_to_add.from_table, + self.source_table_key, + ] + ), + op=ast.CompareOperationOp.Eq, + right=ast.Field(chain=[join_to_add.to_table, "distinct_id"]), + ), + ast.CompareOperation( + left=ast.Field( + chain=[ + join_to_add.from_table, + timestamp_key, + ] + ), + op=ast.CompareOperationOp.GtEq, + right=ast.Field(chain=[join_to_add.to_table, "timestamp"]), + ), + ] + ), + constraint_type="ON", + ), + ) + + return _join_function_for_experiments