Skip to content

Commit

Permalink
fix(server/mongo): support missing fields with isnull/notnull [TCTC-9…
Browse files Browse the repository at this point in the history
…655] (#2289)

A join step can introduce missing fields: if the `$lookup` step does not return anything,
the right fields will be missing from the resulting documents. This causes
`{$eq: [$missing_field, null]}` to always evaluate to false since the field cannot be
looked up.

This PR fixes that behaviour by defaulting to null for missing expressions with the
isnull/isnotnull operators.

The same logic was applied to the `then` and `else` branches: an `$addFields` aggreation with a
`$cond` evalutating to an empty field just does NOT add a new field.

Signed-off-by: Luka Peschke <[email protected]>
  • Loading branch information
lukapeschke authored Nov 28, 2024
1 parent 588a23d commit ffff025
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 30 deletions.
4 changes: 4 additions & 0 deletions server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

### Fixed

- Mongo: the `isnull` and `notnull` operators now behave correctly in case of a missing field in the `ifthenelse` step

## [0.48.3] - 2024-11-21

### Fixed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
from typing import Any, Literal

from weaverbird.backends.mongo_translator.steps.formula import build_mongo_formula_tree
from weaverbird.backends.mongo_translator.steps.types import MongoStep
from weaverbird.backends.mongo_translator.utils import build_cond_expression
from weaverbird.pipeline.formula_ast.eval import FormulaParser
from weaverbird.pipeline.steps.ifthenelse import IfThenElse, IfthenelseStep


def _default_to_null(expr: Any) -> dict[Literal["$ifNull"], list[Any]]:
"""Makes the passed expression default to NULL if not defined"""
return {"$ifNull": [expr, None]}


def transform_ifthenelse_step(step: IfThenElse) -> MongoStep:
else_expr: dict | str | int | float | bool
if isinstance(step.else_value, IfThenElse):
Expand All @@ -21,7 +28,7 @@ def transform_ifthenelse_step(step: IfThenElse) -> MongoStep:
except SyntaxError: # step is a badly formatted string
return step.then

return {"$cond": {"if": if_expr, "then": then_expr, "else": else_expr}}
return {"$cond": {"if": if_expr, "then": _default_to_null(then_expr), "else": _default_to_null(else_expr)}}


def translate_ifthenelse(step: IfthenelseStep) -> list[MongoStep]:
Expand Down
3 changes: 2 additions & 1 deletion server/src/weaverbird/backends/mongo_translator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def build_cond_expression(

else:
if cond.operator == "notnull" or cond.operator == "isnull":
return {operator_mapping[cond.operator]: [f"${cond.column}", None]}
# $ifNull allows to replace missing values with NULL
return {operator_mapping[cond.operator]: [{"$ifNull": [f"${cond.column}", None]}, None]}

else:
cond_expression = {operator_mapping[cond.operator]: [f"${cond.column}", cond.value]}
Expand Down
156 changes: 156 additions & 0 deletions server/tests/backends/fixtures/ifthenelse/after_join.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
exclude:
- athena_pypika
- bigquery_pypika
- mysql_pypika
- postgres_pypika
- redshift_pypika
- snowflake_pypika
step:
pipeline:
- name: join
type: left
'on': [[id, id]]
right_pipeline:
- name: domain
domain: joined
- name: ifthenelse
new_column: new_col
if:
column: new_value
operator: isnull
else: '[new_value]'
then: '[value]'
other_inputs:
joined:
schema:
fields:
- name: new_value
type: string
- name: id
type: integer
pandas_version: 1.4.0
data:
- new_value: A
id: 1
- new_value: A
id: 2
- new_value: A
id: 3
- new_value: A
id: 4
- new_value: A
id: 5
- new_value: A
id: 6
- new_value: A
id: 7
- new_value: A
id: 8
- new_value: A
id: 9
input:
schema:
fields:
- name: value
type: string
- name: id
type: integer
pandas_version: 1.4.0
data:
- id: 1
value: BA
- id: 2
value: BA
- id: 3
value: BA
- id: 4
value: BA
- id: 5
value: BA
- id: 6
value: BA
- id: 7
value: BA
- id: 8
value: BA
- id: 9
value: BA
- id: 10
value: B
- id: 11
value: B
- id: 12
value: B
- id: 13
value: B
- id: 14
value: B
expected:
schema:
fields:
- name: value
type: string
- name: new_value
type: string
- name: id
type: integer
- name: new_col
type: string
pandas_version: 0.20.0
data:
- new_value: A
id: 1
value: BA
new_col: A
- new_value: A
id: 2
value: BA
new_col: A
- new_value: A
id: 3
value: BA
new_col: A
- new_value: A
id: 4
value: BA
new_col: A
- new_value: A
id: 5
value: BA
new_col: A
- new_value: A
id: 6
value: BA
new_col: A
- new_value: A
id: 7
value: BA
new_col: A
- new_value: A
id: 8
value: BA
new_col: A
- new_value: A
id: 9
value: BA
new_col: A
- new_value: null
id: 10
value: B
new_col: B
- new_value: null
id: 11
value: B
new_col: B
- new_value: null
id: 12
value: B
new_col: B
- new_value: null
id: 13
value: B
new_col: B
- new_value: null
id: 14
value: B
new_col: B
86 changes: 64 additions & 22 deletions server/tests/backends/mongo_translator/steps/test_ifthenelse.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,29 @@ def test_ifthenelse_basic():
then=1,
else_value=2,
)
) == [{"$addFields": {"foobar": {"$cond": {"if": {"$eq": ["$foo", 1]}, "then": 1, "else": 2}}}}]
) == [
{
"$addFields": {
"foobar": {
"$cond": {
"if": {"$eq": ["$foo", 1]},
"then": {
"$ifNull": [
1,
None,
]
},
"else": {
"$ifNull": [
2,
None,
]
},
}
}
}
}
]


