Skip to content

Commit

Permalink
add ability to do list comprehensions in stream map expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
haleemur committed Oct 7, 2023
1 parent bab2aa9 commit 899e698
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
27 changes: 26 additions & 1 deletion singer_sdk/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,27 @@ def md5(string: str) -> str:
return hashlib.md5(string.encode("utf-8")).hexdigest() # noqa: S324


def compound_eval(
expr: str,
operators: t.Dict[str, t.Callable[[dict], dict | None]] = None,
functions: t.Dict[str, t.Callable[[dict], dict | None]] = None,
names=t.Dict[str, t.Any]
) -> t.Union[str, int, float, t.List, t.Set, t.Dict]:
"""Evaluate inline maps using the `EvalWithCompoundTypes` class
Args:
expr: expression to evaluate
operators: dictionary of operators and the functions they map to in the evaluation context
functions: dictionary of function names and definitions available in the evaluation context
names: dictionary of variable names available in the evaluation context
Returns:
result of the evaluated expression
"""
s = simpleeval.EvalWithCompoundTypes(operators=operators, functions=functions, names=names)
return s.eval(expr)


StreamMapsDict: TypeAlias = t.Dict[str, t.Union[str, dict, None]]


Expand Down Expand Up @@ -296,6 +317,7 @@ def functions(self) -> dict[str, t.Callable]:
funcs: dict[str, t.Any] = simpleeval.DEFAULT_FUNCTIONS.copy()
funcs["md5"] = md5
funcs["datetime"] = datetime
funcs["bool"] = bool
return funcs

def _eval(
Expand Down Expand Up @@ -325,7 +347,7 @@ def _eval(
# Allow access to original property value if applicable
names["self"] = record[property_name]
try:
result: str | int | float = simpleeval.simple_eval(
result: str | int | float = compound_eval(
expr,
functions=self.functions,
names=names,
Expand Down Expand Up @@ -374,6 +396,9 @@ def _eval_type(
if expr.startswith("str("):
return th.StringType()

if expr.startswith("bool("):
return th.BooleanType()

return th.StringType() if expr[0] == "'" and expr[-1] == "'" else default

def _init_functions_and_schema( # noqa: PLR0912, PLR0915, C901
Expand Down
35 changes: 35 additions & 0 deletions tests/core/test_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
PropertiesList,
Property,
StringType,
BooleanType,
OneOf,
)

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -56,6 +58,14 @@ def sample_catalog_dict() -> dict:
Property("the", StringType),
Property("brown", StringType),
).to_dict()
nested_jellybean_schema = PropertiesList(
Property("id", IntegerType),
Property("custom_fields",
ArrayType(ObjectType(
Property("id", IntegerType),
Property("value", OneOf(StringType, IntegerType, BooleanType))))
)
).to_dict()
return {
"streams": [
{
Expand All @@ -68,6 +78,11 @@ def sample_catalog_dict() -> dict:
"tap_stream_id": "foobars",
"schema": foobars_schema,
},
{
"stream": "nested_jellybean",
"tap_stream_id": "nested_jellybean",
"schema": nested_jellybean_schema
}
],
}

Expand Down Expand Up @@ -110,6 +125,10 @@ def sample_stream():
{"the": "quick"},
{"brown": "fox"},
],
"nested_jellybean": [
{"id": 123, "custom_fields": [{"id": 1, "value": "abc"}, {"id": 2, "value": 1212}, {"id": 3, "value": None}]},
{"id": 124, "custom_fields": [{"id": 1, "value": "foo"}, {"id": 2, "value": 9009}, {"id": 3, "value": True}]}
],
}


Expand All @@ -129,6 +148,12 @@ def transform_stream_maps():
"int_test": "int('0')",
"__else__": None,
},
"nested_jellybean": {
"custom_fields": "__NULL__",
"custom_field_1": 'dict([(x["id"], x["value"]) for x in custom_fields]).get(1)',
"custom_field_2": 'int(dict([(x["id"], x["value"]) for x in custom_fields]).get(2)) if dict([(x["id"], x["value"]) for x in custom_fields]).get(2) else None',
"custom_field_3": 'bool(dict([(x["id"], x["value"]) for x in custom_fields]).get(3)) if dict([(x["id"], x["value"]) for x in custom_fields]).get(3) else None',
}
}


Expand Down Expand Up @@ -185,6 +210,10 @@ def transformed_result(stream_map_config):
{"the": "quick"},
{"brown": "fox"},
],
"nested_jellybean": [
{"id": 123, "custom_field_1": "abc", "custom_field_2": 1212, "custom_field_3": None},
{"id": 124, "custom_field_1": "foo", "custom_field_2": 9009, "custom_field_3": True}
],
}


Expand All @@ -204,6 +233,12 @@ def transformed_schemas():
Property("the", StringType),
Property("brown", StringType),
).to_dict(),
"nested_jellybean": PropertiesList(
Property("id", IntegerType),
Property("custom_field_1", StringType),
Property("custom_field_2", IntegerType),
Property("custom_field_3", BooleanType)
).to_dict()
}


Expand Down

0 comments on commit 899e698

Please sign in to comment.