Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hogql): in cohort joins with version as well #21552

Merged
merged 4 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/check_temporal_up
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ while true; do
fi

if [ $SECONDS -ge $TIMEOUT ]; then
echo "Timed out waiting for Temporal to be ready"
exit 1
echo "Timed out ($TIMEOUT sec) waiting for Temporal to be ready. Crossing fingers and trying to run tests anyway."
exit 0
fi

echo "Waiting for Temporal to be ready"
Expand Down
1 change: 0 additions & 1 deletion mypy-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ posthog/hogql/transforms/in_cohort.py:0: error: Incompatible default for argumen
posthog/hogql/transforms/in_cohort.py:0: note: PEP 484 prohibits implicit Optional. Accordingly, mypy has changed its default to no_implicit_optional=True
posthog/hogql/transforms/in_cohort.py:0: note: Use https://github.com/hauntsaninja/no_implicit_optional to automatically upgrade your codebase
posthog/hogql/transforms/in_cohort.py:0: error: Argument "is_static" to "_add_join_for_cohort" of "InCohortResolver" has incompatible type "bool | None"; expected "bool" [arg-type]
posthog/hogql/transforms/in_cohort.py:0: error: Incompatible types in assignment (expression has type "ValuesQuerySet[Cohort, tuple[int, bool | None]]", variable has type "ValuesQuerySet[Cohort, tuple[int, bool | None, str | None]]") [assignment]
posthog/hogql/transforms/in_cohort.py:0: error: Argument "is_static" to "_add_join_for_cohort" of "InCohortResolver" has incompatible type "bool | None"; expected "bool" [arg-type]
posthog/hogql/transforms/in_cohort.py:0: error: Argument "table" to "JoinExpr" has incompatible type "Expr"; expected "SelectQuery | SelectUnionQuery | Field | None" [arg-type]
posthog/hogql/transforms/in_cohort.py:0: error: List item 0 has incompatible type "SelectQueryType | None"; expected "SelectQueryType" [list-item]
Expand Down
33 changes: 20 additions & 13 deletions posthog/hogql/transforms/in_cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,18 +287,19 @@ def visit_compare_operation(self, node: ast.CompareOperation):

if (isinstance(arg.value, int) or isinstance(arg.value, float)) and not isinstance(arg.value, bool):
cohorts = Cohort.objects.filter(id=int(arg.value), team_id=self.context.team_id).values_list(
"id", "is_static", "name"
"id", "is_static", "version", "name"
)
if len(cohorts) == 1:
self.context.add_notice(
start=arg.start,
end=arg.end,
message=f"Cohort #{cohorts[0][0]} can also be specified as {escape_clickhouse_string(cohorts[0][2])}",
fix=escape_clickhouse_string(cohorts[0][2]),
message=f"Cohort #{cohorts[0][0]} can also be specified as {escape_clickhouse_string(cohorts[0][3])}",
fix=escape_clickhouse_string(cohorts[0][3]),
)
self._add_join_for_cohort(
cohort_id=cohorts[0][0],
is_static=cohorts[0][1],
version=cohorts[0][2],
compare=node,
select=self.stack[-1],
negative=node.op == ast.CompareOperationOp.NotInCohort,
Expand All @@ -307,25 +308,26 @@ def visit_compare_operation(self, node: ast.CompareOperation):
raise QueryError(f"Could not find cohort with ID {arg.value}", node=arg)

if isinstance(arg.value, str):
cohorts = Cohort.objects.filter(name=arg.value, team_id=self.context.team_id).values_list(
"id", "is_static"
cohorts2 = Cohort.objects.filter(name=arg.value, team_id=self.context.team_id).values_list(
"id", "is_static", "version"
)
if len(cohorts) == 1:
if len(cohorts2) == 1:
self.context.add_notice(
start=arg.start,
end=arg.end,
message=f"Searching for cohort by name. Replace with numeric ID {cohorts[0][0]} to protect against renaming.",
fix=str(cohorts[0][0]),
message=f"Searching for cohort by name. Replace with numeric ID {cohorts2[0][0]} to protect against renaming.",
fix=str(cohorts2[0][0]),
)
self._add_join_for_cohort(
cohort_id=cohorts[0][0],
is_static=cohorts[0][1],
cohort_id=cohorts2[0][0],
is_static=cohorts2[0][1],
version=cohorts2[0][2],
compare=node,
select=self.stack[-1],
negative=node.op == ast.CompareOperationOp.NotInCohort,
)
return
elif len(cohorts) > 1:
elif len(cohorts2) > 1:
raise QueryError(f"Found multiple cohorts with name '{arg.value}'", node=arg)
raise QueryError(f"Could not find a cohort with the name '{arg.value}'", node=arg)
else:
Expand All @@ -336,6 +338,7 @@ def _add_join_for_cohort(
self,
cohort_id: int,
is_static: bool,
version: Optional[int],
select: ast.SelectQuery,
compare: ast.CompareOperation,
negative: bool,
Expand All @@ -354,11 +357,15 @@ def _add_join_for_cohort(
if must_add_join:
if is_static:
sql = "(SELECT person_id, 1 as matched FROM static_cohort_people WHERE cohort_id = {cohort_id})"
elif version is not None:
sql = "(SELECT person_id, 1 as matched FROM raw_cohort_people WHERE cohort_id = {cohort_id} AND version = {version})"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is version value supplied for the placeholder?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you raise a very good question

else:
sql = "(SELECT person_id, 1 as matched FROM raw_cohort_people WHERE cohort_id = {cohort_id} GROUP BY person_id, cohort_id, version HAVING sum(sign) > 0)"
subquery = parse_expr(
sql, {"cohort_id": ast.Constant(value=cohort_id)}, start=None
) # clear the source start position
sql,
{"cohort_id": ast.Constant(value=cohort_id), "version": ast.Constant(value=version)},
start=None, # clear the source start position
)

new_join = ast.JoinExpr(
alias=f"in_cohort__{cohort_id}",
Expand Down
Loading