def test_ifthenelse_formulas():
Expand All @@ -29,12 +51,17 @@ def test_ifthenelse_formulas():
"foobar": {
"$cond": {
"if": {"$eq": ["$foo", 1]},
"then": {"$multiply": ["$foo", 2]},
"then": {"$ifNull": [{"$multiply": ["$foo", 2]}, None]},
"else": {
"$cond": [
{"$in": [{"$add": ["$foo", 1]}, [0, None]]},
"$ifNull": [
{
"$cond": [
{"$in": [{"$add": ["$foo", 1]}, [0, None]]},
None,
{"$divide": ["$foo", {"$add": ["$foo", 1]}]},
]
},
None,
{"$divide": ["$foo", {"$add": ["$foo", 1]}]},
]
},
},
Expand Down Expand Up @@ -62,25 +89,40 @@ def test_ifthenelse_nested_else():
"foobar": {
"$cond": {
"if": {"$eq": ["$foo", 1]},
"then": {"$multiply": ["$foo", 2]},
"then": {"$ifNull": [{"$multiply": ["$foo", 2]}, None]},
"else": {
"$cond": {
"if": {"$gt": ["$foo", 2]},
"then": {
"$cond": [
{"$in": [2, [0, None]]},
None,
{"$mod": ["$foo", 2]},
]
"$ifNull": [
{
"$cond": {
"if": {"$gt": ["$foo", 2]},
"then": {
"$ifNull": [
{
"$cond": [
{"$in": [2, [0, None]]},
None,
{"$mod": ["$foo", 2]},
]
},
None,
]
},
"else": {
"$ifNull": [
{
"$cond": [
{"$in": [3, [0, None]]},
None,
{"$mod": ["$foo", 3]},
]
},
None,
],
},
}
},
"else": {
"$cond": [
{"$in": [3, [0, None]]},
None,
{"$mod": ["$foo", 3]},
]
},
}
None,
]
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,9 @@ def test_mongo_translator_pipeline(mongo_database, case_id, case_spec_file_path,
if "other_inputs" in spec and (
"join" in case_id or "append" in case_id
): # needed for join & append steps tests as we need a != collection
[
mongo_database[k].insert_many(
pd.read_json(StringIO(json.dumps(v)), orient="table").to_dict(orient="records")
)
for k, v in spec.get("other_inputs", {}).items()
]
for collection_name, raw_df in spec["other_inputs"].items():
df = pd.read_json(StringIO(json.dumps(raw_df)), orient="table")
mongo_database[collection_name].insert_many(df.to_dict(orient="records"))

# create query
steps = spec["step"]["pipeline"]
Expand Down

0 comments on commit ffff025

Please sign in to comment.