Skip to content

Commit

Permalink
Use stored fields in the actual JOIN
Browse files Browse the repository at this point in the history
  • Loading branch information
danielbachhuber committed Nov 30, 2024
1 parent 3cb7aea commit 16d6c09
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def create_data_warehouse_table_with_payments(self):
)
distinct_id = pa.array(["user_control_0", "user_test_1", "user_test_2", "user_test_3", "user_extra"])
amount = pa.array([100, 50, 75, 80, 90])
names = ["id", "timestamp", "distinct_id", "amount"]
names = ["id", "dw_timestamp", "dw_distinct_id", "amount"]

pq.write_to_dataset(
pa.Table.from_arrays([id, timestamp, distinct_id, amount], names=names),
Expand All @@ -163,8 +163,8 @@ def create_data_warehouse_table_with_payments(self):
team=self.team,
columns={
"id": "String",
"timestamp": "DateTime64(3, 'UTC')",
"distinct_id": "String",
"dw_timestamp": "DateTime64(3, 'UTC')",
"dw_distinct_id": "String",
"amount": "Int64",
},
credential=credential,
Expand All @@ -173,11 +173,11 @@ def create_data_warehouse_table_with_payments(self):
DataWarehouseJoin.objects.create(
team=self.team,
source_table_name=table_name,
source_table_key="distinct_id",
source_table_key="dw_distinct_id",
joining_table_name="events",
joining_table_key="distinct_id",
field_name="events",
configuration={"experiments_optimized": True},
configuration={"experiments_optimized": True, "experiments_timestamp_field": "dw_timestamp"},
)
return table_name

Expand Down Expand Up @@ -504,10 +504,10 @@ def test_query_runner_with_data_warehouse_series(self):
series=[
DataWarehouseNode(
id=table_name,
distinct_id_field="distinct_id",
id_field="distinct_id",
distinct_id_field="dw_distinct_id",
id_field="id",
table_name=table_name,
timestamp_field="timestamp",
timestamp_field="dw_timestamp",
)
]
)
Expand Down Expand Up @@ -597,10 +597,10 @@ def test_query_runner_with_invalid_data_warehouse_table_name(self):
series=[
DataWarehouseNode(
id=table_name,
distinct_id_field="distinct_id",
id_field="distinct_id",
distinct_id_field="dw_distinct_id",
id_field="id",
table_name=table_name,
timestamp_field="timestamp",
timestamp_field="dw_timestamp",
)
]
)
Expand Down
15 changes: 10 additions & 5 deletions posthog/warehouse/models/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,19 @@ def _join_function(

return _join_function

def join_function_for_experiments(
self, override_source_table_key: Optional[str] = None, override_joining_table_key: Optional[str] = None
):
def join_function_for_experiments(self):
def _join_function_for_experiments(
join_to_add: LazyJoinToAdd,
context: HogQLContext,
node: SelectQuery,
):
if not self.configuration.get("experiments_optimized"):
raise ResolutionError("experiments_optimized is not enabled for this join")

timestamp_field = self.configuration.get("experiments_timestamp_field")
if not timestamp_field:
raise ResolutionError("experiments_timestamp_field is not set for this join")

return ast.JoinExpr(
table=ast.SelectQuery(
select=[
Expand Down Expand Up @@ -147,7 +152,7 @@ def _join_function_for_experiments(
left=ast.Field(
chain=[
join_to_add.from_table,
"distinct_id",
self.source_table_key,
]
),
op=ast.CompareOperationOp.Eq,
Expand All @@ -157,7 +162,7 @@ def _join_function_for_experiments(
left=ast.Field(
chain=[
join_to_add.from_table,
"timestamp",
timestamp_field,
]
),
op=ast.CompareOperationOp.GtEq,
Expand Down

0 comments on commit 16d6c09

Please sign in to comment.