From 62a3bbb53ecaeebdfb22b90b6ea841d31b3d4c42 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 8 Sep 2024 00:33:46 +0200 Subject: [PATCH] Zyp Treatments: Add `normalize_complex_lists` option Because CrateDB can not store lists of varying objects, try to normalize them, currently biased towards strings and floats. --- src/zyp/model/treatment.py | 65 ++++++++++++++++++++++++++++- tests/transform/conftest.py | 1 + tests/transform/test_zyp_generic.py | 41 ++++++++++++++++++ tests/zyp/test_treatment.py | 23 ++++++++++ 4 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 tests/transform/test_zyp_generic.py diff --git a/src/zyp/model/treatment.py b/src/zyp/model/treatment.py index f7392c0..076b7e8 100644 --- a/src/zyp/model/treatment.py +++ b/src/zyp/model/treatment.py @@ -1,3 +1,4 @@ +import builtins import typing as t from attr import Factory @@ -13,6 +14,7 @@ class Treatment(Dumpable): convert_list: t.List[str] = Factory(list) convert_string: t.List[str] = Factory(list) convert_dict: t.List[t.Dict[str, str]] = Factory(list) + normalize_complex_lists: bool = False prune_invalid_date: t.List[str] = Factory(list) def apply(self, data: DictOrList) -> DictOrList: @@ -28,7 +30,7 @@ def apply_record(self, data: Record) -> Record: local_ignores = [] if self.ignore_complex_lists: for k, v in data.items(): - if isinstance(v, list) and v and isinstance(v[0], dict): + if self.is_list_of_dicts(v): # Skip ignoring special-encoded items. if v[0] and list(v[0].keys())[0].startswith("$"): continue @@ -39,6 +41,12 @@ def apply_record(self, data: Record) -> Record: if ignore_name in data: del data[ignore_name] + # Apply normalization for lists of objects. + if self.normalize_complex_lists: + for _, v in data.items(): + if self.is_list_of_dicts(v): + ListOfVaryingObjectsNormalizer(v).apply() + # Converge certain items to `list` even when defined differently. for to_list_name in self.convert_list: if to_list_name in data and not isinstance(data[to_list_name], list): @@ -66,3 +74,58 @@ def apply_record(self, data: Record) -> Record: del data[key] return data + + @staticmethod + def is_list_of_dicts(v: t.Any) -> bool: + return isinstance(v, list) and bool(v) and isinstance(v[0], dict) + + +@define +class NormalizerRule: + """ + Manage details of a normalizer rule. + """ + + name: str + converter: t.Callable + + +@define +class ListOfVaryingObjectsNormalizer: + """ + CrateDB can not store lists of varying objects, so try to normalize them. + """ + + data: Collection + + def apply(self): + self.apply_rules(self.get_rules(self.type_stats())) + + def apply_rules(self, rules: t.List[NormalizerRule]) -> None: + for item in self.data: + for rule in rules: + name = rule.name + if name in item: + item[name] = rule.converter(item[name]) + + def get_rules(self, statistics) -> t.List[NormalizerRule]: + rules = [] + for name, types in statistics.items(): + if len(types) > 1: + rules.append(NormalizerRule(name=name, converter=self.get_best_converter(types))) + return rules + + def type_stats(self) -> t.Dict[str, t.List[str]]: + types: t.Dict[str, t.List[str]] = {} + for item in self.data: + for key, value in item.items(): + types.setdefault(key, []).append(type(value).__name__) + return types + + @staticmethod + def get_best_converter(types: t.List[str]) -> t.Callable: + if "str" in types: + return builtins.str + if "float" in types and "int" in types and "str" not in types: + return builtins.float + return lambda x: x diff --git a/tests/transform/conftest.py b/tests/transform/conftest.py index 5b023ec..9a9f3b1 100644 --- a/tests/transform/conftest.py +++ b/tests/transform/conftest.py @@ -2,6 +2,7 @@ RESET_TABLES = [ "from.dynamodb", + "from.generic", ] diff --git a/tests/transform/test_zyp_generic.py b/tests/transform/test_zyp_generic.py new file mode 100644 index 0000000..6fe3f4b --- /dev/null +++ b/tests/transform/test_zyp_generic.py @@ -0,0 +1,41 @@ +import pytest + +from commons_codec.model import SQLOperation +from zyp.model.treatment import Treatment + + +@pytest.mark.integration +def test_normalize_list_of_objects(caplog, cratedb): + """ + Verify writing record to CrateDB, with transformations. + """ + + record_in = { + "_list_float_int": [{"abc": 42.42}, {"abc": 42}], + "_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2, "abc": None}], + "_list_int_str": [{"abc": 123}, {"abc": "123"}], + } + + record_out = { + "_list_float_int": [{"abc": 42.42}, {"abc": 42.0}], + "_list_float_none": [{"id": 1, "abc": 42.42}, {"id": 2}], + "_list_int_str": [{"abc": "123"}, {"abc": "123"}], + } + + # Define CrateDB SQL DDL and DML operations (SQL+parameters). + operation_ddl = SQLOperation('CREATE TABLE "from".generic (data OBJECT(DYNAMIC))', None) + operation_dml = SQLOperation('INSERT INTO "from".generic (data) VALUES (:data)', {"data": record_in}) + + # Apply treatment to parameters. + parameters = operation_dml.parameters + Treatment(normalize_complex_lists=True).apply(parameters) + + # Insert into CrateDB. + cratedb.database.run_sql(operation_ddl.statement) + cratedb.database.run_sql(operation_dml.statement, parameters) + + # Verify data in target database. + assert cratedb.database.refresh_table("from.generic") is True + + results = cratedb.database.run_sql('SELECT * FROM "from".generic;', records=True) # noqa: S608 + assert results[0]["data"] == record_out diff --git a/tests/zyp/test_treatment.py b/tests/zyp/test_treatment.py index fad5ad7..d69c582 100644 --- a/tests/zyp/test_treatment.py +++ b/tests/zyp/test_treatment.py @@ -70,6 +70,29 @@ def test_treatment_ignore_fields(): assert transformation.apply([{"data": [{"abc": 123}]}]) == [{"data": [{}]}] +def test_treatment_normalize_complex_lists_success(): + """ + Verify normalizing lists of objects works. + """ + transformation = Treatment(normalize_complex_lists=True) + assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": 123}]}]) == [ + {"data": [{"abc": 123.42}, {"abc": 123.0}]} + ] + assert transformation.apply([{"data": [{"abc": 123}, {"abc": "123"}]}]) == [ + {"data": [{"abc": "123"}, {"abc": "123"}]} + ] + + +def test_treatment_normalize_complex_lists_passthrough(): + """ + When no normalization rule can be applied, return input 1:1. + """ + transformation = Treatment(normalize_complex_lists=True) + assert transformation.apply([{"data": [{"abc": 123.42}, {"abc": None}]}]) == [ + {"data": [{"abc": 123.42}, {"abc": None}]} + ] + + def test_treatment_convert_string(): """ Verify treating nested data to convert values into strings works.